KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > ConnectionTable


1 // $Id: ConnectionTable.java,v 1.23 2005/04/18 09:54:44 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Address;
9 import org.jgroups.Message;
10 import org.jgroups.Version;
11 import org.jgroups.stack.IpAddress;
12 import org.jgroups.util.Util;
13
14 import java.io.DataInputStream JavaDoc;
15 import java.io.DataOutputStream JavaDoc;
16 import java.io.EOFException JavaDoc;
17 import java.io.IOException JavaDoc;
18 import java.net.*;
19 import java.util.HashMap JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import java.util.Map JavaDoc;
22 import java.util.Vector JavaDoc;
23
24
25 /**
26  * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there
27  * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this
28  * connection. For incoming messages, one server socket is created at startup. For each new incoming
29  * client connecting, a new thread from a thread pool is allocated and listens for incoming messages
30  * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed
31  * after some time.<br> Incoming messages from any of the sockets can be received by setting the
32  * message listener.
33  * @author Bela Ban
34  */

35 public class ConnectionTable implements Runnable JavaDoc {
36     final HashMap JavaDoc conns=new HashMap JavaDoc(); // keys: Addresses (peer address), values: Connection
37
Receiver receiver=null;
38     ServerSocket srv_sock=null;
39     boolean reuse_addr=false;
40     InetAddress bind_addr=null;
41
42     /**
43      * The address which will be broadcast to the group (the externally visible address which this host should
44      * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
45      */

46     InetAddress external_addr=null;
47     Address local_addr=null; // bind_addr + port of srv_sock
48
int srv_port=7800;
49     int max_port=0; // maximum port to bind to (if < srv_port, no limit)
50
Thread JavaDoc acceptor=null; // continuously calls srv_sock.accept()
51
static final int backlog=20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
52
int recv_buf_size=120000;
53     int send_buf_size=60000;
54     final Vector JavaDoc conn_listeners=new Vector JavaDoc(); // listeners to be notified when a conn is established/torn down
55
final Object JavaDoc recv_mutex=new Object JavaDoc(); // to serialize simultaneous access to receive() from multiple Connections
56
Reaper reaper=null; // closes conns that have been idle for more than n secs
57
long reaper_interval=60000; // reap unused conns once a minute
58
long conn_expire_time=300000; // connections can be idle for 5 minutes before they are reaped
59
boolean use_reaper=false; // by default we don't reap idle conns
60
int sock_conn_timeout=1000; // max time in millis to wait for Socket.connect() to return
61
ThreadGroup JavaDoc thread_group=null;
62     protected final Log log=LogFactory.getLog(getClass());
63     static int javaVersion=0;
64     final byte[] cookie={'b', 'e', 'l', 'a'};
65
66
67     static {
68         javaVersion=Util.getJavaVersion();
69     }
70
71
72     /** Used for message reception */
73     public interface Receiver {
74         void receive(Message msg);
75     }
76
77
78
79     /** Used to be notified about connection establishment and teardown */
80     public interface ConnectionListener {
81         void connectionOpened(Address peer_addr);
82         void connectionClosed(Address peer_addr);
83     }
84
85
86     /**
87      * Regular ConnectionTable without expiration of idle connections
88      * @param srv_port The port on which the server will listen. If this port is reserved, the next
89      * free port will be taken (incrementing srv_port).
90      */

91     public ConnectionTable(int srv_port) throws Exception JavaDoc {
92         this.srv_port=srv_port;
93         start();
94     }
95
96
97     /**
98      * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
99      * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
100      * @param srv_port The port on which the server will listen
101      * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
102      * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
103      * it will be reaped
104      */

105     public ConnectionTable(int srv_port, long reaper_interval, long conn_expire_time) throws Exception JavaDoc {
106         this.srv_port=srv_port;
107         this.reaper_interval=reaper_interval;
108         this.conn_expire_time=conn_expire_time;
109         use_reaper=true;
110         start();
111     }
112
113
114     /**
115      * Create a ConnectionTable
116      * @param r A reference to a receiver of all messages received by this class. Method <code>receive()</code>
117      * will be called.
118      * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
119      * This is interesting only in multi-homed systems. If bind_addr is null, the
120      * server socket will bind to the first available interface (e.g. /dev/hme0 on
121      * Solaris or /dev/eth0 on Linux systems).
122      * @param external_addr The address which will be broadcast to the group (the externally visible address
123      * which this host should be contacted on). If external_addr is null, it will default to
124      * the same address that the server socket is bound to.
125      * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
126      * free port will be taken (incrementing srv_port).
127      * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
128      * then there is no limit.
129      */

130     public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port) throws Exception JavaDoc {
131         setReceiver(r);
132         this.bind_addr=bind_addr;
133         this.external_addr=external_addr;
134         this.srv_port=srv_port;
135         this.max_port=max_port;
136         start();
137     }
138
139
140     /**
141      * ConnectionTable including a connection reaper. Connections that have been idle for more than conn_expire_time
142      * milliseconds will be closed and removed from the connection table. On next access they will be re-created.
143      *
144      * @param srv_port The port on which the server will listen.If this port is reserved, the next
145      * free port will be taken (incrementing srv_port).
146      * @param bind_addr The host name or IP address of the interface to which the server socket will bind.
147      * This is interesting only in multi-homed systems. If bind_addr is null, the
148      * server socket will bind to the first available interface (e.g. /dev/hme0 on
149      * Solaris or /dev/eth0 on Linux systems).
150      * @param external_addr The address which will be broadcast to the group (the externally visible address
151      * which this host should be contacted on). If external_addr is null, it will default to
152      * the same address that the server socket is bound to.
153      * @param srv_port The port to which the server socket will bind to. If this port is reserved, the next
154      * free port will be taken (incrementing srv_port).
155      * @param max_port The largest port number that the server socket will be bound to. If max_port < srv_port
156      * then there is no limit.
157      * @param reaper_interval Number of milliseconds to wait for reaper between attepts to reap idle connections
158      * @param conn_expire_time Number of milliseconds a connection can be idle (no traffic sent or received until
159      * it will be reaped
160      */

