KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > ConnectionTableNIO


1 // $Id: ConnectionTableNIO.java,v 1.32 2007/07/06 08:07:05 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.apache.commons.logging.Log;
6 import org.jgroups.Address;
7 import org.jgroups.Global;
8 import org.jgroups.stack.IpAddress;
9 import org.jgroups.util.DirectExecutor;
10 import org.jgroups.util.Util;
11
12 import java.io.IOException JavaDoc;
13 import java.net.*;
14 import java.nio.ByteBuffer JavaDoc;
15 import java.nio.channels.*;
16 import java.nio.channels.spi.SelectorProvider JavaDoc;
17 import java.util.*;
18 import java.util.concurrent.*;
19
20 /**
21  * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
22  * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
23  * connection. For incoming messages, one server socket is created at startup. For each new incoming
24  * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
25  * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
26  * after some time.
27  * <p/>
28  * Incoming messages from any of the sockets can be received by setting the message listener.
29  *
30  * We currently require use_incoming_packet_handler=true (release 2.4 will support use_incoming_packet_handler=false
31  * due to threadless stack support).
32  *
33  * @author Bela Ban, Scott Marlow, Alex Fu
34  */

35 public class ConnectionTableNIO extends BasicConnectionTable implements Runnable JavaDoc {
36
37    private ServerSocketChannel m_serverSocketChannel;
38    private Selector m_acceptSelector;
39
40    private WriteHandler[] m_writeHandlers;
41    private int m_nextWriteHandler = 0;
42    private final Object JavaDoc m_lockNextWriteHandler = new Object JavaDoc();
43
44    private ReadHandler[] m_readHandlers;
45    private int m_nextReadHandler = 0;
46    private final Object JavaDoc m_lockNextReadHandler = new Object JavaDoc();
47
48    // thread pool for processing read requests
49
private Executor m_requestProcessors;
50    private volatile boolean serverStopping=false;
51
52    private final List<Thread JavaDoc> m_backGroundThreads = new LinkedList<Thread JavaDoc>(); // Collection of all created threads
53

54    private int m_reader_threads = 3;
55
56    private int m_writer_threads = 3;
57
58    private int m_processor_threads = 5; // PooledExecutor.createThreads()
59
private int m_processor_minThreads = 5; // PooledExecutor.setMinimumPoolSize()
60
private int m_processor_maxThreads = 5; // PooledExecutor.setMaxThreads()
61
private int m_processor_queueSize=100; // Number of queued requests that can be pending waiting
62
// for a background thread to run the request.
63
private long m_processor_keepAliveTime = Long.MAX_VALUE; // PooledExecutor.setKeepAliveTime( milliseconds);
64
// negative value used to mean to wait forever, instead set to Long.MAX_VALUE to wait forever
65

66
67
68    /**
69     * @param srv_port
70     * @throws Exception
71     */

72    public ConnectionTableNIO(int srv_port) throws Exception JavaDoc {
73       this.srv_port=srv_port;
74       start();
75    }
76
77    /**
78     * @param srv_port
79     * @param reaper_interval
80     * @param conn_expire_time
81     * @throws Exception
82     */

83    public ConnectionTableNIO(int srv_port, long reaper_interval,
84                              long conn_expire_time) throws Exception JavaDoc {
85       this.srv_port=srv_port;
86       this.reaper_interval=reaper_interval;
87       this.conn_expire_time=conn_expire_time;
88       start();
89    }
90
91    /**
92     * @param r
93     * @param bind_addr
94     * @param external_addr
95     * @param srv_port
96     * @param max_port
97     * @throws Exception
98     */

99    public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port
100    )
101       throws Exception JavaDoc
102    {
103       setReceiver(r);
104       this.external_addr=external_addr;
105       this.bind_addr=bind_addr;
106       this.srv_port=srv_port;
107       this.max_port=max_port;
108       use_reaper=true;
109       start();
110    }
111
112
113     public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
114                               boolean doStart
115     )
116             throws Exception JavaDoc
117     {
118         setReceiver(r);
119         this.external_addr=external_addr;
120         this.bind_addr=bind_addr;
121         this.srv_port=srv_port;
122         this.max_port=max_port;
123         use_reaper=true;
124         if(doStart)
125             start();
126     }
127
128
129    /**
130     * @param r
131     * @param bind_addr
132     * @param external_addr
133     * @param srv_port
134     * @param max_port
135     * @param reaper_interval
136     * @param conn_expire_time
137     * @throws Exception
138     */

139    public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
140                              long reaper_interval, long conn_expire_time
141                              ) throws Exception JavaDoc
142    {
143       setReceiver(r);
144       this.bind_addr=bind_addr;
145       this.external_addr=external_addr;
146       this.srv_port=srv_port;
147       this.max_port=max_port;
148       this.reaper_interval=reaper_interval;
149       this.conn_expire_time=conn_expire_time;
150       use_reaper=true;
151       start();
152    }
153
154
155     public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
156                               long reaper_interval, long conn_expire_time, boolean doStart
157     ) throws Exception JavaDoc
158     {
159         setReceiver(r);
160         this.bind_addr=bind_addr;
161         this.external_addr=external_addr;
162         this.srv_port=srv_port;
163         this.max_port=max_port;
164         this.reaper_interval=reaper_interval;
165         this.conn_expire_time=conn_expire_time;
166         use_reaper=true;
167         if(doStart)
168             start();
169     }
170
171
172
173     public int getReaderThreads() { return m_reader_threads; }
174
175     public void setReaderThreads(int m_reader_threads) {
176         this.m_reader_threads=m_reader_threads;
177     }
178
179     public int getWriterThreads() { return m_writer_threads; }
180
181     public void setWriterThreads(int m_writer_threads) {
182         this.m_writer_threads=m_writer_threads;
183     }
184
185     public int getProcessorThreads() { return m_processor_threads; }
186
187     public void setProcessorThreads(int m_processor_threads) {
188         this.m_processor_threads=m_processor_threads;
189     }
190
191     public int getProcessorMinThreads() { return m_processor_minThreads;}
192
193     public void setProcessorMinThreads(int m_processor_minThreads) {
194         this.m_processor_minThreads=m_processor_minThreads;
195     }
196
197     public int getProcessorMaxThreads() { return m_processor_maxThreads;}
198
199     public void setProcessorMaxThreads(int m_processor_maxThreads) {
200         this.m_processor_maxThreads=m_processor_maxThreads;
201     }
202
203     public int getProcessorQueueSize() { return m_processor_queueSize; }
204
205     public void setProcessorQueueSize(int m_processor_queueSize) {
206         this.m_processor_queueSize=m_processor_queueSize;
207     }
208
209     public long getProcessorKeepAliveTime() { return m_processor_keepAliveTime; }
210
211     public void setProcessorKeepAliveTime(long m_processor_keepAliveTime) {
212         this.m_processor_keepAliveTime=m_processor_keepAliveTime;
213     }
214
215
216     /**
217     * Try to obtain correct Connection (or create one if not yet existent)
218     */

