KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > BasicConnectionTable


1 package org.jgroups.blocks;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jgroups.Address;
6 import org.jgroups.Global;
7 import org.jgroups.Version;
8 import org.jgroups.stack.IpAddress;
9 import org.jgroups.util.Util;
10
11 import java.io.*;
12 import java.net.InetAddress JavaDoc;
13 import java.net.ServerSocket JavaDoc;
14 import java.net.Socket JavaDoc;
15 import java.net.SocketException JavaDoc;
16 import java.util.*;
17 import java.util.concurrent.BlockingQueue JavaDoc;
18 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
19 import java.util.concurrent.locks.Lock JavaDoc;
20 import java.util.concurrent.locks.ReentrantLock JavaDoc;
21
22 /**
23  * Shared class for TCP connection tables.
24  * @author Scott Marlow
25  */

26 public abstract class BasicConnectionTable {
27     final Map<Address,Connection> conns=new HashMap<Address,Connection>(); // keys: Addresses (peer address), values: Connection
28
Receiver receiver=null;
29     boolean use_send_queues=false; // max number of messages in a send queue
30
int send_queue_size=10000;
31     InetAddress JavaDoc bind_addr=null;
32     Address local_addr=null; // bind_addr + port of srv_sock
33
int srv_port=7800;
34     int recv_buf_size=120000;
35     int send_buf_size=60000;
36     final Vector<ConnectionListener> conn_listeners=new Vector<ConnectionListener>(); // listeners to be notified when a conn is established/torn down
37
Reaper reaper=null; // closes conns that have been idle for more than n secs
38
long reaper_interval=60000; // reap unused conns once a minute
39
long conn_expire_time=300000; // connections can be idle for 5 minutes before they are reaped
40
int sock_conn_timeout=1000; // max time in millis to wait for Socket.connect() to return
41
ThreadGroup JavaDoc thread_group=null;
42     protected final Log log= LogFactory.getLog(getClass());
43     final byte[] cookie={'b', 'e', 'l', 'a'};
44     boolean use_reaper=false; // by default we don't reap idle conns
45
static final int backlog=20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
46
volatile ServerSocket JavaDoc srv_sock=null;
47     boolean tcp_nodelay=false;
48     int linger=-1;
49
50    /**
51     * The address which will be broadcast to the group (the externally visible address which this host should
52     * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
53     */

54     InetAddress JavaDoc external_addr=null;
55     int max_port=0; // maximum port to bind to (if < srv_port, no limit)
56
Thread JavaDoc acceptor=null; // continuously calls srv_sock.accept()
57
boolean running=false;
58
59     final static long MAX_JOIN_TIMEOUT=Global.THREAD_SHUTDOWN_WAIT_TIME;
60
61
62     public final void setReceiver(Receiver r) {
63         receiver=r;
64     }
65
66    public void addConnectionListener(ConnectionListener l) {
67        if(l != null && !conn_listeners.contains(l))
68            conn_listeners.addElement(l);
69    }
70
71    public void removeConnectionListener(ConnectionListener l) {
72        if(l != null) conn_listeners.removeElement(l);
73    }
74
75    public Address getLocalAddress() {
76        if(local_addr == null)
77            local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null;
78        return local_addr;
79    }
80
81    public int getSendBufferSize() {
82        return send_buf_size;
83    }
84
85    public void setSendBufferSize(int send_buf_size) {
86        this.send_buf_size=send_buf_size;
87    }
88
89    public int getReceiveBufferSize() {
90        return recv_buf_size;
91    }
92
93    public void setReceiveBufferSize(int recv_buf_size) {
94        this.recv_buf_size=recv_buf_size;
95    }
96
97    public int getSocketConnectionTimeout() {
98        return sock_conn_timeout;
99    }
100
101    public void setSocketConnectionTimeout(int sock_conn_timeout) {
102        this.sock_conn_timeout=sock_conn_timeout;
103    }
104
105    public int getNumConnections() {
106        return conns.size();
107    }
108
109     public boolean getTcpNodelay() {
110         return tcp_nodelay;
111     }
112
113     public void setTcpNodelay(boolean tcp_nodelay) {
114         this.tcp_nodelay=tcp_nodelay;
115     }
116
117     public int getLinger() {
118         return linger;
119     }
120
121     public void setLinger(int linger) {
122         this.linger=linger;
123     }
124
125     public boolean getUseSendQueues() {return use_send_queues;}
126
127    public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;}
128
129     public int getSendQueueSize() {
130         return send_queue_size;
131     }
132
133     public void setSendQueueSize(int send_queue_size) {
134         this.send_queue_size=send_queue_size;
135     }
136
137     public void start() throws Exception JavaDoc {
138        running=true;
139    }
140
141    public void stop() {
142        running=false;
143    }
144
145    /**
146     Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
147     */