161     public ConnectionTable(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,
162                            long reaper_interval, long conn_expire_time) throws Exception JavaDoc {
163         setReceiver(r);
164         this.bind_addr=bind_addr;
165         this.external_addr=external_addr;
166         this.srv_port=srv_port;
167         this.max_port=max_port;
168         this.reaper_interval=reaper_interval;
169         this.conn_expire_time=conn_expire_time;
170         use_reaper=true;
171         start();
172     }
173
174
175     public void setReceiver(Receiver r) {
176         receiver=r;
177     }
178
179
180     public void addConnectionListener(ConnectionListener l) {
181         if(l != null && !conn_listeners.contains(l))
182             conn_listeners.addElement(l);
183     }
184
185
186     public void removeConnectionListener(ConnectionListener l) {
187         if(l != null) conn_listeners.removeElement(l);
188     }
189
190
191     public Address getLocalAddress() {
192         if(local_addr == null)
193             local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null;
194         return local_addr;
195     }
196
197
198     public int getSendBufferSize() {
199         return send_buf_size;
200     }
201
202     public void setSendBufferSize(int send_buf_size) {
203         this.send_buf_size=send_buf_size;
204     }
205
206     public int getReceiveBufferSize() {
207         return recv_buf_size;
208     }
209
210     public void setReceiveBufferSize(int recv_buf_size) {
211         this.recv_buf_size=recv_buf_size;
212     }
213
214     public int getSocketConnectionTimeout() {
215         return sock_conn_timeout;
216     }
217
218     public void setSocketConnectionTimeout(int sock_conn_timeout) {
219         this.sock_conn_timeout=sock_conn_timeout;
220     }
221
222     /** Sends a message to a unicast destination. The destination has to be set
223      * @param msg The message to send
224      * @throws SocketException Thrown if connection cannot be established
225      */