219    ConnectionTable.Connection getConnection(Address dest) throws Exception JavaDoc
220    {
221       Connection conn;
222       SocketChannel sock_ch;
223
224       synchronized (conns)
225       {
226          conn = (Connection) conns.get(dest);
227          if (conn == null)
228          {
229 //Thread.dumpStack();
230
//System.out.println("getconnection failed to find " + ((IpAddress)dest).toString() + " conns size= "+conns.size());
231
// for(Iterator iter = conns.keySet().iterator(); iter.hasNext(); ) {
232
// System.out.println("keyset key=" + iter.next());
233
// }
234
InetSocketAddress destAddress = new InetSocketAddress(((IpAddress) dest).getIpAddress(),
235                ((IpAddress) dest).getPort());
236             sock_ch = SocketChannel.open(destAddress);
237              sock_ch.socket().setTcpNoDelay(tcp_nodelay);
238             conn = new Connection(sock_ch, dest);
239
240             conn.sendLocalAddress(local_addr);
241             // This outbound connection is ready
242

243             sock_ch.configureBlocking(false);
244
245             try
246             {
247                if (log.isTraceEnabled())
248                   log.trace("About to change new connection send buff size from " + sock_ch.socket().getSendBufferSize() + " bytes");
249                sock_ch.socket().setSendBufferSize(send_buf_size);
250                if (log.isTraceEnabled())
251                   log.trace("Changed new connection send buff size to " + sock_ch.socket().getSendBufferSize() + " bytes");
252             }
253             catch (IllegalArgumentException JavaDoc ex)
254             {
255                if (log.isErrorEnabled()) log.error("exception setting send buffer size to " +
256                   send_buf_size + " bytes: " + ex);
257             }
258             try
259             {
260                if (log.isTraceEnabled())
261                   log.trace("About to change new connection receive buff size from " + sock_ch.socket().getReceiveBufferSize() + " bytes");
262                sock_ch.socket().setReceiveBufferSize(recv_buf_size);
263                if (log.isTraceEnabled())
264                   log.trace("Changed new connection receive buff size to " + sock_ch.socket().getReceiveBufferSize() + " bytes");
265             }
266             catch (IllegalArgumentException JavaDoc ex)
267             {
268                if (log.isErrorEnabled()) log.error("exception setting receive buffer size to " +
269                   send_buf_size + " bytes: " + ex);
270             }
271
272             int idx;
273             synchronized (m_lockNextWriteHandler)
274             {
275                idx = m_nextWriteHandler = (m_nextWriteHandler + 1) % m_writeHandlers.length;
276             }
277             conn.setupWriteHandler(m_writeHandlers[idx]);
278
279             // Put the new connection to the queue
280
try
281             {
282                synchronized (m_lockNextReadHandler)
283                {
284                   idx = m_nextReadHandler = (m_nextReadHandler + 1) % m_readHandlers.length;
285                }
286                m_readHandlers[idx].add(conn);
287
288             } catch (InterruptedException JavaDoc e)
289             {
290                if (log.isWarnEnabled())
291                   log.warn("Thread (" +Thread.currentThread().getName() + ") was interrupted, closing connection", e);
292                // What can we do? Remove it from table then.
293
conn.destroy();
294                throw e;
295             }
296
297             // Add connection to table
298
addConnection(dest, conn);
299
300             notifyConnectionOpened(dest);
301             if (log.isTraceEnabled()) log.trace("created socket to " + dest);
302          }
303          return conn;
304       }
305    }
306
307    public final void start() throws Exception JavaDoc {
308        super.start();
309        //Roland Kurmann 4/7/2003, build new thread group
310
thread_group = new ThreadGroup JavaDoc(Util.getGlobalThreadGroup(), "ConnectionTableThreads");
311        init();
312        srv_sock=createServerSocket(srv_port, max_port);
313
314        if (external_addr!=null)
315            local_addr=new IpAddress(external_addr, srv_sock.getLocalPort());
316        else if (bind_addr != null)
317            local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort());
318        else
319            local_addr=new IpAddress(srv_sock.getLocalPort());
320
321        if(log.isDebugEnabled()) log.debug("server socket created on " + local_addr);
322
323
324        //Roland Kurmann 4/7/2003, put in thread_group
325
acceptor=new Thread JavaDoc(thread_group, this, "ConnectionTable.AcceptorThread");
326        acceptor.setDaemon(true);
327        acceptor.start();
328        m_backGroundThreads.add(acceptor);
329
330        // start the connection reaper - will periodically remove unused connections
331
if(use_reaper && reaper == null) {
332            reaper=new Reaper();
333            reaper.start();
334        }
335    }
336
337    protected void init()
338       throws Exception JavaDoc
339    {
340
341       // use directExector if max thread pool size is less than or equal to zero.
342
if(getProcessorMaxThreads() <= 0) {
343          m_requestProcessors = new DirectExecutor();
344       }
345       else
346       {
347          // Create worker thread pool for processing incoming buffers
348
ThreadPoolExecutor requestProcessors = new ThreadPoolExecutor(getProcessorMinThreads(), getProcessorMaxThreads(),
349                                                                         getProcessorKeepAliveTime(), TimeUnit.MILLISECONDS,
350                                                                         new LinkedBlockingQueue<Runnable JavaDoc>(getProcessorQueueSize()));
351
352           requestProcessors.setThreadFactory(new ThreadFactory() {
353               public Thread JavaDoc newThread(Runnable JavaDoc runnable) {
354                   Thread JavaDoc new_thread=new Thread JavaDoc(thread_group, runnable);
355                   new_thread.setDaemon(true);
356                   new_thread.setName("ConnectionTableNIO.Thread");
357                   m_backGroundThreads.add(new_thread);
358                   return new_thread;
359               }
360           });
361          m_requestProcessors = requestProcessors;
362       }
363
364       m_writeHandlers = WriteHandler.create(getWriterThreads(), thread_group, m_backGroundThreads, log);
365       m_readHandlers = ReadHandler.create(getReaderThreads(), this, thread_group, m_backGroundThreads, log);
366    }
367
368
369    /**
370     * Closes all open sockets, the server socket and all threads waiting for incoming messages
371     */

