KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > GossipRouter


1 // $Id: GossipRouter.java,v 1.11 2005/02/19 10:40:43 ovidiuf Exp $
2

3 package org.jgroups.stack;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.Address;
8 import org.jgroups.util.Promise;
9 import org.jgroups.util.Util;
10
11 import java.io.*;
12 import java.net.InetAddress JavaDoc;
13 import java.net.ServerSocket JavaDoc;
14 import java.net.Socket JavaDoc;
15 import java.util.*;
16
17 /**
18  * Router for TCP based group comunication (using layer TCP instead of UDP).
19  * Instead of the TCP layer sending packets point-to-point to each other
20  * member, it sends the packet to the router which - depending on the target
21  * address - multicasts or unicasts it to the group / or single member.<p>
22  * This class is especially interesting for applets which cannot directly make
23  * connections (neither UDP nor TCP) to a host different from the one they were
24  * loaded from. Therefore, an applet would create a normal channel plus
25  * protocol stack, but the bottom layer would have to be the TCP layer which
26  * sends all packets point-to-point (over a TCP connection) to the router,
27  * which in turn forwards them to their end location(s) (also over TCP). A
28  * centralized router would therefore have to be running on the host the applet
29  * was loaded from.<p>
30  * An alternative for running JGroups in an applet (IP multicast is not allows
31  * in applets as of 1.2), is to use point-to-point UDP communication via the
32  * gossip server. However, then the appplet has to be signed which involves
33  * additional administrative effort on the part of the user.<p>
34  * Since 2.1.1 the GossipRouter is also able to answer Gossip requests. Instead
35  * of running different Router and GossipServer processes, is enough just to
36  * run a single GossipRouter. This should simplify the administration of a
37  * JG realm that has needs gossip and routing services.
38  *
39  * @since 2.1.1
40  *
41  * @author Bela Ban
42  * @author Ovidiu Feodorov <ovidiuf@users.sourceforge.net>
43  */

