KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > GossipRouter


1 // $Id: GossipRouter.java,v 1.11 2005/02/19 10:40:43 ovidiuf Exp $
2

3 package org.jgroups.stack;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.Address;
8 import org.jgroups.util.Promise;
9 import org.jgroups.util.Util;
10
11 import java.io.*;
12 import java.net.InetAddress JavaDoc;
13 import java.net.ServerSocket JavaDoc;
14 import java.net.Socket JavaDoc;
15 import java.util.*;
16
17 /**
18  * Router for TCP based group comunication (using layer TCP instead of UDP).
19  * Instead of the TCP layer sending packets point-to-point to each other
20  * member, it sends the packet to the router which - depending on the target
21  * address - multicasts or unicasts it to the group / or single member.<p>
22  * This class is especially interesting for applets which cannot directly make
23  * connections (neither UDP nor TCP) to a host different from the one they were
24  * loaded from. Therefore, an applet would create a normal channel plus
25  * protocol stack, but the bottom layer would have to be the TCP layer which
26  * sends all packets point-to-point (over a TCP connection) to the router,
27  * which in turn forwards them to their end location(s) (also over TCP). A
28  * centralized router would therefore have to be running on the host the applet
29  * was loaded from.<p>
30  * An alternative for running JGroups in an applet (IP multicast is not allows
31  * in applets as of 1.2), is to use point-to-point UDP communication via the
32  * gossip server. However, then the appplet has to be signed which involves
33  * additional administrative effort on the part of the user.<p>
34  * Since 2.1.1 the GossipRouter is also able to answer Gossip requests. Instead
35  * of running different Router and GossipServer processes, is enough just to
36  * run a single GossipRouter. This should simplify the administration of a
37  * JG realm that has needs gossip and routing services.
38  *
39  * @since 2.1.1
40  *
41  * @author Bela Ban
42  * @author Ovidiu Feodorov <ovidiuf@users.sourceforge.net>
43  */