148    public void remove(Address addr) {
149        Connection conn;
150
151        synchronized(conns) {
152            conn=conns.remove(addr);
153        }
154
155        if(conn != null) {
156            try {
157                conn.destroy(); // won't do anything if already destroyed
158
}
159            catch(Exception JavaDoc e) {
160            }
161        }
162        if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString());
163    }
164
165    /**
166     * Calls the receiver callback. We do not serialize access to this method, and it may be called concurrently
167     * by several Connection handler threads. Therefore the receiver needs to be reentrant.
168     */

169    public void receive(Address sender, byte[] data, int offset, int length) {
170        if(receiver != null) {
171            receiver.receive(sender, data, offset, length);
172        }
173        else
174            if(log.isErrorEnabled()) log.error("receiver is null (not set) !");
175    }
176
177    public String JavaDoc toString() {
178        StringBuilder JavaDoc ret=new StringBuilder JavaDoc();
179        Address key;
180        Connection val;
181        Map.Entry entry;
182        HashMap<Address,Connection> copy;
183
184        synchronized(conns) {
185            copy=new HashMap<Address,Connection>(conns);
186        }
187        ret.append("local_addr=" + local_addr).append("\n");
188        ret.append("connections (" + copy.size() + "):\n");
189        for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
190            entry=(Map.Entry)it.next();
191            key=(Address)entry.getKey();
192            val=(Connection)entry.getValue();
193            ret.append("key: " + key + ": " + val + '\n');
194        }
195        ret.append('\n');
196        return ret.toString();
197    }
198
199    void notifyConnectionOpened(Address peer) {
200        if(peer == null) return;
201        for(int i=0; i < conn_listeners.size(); i++)
202            conn_listeners.elementAt(i).connectionOpened(peer);
203    }
204
205    void notifyConnectionClosed(Address peer) {
206        if(peer == null) return;
207        for(int i=0; i < conn_listeners.size(); i++)
208            conn_listeners.elementAt(i).connectionClosed(peer);
209    }
210
211    void addConnection(Address peer, Connection c) {
212        conns.put(peer, c);
213        if(reaper != null && !reaper.isRunning())
214            reaper.start();
215    }
216
217    public void send(Address dest, byte[] data, int offset, int length) throws Exception JavaDoc {
218        Connection conn;
219        if(dest == null) {
220            if(log.isErrorEnabled())
221                log.error("destination is null");
222            return;
223        }
224
225        if(data == null) {
226            log.warn("data is null; discarding packet");
227            return;
228        }
229
230        if(!running) {
231            if(log.isWarnEnabled())
232                log.warn("connection table is not running, discarding message to " + dest);
233            return;
234        }
235
236        if(dest.equals(local_addr)) {
237            receive(local_addr, data, offset, length);
238            return;
239        }
240
241        // 1. Try to obtain correct Connection (or create one if not yet existent)
242
try {
243            conn=getConnection(dest);
244            if(conn == null) return;
245        }
246        catch(Throwable JavaDoc ex) {
247            throw new Exception JavaDoc("connection to " + dest + " could not be established", ex);
248        }
249
250        // 2. Send the message using that connection
251
try {
252            conn.send(data, offset, length);
253        }
254        catch(Throwable JavaDoc ex) {
255            if(log.isTraceEnabled())
256                log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table", ex);
257            remove(dest);
258        }
259    }
260
261    abstract Connection getConnection(Address dest) throws Exception JavaDoc;
262
263    /**
264     * Removes all connections from ConnectionTable which are not in c
265     * @param c
266     */

267    //public void retainAll(Collection c) {
268
// conns.keySet().retainAll(c);
269
//}
270