226     public void send(Message msg) throws SocketException {
227         Address dest=msg != null ? msg.getDest() : null;
228         Connection conn;
229
230         if(dest == null) {
231             if(log.isErrorEnabled())
232                 log.error("msg is null or message's destination is null");
233             return;
234         }
235
236         // 1. Try to obtain correct Connection (or create one if not yet existent)
237
try {
238             conn=getConnection(dest);
239             if(conn == null) return;
240         }
241         catch(SocketException sock_ex) {
242             // log.error("exception sending message to " + dest, sock_ex);
243
throw sock_ex;
244         }
245         catch(Throwable JavaDoc ex) {
246             if(log.isInfoEnabled()) log.info("connection to " + dest + " could not be established: " + ex);
247             throw new SocketException(ex.toString());
248         }
249
250         // 2. Send the message using that connection
251
try {
252             conn.send(msg);
253         }
254         catch(Throwable JavaDoc ex) {
255             if(log.isTraceEnabled())
256                 log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table");
257             remove(dest);
258         }
259     }
260
261
262     /** Try to obtain correct Connection (or create one if not yet existent) */
263     Connection getConnection(Address dest) throws Exception JavaDoc {
264         Connection conn=null;
265         Socket sock;
266
267         synchronized(conns) {
268             conn=(Connection)conns.get(dest);
269             if(conn == null) {
270                 // changed by bela Jan 18 2004: use the bind address for the client sockets as well
271

272                 if(javaVersion >= 14) {
273                     SocketAddress tmpBindAddr=new InetSocketAddress(bind_addr, 0);
274                     InetAddress tmpDest=((IpAddress)dest).getIpAddress();
275                     SocketAddress destAddr=new InetSocketAddress(tmpDest, ((IpAddress)dest).getPort());
276                     sock=new Socket();
277                     sock.bind(tmpBindAddr);
278                     sock.connect(destAddr, sock_conn_timeout);
279                 }
280                 else {
281                     sock=new Socket(((IpAddress)dest).getIpAddress(), ((IpAddress)dest).getPort(), bind_addr, 0);
282                 }
283
284                 try {
285                     sock.setSendBufferSize(send_buf_size);
286                 }
287                 catch(IllegalArgumentException JavaDoc ex) {
288                     if(log.isErrorEnabled()) log.error("exception setting send buffer size to " +
289                                                        send_buf_size + " bytes: " + ex);
290                 }
291                 try {
292                     sock.setReceiveBufferSize(recv_buf_size);
293                 }
294                 catch(IllegalArgumentException JavaDoc ex) {
295                     if(log.isErrorEnabled()) log.error("exception setting receive buffer size to " +
296                             send_buf_size + " bytes: " + ex);
297                 }
298                 conn=new Connection(sock, dest);
299                 conn.sendLocalAddress(local_addr);
300                 notifyConnectionOpened(dest);
301                 // conns.put(dest, conn);
302
addConnection(dest, conn);
303                 conn.init();
304                 if(log.isInfoEnabled()) log.info("created socket to " + dest);
305             }
306             return conn;
307         }
308     }
309
310
311     public void start() throws Exception JavaDoc {
312         srv_sock=createServerSocket(srv_port, max_port);
313
314         if (external_addr!=null)
315             local_addr=new IpAddress(external_addr, srv_sock.getLocalPort());
316         else if (bind_addr != null)
317             local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort());
318         else
319             local_addr=new IpAddress(srv_sock.getLocalPort());
320
321         if(log.isInfoEnabled()) log.info("server socket created on " + local_addr);
322
323         //Roland Kurmann 4/7/2003, build new thread group
324
thread_group = new ThreadGroup JavaDoc(Thread.currentThread().getThreadGroup(), "ConnectionTableGroup");
325         //Roland Kurmann 4/7/2003, put in thread_group
326
acceptor=new Thread JavaDoc(thread_group, this, "ConnectionTable.AcceptorThread");
327         acceptor.setDaemon(true);
328         acceptor.start();
329
330         // start the connection reaper - will periodically remove unused connections
331
if(use_reaper && reaper == null) {
332             reaper=new Reaper();
333             reaper.start();
334         }
335     }
336
337
338     /** Closes all open sockets, the server socket and all threads waiting for incoming messages */
339     public void stop() {
340         Iterator JavaDoc it=null;
341         Connection conn;
342         ServerSocket tmp;
343
344         // 1. close the server socket (this also stops the acceptor thread)
345
if(srv_sock != null) {
346             try {
347                 tmp=srv_sock;
348                 srv_sock=null;
349                 tmp.close();
350             }
351             catch(Exception JavaDoc e) {
352             }
353         }
354
355
356         // 2. then close the connections
357
synchronized(conns) {
358             it=conns.values().iterator();
359             while(it.hasNext()) {
360                 conn=(Connection)it.next();
361                 conn.destroy();
362             }
363             conns.clear();
364         }
365         local_addr=null;
366     }
367
368
369     /**
370      Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
371      */