44 public class GossipRouter {
45
46     public static final int GET = -10;
47     public static final int REGISTER = -11;
48     public static final int DUMP = -21;
49     public static final int SHUTDOWN = -1;
50     public static final int SHUTDOWN_OK = -2;
51
52     public static final int PORT = 8980;
53     public static final long EXPIRY_TIME = 30000;
54     public static final long GOSSIP_REQUEST_TIMEOUT = 1000;
55     public static final long ROUTING_CLIENT_REPLY_TIMEOUT = 120000;
56
57     // BufferedInputStream mark buffer size
58
private final int MARK_BUFFER_SIZE = 2048;
59
60     private static final Object JavaDoc GOSSIP_REQUEST = new Object JavaDoc();
61     private static final Object JavaDoc GOSSIP_FAILURE = new Object JavaDoc();
62
63     private int port;
64     private String JavaDoc bindAddressString;
65
66     // time (in msecs) until a cached 'gossip' member entry expires
67
private long expiryTime;
68
69     // number of millisecs the main thread waits to receive a gossip request
70
// after connection was established; upon expiration, the router initiates
71
// the routing protocol on the connection. Don't set the interval too big,
72
// otherwise the router will appear slow in answering routing requests.
73
private long gossipRequestTimeout;
74
75     // time (in ms) main thread waits for a router client to send the routing
76
// request type and the group afiliation before it declares the request
77
// failed.
78
private long routingClientReplyTimeout;
79
80     // HashMap<String,List<Address>. Maintains associations between groups and their members
81
private final Hashtable routingTable=new Hashtable();
82
83     // (groupname - vector of AddressEntry's)
84
private final Map gossipTable = new HashMap();
85
86     private ServerSocket JavaDoc srvSock = null;
87     private InetAddress JavaDoc bindAddress = null;
88
89     // the cache sweeper
90
Timer timer = null;
91
92     protected final Log log=LogFactory.getLog(this.getClass());
93
94     //
95
// JMX INSTRUMENTATION - MANAGEMENT INTERFACE
96
//
97

98     public GossipRouter() {
99         this(PORT);
100     }
101
102     public GossipRouter(int port) {
103         this(port, null);
104     }
105
106     public GossipRouter(int port, String JavaDoc bindAddressString) {
107         this(port, bindAddressString, EXPIRY_TIME);
108     }
109
110     public GossipRouter(int port, String JavaDoc bindAddressString,
111                         long expiryTime) {
112         this(port, bindAddressString, expiryTime,
113              GOSSIP_REQUEST_TIMEOUT,
114              ROUTING_CLIENT_REPLY_TIMEOUT);
115     }
116
117     public GossipRouter(int port, String JavaDoc bindAddressString,
118                         long expiryTime, long gossipRequestTimeout,
119                         long routingClientReplyTimeout) {
120         this.port=port;
121         this.bindAddressString=bindAddressString;
122         this.expiryTime = expiryTime;
123         this.gossipRequestTimeout = gossipRequestTimeout;
124         this.routingClientReplyTimeout = routingClientReplyTimeout;
125     }
126
127
128     //
129
// MANAGED ATTRIBUTES
130
//
131

132     public void setPort(int port) {
133         this.port = port;
134     }
135
136     public int getPort() {
137          return port;
138     }
139
140     public void setBindAddress(String JavaDoc bindAddress) {
141         bindAddressString = bindAddress;
142     }
143
144     public String JavaDoc getBindAddress() {
145          return bindAddressString;
146     }
147
148     public void setExpiryTime(long expiryTime) {
149         this.expiryTime = expiryTime;
150     }
151
152     public long getExpiryTime() {
153          return expiryTime;
154     }
155
156     public void setGossipRequestTimeout(long gossipRequestTimeout) {
157         this.gossipRequestTimeout = gossipRequestTimeout;
158     }
159
160     public long getGossipRequestTimeout() {
161          return gossipRequestTimeout;
162     }
163
164     public void setRoutingClientReplyTimeout(long routingClientReplyTimeout) {
165         this.routingClientReplyTimeout = routingClientReplyTimeout;
166     }
167
168     public long getRoutingClientReplyTimeout() {
169          return routingClientReplyTimeout;
170     }
171
172     public boolean isStarted() {
173         return srvSock!=null;
174     }
175
176     //
177
// JBoss MBean LIFECYCLE OPERATIONS
178
//
179

180
181     /**
182      * JBoss MBean lifecycle operation.
183      **/

184     public void create() throws Exception JavaDoc {
185         // not used
186
}
187
188     /**
189      * JBoss MBean lifecycle operation. Called after create(). When this method
190      * is called, the managed attributes have already been set.<br>
191      * Brings the Router in fully functional state.
192      **/

193     public void start() throws Exception JavaDoc {
194
195         if (srvSock!=null) {
196             throw new Exception JavaDoc("Router already started.");
197         }
198
199         if (bindAddressString!=null) {
200             bindAddress = InetAddress.getByName(bindAddressString);
201             srvSock = new ServerSocket JavaDoc(port, 50, bindAddress);
202         }
203         else {
204             srvSock = new ServerSocket JavaDoc(port, 50);
205         }
206
207         // start the main server thread
208
new Thread JavaDoc(new Runnable JavaDoc() {
209                 public void run() {
210                     mainLoop();
211                     cleanup();
212                 }
213             }, "JGroups Router Main Thread").start();
214
215         // starts the cache sweeper as daemon thread, so we won't block on it
216
// upon termination
217
timer = new Timer(true);
218         timer.schedule(new TimerTask() {
219                 public void run() {
220                     sweep();
221                 }
222             }, expiryTime, expiryTime);
223     }
224
225     /**
226      * JBoss MBean lifecycle operation. The JMX agent allways calls this method
227      * before destroy(). Close connections and frees resources.
228      **/

229     public void stop() {
230
231         if (srvSock==null) {
232             if(log.isWarnEnabled()) log.warn("Router already stopped");
233             return;
234         }
235
236         timer.cancel();
237         shutdown();
238         try {
239             srvSock.close();
240         }
241         catch(Exception JavaDoc e) {
242             if(log.isErrorEnabled()) log.error("Failed to close server socket: "+e);
243         }
244         // exiting the mainLoop will clean the tables
245
srvSock = null;
246         if(log.isInfoEnabled()) log.info("Router stopped");
247     }
248
249     /**
250      * JBoss MBean lifecycle operation.
251      **/

252     public void destroy() {
253         // not used
254
}
255
256
257     //
258
// ORDINARY OPERATIONS
259
//
260

261     public String JavaDoc dumpRoutingTable() {
262         return dumpTable(routingTable);
263     }
264
265     public String JavaDoc dumpGossipTable() {
266         return dumpTable(gossipTable);
267     }
268
269
270
271     //
272
// END OF MANAGEMENT INTERFACE
273
//
274

275     public static String JavaDoc requestTypeToString(int type) {
276         return
277             type == GET ? "GET" :
278                 (type == REGISTER ? "REGISTER" :
279                     (type == DUMP ? "DUMP" :
280                         (type == SHUTDOWN ? "SHUTDOWN" : "UNKNOWN REQUEST: "+type)));
281     }
282
283
284     /**
285      * The main server loop. Runs on the JGroups Router Main Thread.
286      **/

287     private void mainLoop() {
288         Socket JavaDoc sock = null;
289         DataInputStream input = null;
290         DataOutputStream output = null;
291         Address peer_addr = null;
292         byte[] buf;
293         int len, type = -1;
294         String JavaDoc gname = null;
295         Date d;
296         boolean up = true;
297
298         if(bindAddress == null) {
299             bindAddress=srvSock.getInetAddress();
300         }
301         d=new Date();
302         System.out.println("GossipRouter started at " + d +
303                            "\nListening on port " + port + " bound on address " + bindAddress + '\n');
304         d=null;
305
306         while(up) {
307
308             try {
309                 sock=srvSock.accept();
310                 sock.setSoLinger(true, 500);
311
312                 if(log.isTraceEnabled()) {
313                     log.trace("router accepted connection from "+sock);
314                 }
315
316                 final BufferedInputStream bis = new BufferedInputStream(sock.getInputStream());
317                 final Promise waitArea = new Promise();
318                 final Socket JavaDoc s = sock;
319
320                 // @since 2.2.1
321
// Handling of gossip requests on a different thread allows
322
// the GossipRouter to serve both routing and gossip requests.
323
// The GossipRouter stays backward compatible, old clients
324
// shouldn't be aware they talk to a merged Router/GossipServer.
325

326                Thread JavaDoc t = new Thread JavaDoc(new Runnable JavaDoc() {
327                         public void run() {
328                             ObjectInputStream ois = null;
329                             try {
330                                 bis.mark(MARK_BUFFER_SIZE);
331                                 // blocks until gossip request or 'forever'
332
ois = new ObjectInputStream(bis);
333                                 GossipData gossip_req = (GossipData)ois.readObject();
334
335                                 // it is a gossip request, set the main thread free
336
waitArea.setResult(GOSSIP_REQUEST);
337
338                                 GossipData gresp = processGossip(gossip_req);
339                                 if (gresp != null) {
340                                     ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
341                                     oos.writeObject(gresp);
342                                     oos.close();
343                                 }
344                                 bis.close();
345                                 s.close();
346                             }
347                             catch(Exception JavaDoc e) {
348                                 if(log.isDebugEnabled()) log.debug("gossip thread exception :"+e);
349                                 waitArea.setResult(GOSSIP_FAILURE);
350                             }
351                             finally {
352                                 try {
353                                     ois.close();
354                                 }
355                                 catch(Exception JavaDoc e) {
356                                     // OK
357
}
358                             }
359                         }
360                     }, "Gossip Request Thread");
361
362                 t.start();
363
364                 Object JavaDoc waitResult = waitArea.getResult(gossipRequestTimeout);
365                 waitArea.reset();
366
367                 if (waitResult != null) {
368                     // gossip request, let the gossip thread deal with it
369
continue;
370                 }
371
372                 // timeout, this is a routing request
373

374                 peer_addr = new IpAddress(sock.getInetAddress(), sock.getPort());
375                 output = new DataOutputStream(sock.getOutputStream());
376                 
377                 // return the address of the peer so it can set it
378
buf = Util.objectToByteBuffer(peer_addr);
379                 output.writeInt(buf.length);
380                 output.write(buf, 0, buf.length);
381
382                 // The gossip thread still waits for a serialized object, so
383
// wait that read to fail. If it actually gets a GossipData,
384
// that's an error condition we should handle here
385
waitResult = waitArea.getResult(routingClientReplyTimeout);
386
387                 if (waitResult == null) {
388                     // timeout
389
throw new Exception JavaDoc("Timeout waiting for router client answer");
390                 }
391                 else if (waitResult == GOSSIP_REQUEST) {
392                     // lazy gossip client, let it handle its business, it will
393
// fail anyway
394
output.close();
395                     continue;
396                 }
397
398                 bis.reset();
399                 input=new DataInputStream(bis);
400
401                 type=input.readInt();
402                 if(log.isTraceEnabled()) {
403                     log.trace("request of type "+requestTypeToString(type));
404                 }
405
406                 gname=input.readUTF();
407
408
409                 // We can have 2 kinds of messages at this point: GET requests or REGISTER requests.
410
// GET requests are processed right here, REGISTRATION requests cause the spawning of
411
// a separate thread handling it (long running thread as it will do the message routing
412
// on behalf of that client for the duration of the client's lifetime).
413

414                 switch(type) {
415                 case GossipRouter.GET:
416                     processGetRequest(sock, output, gname); // closes sock after processing
417
break;
418                 case GossipRouter.DUMP:
419                     processDumpRequest(sock, output); // closes sock after processing
420
break;
421                 case GossipRouter.REGISTER:
422                     Address addr;
423                     len=input.readInt();
424                     buf=new byte[len];
425                     input.readFully(buf, 0, buf.length); // read Address
426
addr=(Address)Util.objectFromByteBuffer(buf);
427                     SocketThread st = new SocketThread(sock, input, addr);
428                     addEntry(gname, new AddressEntry(addr, sock, st, output));
429                     st.start();
430                     break;
431                 case GossipRouter.SHUTDOWN:
432                     if(log.isInfoEnabled()) log.info("router shutting down");
433                     output.writeInt(SHUTDOWN_OK);
434                     output.flush();
435                     try {
436                         sock.close();
437                     }
438                     catch(Exception JavaDoc e) {
439                         // OK, going down anyway
440
}
441                     up = false;
442                     continue;
443                 default:
444                     if(log.isErrorEnabled()) log.error("request of type " + type + " not recognized");
445                     continue;
446                 }
447             }
448             catch(Exception JavaDoc e) {
449                 if(log.isErrorEnabled()) log.error("failure handling a client connection: " + e.getMessage(), e);
450                 try {
451                     sock.close();
452                 }
453                 catch(IOException e2) {
454                     if(log.isWarnEnabled()) log.warn("failed to close socket "+sock);
455                 }
456                 continue;
457             }
458         }
459     }
460
461
462     /**
463      * Cleans the routing tables while the Router is going down.
464      **/

465     private void cleanup() {
466
467         // shutdown the routing threads and cleanup the tables
468
synchronized(routingTable) {
469             for(Iterator i=routingTable.keySet().iterator(); i.hasNext();) {
470                 String JavaDoc gname=(String JavaDoc)i.next();
471                 List l=(List)routingTable.get(gname);
472                 if (l!=null) {
473                     for(Iterator j=l.iterator(); j.hasNext(); ) {
474                         AddressEntry e = (AddressEntry)j.next();
475                         e.destroy();
476                     }
477                 }
478             }
479             routingTable.clear();
480             if(log.isInfoEnabled()) log.info("routing table cleared");
481         }
482         synchronized(gossipTable) {
483             gossipTable.clear();
484             if(log.isInfoEnabled()) log.info("gossip table cleared");
485         }
486
487     }
488
489     /**
490      * Connects to the ServerSocket and sends the shutdown header.
491      **/

492     private void shutdown() {
493         try {
494             Socket JavaDoc s = new Socket JavaDoc(srvSock.getInetAddress(),
495                                   srvSock.getLocalPort());
496             DataInputStream dis = new DataInputStream(s.getInputStream());
497             int len = dis.readInt();
498             byte[] buf = new byte[len];
499             dis.readFully(buf, 0, buf.length);
500             DataOutputStream dos = new DataOutputStream(s.getOutputStream());
501             dos.writeInt(SHUTDOWN);
502             dos.writeUTF("");
503             // waits until the server replies
504
dis.readInt();
505             dos.flush();
506             dos.close();
507             s.close();
508         }
509         catch(Exception JavaDoc e) {
510             if(log.isErrorEnabled()) log.error("shutdown failed: "+e);
511         }
512         
513     }
514
515     //
516
// GOSSIPING
517
//
518

519     /**
520      * @since 2.2.1
521      **/

522     private GossipData processGossip(GossipData gossip) {
523
524         if(log.isTraceEnabled()) log.trace("gossip is "+gossip);
525
526         if (gossip==null) {
527             if(log.isWarnEnabled()) log.warn("null gossip request");
528             return null;
529         }
530
531         String JavaDoc group = gossip.getGroup();
532         Address mbr = null;
533
534         synchronized(gossipTable) {
535
536             switch(gossip.getType()) {
537                 
538             case GossipData.REGISTER_REQ:
539                 mbr=gossip.getMbr();
540                 if(group == null || mbr == null) {
541                     if(log.isErrorEnabled()) log.error("group or member is null, cannot register member");
542                     return null;
543                 }
544                 addGossipEntry(group, new AddressEntry(mbr));
545                 return null;
546
547             case GossipData.GET_REQ:
548                 if(group == null) {
549                     if(log.isErrorEnabled()) log.error("group is null, cannot get membership");
550                     return null;
551                 }
552                 Vector mbrs = null;
553                 List l = (List)gossipTable.get(group);
554                 if (l != null) {
555                     mbrs = new Vector();
556                     for(Iterator i = l.iterator(); i.hasNext(); ) {
557                         AddressEntry e = (AddressEntry)i.next();
558                         mbrs.add(e.addr);
559                     }
560                 }
561                 return new GossipData(GossipData.GET_RSP, group, null, mbrs);
562
563             case GossipData.GET_RSP:
564                 if(log.isWarnEnabled()) log.warn("received a GET_RSP. Should not be received by server");
565                 return null;
566
567             default:
568                 if(log.isWarnEnabled()) log.warn("received unkown gossip request (gossip=" + gossip + ')');
569                 return null;
570             }
571         }
572     }
573
574
575     /**
576      * Adds a new member to the group in the gossip table or renews the
577      * membership where is the case.
578      *
579      * @since 2.2.1
580      **/

581     private void addGossipEntry(String JavaDoc groupname, AddressEntry e) {
582
583         List val;
584
585         if(groupname == null) {
586             if(log.isErrorEnabled()) log.error("groupname was null, not added !");
587             return;
588         }
589
590         synchronized(gossipTable) {
591
592             val=(List)gossipTable.get(groupname);
593             if(val == null) {
594                 val=Collections.synchronizedList(new ArrayList());
595                 gossipTable.put(groupname, val);
596             }
597             int index = val.indexOf(e);
598             if (index==-1) {
599                 val.add(e);
600                 return;
601             }
602             ((AddressEntry)val.get(index)).update();
603         }
604     }
605
606
607     /**
608      * Removes expired gossip entries (entries older than EXPIRY_TIME msec).
609      * @since 2.2.1
610      */

611     private void sweep() {
612
613         long diff, currentTime=System.currentTimeMillis();
614         int num_entries_removed=0;
615         String JavaDoc key=null;
616         List val;
617
618         if(log.isTraceEnabled()) log.trace("running sweep");
619
620         synchronized(gossipTable) {
621             for(Iterator i=gossipTable.keySet().iterator(); i.hasNext();) {
622                 key=(String JavaDoc)i.next();
623                 val=(List)gossipTable.get(key);
624                 if(val != null) {
625                     for(Iterator j=val.iterator(); j.hasNext();) {
626                         AddressEntry ae = (AddressEntry)j.next();
627                         diff=currentTime - ae.timestamp;
628                         if(diff > expiryTime) {
629                             j.remove();
630                             if(log.isTraceEnabled())
631                                 log.trace("Removed member " + ae + " from group " + key + '(' + diff + " msecs old)");
632                             num_entries_removed++;
633                         }
634                     }
635                 }
636             }
637         }
638         
639         if(num_entries_removed > 0) {
640             if(log.isTraceEnabled()) log.trace("done (removed " + num_entries_removed + " entries)");
641         }
642     }
643
644     //
645
// ROUTING
646
//
647

648     /**
649      Gets the members of group 'groupname'. Returns them as a List of Addresses.
650      */

651     private void processGetRequest(Socket JavaDoc sock, DataOutputStream output, String JavaDoc groupname) {
652
653         List grpmbrs=(List)routingTable.get(groupname);
654         org.jgroups.util.List ret=null;
655         AddressEntry entry;
656         byte[] buf;
657
658         if(log.isTraceEnabled()) {
659             log.trace("groupname=" + groupname + ", result=" + grpmbrs);
660         }
661
662         if(grpmbrs != null && grpmbrs.size() > 0) {
663             ret=new org.jgroups.util.List();
664             for(Iterator i=grpmbrs.iterator(); i.hasNext(); ) {
665                 entry=(AddressEntry)i.next();
666                 ret.add(entry.addr);
667             }
668         }
669         try {
670             if(ret == null || ret.size() == 0) {
671                 output.writeInt(0);
672             }
673             else {
674                 buf=Util.objectToByteBuffer(ret);
675                 output.writeInt(buf.length);
676                 output.write(buf, 0, buf.length);
677             }
678         }
679         catch(Exception JavaDoc e) {
680             if(log.isErrorEnabled()) log.error("exception=" + e);
681         }
682         finally {
683             try {
684                 if(output != null)
685                     output.close();
686                 sock.close();
687             }
688             catch(Exception JavaDoc e) {
689             }
690         }
691     }
692
693
694     /**
695      * Dumps the routing table as String to the socket's OutputStream.
696      **/

697     private void processDumpRequest(Socket JavaDoc sock, DataOutputStream output) {
698
699         try {
700             output.writeUTF(dumpRoutingTable());
701         }
702         catch(Exception JavaDoc e) {
703             if(log.isErrorEnabled()) log.error("error sending the answer back to the client: " + e);
704         }
705         finally {
706             try {
707                 if(output != null) {
708                     output.close();
709                 }
710             }
711             catch(Exception JavaDoc e) {
712                 if(log.isErrorEnabled()) log.error("error closing the output stream: " + e);
713             }
714             try {
715                 sock.close();
716             }
717             catch(Exception JavaDoc e) {
718                 if(log.isErrorEnabled()) log.error("error closing the socket: " + e);
719             }
720         }
721     }
722
723
724     private String JavaDoc dumpTable(Map map) {
725
726         String JavaDoc label = (map instanceof Hashtable)?"routing":"gossip";
727         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
728         synchronized(map) {
729             if(map.size() == 0) {
730                 sb.append("empty ");
731                 sb.append(label);
732                 sb.append(" table");
733             }
734             else {
735                 for(Iterator i=map.keySet().iterator(); i.hasNext();) {
736                     String JavaDoc gname=(String JavaDoc)i.next();
737                     sb.append("GROUP: '" + gname + "'\n");
738                     List l=(List)map.get(gname);
739                     if(l == null) {
740                         sb.append("\tnull list of addresses\n");
741                     }
742                     else
743                         if(l.size() == 0) {
744                             sb.append("\tempty list of addresses\n");
745                         }
746                         else {
747                             for(Iterator j=l.iterator(); j.hasNext();) {
748                                 AddressEntry ae=(AddressEntry)j.next();
749                                 sb.append('\t');
750                                 sb.append(ae.toString());
751                                 sb.append('\n');
752                             }
753                         }
754                 }
755             }
756         }
757         return sb.toString();
758     }
759
760
761
762     private void route(Address dest, String JavaDoc dest_group, byte[] msg, Address sender) {
763         if(log.isTraceEnabled()) {
764             int len=msg != null? msg.length : 0;
765             log.trace("routing request from " + sender + " for "+dest_group+" to " +
766                       (dest==null?"ALL":dest.toString())+", " + len + " bytes");
767         }
768
769         if(dest == null) {
770             // send to all members in group dest.getChannelName()
771
if(dest_group == null) {
772                 if(log.isErrorEnabled()) log.error("both dest address and group are null");
773                 return;
774             }
775             else {
776                 sendToAllMembersInGroup(dest_group, msg, sender);
777             }
778         }
779         else {
780             // send to destination address
781
AddressEntry ae = findAddressEntry(dest);
782             if (ae == null) {
783                 if(log.isErrorEnabled()) log.error("cannot find address "+dest+" in the routing table");
784                 return;
785             }
786             if (ae.output==null) {
787                 if(log.isErrorEnabled()) log.error("address "+dest+" is associated with a null output stream");
788                 return;
789             }
790             try {
791                 sendToMember(ae.output, msg);
792             }
793             catch(Exception JavaDoc e) {
794                 if(log.isErrorEnabled()) log.error("failed sending message to "+dest+": "+e.getMessage());
795                 removeEntry(ae.sock); // will close socket
796
}
797         }
798     }
799
800
801     /**
802      * Adds a new member to the routing group.
803      **/

804     private void addEntry(String JavaDoc groupname, AddressEntry e) {
805         List val;
806
807         if(groupname == null) {
808             if(log.isErrorEnabled()) log.error("groupname was null, not added !");
809             return;
810         }
811
812         synchronized(routingTable) {
813             val=(List)routingTable.get(groupname);
814             if(val == null) {
815                 val=Collections.synchronizedList(new ArrayList());
816                 routingTable.put(groupname, val);
817             }
818             int index = val.indexOf(e);
819             if (index==-1) {
820                 val.add(e);
821                 return;
822             }
823             // new connection for an existing member
824
((AddressEntry)val.remove(index)).destroy();
825             val.add(e);
826         }
827     }
828
829
830     private void removeEntry(Socket JavaDoc sock) {
831
832         List val;
833         AddressEntry entry;
834         synchronized(routingTable) {
835             for(Enumeration e=routingTable.keys(); e.hasMoreElements();) {
836                 val=(List)routingTable.get(e.nextElement());
837                 for(Iterator i=val.iterator(); i.hasNext();) {
838                     entry=(AddressEntry)i.next();
839                     if(entry.sock == sock) {
840                         entry.destroy();
841                         //Util.print("Removing entry " + entry);
842
i.remove();
843                         return;
844                     }
845                 }
846             }
847         }
848     }
849
850     /**
851      * @return null if not found
852      **/

853     private AddressEntry findAddressEntry(Address addr) {
854         
855         List val;
856         AddressEntry entry;
857         synchronized(routingTable) {
858             for(Enumeration e=routingTable.keys(); e.hasMoreElements();) {
859                 val=(List)routingTable.get(e.nextElement());
860                 for(Iterator i=val.iterator(); i.hasNext();) {
861                     entry=(AddressEntry)i.next();
862                     if(addr.equals(entry.addr)) {
863                         return entry;
864                     }
865                 }
866             }
867             return null;
868         }
869     }
870
871
872
873
874     private void sendToAllMembersInGroup(String JavaDoc groupname, byte[] msg, Address sender) {
875         List val;
876         val=(List)routingTable.get(groupname);
877         if(val == null || val.size() == 0) {
878             return;
879         }
880
881         synchronized(val) {
882             for(Iterator i=val.iterator(); i.hasNext();) {
883                 AddressEntry ae = (AddressEntry)i.next();
884                 if(ae.addr != null && ae.addr.equals(sender)) {
885                     // if(log.isTraceEnabled())
886
// log.trace("dropped message to sender of multicast (" + ae.addr + ")");
887
continue;
888                 }
889                 DataOutputStream dos = ae.output;
890
891                 if (dos!=null) {
892                     // send only to 'connected' members
893
try {
894                         sendToMember(dos, msg);
895                     }
896                     catch(Exception JavaDoc e) {
897                         if(log.isWarnEnabled()) log.warn("cannot send to "+ae.addr+": "+e.getMessage());
898                         ae.destroy(); // this closes the socket
899
i.remove();
900                     }
901                 }
902             }
903         }
904     }
905
906
907     /**
908      * @exception IOException
909      **/

910     private void sendToMember(DataOutputStream out, byte[] msg) throws IOException {
911         if (out==null) {
912             return;
913         }
914
915         synchronized(out) {
916             out.writeInt(msg.length);
917             out.write(msg, 0, msg.length);
918         }
919     }
920
921
922
923     /**
924      * Class used to store Addresses in both routing and gossip tables.
925      * If it is used for routing, sock and output have valid values, otherwise
926      * they're null and only the timestamp counts.
927      **/

928     class AddressEntry {
929         Address addr=null;
930         Socket JavaDoc sock=null;
931         DataOutputStream output=null;
932         long timestamp=0;
933         final SocketThread thread;
934
935         /**
936          * AddressEntry for a 'gossip' membership.
937          **/

938         public AddressEntry(Address addr) {
939             this(addr, null, null, null);
940         }
941
942         public AddressEntry(Address addr, Socket JavaDoc sock, SocketThread thread, DataOutputStream output) {
943             this.addr=addr;
944             this.sock=sock;
945             this.thread = thread;
946             this.output=output;
947             this.timestamp = System.currentTimeMillis();
948         }
949
950         void destroy() {
951             if (thread != null) {
952                 thread.finish();
953             }
954             if(output != null) {
955                 try {
956                     output.close();
957                 }
958                 catch(Exception JavaDoc e) {
959                 }
960                 output=null;
961             }
962             if(sock != null) {
963                 try {
964                     sock.close();
965                 }
966                 catch(Exception JavaDoc e) {
967                 }
968                 sock=null;
969             }
970             timestamp = 0;
971         }
972
973         public void update() {
974             timestamp = System.currentTimeMillis();
975         }
976
977         public boolean equals(Object JavaDoc other) {
978             return addr.equals(((AddressEntry)other).addr);
979         }
980
981         public String JavaDoc toString() {
982             StringBuffer JavaDoc sb = new StringBuffer JavaDoc("addr=");
983             sb.append(addr);
984             if (sock==null) {
985                 sb.append(", timestamp=");
986                 sb.append(timestamp);
987             }
988             else {
989                 sb.append(", sock=");
990                 sb.append(sock);
991             }
992             return sb.toString();
993         }
994     }
995
996
997     private static int threadCounter = 0;
998
999
1000    /** A SocketThread manages one connection to a client. Its main task is message routing. */
1001    class SocketThread extends Thread JavaDoc {
1002        private volatile boolean active = true;
1003        Socket JavaDoc sock=null;
1004        DataInputStream input=null;
1005        Address addr=null;
1006
1007        public SocketThread(Socket JavaDoc sock, DataInputStream ois, Address addr) {
1008            super("SocketThread "+(threadCounter++));
1009            this.sock=sock;
1010            input=ois;
1011            this.addr=addr;
1012        }
1013
1014        void closeSocket() {
1015            try {
1016                if(input != null)
1017                    input.close();
1018                if(sock != null)
1019                    sock.close();
1020            }
1021            catch(Exception JavaDoc e) {
1022            }
1023        }
1024
1025        void finish() {
1026            if(log.isTraceEnabled()) log.trace("terminating the SocketThread for "+sock);
1027            active = false;
1028        }
1029
1030
1031        public void run() {
1032            byte[] buf;
1033            int len;
1034            Address dst_addr=null;
1035            String JavaDoc gname;
1036
1037            while(active) {
1038                try {
1039                    // 1. Group name is first
1040
gname=input.readUTF();
1041
1042                    // 2. Second is the destination address
1043
dst_addr=Util.readAddress(input);
1044
1045                    if (log.isTraceEnabled()) {
1046                        log.trace("group " + gname + ", routing request to " + (dst_addr == null ? "all" : dst_addr.toString()));
1047                    }
1048
1049                    // 3. Then the length of the byte buffer representing the message
1050
len=input.readInt();
1051                    if(len == 0) {
1052                        if(log.isWarnEnabled()) log.warn("received null message");
1053                        continue;
1054                    }
1055
1056                    // 4. Finally the message itself, as a byte buffer
1057
buf=new byte[len];
1058                    input.readFully(buf, 0, buf.length); // message
1059

1060                    // Then route the message to everyone else except me
1061
route(dst_addr, gname, buf, addr);
1062                }
1063                catch(EOFException io_ex) {
1064                    if(log.isTraceEnabled())
1065                        log.trace("client " +sock.getInetAddress().getHostName() + ':' + sock.getPort() +
1066                                  " closed connection; removing it from routing table");
1067                    removeEntry(sock); // will close socket
1068
return;
1069                }
1070                catch(Exception JavaDoc e) {
1071                    if(log.isErrorEnabled()) log.error("exception=" + e);
1072                    break;
1073                }
1074            }
1075            closeSocket();
1076        }
1077
1078    }
1079
1080
1081    public static void main(String JavaDoc[] args) throws Exception JavaDoc {
1082        String JavaDoc arg;
1083        int port=8080;
1084        long expiry = GossipRouter.EXPIRY_TIME;
1085        long timeout = GossipRouter.GOSSIP_REQUEST_TIMEOUT;
1086        long routingTimeout = GossipRouter.ROUTING_CLIENT_REPLY_TIMEOUT;
1087        GossipRouter router=null;
1088        String JavaDoc address=null;
1089
1090        for(int i=0; i < args.length; i++) {
1091            arg=args[i];
1092            if("-help".equals(arg)) {
1093                System.out.println();
1094                System.out.println("GossipRouter [-port <port>] [-bindaddress <address>] [options]");
1095                System.out.println("Options: ");
1096                System.out.println(" -expiry <msecs> - Time until a gossip cache entry expires.");
1097                System.out.println(" -timeout <msecs> - Number of millisecs the router waits to receive");
1098                System.out.println(" a gossip request after connection was established;");
1099                System.out.println(" upon expiration, the router initiates the routing");
1100                System.out.println(" protocol on the connection.");
1101                return;
1102            }
1103            else if("-port".equals(arg)) {
1104                    port=Integer.parseInt(args[++i]);
1105            }
1106            else if("-bindaddress".equals(arg)) {
1107                address=args[++i];
1108            }
1109            else if("-expiry".equals(arg)) {
1110                expiry=Long.parseLong(args[++i]);
1111            }
1112            else if("-timeout".equals(arg)) {
1113                timeout=Long.parseLong(args[++i]);
1114            }
1115            else if("-rtimeout".equals(arg)) {
1116                routingTimeout=Long.parseLong(args[++i]);
1117            }
1118        }
1119        System.out.println("GossipRouter is starting...");
1120
1121        try {
1122            router= new GossipRouter(port, address, expiry, timeout, routingTimeout);
1123            router.start();
1124        }
1125        catch(Exception JavaDoc e) {
1126            System.err.println(e);
1127        }
1128    }
1129
1130
1131}
1132
Popular Tags