271
272       /**
273        * Removes all connections from ConnectionTable which are not in current_mbrs
274        * @param current_mbrs
275        */

276       public void retainAll(Collection current_mbrs) {
277           if(current_mbrs == null) return;
278           HashMap<Address,Connection> copy;
279           synchronized(conns) {
280               copy=new HashMap<Address,Connection>(conns);
281               conns.keySet().retainAll(current_mbrs);
282           }
283
284           // All of the connections that were not retained must be destroyed
285
// so that their resources are cleaned up.
286
Map.Entry entry;
287           for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
288               entry=(Map.Entry)it.next();
289               Object JavaDoc oKey=entry.getKey();
290               if(!current_mbrs.contains(oKey)) { // This connection NOT in the resultant connection set
291
Connection conn=(Connection)entry.getValue();
292                   if(null != conn) { // Destroy this connection
293
if(log.isTraceEnabled())
294                           log.trace("Destroy this orphaned connection: " + conn);
295                       conn.destroy();
296                   }
297               }
298           }
299           copy.clear();
300       }
301
302
303
304     /** Used for message reception. */
305     public interface Receiver {
306        void receive(Address sender, byte[] data, int offset, int length);
307    }
308
309    /** Used to be notified about connection establishment and teardown. */
310    public interface ConnectionListener {
311        void connectionOpened(Address peer_addr);
312        void connectionClosed(Address peer_addr);
313    }
314
315    class Connection implements Runnable JavaDoc {
316        Socket JavaDoc sock=null; // socket to/from peer (result of srv_sock.accept() or new Socket())
317
String JavaDoc sock_addr=null; // used for Thread.getName()
318
DataOutputStream out=null; // for sending messages
319
DataInputStream in=null; // for receiving messages
320
Thread JavaDoc receiverThread=null; // thread for receiving messages
321
Address peer_addr=null; // address of the 'other end' of the connection
322
final Lock JavaDoc send_lock=new ReentrantLock JavaDoc(); // serialize send()
323
long last_access=System.currentTimeMillis(); // last time a message was sent or received
324

325        /** Bounded queue of data to be sent to the peer of this connection */
326        BlockingQueue JavaDoc<byte[]> send_queue=null;
327        Sender sender=null;
328        boolean is_running=false;
329
330
331        private String JavaDoc getSockAddress() {
332            if(sock_addr != null)
333                return sock_addr;
334            if(sock != null) {
335                StringBuffer JavaDoc sb;
336                sb=new StringBuffer JavaDoc();
337                sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
338                sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort());
339                sock_addr=sb.toString();
340            }
341            return sock_addr;
342        }
343
344
345
346
347        Connection(Socket JavaDoc s, Address peer_addr) {
348            sock=s;
349            this.peer_addr=peer_addr;
350
351            if(use_send_queues) {
352                send_queue=new LinkedBlockingQueue JavaDoc<byte[]>(send_queue_size);
353                sender=new Sender();
354            }
355
356            try {
357                // out=new DataOutputStream(sock.getOutputStream());
358
// in=new DataInputStream(sock.getInputStream());
359

360                // The change to buffered input and output stream yielded a 400% performance gain !
361
// bela Sept 7 2006
362
out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
363                in=new DataInputStream(new BufferedInputStream(sock.getInputStream()));
364                if(sender != null)
365                    sender.start();
366            }
367            catch(Exception JavaDoc ex) {
368                if(log.isErrorEnabled()) log.error("exception is " + ex);
369            }
370        }
371
372
373        boolean established() {
374            return receiverThread != null;
375        }
376
377
378        void setPeerAddress(Address peer_addr) {
379            this.peer_addr=peer_addr;
380        }
381
382        Address getPeerAddress() {return peer_addr;}
383
384        void updateLastAccessed() {
385            last_access=System.currentTimeMillis();
386        }
387
388        void init() {
389            is_running=true;
390            if(receiverThread == null || !receiverThread.isAlive()) {
391                // Roland Kurmann 4/7/2003, put in thread_group
392
receiverThread=new Thread JavaDoc(thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]");
393                receiverThread.setDaemon(true);
394                receiverThread.start();
395                if(log.isTraceEnabled())
396                    log.trace("receiver started: " + receiverThread);
397            }
398
399        }
400
401
402        void destroy() {
403            is_running=false;
404            closeSocket(); // should terminate handler as well
405
if(sender != null)
406                sender.stop();
407            Thread JavaDoc tmp=receiverThread;
408            receiverThread=null;
409            if(tmp != null) {
410                Util.interruptAndWaitToDie(tmp);
411            }
412        }
413
414
415        /**
416         *
417         * @param data Guaranteed to be non null
418         * @param offset
419         * @param length
420         */