372     public void remove(Address addr) {
373         Connection conn;
374
375         synchronized(conns) {
376             conn=(Connection)conns.remove(addr);
377         }
378
379         if(conn != null) {
380             try {
381                 conn.destroy(); // won't do anything if already destroyed
382
}
383             catch(Exception JavaDoc e) {
384             }
385         }
386         if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString());
387     }
388
389
390     /**
391      * Acceptor thread. Continuously accept new connections. Create a new thread for each new
392      * connection and put it in conns. When the thread should stop, it is
393      * interrupted by the thread creator.
394      */

395     public void run() {
396         Socket client_sock;
397         Connection conn=null;
398         Address peer_addr;
399
400         while(srv_sock != null) {
401             try {
402                 client_sock=srv_sock.accept();
403                 if(log.isTraceEnabled())
404                     log.trace("accepted connection from " + client_sock.getInetAddress() + ":" + client_sock.getPort());
405
406                 // create new thread and add to conn table
407
conn=new Connection(client_sock, null); // will call receive(msg)
408
// get peer's address
409
peer_addr=conn.readPeerAddress(client_sock);
410
411                 // client_addr=new IpAddress(client_sock.getInetAddress(), client_port);
412
conn.setPeerAddress(peer_addr);
413
414                 synchronized(conns) {
415                     if(conns.containsKey(peer_addr)) {
416                         if(log.isTraceEnabled())
417                             log.trace(peer_addr + " is already there, will reuse connection");
418                         //conn.destroy();
419
//continue; // return; // we cannot terminate the thread (bela Sept 2 2004)
420
}
421                     else {
422                         // conns.put(peer_addr, conn);
423
addConnection(peer_addr, conn);
424                         notifyConnectionOpened(peer_addr);
425                     }
426                 }
427
428                 conn.init(); // starts handler thread on this socket
429
}
430             catch(SocketException sock_ex) {
431                 if(log.isInfoEnabled()) log.info("exception is " + sock_ex);
432                 if(conn != null)
433                     conn.destroy();
434                 if(srv_sock == null)
435                     break; // socket was closed, therefore stop
436
}
437             catch(Throwable JavaDoc ex) {
438                 if(log.isWarnEnabled()) log.warn("exception is " + ex);
439                 if(srv_sock == null)
440                     break; // socket was closed, therefore stop
441
}
442         }
443         if(log.isTraceEnabled())
444             log.trace(Thread.currentThread().getName() + " terminated");
445     }
446
447
448     /**
449      * Calls the receiver callback. We serialize access to this method because it may be called concurrently
450      * by several Connection handler threads. Therefore the receiver doesn't need to synchronize.
451      */

452     public void receive(Message msg) {
453         if(receiver != null) {
454             synchronized(recv_mutex) {
455                 receiver.receive(msg);
456             }
457         }
458         else
459             if(log.isErrorEnabled()) log.error("receiver is null (not set) !");
460     }
461
462
463     public String JavaDoc toString() {
464         StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
465         Address key;
466         Connection val;
467         Map.Entry JavaDoc entry;
468         HashMap JavaDoc copy;
469
470         synchronized(conns) {
471             copy=new HashMap JavaDoc(conns);
472         }
473         ret.append("connections (" + copy.size() + "):\n");
474         for(Iterator JavaDoc it=copy.entrySet().iterator(); it.hasNext();) {
475             entry=(Map.Entry JavaDoc)it.next();
476             key=(Address)entry.getKey();
477             val=(Connection)entry.getValue();
478             ret.append("key: " + key + ": " + val + '\n');
479         }
480         ret.append('\n');
481         return ret.toString();
482     }
483
484
485     /** Finds first available port starting at start_port and returns server socket.
486       * Will not bind to port >end_port. Sets srv_port */