44 public class GossipRouter {
45
46     public static final int GET = -10;
47     public static final int REGISTER = -11;
48     public static final int DUMP = -21;
49     public static final int SHUTDOWN = -1;
50     public static final int SHUTDOWN_OK = -2;
51
52     public static final int PORT = 8980;
53     public static final long EXPIRY_TIME = 30000;
54     public static final long GOSSIP_REQUEST_TIMEOUT = 1000;
55     public static final long ROUTING_CLIENT_REPLY_TIMEOUT = 120000;
56
57     // BufferedInputStream mark buffer size
58
private final int MARK_BUFFER_SIZE = 2048;
59
60     private static final Object JavaDoc GOSSIP_REQUEST = new Object JavaDoc();
61     private static final Object JavaDoc GOSSIP_FAILURE = new Object JavaDoc();
62
63     private int port;
64     private String JavaDoc bindAddressString;
65
66     // time (in msecs) until a cached 'gossip' member entry expires
67
private long expiryTime;
68
69     // number of millisecs the main thread waits to receive a gossip request
70
// after connection was established; upon expiration, the router initiates
71
// the routing protocol on the connection. Don't set the interval too big,
72
// otherwise the router will appear slow in answering routing requests.
73
private long gossipRequestTimeout;
74
75     // time (in ms) main thread waits for a router client to send the routing
76
// request type and the group afiliation before it declares the request
77
// failed.
78
private long routingClientReplyTimeout;
79
80     // HashMap<String,List<Address>. Maintains associations between groups and their members
81
private final Hashtable routingTable=new Hashtable();
82
83     // (groupname - vector of AddressEntry's)
84
private final Map gossipTable = new HashMap();
85
86     private ServerSocket JavaDoc srvSock = null;
87     private InetAddress JavaDoc bindAddress = null;
88
89     // the cache sweeper
90
Timer timer = null;
91
92     protected final Log log=LogFactory.getLog(this.getClass());
93
94     //
95
// JMX INSTRUMENTATION - MANAGEMENT INTERFACE
96
//
97

98     public GossipRouter() {
99         this(PORT);
100     }
101
102     public GossipRouter(int port) {
103         this(port, null);
104     }
105
106     public GossipRouter(int port, String JavaDoc bindAddressString) {
107         this(port, bindAddressString, EXPIRY_TIME);
108     }
109
110     public GossipRouter(int port, String JavaDoc bindAddressString,
111                         long expiryTime) {
112         this(port, bindAddressString, expiryTime,
113              GOSSIP_REQUEST_TIMEOUT,
114              ROUTING_CLIENT_REPLY_TIMEOUT);
115     }
116
117     public GossipRouter(int port, String JavaDoc bindAddressString,
118                         long expiryTime, long gossipRequestTimeout,
119                         long routingClientReplyTimeout) {
120         this.port=port;
121         this.bindAddressString=bindAddressString;
122         this.expiryTime = expiryTime;
123         this.gossipRequestTimeout = gossipRequestTimeout;
124         this.routingClientReplyTimeout = routingClientReplyTimeout;
125     }
126
127
128     //
129
// MANAGED ATTRIBUTES
130
//
131

132     public void setPort(int port) {
133         this.port = port;
134     }
135
136     public int getPort() {
137          return port;
138     }
139
140     public void setBindAddress(String JavaDoc bindAddress) {
141         bindAddressString = bindAddress;
142     }
143
144     public String JavaDoc getBindAddress() {
145          return bindAddressString;
146     }
147
148     public void setExpiryTime(long expiryTime) {
149         this.expiryTime = expiryTime;
150     }
151
152     public long getExpiryTime() {
153          return expiryTime;
154     }
155
156     public void setGossipRequestTimeout(long gossipRequestTimeout) {
157         this.gossipRequestTimeout = gossipRequestTimeout;
158     }
159
160     public long getGossipRequestTimeout() {
161          return gossipRequestTimeout;
162     }
163
164     public void setRoutingClientReplyTimeout(long routingClientReplyTimeout) {
165         this.routingClientReplyTimeout = routingClientReplyTimeout;
166     }
167
168     public long getRoutingClientReplyTimeout() {
169          return routingClientReplyTimeout;
170     }
171
172     public boolean isStarted() {
173         return srvSock!=null;
174     }
175
176     //
177
// JBoss MBean LIFECYCLE OPERATIONS
178
//
179

180
181     /**
182      * JBoss MBean lifecycle operation.
183      **/

184     public void create() throws Exception JavaDoc {
185         // not used
186
}
187
188     /**
189      * JBoss MBean lifecycle operation. Called after create(). When this method
190      * is called, the managed attributes have already been set.<br>
191      * Brings the Router in fully functional state.
192      **/

193     public void start() throws Exception JavaDoc {
194
195         if (srvSock!=null) {
196             throw new Exception JavaDoc("Router already started.");
197         }
198
199         if (bindAddressString!=null) {
200             bindAddress = InetAddress.getByName(bindAddressString);
201             srvSock = new ServerSocket JavaDoc(port, 50, bindAddress);
202         }
203         else {
204             srvSock = new ServerSocket JavaDoc(port, 50);
205         }
206
207         // start the main server thread
208
new Thread JavaDoc(new Runnable JavaDoc() {
209                 public void run() {
210                     mainLoop();
211                     cleanup();
212                 }
213             }, "JGroups Router Main Thread").start();
214
215         // starts the cache sweeper as daemon thread, so we won't block on it
216
// upon termination
217
timer = new Timer(true);
218         timer.schedule(new TimerTask() {
219                 public void run() {
220                     sweep();
221                 }
222             }, expiryTime, expiryTime);
223     }
224
225     /**
226      * JBoss MBean lifecycle operation. The JMX agent allways calls this method
227      * before destroy(). Close connections and frees resources.
228      **/

229     public void stop() {
230
231         if (srvSock==null) {
232             if(log.isWarnEnabled()) log.warn("Router already stopped");
233             return;
234         }
235
236         timer.cancel();
237         shutdown();
238         try {
239             srvSock.close();
240         }
241         catch(Exception JavaDoc e) {
242             if(log.isErrorEnabled()) log.error("Failed to close server socket: "+e);
243         }
244         // exiting the mainLoop will clean the tables
245
srvSock = null;
246         if(log.isInfoEnabled()) log.info("Router stopped");
247     }
248
249     /**
250      * JBoss MBean lifecycle operation.
251      **/

252     public void destroy() {
253         // not used
254
}
255
256
257     //
258
// ORDINARY OPERATIONS
259
//
260

261     public String JavaDoc dumpRoutingTable() {
262         return dumpTable(routingTable);
263     }
264
265     public String JavaDoc dumpGossipTable() {
266         return dumpTable(gossipTable);
267     }
268
269
270
271     //
272
// END OF MANAGEMENT INTERFACE
273
//
274

275     public static String JavaDoc requestTypeToString(int type) {
276         return
277             type == GET ? "GET" :
278                 (type == REGISTER ? "REGISTER" :
279                     (type == DUMP ? "DUMP" :
280                         (type == SHUTDOWN ? "SHUTDOWN" : "UNKNOWN REQUEST: "+type)));
281     }
282
283
284     /**
285      * The main server loop. Runs on the JGroups Router Main Thread.
286      **/

287     private void mainLoop() {
288         Socket JavaDoc sock = null;
289         DataInputStream input = null;
290         DataOutputStream output = null;
291         Address peer_addr = null;
292         byte[] buf;
293         int len, type = -1;
294         String JavaDoc gname = null;
295         Date d;
296         boolean up = true;
297
298         if(bindAddress == null) {
299             bindAddress=srvSock.getInetAddress();
300         }
301         d=new Date();
302         System.out.println("GossipRouter started at " + d +
303                            "\nListening on port " + port + " bound on address " + bindAddress + '\n');
304         d=null;
305
306         while(up) {
307
308             try {
309                 sock=srvSock.accept();
310                 sock.setSoLinger(true, 500);
311
312                 if(log.isTraceEnabled()) {
313                     log.trace("router accepted connection from "+sock);
314                 }
315
316                 final BufferedInputStream bis = new BufferedInputStream(sock.getInputStream());
317                 final Promise waitArea = new Promise();
318                 final Socket JavaDoc s = sock;
319
320                 // @since 2.2.1
321
// Handling of gossip requests on a different thread allows
322
// the GossipRouter to serve both routing and gossip requests.
323
// The GossipRouter stays backward compatible, old clients
324
// shouldn't be aware they talk to a merged Router/GossipServer.
325

326                Thread JavaDoc t = new Thread JavaDoc(new Runnable JavaDoc() {
327                         public void run() {
328                             ObjectInputStream ois = null;
329                             try {
330                                 bis.mark(MARK_BUFFER_SIZE);
331                                 // blocks until gossip request or 'forever'
332
ois = new ObjectInputStream(bis);
333                                 GossipData gossip_req = (GossipData)ois.readObject();
334
335                                 // it is a gossip request, set the main thread free
336
waitArea.setResult(GOSSIP_REQUEST);
337
338                                 GossipData gresp = processGossip(gossip_req);
339                                 if (gresp != null) {
340                                     ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
341                                     oos.writeObject(gresp);
342                                     oos.close();
343                                 }
344                                 bis.close();
345                                 s.close();
346                             }
347                             catch(Exception JavaDoc e) {
348                                 if(log.isDebugEnabled()) log.debug("gossip thread exception :"+e);
349                                 waitArea.setResult(GOSSIP_FAILURE);
350                             }
351                             finally {
352                                 try {
353                                     ois.close();
354                                 }
355                                 catch(Exception JavaDoc e) {
356                                     // OK
357
}
358                             }
359                         }
360                     }, "Gossip Request Thread");
361
362                 t.start();
363
364                 Object JavaDoc waitResult = waitArea.getResult(gossipRequestTimeout);
365                 waitArea.reset();
366
367                 if (waitResult != null) {
368                     // gossip request, let the gossip thread deal with it
369
continue;
370                 }
371
372                 // timeout, this is a routing request
373

374                 peer_addr = new IpAddress(sock.getInetAddress(), sock.getPort());
375                 output = new DataOutputStream(sock.getOutputStream());
376                 
377                 // return the address of the peer so it can set it
378
buf = Util.objectToByteBuffer(peer_addr);
379                 output.writeInt(buf.length);
380                 output.write(buf, 0, buf.length);
381
382                 // The gossip thread still waits for a serialized object, so
383
// wait that read to fail. If it actually gets a GossipData,
384
// that's an error condition we should handle here
385
waitResult = waitArea.getResult(routingClientReplyTimeout);
386
387                 if (waitResult == null) {
388                     // timeout
389
throw new Exception JavaDoc("Timeout waiting for router client answer");
390                 }
391                 else if (waitResult == GOSSIP_REQUEST) {
392                     // lazy gossip client, let it handle its business, it will
393
// fail anyway
394
output.close();
395                     continue;
396                 }
397
398                 bis.reset();
399                 input=new DataInputStream(bis);
400
401                 type=input.readInt();
402                 if(log.isTraceEnabled()) {
403                     log.trace("request of type "+requestTypeToString(type));
404                 }
405
406                 gname=input.readUTF();
407
408
409                 // We can have 2 kinds of messages at this point: GET requests or REGISTER requests.
410
// GET requests are processed right here, REGISTRATION requests cause the spawning of
411
// a separate thread handling it (long running thread as it will do the message routing
412
// on behalf of that client for the duration of the client's lifetime).
413

414                 switch(type) {
415                 case GossipRouter.GET:
416                     processGetRequest(sock, output, gname); // closes sock after processing
417
break;
418                 case GossipRouter.DUMP:
419                     processDumpRequest(sock, output); // closes sock after processing
420
break;
421                 case GossipRouter.REGISTER:
422                     Address addr;
423                     len=input.readInt();
424                     buf=new byte[len];
425                     input.readFully(buf, 0, buf.length); // read Address
426
addr=(Address)Util.objectFromByteBuffer(buf);
427                     SocketThread st = new SocketThread(sock, input, addr);
428                     addEntry(gname, new AddressEntry(addr, sock, st, output));
429                     st.start();
430                     break;
431                 case GossipRouter.SHUTDOWN:
432                     if(log.isInfoEnabled()) log.info("router shutting down");
433                     output.writeInt(SHUTDOWN_OK);
434                     output.flush();
435                     try {
436                         sock.close();
437                     }
438                     catch(Exception JavaDoc e) {
439                         // OK, going down anyway
440
}
441                     up = false;
442                     continue;
443                 default:
444                     if(log.isErrorEnabled()) log.error("request of type " + type + " not recognized");
445                     continue;
446                 }
447             }
448             catch(Exception JavaDoc e) {
449                 if(log.isErrorEnabled()) log.error("failure handling a client connection: " + e.getMessage(), e);
450                 try {
451                     sock.close();
452                 }
453                 catch(IOException e2) {
454                     if(log.isWarnEnabled()) log.warn("failed to close socket "+sock);
455                 }
456                 continue;
457             }
458         }
459     }
460
461
462     /**
463      * Cleans the routing tables while the Router is going down.
464      **/

465     private void cleanup() {
466
467         // shutdown the routing threads and cleanup the tables
468
synchronized(routingTable) {
469             for(Iterator i=routingTable.keySet().iterator(); i.hasNext();) {
470                 String JavaDoc gname=(String JavaDoc)i.next();
471                 List l=(List)routingTable.get(gname);
472                 if (l!=null) {
473                     for(Iterator j=l.iterator(); j.hasNext(); ) {
474                         AddressEntry e = (AddressEntry)j.next();
475                         e.destroy();
476                     }
477                 }
478             }
479             routingTable.clear();
480             if(log.isInfoEnabled()) log.info("routing table cleared");
481         }
482         synchronized(gossipTable) {
483             gossipTable.clear();
484             if(log.isInfoEnabled()) log.info("gossip table cleared");
485         }
486
487     }
488
489     /**
490      * Connects to the ServerSocket and sends the shutdown header.
491      **/

492     private void shutdown() {
493         try {
494             Socket JavaDoc s = new Socket JavaDoc(srvSock.getInetAddress(),
495                                   srvSock.getLocalPort());
496             DataInputStream dis = new DataInputStream(s.getInputStream());
497             int len = dis.readInt();
498             byte[] buf = new byte[len];
499             dis.readFully(buf, 0, buf.length);
500             DataOutputStream dos = new DataOutputStream(s.getOutputStream());
501             dos.writeInt(SHUTDOWN);
502             dos.writeUTF("");
503             // waits until the server replies
504
dis.readInt();
505             dos.flush();
506             dos.close();
507             s.close();
508         }
509         catch(Exception JavaDoc e) {
510             if(log.isErrorEnabled()) log.error("shutdown failed: "+e);
511         }
512         
513     }
514
515     //
516
// GOSSIPING
517
//
518

519     /**
520      * @since 2.2.1
521      **/

522     private GossipData processGossip(GossipData gossip) {
523
524         if(log.isTraceEnabled()) log.trace("gossip is "+gossip);
525
526         if (gossip==null) {
527             if(log.isWarnEnabled()) log.warn("null gossip request");
528             return null;
529         }
530
531         String JavaDoc group = gossip.getGroup();
532         Address mbr = null;
533
534         synchronized(gossipTable) {
535
536             switch(gossip.getType()) {
537                 
538             case GossipData.REGISTER_REQ:
539                 mbr=gossip.getMbr();
540                 if(group == null || mbr == null) {
541                     if(log.isErrorEnabled()) log.error("group or member is null, cannot register member");
542                     return null;
543                 }
544                 addGossipEntry(group, new AddressEntry(mbr));
545                 return null;
546
547             case GossipData.GET_REQ:
548                 if(group == null) {
549                     if(log.isErrorEnabled()) log.error("group is null, cannot get membership");
550                     return null;
551                 }
552                 Vector mbrs = null;
553                 List l = (List)gossipTable.get(group);
554                 if (l != null) {
555                     mbrs = new Vector();
556                     for(Iterator i = l.iterator(); i.hasNext(); ) {
557                         AddressEntry e = (AddressEntry)i.next();
558                         mbrs.add(e.addr);
559                     }
560                 }
561                 return new GossipData(GossipData.GET_RSP, group, null, mbrs);
562
563             case GossipData.GET_RSP:
564                 if(log.isWarnEnabled()) log.warn("received a GET_RSP. Should not be received by server");
565                 return null;
566
567             default:
568                 if(log.isWarnEnabled()) log.warn("received unkown gossip request (gossip=" + gossip + ')');
569                 return null;
570             }
571         }
572     }
573
574
575     /**
576      * Adds a new member to the group in the gossip table or renews the
577      * membership where is the case.
578      *
579      * @since 2.2.1
580      **/

581     private void addGossipEntry(String JavaDoc groupname, AddressEntry e) {
582
583         List val;
584
585         if(groupname == null) {
586             if(log.isErrorEnabled()) log.error("groupname was null, not added !");
587             return;
588         }
589
590         synchronized(gossipTable) {
591
592             val=(List)gossipTable.get(groupname);
593             if(val == null) {
594                 val=Collections.synchronizedList(new ArrayList());
595                 gossipTable.put(groupname, val);
596             }
597             int index = val.indexOf(e);
598             if (index==-1) {
599                 val.add(e);
600                 return;
601             }
602             ((AddressEntry)val.get(index)).update();
603         }
604     }
605
606
607     /**
608      * Removes expired gossip entries (entries older than EXPIRY_TIME msec).
609      * @since 2.2.1
610      */

611     private void sweep() {
612
613         long diff, currentTime=System.currentTimeMillis();
614         int num_entries_removed=0;
615         String JavaDoc key=null;
616         List val;
617
618         if(log.isTraceEnabled()) log.trace("running sweep");
619
620         synchronized(gossipTable) {
621             for(Iterator i=gossipTable.keySet().iterator(); i.hasNext();) {
622                 key=(String JavaDoc)i.next();
623                 val=(List)gossipTable.get(key);
624                 if(val != null) {
625                     for(Iterator j=val.iterator(); j.hasNext();) {
626                         AddressEntry ae = (AddressEntry)j.next();
627                         diff=currentTime - ae.timestamp;
628                         if(diff > expiryTime) {
629                             j.remove();
630                             if(log.isTraceEnabled())
631                                 log.trace("Removed member " + ae + " from group " + key + '(' + diff + " msecs old)");
632                             num_entries_removed++;
633                         }
634                     }
635                 }
636             }
637         }
638         
639         if(num_entries_removed > 0) {
640             if(log.isTraceEnabled()) log.trace("done (removed " + num_entries_removed + " entries)");
641         }
642     }
643
644     //
645
// ROUTING
646
//
647

648     /**
649      Gets the members of group 'groupname'. Returns them as a List of Addresses.
650      */

651     private void processGetRequest(Socket JavaDoc sock, DataOutputStream output, String JavaDoc groupname) {
652
653         List grpmbrs=(List)routingTable.get(groupname);
654         org.jgroups.util.List ret=null;
655         AddressEntry entry;
656         byte[] buf;
657
658         if(log.isTraceEnabled()) {
659             log.trace("groupname=" + groupname + ", result=" + grpmbrs);
660         }
661
662         if(grpmbrs != null && grpmbrs.size() > 0) {
663             ret=new org.jgroups.util.List();
664             for(Iterator i=grpmbrs.iterator(); i.hasNext(); ) {
665                 entry=(AddressEntry)i.next();
666                 ret.add(entry.addr);
667             }
668         }
669         try {
670             if(ret == null || ret.size() == 0) {
671                 output.writeInt(0);
672             }
673             else {
674                 buf=Util.objectToByteBuffer(ret);
675                 output.writeInt(buf.length);
676                 output.write(buf, 0, buf.length);
677             }
678         }
679         catch(Exception JavaDoc e) {
680             if(log.isErrorEnabled()) log.error("exception=" + e);
681         }
682         finally {
683             try {
684                 if(output != null)
685                     output.close();
686                 sock.close();
687             }
688             catch(Exception JavaDoc e) {
689             }
690         }
691     }
692
693
694     /**
695      * Dumps the routing table as String to the socket's OutputStream.
696      **/

697     private void processDumpRequest(Socket JavaDoc sock, DataOutputStream output) {
698
699         try {
700             output.writeUTF(dumpRoutingTable());
701         }
702         catch(Exception JavaDoc e) {
703             if(log.isErrorEnabled()) log.error("error sending the answer back to the client: " + e);
704         }
705         finally {
706             try {
707                 if(output != null) {
708                     output.close();
709                 }
710             }
711             catch(Exception JavaDoc e) {
712                 if(log.isErrorEnabled()) log.error("error closing the output stream: " + e);
713             }
714             try {
715                 sock.close();
716             }
717             catch(Exception JavaDoc e) {
718                 if(log.isErrorEnabled()) log.error("error closing the socket: " + e);
719             }
720         }
721     }
722
723
724     private String JavaDoc dumpTable(Map map) {
725
726         String JavaDoc label = (map instanceof Hashtable)?"routing":"gossip";
727         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
728         synchronized(map) {
729             if(map.size() == 0) {
730                 sb.append("empty ");
731                 sb.append(label);
732                 sb.append(" table");
733             }
734             else {
735                 for(Iterator i=map.keySet().iterator(); i.hasNext();) {
736                     String JavaDoc gname=(String JavaDoc)i.next();
737                     sb.append("GROUP: '" + gname + "'\n");
738                     List l=(List)map.get(gname);
739                     if(l == null) {
740                         sb.append("\tnull list of addresses\n");
741                     }
742                     else
743                         if(l.size() == 0) {
744                             sb.append("\tempty list of addresses\n");
745                         }
746                         else {
747                             for(Iterator j=l.iterator(); j.hasNext();) {
748                                 AddressEntry ae=(AddressEntry)j.next();
749                                 sb.append('\t');
750                                 sb.append(ae.toString());
751                                 sb.append('\n');
752                             }
753                         }
754                 }
755             }
756         }
757         return sb.toString();
758     }
759
760
761
762     private void route(Address dest, String JavaDoc dest_group, byte[] msg, Address sender) {
763         if(log.isTraceEnabled()) {
764             int len=msg != null? msg.length : 0;
765             log.trace("routing request from " + sender + " for "+dest_group+" to " +
766                       (dest==null?"ALL":dest.toString())+", " + len + " bytes");
767         }
768
769         if(dest == null) {
770             // send to all members in group dest.getChannelName()
771
if(dest_group == null) {
772                 if(log.isErrorEnabled()) log.error("both dest address and group are null");
773                 return;
774             }
775             else {
776                 sendToAllMembersInGroup(dest_group, msg, sender);
777             }
778         }
779         else {
780             // send to destination address
781
AddressEntry ae = findAddressEntry(dest);
782             if (ae == null) {
783                 if(log.isErrorEnabled()) log.error("cannot find address "+dest+" in the routing table");
784                 return;
785             }
786             if (ae.output==null) {
787                 if(log.isErrorEnabled()) log.error("address "+dest+" is associated with a null output stream");
788                 return;
789             }
790             try {
791                 sendToMember(ae.output, msg);
792             }
793             catch(Exception JavaDoc e) {
794                 if(log.isErrorEnabled()) log.error("failed sending message to "+dest+": "+e.getMessage());
795                 removeEntry(ae.sock); // will close socket
796
}
797         }
798     }
799
8