421        void send(byte[] data, int offset, int length) {
422            if(!is_running) {
423                if(log.isWarnEnabled())
424                    log.warn("Connection is not running, discarding message");
425                return;
426            }
427            if(use_send_queues) {
428                try {
429                    // we need to copy the byte[] buffer here because the original buffer might get changed meanwhile
430
byte[] tmp=new byte[length];
431                    System.arraycopy(data, offset, tmp, 0, length);
432                    send_queue.put(tmp);
433                }
434                catch(InterruptedException JavaDoc e) {
435                    Thread.currentThread().interrupt();
436                }
437            }
438            else
439                _send(data, offset, length, true);
440        }
441
442
443        /**
444         * Sends data using the 'out' output stream of the socket
445         * @param data
446         * @param offset
447         * @param length
448         * @param acquire_lock
449         */

450        private void _send(byte[] data, int offset, int length, boolean acquire_lock) {
451            if(acquire_lock)
452                send_lock.lock();
453
454            try {
455                doSend(data, offset, length);
456                updateLastAccessed();
457            }
458            catch(InterruptedException JavaDoc iex) {
459                Thread.currentThread().interrupt(); // set interrupt flag again
460
}
461            catch(Throwable JavaDoc ex) {
462                if(log.isErrorEnabled()) log.error("exception is " + ex);
463            }
464            finally {
465                if(acquire_lock)
466                    send_lock.unlock();
467            }
468        }
469
470
471        void doSend(byte[] data, int offset, int length) throws Exception JavaDoc {
472            try {
473                // we're using 'double-writes', sending the buffer to the destination in 2 pieces. this would
474
// ensure that, if the peer closed the connection while we were idle, we would get an exception.
475
// this won't happen if we use a single write (see Stevens, ch. 5.13).
476
if(out != null) {
477                    out.writeInt(length); // write the length of the data buffer first
478
Util.doubleWrite(data, offset, length, out);
479                    out.flush(); // may not be very efficient (but safe)
480
}
481            }
482            catch(Exception JavaDoc ex) {
483                remove(peer_addr);
484                throw ex;
485            }
486        }
487
488
489        /**
490         * Reads the peer's address. First a cookie has to be sent which has to match my own cookie, otherwise
491         * the connection will be refused
492         */

493        Address readPeerAddress(Socket JavaDoc client_sock) throws Exception JavaDoc {
494            Address client_peer_addr=null;
495            byte[] input_cookie=new byte[cookie.length];
496            int client_port=client_sock != null? client_sock.getPort() : 0;
497            short version;
498            InetAddress JavaDoc client_addr=client_sock != null? client_sock.getInetAddress() : null;
499
500            if(in != null) {
501                initCookie(input_cookie);
502
503                // read the cookie first
504
in.read(input_cookie, 0, input_cookie.length);
505                if(!matchCookie(input_cookie))
506                    throw new SocketException JavaDoc("ConnectionTable.Connection.readPeerAddress(): cookie sent by " +
507                                              client_peer_addr + " does not match own cookie; terminating connection");
508                // then read the version
509
version=in.readShort();
510
511                if(Version.isBinaryCompatible(version) == false) {
512                    if(log.isWarnEnabled())
513                        log.warn(new StringBuffer JavaDoc("packet from ").append(client_addr).append(':').append(client_port).
514                               append(" has different version (").append(Version.print(version)).append(") from ours (").
515                                 append(Version.printVersion()).append("). This may cause problems"));
516                }
517                client_peer_addr=new IpAddress();
518                client_peer_addr.readFrom(in);
519
520                updateLastAccessed();
521            }
522            return client_peer_addr;
523        }
524
525
526        /**
527         * Send the cookie first, then the our port number. If the cookie doesn't match the receiver's cookie,
528         * the receiver will reject the connection and close it.
529         */