487     protected ServerSocket createServerSocket(int start_port, int end_port) throws Exception JavaDoc {
488         ServerSocket ret=null;
489
490         while(true) {
491             try {
492                 if(bind_addr == null)
493                     ret=new ServerSocket(start_port);
494                 else {
495
496                     ret=new ServerSocket(start_port, backlog, bind_addr);
497                 }
498             }
499             catch(BindException bind_ex) {
500                 if (start_port==end_port) throw new BindException("No available port to bind to");
501                 if(bind_addr != null && javaVersion >= 14) {
502                     NetworkInterface nic=NetworkInterface.getByInetAddress(bind_addr);
503                     if(nic == null)
504                         throw new BindException("bind_addr " + bind_addr + " is not a valid interface");
505                 }
506                 start_port++;
507                 continue;
508             }
509             catch(IOException JavaDoc io_ex) {
510                 if(log.isErrorEnabled()) log.error("exception is " + io_ex);
511             }
512             srv_port=start_port;
513             break;
514         }
515         return ret;
516     }
517
518
519     void notifyConnectionOpened(Address peer) {
520         if(peer == null) return;
521         for(int i=0; i < conn_listeners.size(); i++)
522             ((ConnectionListener)conn_listeners.elementAt(i)).connectionOpened(peer);
523     }
524
525     void notifyConnectionClosed(Address peer) {
526         if(peer == null) return;
527         for(int i=0; i < conn_listeners.size(); i++)
528             ((ConnectionListener)conn_listeners.elementAt(i)).connectionClosed(peer);
529     }
530
531
532     void addConnection(Address peer, Connection c) {
533         conns.put(peer, c);
534         if(reaper != null && !reaper.isRunning())
535             reaper.start();
536     }
537
538
539
540
541     class Connection implements Runnable JavaDoc {
542         Socket sock=null; // socket to/from peer (result of srv_sock.accept() or new Socket())
543
String JavaDoc sock_addr=null; // used for Thread.getName()
544
DataOutputStream JavaDoc out=null; // for sending messages
545
DataInputStream JavaDoc in=null; // for receiving messages
546
Thread JavaDoc receiverThread=null; // thread for receiving messages
547
Address peer_addr=null; // address of the 'other end' of the connection
548
final Object JavaDoc send_mutex=new Object JavaDoc(); // serialize sends
549
long last_access=System.currentTimeMillis(); // last time a message was sent or received
550
LinkedQueue send_queue=new LinkedQueue();
551         Sender sender=new Sender();
552         final long POLL_TIMEOUT=30000;
553
554
555         String JavaDoc getSockAddress() {
556             if(sock_addr != null)
557                 return sock_addr;
558             if(sock != null) {
559                 StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
560                 sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
561                 sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort());
562                 sock_addr=sb.toString();
563             }
564             return sock_addr;
565         }
566
567         class Sender implements Runnable JavaDoc {
568             Thread JavaDoc senderThread;
569             private boolean running=false;
570
571             void start() {
572                 if(senderThread == null || !senderThread.isAlive()) {
573                     senderThread=new Thread JavaDoc(thread_group, this, "ConnectionTable.Connection.Sender [" + getSockAddress() + "]");
574                     senderThread.setDaemon(true);
575                     senderThread.start();
576                     running=true;
577                     if(log.isTraceEnabled())
578                         log.trace("ConnectionTable.Connection.Sender thread started");
579                 }
580             }
581
582             void stop() {
583                 if(senderThread != null) {
584                     senderThread.interrupt();
585                     senderThread=null;
586                     running=false;
587                 }
588             }
589
590             boolean isRunning() {
591                 return running && senderThread != null;
592             }
593
594             public void run() {
595                 Message msg;
596                 while(senderThread != null && senderThread.equals(Thread.currentThread())) {
597                     try {
598                         msg=(Message)send_queue.poll(POLL_TIMEOUT);
599                         if(msg == null)
600                             break;
601                         _send(msg);
602                     }
603                     catch(InterruptedException JavaDoc e) {
604                         break;
605                     }
606                 }
607                 running=false;
608                 if(log.isTraceEnabled())
609                     log.trace("ConnectionTable.Connection.Sender thread terminated");
610             }
611         }
612
613
614         Connection(Socket s, Address peer_addr) {
615             sock=s;
616             this.peer_addr=peer_addr;
617             try {
618                 out=new DataOutputStream JavaDoc(sock.getOutputStream());
619                 in=new DataInputStream JavaDoc(sock.getInputStream());
620             }
621             catch(Exception JavaDoc ex) {
622                 if(log.isErrorEnabled()) log.error("exception is " + ex);
623             }
624         }
625
626
627         boolean established() {
628             return receiverThread != null;
629         }
630
631
632         void setPeerAddress(Address peer_addr) {
633             this.peer_addr=peer_addr;
634         }
635
636         void updateLastAccessed() {
637             last_access=System.currentTimeMillis();
638         }
639
640         void init() {
641             // if(log.isInfoEnabled()) log.info("connection was created to " + peer_addr);
642
if(receiverThread == null || !receiverThread.isAlive()) {
643                 // Roland Kurmann 4/7/2003, put in thread_group
644
receiverThread=new Thread JavaDoc(thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]");
645                 receiverThread.setDaemon(true);
646                 receiverThread.start();
647                 if(log.isTraceEnabled())
648                     log.trace("ConnectionTable.Connection.Receiver started");
649             }
650         }
651
652
653         void destroy() {
654             closeSocket(); // should terminate handler as well
655
sender.stop();
656             receiverThread=null;
657         }
658
659
660         void send(Message msg) {
661             try {
662                 send_queue.put(msg);
663                 if(!sender.isRunning())
664                     sender.start();
665             }
666             catch(InterruptedException JavaDoc e) {
667                 log.error("failed adding message to send_queue", e);
668             }
669         }
670
671         private void _send(Message msg) {
672             synchronized(send_mutex) {
673                 try {
674                     doSend(msg);
675                     updateLastAccessed();
676                 }
677                 catch(IOException JavaDoc io_ex) {
678                     if(log.isWarnEnabled())
679                         log.warn("peer closed connection, trying to re-establish connection and re-send msg");
680                     try {
681                         doSend(msg);
682                         updateLastAccessed();
683                     }
684                     catch(IOException JavaDoc io_ex2) {
685                          if(log.isErrorEnabled()) log.error("2nd attempt to send data failed too");
686                     }
687                     catch(Exception JavaDoc ex2) {
688                          if(log.isErrorEnabled()) log.error("exception is " + ex2);
689                     }
690                 }
691                 catch(Exception JavaDoc ex) {
692                      if(log.isErrorEnabled()) log.error("exception is " + ex);
693                 }
694             }
695         }
696
697
698         void doSend(Message msg) throws Exception JavaDoc {
699             IpAddress dst_addr=(IpAddress)msg.getDest();
700             byte[] buffie=null;
701
702             if(dst_addr == null || dst_addr.getIpAddress() == null) {
703                 if(log.isErrorEnabled()) log.error("the destination address is null; aborting send");
704                 return;
705             }
706
707             try {
708                 // set the source address if not yet set
709
if(msg.getSrc() == null)
710                     msg.setSrc(local_addr);
711
712                 buffie=Util.objectToByteBuffer(msg);
713                 if(buffie.length <= 0) {
714                     if(log.isErrorEnabled()) log.error("buffer.length is 0. Will not send message");
715                     return;
716                 }
717
718                 // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would
719
// ensure that, if the peer closed the connection while we were idle, we would get an exception.
720
// this won't happen if we use a single write (see Stevens, ch. 5.13).
721
if(out != null) {
722                     out.writeInt(buffie.length); // write the length of the data buffer first
723
Util.doubleWrite(buffie, out);
724                     out.flush(); // may not be very efficient (but safe)
725
}
726             }
727             catch(Exception JavaDoc ex) {
728                 if(log.isErrorEnabled())
729                     log.error("failure sending to " + dst_addr, ex);
730                 remove(dst_addr);
731                 throw ex;
732             }
733         }
734
735
736         /**
737          * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise
738          * the connection will be refused
739          */