372    public void stop()
373    {
374        super.stop();
375       serverStopping = true;
376
377        if(reaper != null)
378            reaper.stop();
379
380       // Stop the main selector
381
m_acceptSelector.wakeup();
382
383       // Stop selector threads
384
for (int i = 0; i < m_readHandlers.length; i++)
385       {
386          try
387          {
388             m_readHandlers[i].add(new Shutdown JavaDoc());
389          } catch (InterruptedException JavaDoc e)
390          {
391             log.error("Thread ("+Thread.currentThread().getName() +") was interrupted, failed to shutdown selector", e);
392          }
393       }
394       for (int i = 0; i < m_writeHandlers.length; i++)
395       {
396          try
397          {
398             m_writeHandlers[i].queue.put(new Shutdown JavaDoc());
399             m_writeHandlers[i].selector.wakeup();
400          } catch (InterruptedException JavaDoc e)
401          {
402             log.error("Thread ("+Thread.currentThread().getName() +") was interrupted, failed to shutdown selector", e);
403          }
404       }
405
406       // Stop the callback thread pool
407
if(m_requestProcessors instanceof ThreadPoolExecutor)
408          ((ThreadPoolExecutor)m_requestProcessors).shutdownNow();
409
410        if(m_requestProcessors instanceof ThreadPoolExecutor){
411         try{
412         ((ThreadPoolExecutor) m_requestProcessors).awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME,
413                                         TimeUnit.MILLISECONDS);
414         }catch(InterruptedException JavaDoc e){
415         }
416     }
417
418       // then close the connections
419
synchronized(conns) {
420           Iterator it=conns.values().iterator();
421           while(it.hasNext()) {
422               Connection conn=(Connection)it.next();
423               conn.destroy();
424           }
425           conns.clear();
426       }
427
428       while(!m_backGroundThreads.isEmpty()) {
429           Thread JavaDoc t =m_backGroundThreads.remove(0);
430           try {
431             t.join();
432           } catch(InterruptedException JavaDoc e) {
433             log.error("Thread ("+Thread.currentThread().getName() +") was interrupted while waiting on thread " + t.getName() + " to finish.");
434           }
435       }
436       m_backGroundThreads.clear();
437
438    }
439
440    /**
441     * Acceptor thread. Continuously accept new connections and assign readhandler/writehandler
442     * to them.
443     */

444    public void run() {
445        Connection conn;
446
447        while(m_serverSocketChannel.isOpen() && !serverStopping) {
448            int num;
449            try {
450                num=m_acceptSelector.select();
451            }
452            catch(IOException JavaDoc e) {
453                if(log.isWarnEnabled())
454                    log.warn("Select operation on listening socket failed", e);
455                continue; // Give up this time
456
}
457
458            if(num > 0) {
459                Set<SelectionKey> readyKeys=m_acceptSelector.selectedKeys();
460                for(Iterator<SelectionKey> i=readyKeys.iterator(); i.hasNext();) {
461                    SelectionKey key=i.next();
462                    i.remove();
463                    // We only deal with new incoming connections
464

465                    ServerSocketChannel readyChannel=(ServerSocketChannel)key.channel();
466                    SocketChannel client_sock_ch;
467                    try {
468                        client_sock_ch=readyChannel.accept();
469                    }
470                    catch(IOException JavaDoc e) {
471                        if(log.isWarnEnabled())
472                            log.warn("Attempt to accept new connection from listening socket failed", e);
473                        // Give up this connection
474
continue;
475                    }
476
477                    if(log.isTraceEnabled())
478                        log.trace("accepted connection, client_sock=" + client_sock_ch.socket());
479
480                    try {
481                        client_sock_ch.socket().setSendBufferSize(send_buf_size);
482                    }
483                    catch(IllegalArgumentException JavaDoc ex) {
484                        if(log.isErrorEnabled()) log.error("exception setting send buffer size to " + send_buf_size + " bytes: ", ex);
485                    }
486                    catch(SocketException e) {
487                        if(log.isErrorEnabled()) log.error("exception setting send buffer size to " + send_buf_size + " bytes: ", e);
488                    }
489
490                    try {
491                        client_sock_ch.socket().setReceiveBufferSize(recv_buf_size);
492                    }
493                    catch(IllegalArgumentException JavaDoc ex) {
494                        if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " + send_buf_size + " bytes: ", ex);
495                    }
496                    catch(SocketException e) {
497                        if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " + recv_buf_size + " bytes: ", e);
498                    }
499
500                    conn=new Connection(client_sock_ch, null);
501                    try {
502                        conn.peer_addr=conn.readPeerAddress(client_sock_ch.socket());
503                        synchronized(conns) {
504                            if(conns.containsKey(conn.getPeerAddress())) {
505                                if(log.isTraceEnabled())
506                                    log.trace(conn.peer_addr + " is already there, will reuse connection");
507                                continue;
508
509 //
510
// if(conn.getPeerAddress().equals(getLocalAddress())) {
511
// if(log.isTraceEnabled())
512
// log.trace(conn.getPeerAddress() + " is myself, not put it in table twice, but still read from it");
513
// }
514
// else {
515
// if(log.isWarnEnabled())
516
// log.warn(conn.getPeerAddress() + " is already there, will terminate connection");
517
// // keep existing connection, close this new one
518
// conn.destroy();
519
// continue;
520
// }
521
}
522                            else {
523                                addConnection(conn.getPeerAddress(), conn);
524                            }
525                        }
526                        notifyConnectionOpened(conn.getPeerAddress());
527                        client_sock_ch.configureBlocking(false);
528                    }
529                    catch(IOException JavaDoc e) {
530                        if(log.isWarnEnabled())
531                            log.warn("Attempt to configure non-blocking mode failed", e);
532                        conn.destroy();
533                        continue;
534                    }
535                    catch(Exception JavaDoc e) {
536                        if(log.isWarnEnabled())
537                            log.warn("Attempt to handshake with other peer failed", e);
538                        conn.destroy();
539                        continue;
540                    }
541
542                    int idx;
543                    synchronized(m_lockNextWriteHandler) {
544                        idx=m_nextWriteHandler=(m_nextWriteHandler + 1) % m_writeHandlers.length;
545                    }
546                    conn.setupWriteHandler(m_writeHandlers[idx]);
547
548                    try {
549                        synchronized(m_lockNextReadHandler) {
550                            idx=m_nextReadHandler=(m_nextReadHandler + 1) % m_readHandlers.length;
551                        }
552                        m_readHandlers[idx].add(conn);
553
554                    }
555                    catch(InterruptedException JavaDoc e) {
556                        if(log.isWarnEnabled())
557                            log.warn("Attempt to configure read handler for accepted connection failed", e);
558                        // close connection
559
conn.destroy();
560                    }
561                } // end of iteration
562
} // end of selected key > 0
563
} // end of thread
564

565        if(m_serverSocketChannel.isOpen()) {
566            try {
567                m_serverSocketChannel.close();
568            }
569            catch(Exception JavaDoc e) {
570                log.error("exception closing server listening socket", e);
571            }
572        }
573        if(log.isTraceEnabled())
574            log.trace("acceptor thread terminated");
575
576    }
577
578
579     /**
580     * Finds first available port starting at start_port and returns server socket. Sets srv_port
581     */

