KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > ConnectionTable1_4


1 // $Id: ConnectionTable1_4.java,v 1.9 2005/03/23 15:42:40 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.jgroups.Address;
6 import org.jgroups.Message;
7 import org.jgroups.stack.IpAddress;
8 import org.jgroups.util.Util;
9 import org.jgroups.util.Util1_4;
10
11 import java.io.IOException JavaDoc;
12 import java.net.*;
13 import java.nio.ByteBuffer JavaDoc;
14 import java.nio.channels.SelectionKey JavaDoc;
15 import java.nio.channels.Selector JavaDoc;
16 import java.nio.channels.ServerSocketChannel JavaDoc;
17 import java.nio.channels.SocketChannel JavaDoc;
18 import java.util.ArrayList JavaDoc;
19 import java.util.Iterator JavaDoc;
20 import java.util.Set JavaDoc;
21
22
23 /**
24  * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
25  * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
26  * connection. For incoming messages, one server socket is created at startup. For each new incoming
27  * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
28  * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
29  * after some time.<br> Incoming messages from any of the sockets can be received by setting the
30  * message listener.
31  *
32  * @author Bela Ban
33  */

34 public class ConnectionTable1_4 extends ConnectionTable implements Runnable JavaDoc {
35
36     private ServerSocketChannel JavaDoc srv_sock_ch=null;
37     private Selector JavaDoc selector=null;
38     private ArrayList JavaDoc pendingSocksList=null;
39
40     /**
41      * @param srv_port
42      * @throws Exception
43      */

44     public ConnectionTable1_4(int srv_port) throws Exception JavaDoc {
45         super(srv_port);
46     }
47
48     /**
49      * @param srv_port
50      * @param reaper_interval
51      * @param conn_expire_time
52      * @throws Exception
53      */

54     public ConnectionTable1_4(int srv_port, long reaper_interval,
55                               long conn_expire_time) throws Exception JavaDoc {
56         super(srv_port, reaper_interval, conn_expire_time);
57     }
58
59     /**
60      * @param r
61      * @param bind_addr
62      * @param external_addr
63      * @param srv_port
64      * @param max_port
65      * @throws Exception
66      */

67     public ConnectionTable1_4(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port)
68             throws Exception JavaDoc {
69         super(r, bind_addr, external_addr, srv_port, max_port);
70     }
71
72     /**
73      * @param r
74      * @param bind_addr
75      * @param external_addr
76      * @param srv_port
77      * @param max_port
78      * @param reaper_interval
79      * @param conn_expire_time
80      * @throws Exception
81      */

82     public ConnectionTable1_4(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
83                               long reaper_interval, long conn_expire_time) throws Exception JavaDoc {
84         super(r, bind_addr, external_addr, srv_port, max_port, reaper_interval, conn_expire_time);
85     }
86
87     /**
88      * Try to obtain correct Connection (or create one if not yet existent)
89      */

90     ConnectionTable.Connection getConnection(Address dest) throws Exception JavaDoc {
91         Connection conn=null;
92         SocketChannel JavaDoc sock_ch;
93
94         synchronized(conns) {
95             conn=(Connection)conns.get(dest);
96             if(conn == null) {
97                 InetSocketAddress destAddress=
98                         new InetSocketAddress(((IpAddress)dest).getIpAddress(),
99                                 ((IpAddress)dest).getPort());
100                 sock_ch=SocketChannel.open(destAddress);
101                 conn=new Connection(sock_ch, dest);
102                 conn.sendLocalAddress(local_addr);
103                 // conns.put(dest, conn);
104
addConnection(dest, conn);
105                 pendingSocksList.add(conn);
106                 selector.wakeup();
107                 notifyConnectionOpened(dest);
108                 if(log.isInfoEnabled()) log.info("created socket to " + dest);
109             }
110             return conn;
111         }
112     }
113
114     /**
115      * Closes all open sockets, the server socket and all threads waiting for incoming messages
116      */

117     public void stop() {
118         if(srv_sock_ch != null) {
119             try {
120                 ServerSocketChannel JavaDoc temp=srv_sock_ch;
121                 srv_sock_ch=null;
122                 temp.close();
123             }
124             catch(Exception JavaDoc ex) {
125             }
126         }
127         super.stop();
128     }
129
130     /**
131      * Acceptor thread. Continuously accept new connections. Create a new
132      * thread for each new connection and put it in conns. When the thread
133      * should stop, it is interrupted by the thread creator.
134      */

135     public void run() {
136         Socket client_sock;
137         Connection conn=null;
138         Address peer_addr;
139
140         while(srv_sock_ch != null) {
141             try {
142                 if(selector.select() > 0) {
143                     Set JavaDoc readyKeys=selector.selectedKeys();
144                     for(Iterator JavaDoc i=readyKeys.iterator(); i.hasNext();) {
145                         SelectionKey JavaDoc key=(SelectionKey JavaDoc)i.next();
146                         i.remove();
147                         if((key.readyOps() & SelectionKey.OP_ACCEPT)
148                                 == SelectionKey.OP_ACCEPT) {
149                             ServerSocketChannel JavaDoc readyChannel=
150                                     (ServerSocketChannel JavaDoc)key.channel();
151
152                             SocketChannel JavaDoc client_sock_ch=
153                                     readyChannel.accept();
154                             client_sock=client_sock_ch.socket();
155
156                             if(log.isInfoEnabled())
157                                 log.info("accepted connection, client_sock="
158                                         + client_sock);
159
160                             conn=new Connection(client_sock_ch, null);
161                             // will call receive(msg)
162
// get peer's address
163
peer_addr=conn.readPeerAddress(client_sock);
164
165                             conn.setPeerAddress(peer_addr);
166
167                             synchronized(conns) {
168                                 if(conns.containsKey(peer_addr)) {
169
170                                     if(log.isWarnEnabled())
171                                         log.warn(peer_addr
172                                                 + " is already there, will terminate connection");
173                                     conn.destroy();
174                                     return;
175                                 }
176                                 addConnection(peer_addr, conn);
177                             }
178                             conn.init();
179                             notifyConnectionOpened(peer_addr);
180                         }
181                         else
182                             if(
183                                     (key.readyOps() & SelectionKey.OP_READ)
184                                     == SelectionKey.OP_READ) {
185                                 conn=(Connection)key.attachment();
186                                 ByteBuffer JavaDoc buff=conn.getNIOMsgReader().readCompleteMsgBuffer();
187                                 if(buff != null) {
188                                     receive((Message)Util.objectFromByteBuffer(buff.array()));
189                                     conn.getNIOMsgReader().reset();
190                                 }
191                             }
192                     }
193                 }
194                 else {
195                     /*In addition to the accepted Sockets, we must registe
196                      * sockets opend by this for OP_READ, because peer may
197                      * use the same socket to sends data using the same socket,
198                      * instead of opening a new connection. We can not register
199                      * with this selectior from a different thread. set pending
200                      * and wakeup this selector.
201                      */

202                     synchronized(conns) {
203                         Connection pendingConnection;
204                         while((pendingSocksList.size() > 0) && (null != (pendingConnection=(Connection)pendingSocksList.remove(0)))) {
205                             pendingConnection.init();
206                         }
207                     }
208                 }
209             }
210             catch(SocketException sock_ex) {
211                 if(log.isInfoEnabled()) log.info("exception is " + sock_ex);
212                 if(conn != null)
213                     conn.destroy();
214                 if(srv_sock == null)
215                     break; // socket was closed, therefore stop
216
}
217             catch(Throwable JavaDoc ex) {
218
219                 if(log.isWarnEnabled()) log.warn("exception is " + ex);
220             }
221         }
222     }
223
224     /**
225      * Finds first available port starting at start_port and returns server socket. Sets srv_port
226      */

227     protected ServerSocket createServerSocket(int start_port) throws Exception JavaDoc {
228         this.selector=Selector.open();
229         srv_sock_ch=ServerSocketChannel.open();
230         srv_sock_ch.configureBlocking(false);
231         while(true) {
232             try {
233                 if(bind_addr == null)
234                     srv_sock_ch.socket().bind(new InetSocketAddress(start_port));
235                 else
236                     srv_sock_ch.socket().bind(new InetSocketAddress(bind_addr, start_port), backlog);
237             }
238             catch(BindException bind_ex) {
239                 start_port++;
240                 continue;
241             }
242             catch(IOException JavaDoc io_ex) {
243                 if(log.isErrorEnabled()) log.error("exception is " + io_ex);
244             }
245             srv_port=start_port;
246             break;
247         }
248         pendingSocksList=new ArrayList JavaDoc();
249         srv_sock_ch.register(this.selector, SelectionKey.OP_ACCEPT);
250         return srv_sock_ch.socket();
251     }
252
253     class Connection extends ConnectionTable.Connection {
254         private SocketChannel JavaDoc sock_ch=null;
255         private static final int HEADER_SIZE=4;
256         private static final int DEFAULT_BUFF_SIZE=256;
257         final ByteBuffer JavaDoc headerBuffer=ByteBuffer.allocate(HEADER_SIZE);
258         NBMessageForm1_4 nioMsgReader=null;
259
260         Connection(SocketChannel JavaDoc s, Address peer_addr) {
261             super(s.socket(), peer_addr);
262             sock_ch=s;
263         }
264
265         void init() {
266             in=null;
267             out=null;
268             try {
269                 sock_ch.configureBlocking(false);
270                 nioMsgReader=new NBMessageForm1_4(DEFAULT_BUFF_SIZE, sock_ch);
271                 sock_ch.register(selector, SelectionKey.OP_READ, this);
272             }
273             catch(IOException JavaDoc e) {
274             }
275
276             if(log.isInfoEnabled()) log.info("connection was created to " + peer_addr);
277
278         }
279
280         void destroy() {
281             closeSocket();
282             nioMsgReader=null;
283         }
284
285         void doSend(Message msg) throws Exception JavaDoc {
286             IpAddress dst_addr=(IpAddress)msg.getDest();
287             byte[] buffie=null;
288
289             if(dst_addr == null || dst_addr.getIpAddress() == null) {
290                 if(log.isErrorEnabled()) log.error("the destination address is null; aborting send");
291                 return;
292             }
293
294             try {
295                 // set the source address if not yet set
296
if(msg.getSrc() == null)
297                     msg.setSrc(local_addr);
298
299                 buffie=Util.objectToByteBuffer(msg);
300                 if(buffie.length <= 0) {
301                     if(log.isErrorEnabled()) log.error("buffer.length is 0. Will not send message");
302                     return;
303                 }
304
305                 headerBuffer.clear();
306                 headerBuffer.putInt(buffie.length);
307                 headerBuffer.flip();
308                 Util1_4.writeFully(headerBuffer, sock_ch);
309                 ByteBuffer JavaDoc sendBuffer=ByteBuffer.wrap(buffie);
310                 Util1_4.writeFully(sendBuffer, sock_ch);
311             }
312             catch(Exception JavaDoc ex) {
313
314                 if(log.isErrorEnabled())
315                     log.error("to " + dst_addr + ", exception is " + ex + ", stack trace:\n" +
316                             Util.printStackTrace(ex));
317                 remove(dst_addr);
318                 throw ex;
319             }
320         }
321
322         void closeSocket() {
323             if(sock != null) {
324                 try {
325                     sock_ch.close();
326                 }
327                 catch(Exception JavaDoc e) {
328                 }
329                 sock=null;
330             }
331         }
332
333         NBMessageForm1_4 getNIOMsgReader() {
334             return nioMsgReader;
335         }
336     }
337 }
Popular Tags