740         Address readPeerAddress(Socket client_sock) throws Exception JavaDoc {
741             Address client_peer_addr=null;
742             byte[] version, buf, input_cookie=new byte[cookie.length];
743             int len=0, client_port=client_sock != null? client_sock.getPort() : 0;
744             InetAddress client_addr=client_sock != null? client_sock.getInetAddress() : null;
745
746             if(in != null) {
747                 initCookie(input_cookie);
748
749                 // read the cookie first
750
in.read(input_cookie, 0, input_cookie.length);
751                 if(!matchCookie(input_cookie))
752                     throw new SocketException("ConnectionTable.Connection.readPeerAddress(): cookie sent by " +
753                                               client_peer_addr + " does not match own cookie; terminating connection");
754                 // then read the version
755
version=new byte[Version.version_id.length];
756                 in.read(version, 0, version.length);
757
758                 if(Version.compareTo(version) == false) {
759                     if(log.isWarnEnabled()) log.warn("packet from " + client_addr + ':' + client_port +
760                                " has different version (" +
761                                Version.printVersionId(version, Version.version_id.length) +
762                                ") from ours (" + Version.printVersionId(Version.version_id) +
763                                "). This may cause problems");
764                 }
765
766                 // read the length of the address
767
len=in.readInt();
768
769                 // finally read the address itself
770
buf=new byte[len];
771                 in.readFully(buf, 0, len);
772                 client_peer_addr=(Address)Util.objectFromByteBuffer(buf);
773                 updateLastAccessed();
774             }
775             return client_peer_addr;
776         }
777
778
779         /**
780          * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie,
781          * the receiver will reject the connection and close it.
782          */