582    protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception JavaDoc
583    {
584       this.m_acceptSelector = Selector.open();
585       m_serverSocketChannel = ServerSocketChannel.open();
586       m_serverSocketChannel.configureBlocking(false);
587       while (true)
588       {
589          try
590          {
591             SocketAddress sockAddr;
592             if (bind_addr == null)
593             {
594                sockAddr=new InetSocketAddress(start_port);
595                m_serverSocketChannel.socket().bind(sockAddr);
596             }
597             else
598             {
599                sockAddr=new InetSocketAddress(bind_addr, start_port);
600                m_serverSocketChannel.socket().bind(sockAddr, backlog);
601             }
602          }
603          catch (BindException bind_ex)
604          {
605             if (start_port == end_port)
606                throw (BindException) ((new BindException("No available port to bind to")).initCause(bind_ex));
607             start_port++;
608             continue;
609          }
610          catch (SocketException bind_ex)
611          {
612             if (start_port == end_port)
613                throw (BindException) ((new BindException("No available port to bind to")).initCause(bind_ex));
614             start_port++;
615             continue;
616          }
617          catch (IOException JavaDoc io_ex)
618          {
619             if (log.isErrorEnabled()) log.error("Attempt to bind serversocket failed, port="+start_port+", bind addr=" + bind_addr ,io_ex);
620             throw io_ex;
621          }
622          srv_port = start_port;
623          break;
624       }
625       m_serverSocketChannel.register(this.m_acceptSelector, SelectionKey.OP_ACCEPT);
626       return m_serverSocketChannel.socket();
627    }
628
629    protected void runRequest(Address addr, ByteBuffer JavaDoc buf) throws InterruptedException JavaDoc {
630       m_requestProcessors.execute(new ExecuteTask(addr, buf));
631    }
632
633
634    // Represents shutdown
635
private static class Shutdown {
636    }
637
638    // ReadHandler has selector to deal with read, it runs in seperated thread
639
private static class ReadHandler implements Runnable JavaDoc {
640       private final Selector selector= initHandler();
641       private final LinkedBlockingQueue<Object JavaDoc> queue= new LinkedBlockingQueue<Object JavaDoc>();
642       private final ConnectionTableNIO connectTable;
643        private final Log log;
644
645       ReadHandler(ConnectionTableNIO ct, Log log) {
646          connectTable= ct;
647           this.log=log;
648       }
649
650       public Selector initHandler()
651       {
652          // Open the selector
653
try
654          {
655             return Selector.open();
656          } catch (IOException JavaDoc e)
657          {
658             if (log.isErrorEnabled()) log.error(e);
659             throw new IllegalStateException JavaDoc(e.getMessage());
660          }
661
662       }
663
664       /**
665        * create instances of ReadHandler threads for receiving data.
666        *
667        * @param workerThreads is the number of threads to create.
668        */

669       private static ReadHandler[] create(int workerThreads, ConnectionTableNIO ct, ThreadGroup JavaDoc tg, List<Thread JavaDoc> backGroundThreads, Log log)
670       {
671          ReadHandler[] handlers = new ReadHandler[workerThreads];
672          for (int looper = 0; looper < workerThreads; looper++)
673          {
674             handlers[looper] = new ReadHandler(ct, log);
675
676             Thread JavaDoc thread = new Thread JavaDoc(tg, handlers[looper], "nioReadHandlerThread");
677             thread.setDaemon(true);
678             thread.start();
679             backGroundThreads.add(thread);
680          }
681          return handlers;
682       }
683
684
685       private void add(Object JavaDoc conn) throws InterruptedException JavaDoc
686       {
687          queue.put(conn);
688          wakeup();
689       }
690
691       private void wakeup()
692       {
693          selector.wakeup();
694       }
695
696       public void run()
697       {
698          while (true)
699          { // m_s can be closed by the management thread
700
int events;
701             try
702             {
703                events = selector.select();
704             } catch (IOException JavaDoc e)
705             {
706                if (log.isWarnEnabled())
707                   log.warn("Select operation on socket failed", e);
708                continue; // Give up this time
709
} catch (ClosedSelectorException e)
710             {
711                if (log.isWarnEnabled())
712                   log.warn("Select operation on socket failed" , e);
713                return; // Selector gets closed, thread stops
714
}
715
716             if (events > 0)
717             { // there are read-ready channels
718
Set readyKeys = selector.selectedKeys();
719                try
720                {
721                    for (Iterator i = readyKeys.iterator(); i.hasNext();)
722                    {
723                       SelectionKey key = (SelectionKey) i.next();
724                       i.remove();
725                       // Do partial read and handle call back
726
Connection conn = (Connection) key.attachment();
727                       if(conn != null && conn.getSocketChannel() != null)
728                       {
729                         try
730                         {
731                             if (conn.getSocketChannel().isOpen())
732                                 readOnce(conn);
733                             else
734                             { // socket connection is already closed, clean up connection state
735
conn.closed();
736                             }
737                         } catch (IOException JavaDoc e)
738                         {
739                             if (log.isTraceEnabled()) log.trace("Read operation on socket failed" , e);
740                             // The connection must be bad, cancel the key, close socket, then
741
// remove it from table!
742
key.cancel();
743                             conn.destroy();
744                             conn.closed();
745                         }
746                       }
747                    }
748                }
749                catch(ConcurrentModificationException e) {
750                    if (log.isTraceEnabled()) log.trace("Selection set changed", e);
751                    // valid events should still be in the selection set the next time
752
}
753             }
754
755             // Now we look at the connection queue to get any new connections added
756
Object JavaDoc o;
757             try
758             {
759                o = queue.poll(0L, TimeUnit.MILLISECONDS); // get a connection
760
} catch (InterruptedException JavaDoc e)
761             {
762                if (log.isTraceEnabled()) log.trace("Thread ("+Thread.currentThread().getName() +") was interrupted while polling queue" ,e);
763                // We must give up
764
continue;
765             }
766             if (null == o)
767                continue;
768             if (o instanceof Shutdown JavaDoc) { // shutdown command?
769
try {
770                   selector.close();
771                } catch(IOException JavaDoc e) {
772                   if (log.isTraceEnabled()) log.trace("Read selector close operation failed" , e);
773                }
774                return; // stop reading
775
}
776             Connection conn = (Connection) o;// must be a new connection
777
SocketChannel sc = conn.getSocketChannel();
778             try
779             {
780                sc.register(selector, SelectionKey.OP_READ, conn);
781             } catch (ClosedChannelException e)
782             {
783                if (log.isTraceEnabled()) log.trace("Socket channel was closed while we were trying to register it to selector" , e);
784                // Channel becomes bad. The connection must be bad,
785
// close socket, then remove it from table!
786
conn.destroy();
787                conn.closed();
788             }
789          } // end of the while true loop
790
}
791
792       private void readOnce(Connection conn)
793          throws IOException JavaDoc
794       {
795          ConnectionReadState readState = conn.getReadState();
796          if (!readState.isHeadFinished())
797          { // a brand new message coming or header is not completed
798
// Begin or continue to read header
799
int size = readHeader(conn);
800             if (0 == size)
801             { // header is not completed
802
return;
803             }
804          }
805          // Begin or continue to read body
806
if (readBody(conn) > 0)
807          { // not finish yet
808
return;
809          }
810          Address addr = conn.getPeerAddress();
811          ByteBuffer JavaDoc buf = readState.getReadBodyBuffer();
812          // Clear status
813
readState.bodyFinished();
814          // Assign worker thread to execute call back
815
try
816          {
817             connectTable.runRequest(addr, buf);
818          } catch (InterruptedException JavaDoc e)
819          {
820             // Cannot do call back, what can we do?
821
// Give up handling the message then
822
log.error("Thread ("+Thread.currentThread().getName() +") was interrupted while assigning executor to process read request" , e);
823          }
824       }
825
826       /**
827        * Read message header from channel. It doesn't try to complete. If there is nothing in
828        * the channel, the method returns immediately.
829        *
830        * @param conn The connection
831        * @return 0 if header hasn't been read completely, otherwise the size of message body
832        * @throws IOException
833        */

834       private int readHeader(Connection conn)
835          throws IOException JavaDoc
836       {
837          ConnectionReadState readState = conn.getReadState();
838          ByteBuffer JavaDoc headBuf = readState.getReadHeadBuffer();
839
840          SocketChannel sc = conn.getSocketChannel();
841          while (headBuf.remaining() > 0)
842          {
843             int num = sc.read(headBuf);
844             if (-1 == num)
845             {// EOS
846
throw new IOException JavaDoc("Peer closed socket");
847             }
848             if (0 == num) // no more data
849
return 0;
850          }
851          // OK, now we get the whole header, change the status and return message size
852
return readState.headFinished();
853       }
854
855       /**
856        * Read message body from channel. It doesn't try to complete. If there is nothing in
857        * the channel, the method returns immediately.
858        *
859        * @param conn The connection
860        * @return remaining bytes for the message
861        * @throws IOException
862        */

863       private int readBody(Connection conn)
864          throws IOException JavaDoc
865       {
866          ByteBuffer JavaDoc bodyBuf = conn.getReadState().getReadBodyBuffer();
867
868          SocketChannel sc = conn.getSocketChannel();
869          while (bodyBuf.remaining() > 0)
870          {
871             int num = sc.read(bodyBuf);
872             if (-1 == num) // EOS
873
throw new IOException JavaDoc("Couldn't read from socket as peer closed the socket");
874             if (0 == num) // no more data
875
return bodyBuf.remaining();
876          }
877          // OK, we finished reading the whole message! Flip it (not necessary though)
878
bodyBuf.flip();
879          return 0;
880       }
881    }
882
883    private class ExecuteTask implements Runnable JavaDoc {
884       Address m_addr = null;
885       ByteBuffer JavaDoc m_buf = null;
886
887       public ExecuteTask(Address addr, ByteBuffer JavaDoc buf)
888       {
889          m_addr = addr;
890          m_buf = buf;
891       }
892
893       public void run()
894       {
895          receive(m_addr, m_buf.array(), m_buf.arrayOffset(), m_buf.limit());
896       }
897    }
898
899    private class ConnectionReadState {
900       private final Connection m_conn;
901
902       // Status for receiving message
903
private boolean m_headFinished = false;
904       private ByteBuffer JavaDoc m_readBodyBuf = null;
905       private final ByteBuffer JavaDoc m_readHeadBuf = ByteBuffer.allocate(Connection.HEADER_SIZE);
906
907       public ConnectionReadState(Connection conn)
908       {
909          m_conn = conn;
910       }
911
912       ByteBuffer JavaDoc getReadBodyBuffer()
913       {
914          return m_readBodyBuf;
915       }
916
917       ByteBuffer JavaDoc getReadHeadBuffer()
918       {
919          return m_readHeadBuf;
920       }
921
922       void bodyFinished()
923       {
924          m_headFinished = false;
925          m_readHeadBuf.clear();
926          m_readBodyBuf = null;
927          m_conn.updateLastAccessed();
928       }
929
930       /**
931        * Status change for finishing reading the message header (data already in buffer)
932        *
933        * @return message size
934        */

935       int headFinished()
936       {
937          m_headFinished = true;
938          m_readHeadBuf.flip();
939          int messageSize = m_readHeadBuf.getInt();
940          m_readBodyBuf = ByteBuffer.allocate(messageSize);
941          m_conn.updateLastAccessed();
942          return messageSize;
943       }
944
945       boolean isHeadFinished()
946       {
947          return m_headFinished;
948       }
949    }
950
951    class Connection extends ConnectionTable.Connection {
952       private SocketChannel sock_ch = null;
953       private WriteHandler m_writeHandler;
954       private SelectorWriteHandler m_selectorWriteHandler;
955       private final ConnectionReadState m_readState;
956
957       private static final int HEADER_SIZE = 4;
958       final ByteBuffer JavaDoc headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
959
960       Connection(SocketChannel s, Address peer_addr)
961       {
962          super(s.socket(), peer_addr);
963          sock_ch = s;
964          m_readState = new ConnectionReadState(this);
965           is_running=true;
966       }
967
968       private ConnectionReadState getReadState()
969       {
970          return m_readState;
971       }
972
973       private void setupWriteHandler(WriteHandler hdlr)
974       {
975          m_writeHandler = hdlr;
976          m_selectorWriteHandler = hdlr.add(sock_ch);
977       }
978
979
980
981       void doSend(byte[] buffie, int offset, int length) throws Exception JavaDoc
982       {
983          MyFuture result = new MyFuture();
984          m_writeHandler.write(sock_ch, ByteBuffer.wrap(buffie, offset, length), result, m_selectorWriteHandler);
985           Object JavaDoc ex = result.get();
986          if (ex instanceof Exception JavaDoc)
987          {
988              if (log.isErrorEnabled())
989                  log.error("failed sending message", (Exception JavaDoc)ex);
990              if (((Exception JavaDoc)ex).getCause() instanceof IOException JavaDoc)
991                  throw (IOException JavaDoc) ((Exception JavaDoc)ex).getCause();
992              throw (Exception JavaDoc)ex;
993          }
994          result.get();
995       }
996
997
998       SocketChannel getSocketChannel()
999       {
1000         return sock_ch;
1001      }
1002
1003      void closeSocket()
1004      {
1005
1006         if (sock_ch != null)
1007         {
1008            try
1009            {
1010               if(sock_ch.isConnected() && sock_ch.isOpen()) {
1011                  sock_ch.close();
1012               }
1013            }
1014            catch (Exception JavaDoc e)
1015            {
1016               log.error("error closing socket connection", e);
1017            }
1018            sock_ch = null;
1019         }
1020      }
1021
1022
1023      void closed()
1024      {
1025         Address peerAddr = getPeerAddress();
1026         synchronized (conns)
1027         {
1028            conns.remove(peerAddr);
1029         }
1030         notifyConnectionClosed(peerAddr);
1031      }
1032   }
1033
1034
1035   /**
1036    * Handle writing to non-blocking NIO connection.
1037    */