530        void sendLocalAddress(Address local_addr) {
531            if(local_addr == null) {
532                if(log.isWarnEnabled()) log.warn("local_addr is null");
533                return;
534            }
535            if(out != null) {
536                try {
537                    // write the cookie
538
out.write(cookie, 0, cookie.length);
539
540                    // write the version
541
out.writeShort(Version.version);
542                    local_addr.writeTo(out);
543                    out.flush(); // needed ?
544
updateLastAccessed();
545                }
546                catch(Throwable JavaDoc t) {
547                    if(log.isErrorEnabled()) log.error("exception is " + t);
548                }
549            }
550        }
551
552
553        void initCookie(byte[] c) {
554            if(c != null)
555                for(int i=0; i < c.length; i++)
556                    c[i]=0;
557        }
558
559        boolean matchCookie(byte[] input) {
560            if(input == null || input.length < cookie.length) return false;
561            for(int i=0; i < cookie.length; i++)
562                if(cookie[i] != input[i]) return false;
563            return true;
564        }
565
566
567        String JavaDoc printCookie(byte[] c) {
568            if(c == null) return "";
569            return new String JavaDoc(c);
570        }
571
572
573        public void run() {
574            byte[] buf=new byte[256]; // start with 256, increase as we go
575
int len=0;
576
577            while(receiverThread != null && receiverThread.equals(Thread.currentThread()) && is_running) {
578                try {
579                    if(in == null) {
580                        if(log.isErrorEnabled()) log.error("input stream is null !");
581                        break;
582                    }
583                    len=in.readInt();
584                    if(len > buf.length)
585                        buf=new byte[len];
586                    in.readFully(buf, 0, len);
587                    updateLastAccessed();
588                    receive(peer_addr, buf, 0, len); // calls receiver.receive(msg)
589
}
590                catch(OutOfMemoryError JavaDoc mem_ex) {
591                    if(log.isWarnEnabled()) log.warn("dropped invalid message, closing connection");
592                    break; // continue;
593
}
594                catch(EOFException eof_ex) { // peer closed connection
595
if(log.isTraceEnabled()) log.trace("exception is " + eof_ex);
596                    notifyConnectionClosed(peer_addr);
597                    break;
598                }
599                catch(IOException io_ex) {
600                    if(log.isTraceEnabled()) log.trace("exception is " + io_ex);
601                    notifyConnectionClosed(peer_addr);
602                    break;
603                }
604                catch(Throwable JavaDoc e) {
605                    if(log.isWarnEnabled()) log.warn("exception is " + e);
606                }
607            }
608            if(log.isTraceEnabled())
609                log.trace("ConnectionTable.Connection.Receiver terminated");
610            receiverThread=null;
611            closeSocket();
612            // remove(peer_addr);
613
}
614
615
616        public String JavaDoc toString() {
617            StringBuilder JavaDoc ret=new StringBuilder JavaDoc();
618            InetAddress JavaDoc local=null, remote=null;
619            String JavaDoc local_str, remote_str;
620
621            if(sock == null)
622                ret.append("<null socket>");
623            else {
624                //since the sock variable gets set to null we want to make
625
//make sure we make it through here without a nullpointer exception
626
Socket JavaDoc tmp_sock=sock;
627                local=tmp_sock.getLocalAddress();
628                remote=tmp_sock.getInetAddress();
629                local_str=local != null ? Util.shortName(local) : "<null>";
630                remote_str=remote != null ? Util.shortName(remote) : "<null>";
631                ret.append('<' + local_str + ':' + tmp_sock.getLocalPort() +
632                           " --> " + remote_str + ':' + tmp_sock.getPort() + "> (" +
633                           ((System.currentTimeMillis() - last_access) / 1000) + " secs old)");
634                tmp_sock=null;
635            }
636
637            return ret.toString();
638        }
639
640
641        void closeSocket() {
642            Util.close(sock); // should actually close in/out (so we don't need to close them explicitly)
643
sock=null;
644            Util.close(out); // flushes data
645
// removed 4/22/2003 (request by Roland Kurmann)
646
// out=null;
647
Util.close(in);
648        }
649
650
651        class Sender implements Runnable JavaDoc {
652            Thread JavaDoc senderThread;
653            private boolean is_it_running=false;
654
655            void start() {
656                if(senderThread == null || !senderThread.isAlive()) {
657                    senderThread=new Thread JavaDoc(thread_group, this, "ConnectionTable.Connection.Sender local_addr=" + local_addr + " [" + getSockAddress() + "]");
658                    senderThread.setDaemon(true);
659                    is_it_running=true;
660                    senderThread.start();
661                    if(log.isTraceEnabled())
662                        log.trace("sender thread started: " + senderThread);
663                }
664            }
665
666            void stop() {
667                is_it_running=false;
668                if(send_queue != null)
669                    send_queue.clear();
670                if(senderThread != null) {
671                    Thread JavaDoc tmp=senderThread;
672                    senderThread=null;
673                    Util.interruptAndWaitToDie(tmp);
674                }
675            }
676
677            boolean isRunning() {
678                return is_it_running && senderThread != null;
679            }
680
681            public void run() {
682                byte[] data;
683                while(senderThread != null && senderThread.equals(Thread.currentThread()) && is_it_running) {
684                    try {
685                        data=send_queue.take();
686                        if(data == null)
687                            continue;
688                        // we don't need to serialize access to 'out' as we're the only thread sending messages
689
_send(data, 0, data.length, false);
690                    }
691                    catch(InterruptedException JavaDoc e) {
692                        ;
693                    }
694                }
695                is_it_running=false;
696                if(log.isTraceEnabled())
697                    log.trace("ConnectionTable.Connection.Sender thread terminated");
698            }
699        }
700
701
702    }
703
704    class Reaper implements Runnable JavaDoc {
705        Thread JavaDoc t=null;
706
707        Reaper() {
708            ;
709        }
710
711        // return true if we have zero connections
712
private boolean haveZeroConnections() {
713            synchronized(conns) {
714                return conns.isEmpty();
715            }
716        }
717
718        public void start() {
719
720            if(haveZeroConnections())
721                return;
722            if(t != null && !t.isAlive())
723                t=null;
724            if(t == null) {
725                //RKU 7.4.2003, put in threadgroup
726
t=new Thread JavaDoc(thread_group, this, "ConnectionTable.ReaperThread");
727                t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
728
t.start();
729            }
730        }
731
732        public void stop() {
733            Thread JavaDoc tmp=t;
734            if(t != null)
735                t=null;
736            if(tmp != null) {
737                Util.interruptAndWaitToDie(tmp);
738            }
739        }
740
741
742        public boolean isRunning() {
743            return t != null;
744        }
745
746        public void run() {
747            Connection value;
748            Map.Entry entry;
749            long curr_time;
750
751            if(log.isDebugEnabled()) log.debug("connection reaper thread was started. Number of connections=" +
752                    conns.size() + ", reaper_interval=" + reaper_interval + ", conn_expire_time=" +
753                    conn_expire_time);
754
755            while(!haveZeroConnections() && t != null && t.equals(Thread.currentThread())) {
756                Util.sleep(reaper_interval);
757                if(t == null || !Thread.currentThread().equals(t))
758                    break;
759                synchronized(conns) {
760                    curr_time=System.currentTimeMillis();
761                    for(Iterator it=conns.entrySet().iterator(); it.hasNext();) {
762                        entry=(Map.Entry)it.next();
763                        value=(Connection)entry.getValue();
764                        if(log.isTraceEnabled()) log.trace("connection is " +
765                                                         ((curr_time - value.last_access) / 1000) + " seconds old (curr-time=" +
766                                                         curr_time + ", last_access=" + value.last_access + ')');
767                        if(value.last_access + conn_expire_time < curr_time) {
768                            if(log.isTraceEnabled()) log.trace("connection " + value +
769                                                             " has been idle for too long (conn_expire_time=" + conn_expire_time +
770                                                             "), will be removed");
771                            value.destroy();
772                            it.remove();
773                        }
774                    }
775                }
776            }
777            if(log.isDebugEnabled()) log.debug("reaper terminated");
778            t=null;
779        }
780    }
781 }
782
Popular Tags