783         void sendLocalAddress(Address local_addr) {
784             byte[] buf;
785
786             if(local_addr == null) {
787                 if(log.isWarnEnabled()) log.warn("local_addr is null");
788                 return;
789             }
790             if(out != null) {
791                 try {
792                     buf=Util.objectToByteBuffer(local_addr);
793
794                     // write the cookie
795
out.write(cookie, 0, cookie.length);
796
797                     // write the version
798
out.write(Version.version_id, 0, Version.version_id.length);
799
800                     // write the length of the buffer
801
out.writeInt(buf.length);
802
803                     // and finally write the buffer itself
804
out.write(buf, 0, buf.length);
805                     out.flush(); // needed ?
806
updateLastAccessed();
807                 }
808                 catch(Throwable JavaDoc t) {
809                     if(log.isErrorEnabled()) log.error("exception is " + t);
810                 }
811             }
812         }
813
814
815         void initCookie(byte[] c) {
816             if(c != null)
817                 for(int i=0; i < c.length; i++)
818                     c[i]=0;
819         }
820
821         boolean matchCookie(byte[] input) {
822             if(input == null || input.length < cookie.length) return false;
823             if(log.isInfoEnabled()) log.info("input_cookie is " + printCookie(input));
824             for(int i=0; i < cookie.length; i++)
825                 if(cookie[i] != input[i]) return false;
826             return true;
827         }
828
829
830         String JavaDoc printCookie(byte[] c) {
831             if(c == null) return "";
832             return new String JavaDoc(c);
833         }
834
835
836         public void run() {
837             Message msg;
838             byte[] buf=new byte[256];
839             int len=0;
840
841             while(receiverThread != null && receiverThread.equals(Thread.currentThread())) {
842                 try {
843                     if(in == null) {
844                         if(log.isErrorEnabled()) log.error("input stream is null !");
845                         break;
846                     }
847                     len=in.readInt();
848                     if(len > buf.length)
849                         buf=new byte[len];
850                     in.readFully(buf, 0, len);
851                     updateLastAccessed();
852                     msg=(Message)Util.objectFromByteBuffer(buf);
853                     receive(msg); // calls receiver.receiver(msg)
854
}
855                 catch(OutOfMemoryError JavaDoc mem_ex) {
856                     if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection");
857                     break; // continue;
858
}
859                 catch(EOFException JavaDoc eof_ex) { // peer closed connection
860
if(log.isInfoEnabled()) log.info("exception is " + eof_ex);
861                     notifyConnectionClosed(peer_addr);
862                     break;
863                 }
864                 catch(IOException JavaDoc io_ex) {
865                     if(log.isInfoEnabled()) log.info("exception is " + io_ex);
866                     notifyConnectionClosed(peer_addr);
867                     break;
868                 }
869                 catch(Throwable JavaDoc e) {
870                     if(log.isWarnEnabled()) log.warn("exception is " + e);
871                 }
872             }
873             if(log.isTraceEnabled())
874                 log.trace("ConnectionTable.Connection.Receiver terminated");
875             receiverThread=null;
876             closeSocket();
877             remove(peer_addr);
878         }
879
880
881         public String JavaDoc toString() {
882             StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
883             InetAddress local=null, remote=null;
884             String JavaDoc local_str, remote_str;
885
886             if(sock == null)
887                 ret.append("<null socket>");
888             else {
889                 //since the sock variable gets set to null we want to make
890
//make sure we make it through here without a nullpointer exception
891
Socket tmp_sock=sock;
892                 local=tmp_sock.getLocalAddress();
893                 remote=tmp_sock.getInetAddress();
894                 local_str=local != null ? Util.shortName(local) : "<null>";
895                 remote_str=remote != null ? Util.shortName(remote) : "<null>";
896                 ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() +
897                            " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" +
898                            ((System.currentTimeMillis() - last_access) / 1000) + " secs old)");
899                 tmp_sock=null;
900             }
901
902             return ret.toString();
903         }
904
905
906         void closeSocket() {
907             if(sock != null) {
908                 try {
909                     sock.close(); // should actually close in/out (so we don't need to close them explicitly)
910
}
911                 catch(Exception JavaDoc e) {
912                 }
913                 sock=null;
914             }
915             if(out != null) {
916                 try {
917                     out.close(); // flushes data
918
}
919                 catch(Exception JavaDoc e) {
920                 }
921                 // removed 4/22/2003 (request by Roland Kurmann)
922
// out=null;
923
}
924             if(in != null) {
925                 try {
926                     in.close();
927                 }
928                 catch(Exception JavaDoc ex) {
929                 }
930                 in=null;
931             }
932         }
933     }
934
935
936     class Reaper implements Runnable JavaDoc {
937         Thread JavaDoc t=null;
938
939         Reaper() {
940             ;
941         }
942
943         public void start() {
944             if(conns.size() == 0)
945                 return;
946             if(t != null && !t.isAlive())
947                 t=null;
948             if(t == null) {
949                 //RKU 7.4.2003, put in threadgroup
950
t=new Thread JavaDoc(thread_group, this, "ConnectionTable.ReaperThread");
951                 t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
952
t.start();
953             }
954         }
955
956         public void stop() {
957             if(t != null)
958                 t=null;
959         }
960
961
962         public boolean isRunning() {
963             return t != null;
964         }
965
966         public void run() {
967             Connection value;
968             Map.Entry JavaDoc entry;
969             long curr_time;
970
971             if(log.isInfoEnabled()) log.info("connection reaper thread was started. Number of connections=" +
972                                              conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" +
973                                              conn_expire_time);
974
975             while(conns.size() > 0 && t != null && t.equals(Thread.currentThread())) {
976                 // first sleep
977
Util.sleep(reaper_interval);
978                 synchronized(conns) {
979                     curr_time=System.currentTimeMillis();
980                     for(Iterator JavaDoc it=conns.entrySet().iterator(); it.hasNext();) {
981                         entry=(Map.Entry JavaDoc)it.next();
982                         value=(Connection)entry.getValue();
983                         if(log.isInfoEnabled()) log.info("connection is " +
984                                                          ((curr_time - value.last_access) / 1000) + " seconds old (curr-time=" +
985                                                          curr_time + ", last_access=" + value.last_access + ')');
986                         if(value.last_access + conn_expire_time < curr_time) {
987                             if(log.isInfoEnabled()) log.info("connection " + value +
988                                                              " has been idle for too long (conn_expire_time=" + conn_expire_time +
989                                                              "), will be removed");
990                             value.destroy();
991                             it.remove();
992                         }
993                     }
994                 }
995             }
996             if(log.isInfoEnabled()) log.info("reaper terminated");
997             t=null;
998         }
999     }
1000
1001
1002}
1003
1004
1005
Popular Tags