1038   private static class WriteHandler implements Runnable JavaDoc {
1039      // Create a queue for write requests (unbounded)
1040
private final LinkedBlockingQueue<Object JavaDoc> queue= new LinkedBlockingQueue<Object JavaDoc>();
1041
1042      private final Selector selector= initSelector();
1043      private int m_pendingChannels; // count of the number of channels that have pending writes
1044
// note that this variable is only accessed by one thread.
1045

1046      // allocate and reuse the header for all buffer write operations
1047
private ByteBuffer JavaDoc m_headerBuffer = ByteBuffer.allocate(Connection.HEADER_SIZE);
1048       private final Log log;
1049
1050
1051       public WriteHandler(Log log) {
1052           this.log=log;
1053       }
1054
1055       Selector initSelector() {
1056         try
1057         {
1058            return SelectorProvider.provider().openSelector();
1059         }
1060         catch (IOException JavaDoc e)
1061         {
1062            if (log.isErrorEnabled()) log.error(e);
1063            throw new IllegalStateException JavaDoc(e.getMessage());
1064         }
1065      }
1066
1067      /**
1068       * create instances of WriteHandler threads for sending data.
1069       *
1070       * @param workerThreads is the number of threads to create.
1071       */

1072      private static WriteHandler[] create(int workerThreads, ThreadGroup JavaDoc tg, List<Thread JavaDoc> backGroundThreads, Log log)
1073      {
1074         WriteHandler[] handlers = new WriteHandler[workerThreads];
1075         for (int looper = 0; looper < workerThreads; looper++)
1076         {
1077            handlers[looper] = new WriteHandler(log);
1078
1079            Thread JavaDoc thread = new Thread JavaDoc(tg, handlers[looper], "nioWriteHandlerThread");
1080            thread.setDaemon(true);
1081            thread.start();
1082            backGroundThreads.add(thread);
1083         }
1084         return handlers;
1085      }
1086
1087      /**
1088       * Add a new channel to be handled.
1089       *
1090       * @param channel
1091       */

1092      private SelectorWriteHandler add(SocketChannel channel)
1093      {
1094          return new SelectorWriteHandler(channel, selector, m_headerBuffer);
1095      }
1096
1097      /**
1098       * Writes buffer to the specified socket connection. This is always performed asynchronously. If you want
1099       * to perform a synchrounous write, call notification.`get() which will block until the write operation is complete.
1100       * Best practice is to call notification.getException() which may return any exceptions that occured during the write
1101       * operation.
1102       *
1103       * @param channel is where the buffer is written to.
1104       * @param buffer is what we write.
1105       * @param notification may be specified if you want to know how many bytes were written and know if an exception
1106       * occurred.
1107       */

1108      private void write(SocketChannel channel, ByteBuffer JavaDoc buffer, MyFuture notification, SelectorWriteHandler hdlr) throws InterruptedException JavaDoc
1109      {
1110         queue.put(new WriteRequest(channel, buffer, notification, hdlr));
1111      }
1112
1113      private static void close(SelectorWriteHandler entry)
1114      {
1115         entry.cancel();
1116      }
1117
1118      private static void handleChannelError( SelectorWriteHandler entry, Throwable JavaDoc error)
1119      {
1120         // notify callers of the exception and drain all of the send buffers for this channel.
1121
do
1122         {
1123            if (error != null)
1124               entry.notifyError(error);
1125         }
1126         while (entry.next());
1127         close(entry);
1128      }
1129
1130      // process the write operation
1131
private void processWrite(Selector selector)
1132      {
1133         Set keys = selector.selectedKeys();
1134         Object JavaDoc arr[] = keys.toArray();
1135          for (Object JavaDoc anArr : arr) {
1136              SelectionKey key = (SelectionKey) anArr;
1137              SelectorWriteHandler entry = (SelectorWriteHandler) key.attachment();
1138              boolean needToDecrementPendingChannels = false;
1139              try {
1140                  if (0 == entry.write()) { // write the buffer and if the remaining bytes is zero,
1141
// notify the caller of number of bytes written.
1142
entry.notifyObject(entry.getBytesWritten());
1143                      // switch to next write buffer or clear interest bit on socket channel.
1144
if (!entry.next()) {
1145                          needToDecrementPendingChannels = true;
1146                      }
1147                  }
1148
1149              }
1150              catch (IOException JavaDoc e) {
1151                  needToDecrementPendingChannels = true;
1152                  // connection must of closed
1153
handleChannelError(entry, e);
1154              }
1155              finally {
1156                  if (needToDecrementPendingChannels)
1157                      m_pendingChannels--;
1158              }
1159          }
1160          keys.clear();
1161      }
1162
1163      public void run()
1164      {
1165         while (selector.isOpen())
1166         {
1167            try
1168            {
1169               WriteRequest queueEntry;
1170               Object JavaDoc o;
1171
1172               // When there are no more commands in the Queue, we will hit the blocking code after this loop.
1173
while (null != (o = queue.poll(0L, TimeUnit.MILLISECONDS)))
1174               {
1175                  if (o instanceof Shutdown JavaDoc) // Stop the thread
1176
{
1177                     try {
1178                        selector.close();
1179                     } catch(IOException JavaDoc e) {
1180                        if (log.isTraceEnabled()) log.trace("Write selector close operation failed" , e);
1181                     }
1182                     return;
1183                  }
1184                  queueEntry = (WriteRequest) o;
1185
1186                  if (queueEntry.getHandler().add(queueEntry))
1187                  {
1188                     // If the add operation returns true, than means that a buffer is available to be written to the
1189
// corresponding channel and channel's selection key has been modified to indicate interest in the
1190
// 'write' operation.
1191
// If the add operation threw an exception, we will not increment m_pendingChannels which
1192
// seems correct as long as a new buffer wasn't added to be sent.
1193
// Another way to view this is that we don't have to protect m_pendingChannels on the increment
1194
// side, only need to protect on the decrement side (this logic of this run() will be incorrect
1195
// if m_pendingChannels is set incorrectly).
1196
m_pendingChannels++;
1197                  }
1198
1199                  try
1200                  {
1201                     // process any connections ready to be written to.
1202
if (selector.selectNow() > 0)
1203                     {
1204                        processWrite(selector);
1205                     }
1206                  }
1207                  catch (IOException JavaDoc e)
1208                  { // need to understand what causes this error so we can handle it properly
1209
if (log.isErrorEnabled()) log.error("SelectNow operation on write selector failed, didn't expect this to occur, please report this", e);
1210                     return; // if select fails, give up so we don't go into a busy loop.
1211
}
1212               }
1213
1214               // if there isn't any pending work to do, block on queue to get next request.
1215
if (m_pendingChannels == 0)
1216               {
1217                  o = queue.take();
1218                  if (o instanceof Shutdown JavaDoc){ // Stop the thread
1219
try {
1220                        selector.close();
1221                     } catch(IOException JavaDoc e) {
1222                        if (log.isTraceEnabled()) log.trace("Write selector close operation failed" , e);
1223                     }
1224                     return;
1225                  }
1226                  queueEntry = (WriteRequest) o;
1227                  if (queueEntry.getHandler().add(queueEntry))
1228                     m_pendingChannels++;
1229               }
1230               // otherwise do a blocking wait select operation.
1231
else
1232               {
1233                  try
1234                  {
1235                     if ((selector.select()) > 0)
1236                     {
1237                        processWrite(selector);
1238                     }
1239                  }
1240                  catch (IOException JavaDoc e)
1241                  { // need to understand what causes this error
1242
if (log.isErrorEnabled()) log.error("Failure while writing to socket",e);
1243                  }
1244               }
1245            }
1246            catch (InterruptedException JavaDoc e)
1247            {
1248               if (log.isErrorEnabled()) log.error("Thread ("+Thread.currentThread().getName() +") was interrupted", e);
1249            }
1250            catch (Throwable JavaDoc e) // Log throwable rather than terminating this thread.
1251
{ // We are a daemon thread so we shouldn't prevent the process from terminating if
1252
// the controlling thread decides that should happen.
1253
if (log.isErrorEnabled()) log.error("Thread ("+Thread.currentThread().getName() +") caught Throwable" , e);
1254            }
1255         }
1256      }
1257   }
1258
1259
1260   // Wrapper class for passing Write requests. There will be an instance of this class for each socketChannel
1261
// mapped to a Selector.
1262
public static class SelectorWriteHandler {
1263
1264      private final List<WriteRequest> m_writeRequests = new LinkedList<WriteRequest>(); // Collection of writeRequests
1265
private boolean m_headerSent = false;
1266      private SocketChannel m_channel;
1267      private SelectionKey m_key;
1268      private Selector m_selector;
1269      private int m_bytesWritten = 0;
1270      private boolean m_enabled = false;
1271      private ByteBuffer JavaDoc m_headerBuffer;
1272
1273      SelectorWriteHandler(SocketChannel channel, Selector selector, ByteBuffer JavaDoc headerBuffer)
1274      {
1275         m_channel = channel;
1276         m_selector = selector;
1277         m_headerBuffer = headerBuffer;
1278      }
1279
1280      private void register(Selector selector, SocketChannel channel) throws ClosedChannelException
1281      {
1282         // register the channel but don't enable OP_WRITE until we have a write request.
1283
m_key = channel.register(selector, 0, this);
1284      }
1285
1286      // return true if selection key is enabled when it wasn't previous to call.
1287
private boolean enable()
1288      {
1289         boolean rc = false;
1290
1291         try
1292         {
1293            if (m_key == null)
1294            { // register the socket on first access,
1295
// we are the only thread using this variable, so no sync needed.
1296
register(m_selector, m_channel);
1297            }
1298         }
1299         catch (ClosedChannelException e)
1300         {
1301            return rc;
1302         }
1303
1304         if (!m_enabled)
1305         {
1306            rc = true;
1307            try
1308            {
1309               m_key.interestOps(SelectionKey.OP_WRITE);
1310            }
1311            catch (CancelledKeyException e)
1312            { // channel must of closed
1313
return false;
1314            }
1315            m_enabled = true;
1316         }
1317         return rc;
1318      }
1319
1320      private void disable()
1321      {
1322         if (m_enabled)
1323         {
1324            try
1325            {
1326               m_key.interestOps(0); // pass zero which means that we are not interested in being
1327
// notified of anything for this channel.
1328
}
1329            catch (CancelledKeyException eat) // If we finished writing and didn't get an exception, then
1330
{ // we probably don't need to throw this exception (if they try to write
1331
// again, we will then throw an exception).
1332
}
1333            m_enabled = false;
1334         }
1335      }
1336
1337      private void cancel()
1338      {
1339         m_key.cancel();
1340      }
1341
1342      boolean add(WriteRequest entry)
1343      {
1344         m_writeRequests.add(entry);
1345         return enable();
1346      }
1347
1348      WriteRequest getCurrentRequest()
1349      {
1350         return m_writeRequests.get(0);
1351      }
1352
1353      SocketChannel getChannel()
1354      {
1355         return m_channel;
1356      }
1357
1358      ByteBuffer JavaDoc getBuffer()
1359      {
1360         return getCurrentRequest().getBuffer();
1361      }
1362
1363      MyFuture getCallback()
1364      {
1365         return getCurrentRequest().getCallback();
1366      }
1367
1368      int getBytesWritten()
1369      {
1370         return m_bytesWritten;
1371      }
1372
1373      void notifyError(Throwable JavaDoc error)
1374      {
1375         if (getCallback() != null)
1376            getCallback().setException(error);
1377      }
1378
1379      void notifyObject(Object JavaDoc result)
1380      {
1381         if (getCallback() != null)
1382            getCallback().set(result);
1383      }
1384
1385      /**
1386       * switch to next request or disable write interest bit if there are no more buffers.
1387       *
1388       * @return true if another request was found to be processed.
1389       */

1390      boolean next()
1391      {
1392         m_headerSent = false;
1393         m_bytesWritten = 0;
1394
1395         m_writeRequests.remove(0); // remove current entry
1396
boolean rc = !m_writeRequests.isEmpty();
1397         if (!rc) // disable select for this channel if no more entries
1398
disable();
1399         return rc;
1400      }
1401
1402      /**
1403       * @return bytes remaining to write. This function will only throw IOException, unchecked exceptions are not
1404       * expected to be thrown from here. It is very important for the caller to know if an unchecked exception can
1405       * be thrown in here. Please correct the following throws list to include any other exceptions and update
1406       * caller to handle them.
1407       * @throws IOException
1408       */

1409      int write() throws IOException JavaDoc
1410      {
1411         // Send header first. Note that while we are writing the shared header buffer,
1412
// no other threads can access the header buffer as we are the only thread that has access to it.
1413
if (!m_headerSent)
1414         {
1415            m_headerSent = true;
1416            m_headerBuffer.clear();
1417            m_headerBuffer.putInt(getBuffer().remaining());
1418            m_headerBuffer.flip();
1419            do
1420            {
1421               getChannel().write(m_headerBuffer);
1422            } // we should be able to handle writing the header in one action but just in case, just do a busy loop
1423
while (m_headerBuffer.remaining() > 0);
1424
1425         }
1426
1427         m_bytesWritten += (getChannel().write(getBuffer()));
1428
1429         return getBuffer().remaining();
1430      }
1431
1432   }
1433
1434   public static class WriteRequest {
1435      private final SocketChannel m_channel;
1436      private final ByteBuffer JavaDoc m_buffer;
1437      private final MyFuture m_callback;
1438      private final SelectorWriteHandler m_hdlr;
1439
1440      WriteRequest(SocketChannel channel, ByteBuffer JavaDoc buffer, MyFuture callback, SelectorWriteHandler hdlr)
1441      {
1442         m_channel = channel;
1443         m_buffer = buffer;
1444         m_callback = callback;
1445         m_hdlr = hdlr;
1446      }
1447
1448      SelectorWriteHandler getHandler()
1449      {
1450         return m_hdlr;
1451      }
1452
1453      SocketChannel getChannel()
1454      {
1455         return m_channel;
1456      }
1457
1458      ByteBuffer JavaDoc getBuffer()
1459      {
1460         return m_buffer;
1461      }
1462
1463      MyFuture getCallback()
1464      {
1465         return m_callback;
1466      }
1467
1468   }
1469
1470    private static class NullCallable implements Callable {
1471
1472        public Object JavaDoc call() {
1473            System.out.println("nullCallable.call invoked");
1474            return null;
1475        }
1476    }
1477    private static final NullCallable NULLCALL = new NullCallable();
1478
1479    public static class MyFuture extends FutureTask { // make FutureTask work like the old FutureResult
1480
public MyFuture() {
1481            super(NULLCALL);
1482        }
1483
1484        protected void set(Object JavaDoc o) {
1485            super.set(o);
1486        }
1487
1488
1489        protected void setException(Throwable JavaDoc t) {
1490            super.setException(t);
1491        }
1492    }
1493
1494}
1495
Popular Tags