KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > kernel > AcceptThread


1 package com.ubermq.kernel;
2
3 import java.net.*;
4 import java.nio.channels.*;
5
6 import java.io.IOException JavaDoc;
7 import java.util.Iterator JavaDoc;
8 import java.util.Set JavaDoc;
9 import java.util.Arrays JavaDoc;
10
11 /**
12  * AcceptThread waits for incoming connections and adds them to a
13  * connection list.
14  */

15 public class AcceptThread extends Thread JavaDoc
16 {
17     private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AcceptThread.class);
18     
19     private ServerSocketChannel ssc;
20     private SelectionKey listenKey;
21     private Selector connectSelector;
22
23     private IConnectionInfo.ConnectionAcceptor a;
24
25     private IDatagramFactory factory;
26     private IMessageProcessor incomingProc;
27
28     /**
29      * Instantiate an accept thread.
30      * @param connectSelector the Selector to use to wait for connections.
31      * @param cxns[] connections are added to these ConnectionList's in a round robin fashion as they arrive.
32      * @param bindAddress the SocketAddress to bind to locally.
33      * @param factory the datagram factory to use when creating SocketConnectionInfo objects
34      * @param incomingProc the processor to use when creating SocketConnectionInfo objects
35      */

36     public AcceptThread(Selector connectSelector,
37                         IConnectionInfo.ConnectionAcceptor a,
38                         SocketAddress bindAddress,
39                         IDatagramFactory factory,
40                         IMessageProcessor incomingProc)
41         throws IOException JavaDoc
42     {
43         super("Acceptor");
44
45         this.connectSelector = connectSelector;
46         this.a = a;
47         this.factory = factory;
48         this.incomingProc = incomingProc;
49
50         ssc = ServerSocketChannel.open();
51         ssc.configureBlocking(false);
52         ssc.socket().bind(bindAddress);
53
54         log.debug("Bound to " + bindAddress);
55         listenKey = ssc.register(this.connectSelector, SelectionKey.OP_ACCEPT);
56     }
57
58     /**
59      * Instantiate an accept thread. This is a convenience constructor
60      * that binds to <code>0.0.0.0</code> on the port specified.
61      *
62      * @param connectSelector the Selector to use to wait for connections.
63      * @param cxns[] connections are added to these ConnectionList's in a round robin fashion as they arrive.
64      * @param port the port to listen on, or zero to use any port.
65      * @param factory the datagram factory to use when creating SocketConnectionInfo objects
66      * @param incomingProc the processor to use when creating SocketConnectionInfo objects
67      */

68     public AcceptThread(Selector connectSelector,
69                         IConnectionInfo.ConnectionAcceptor a,
70                         int port,
71                         IDatagramFactory factory,
72                         IMessageProcessor incomingProc)
73         throws IOException JavaDoc
74     {
75         this(connectSelector,
76              a,
77              new InetSocketAddress(port),
78              factory,
79              incomingProc);
80     }
81
82     public void run() {
83         while(!isInterrupted()) {
84             try {
85                 int n = connectSelector.select();
86
87                 if (n > 0)
88                     acceptPendingConnections();
89             } catch(Exception JavaDoc ex) {
90                 log.error("", ex);
91             }
92         }
93
94         // stop listening
95
try
96         {
97             listenKey.cancel();
98             ssc.socket().close();
99             ssc.close();
100
101             System.out.println(ssc.socket().isClosed());
102         }
103         catch (IOException JavaDoc e) {
104             log.error("", e);
105         }
106
107         // we are done
108
log.info("Server listener shut down.");
109     }
110
111     private void acceptPendingConnections() throws IOException JavaDoc {
112         Set JavaDoc readyKeys = connectSelector.selectedKeys();
113
114         for(Iterator JavaDoc i = readyKeys.iterator(); i.hasNext(); ) {
115             SelectionKey key = (SelectionKey)i.next();
116             i.remove();
117
118             ServerSocketChannel readyChannel = (ServerSocketChannel)key.channel();
119
120             SocketChannel incomingChannel = readyChannel.accept();
121             incomingChannel.configureBlocking(false);
122             configureSocket(incomingChannel.socket());
123             log.debug("AcceptThread: Connection from " + incomingChannel.socket().getInetAddress());
124
125             // create a socket connection info instance
126
// and add it to the connection list monitored by the Read write transform thread
127
SocketConnectionInfo conn = new SocketChannelConnectionInfo(incomingChannel, factory, incomingProc);
128             incomingProc.accept(conn);
129             a.acceptIncomingConnection(conn);
130         }
131     }
132
133     private void configureSocket(Socket s)
134         throws SocketException
135     {
136         s.setTcpNoDelay(true);
137     }
138
139 }
140
141
Popular Tags