KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > Router


1 // $Id: Router.java,v 1.5 2004/09/23 16:29:53 belaban Exp $
2

3 package org.jgroups.stack;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.Address;
9 import org.jgroups.util.List;
10 import org.jgroups.util.Util;
11
12 import java.io.DataInputStream JavaDoc;
13 import java.io.DataOutputStream JavaDoc;
14 import java.io.IOException JavaDoc;
15 import java.io.OutputStream JavaDoc;
16 import java.net.InetAddress JavaDoc;
17 import java.net.ServerSocket JavaDoc;
18 import java.net.Socket JavaDoc;
19 import java.util.Date JavaDoc;
20 import java.util.Enumeration JavaDoc;
21 import java.util.Hashtable JavaDoc;
22 import java.util.Iterator JavaDoc;
23
24
25
26
27 /**
28  * Router for TCP based group comunication (using layer TCP instead of UDP). Instead of the TCP
29  * layer sending packets point-to-point to each other member, it sends the packet to the router
30  * which - depending on the target address - multicasts or unicasts it to the group / or single
31  * member.<p>
32  * This class is especially interesting for applets which cannot directly make connections
33  * (neither UDP nor TCP) to a host different from the one they were loaded from. Therefore,
34  * an applet would create a normal channel plus protocol stack, but the bottom layer would have
35  * to be the TCP layer which sends all packets point-to-point (over a TCP connection) to the
36  * router, which in turn forwards them to their end location(s) (also over TCP). A centralized
37  * router would therefore have to be running on the host the applet was loaded from.<p>
38  * An alternative for running JGroups in an applet (IP multicast is not allows in applets as of
39  * 1.2), is to use point-to-point UDP communication via the gossip server. However, then the appplet
40  * has to be signed which involves additional administrative effort on the part of the user.
41  * @author Bela Ban
42  */

43 public class Router {
44     final Hashtable JavaDoc groups=new Hashtable JavaDoc(); // groupname - vector of AddressEntry's
45
int port=8080;
46     ServerSocket JavaDoc srv_sock=null;
47     InetAddress JavaDoc bind_address;
48     protected final Log log=LogFactory.getLog(getClass());
49
50     public static final int GET=-10;
51     public static final int REGISTER=-11;
52     public static final int DUMP=-21;
53
54
55     public Router(int port) throws Exception JavaDoc {
56         this.port=port;
57         srv_sock=new ServerSocket JavaDoc(port, 50); // backlog of 50 connections
58
}
59
60     public Router(int port, InetAddress JavaDoc bind_address) throws Exception JavaDoc {
61         this.port=port;
62         this.bind_address=bind_address;
63         srv_sock=new ServerSocket JavaDoc(port, 50, bind_address); // backlog of 50 connections
64
}
65
66
67     public void start() {
68         Socket JavaDoc sock;
69         DataInputStream JavaDoc input;
70         DataOutputStream JavaDoc output;
71         Address peer_addr;
72         byte[] buf;
73         int len, type;
74         String JavaDoc gname;
75         Date JavaDoc d;
76
77         if(bind_address == null) bind_address=srv_sock.getInetAddress();
78         d=new Date JavaDoc();
79          {
80             if(log.isInfoEnabled()) log.info("Router started at " + d);
81             if(log.isInfoEnabled()) log.info("Listening on port " + port + " bound on address " + bind_address + '\n');
82         }
83         d=null;
84
85         while(true) {
86             try {
87                 sock=srv_sock.accept();
88                 sock.setSoLinger(true, 500);
89                 peer_addr=new org.jgroups.stack.IpAddress(sock.getInetAddress(), sock.getPort());
90                 output=new DataOutputStream JavaDoc(sock.getOutputStream());
91
92                 // return the address of the peer so it can set it
93
buf=Util.objectToByteBuffer(peer_addr);
94                 output.writeInt(buf.length);
95                 output.write(buf, 0, buf.length);
96
97
98                 // We can have 2 kinds of messages at this point: GET requests or REGISTER requests.
99
// GET requests are processed right here, REGISTRATION requests cause the spawning of
100
// a separate thread handling it (long running thread as it will do the message routing
101
// on behalf of that client for the duration of the client's lifetime).
102

103                 input=new DataInputStream JavaDoc(sock.getInputStream());
104                 type=input.readInt();
105                 gname=input.readUTF();
106
107                 switch(type) {
108                     case Router.GET:
109                         processGetRequest(sock, output, gname); // closes sock after processing
110
break;
111                     case Router.DUMP:
112                         processDumpRequest(peer_addr, sock, output); // closes sock after processing
113
break;
114                     case Router.REGISTER:
115                         Address addr;
116                         len=input.readInt();
117                         buf=new byte[len];
118                         input.readFully(buf, 0, buf.length); // read Address
119
addr=(Address)Util.objectFromByteBuffer(buf);
120                         addEntry(gname, new AddressEntry(addr, sock, output));
121                         new SocketThread(sock, input).start();
122                         break;
123                     default:
124                         if(log.isErrorEnabled()) log.error("request of type " + type + " not recognized");
125                         continue;
126                 }
127             }
128             catch(Exception JavaDoc e) {
129                 if(log.isErrorEnabled()) log.error("exception=" + e);
130                 continue;
131             }
132         }
133     }
134
135
136     public void stop() {
137
138     }
139
140     /**
141      Gets the members of group 'groupname'. Returns them as a List of Addresses.
142      */

143     void processGetRequest(Socket JavaDoc sock, DataOutputStream JavaDoc output, String JavaDoc groupname) {
144         List grpmbrs=(List)groups.get(groupname), ret=null;
145         AddressEntry entry;
146         byte[] buf;
147
148         if(log.isTraceEnabled()) log.trace("groupname=" + groupname + ", result=" + grpmbrs);
149
150         if(grpmbrs != null && grpmbrs.size() > 0) {
151             ret=new List();
152             for(Enumeration JavaDoc e=grpmbrs.elements(); e.hasMoreElements();) {
153                 entry=(AddressEntry)e.nextElement();
154                 ret.add(entry.addr);
155             }
156         }
157         try {
158             if(ret == null || ret.size() == 0) {
159                 output.writeInt(0);
160             }
161             else {
162                 buf=Util.objectToByteBuffer(ret);
163                 output.writeInt(buf.length);
164                 output.write(buf, 0, buf.length);
165             }
166         }
167         catch(Exception JavaDoc e) {
168             if(log.isErrorEnabled()) log.error("exception=" + e);
169         }
170         finally {
171             try {
172                 if(output != null)
173                     output.close();
174                 sock.close();
175             }
176             catch(Exception JavaDoc e) {
177             }
178         }
179     }
180
181
182     /**
183      * Dumps the routing table on the wire, as String.
184      **/

185     void processDumpRequest(Address peerAddress, Socket JavaDoc sock, DataOutputStream JavaDoc output) {
186
187         if(log.isTraceEnabled()) log.trace("request from " + peerAddress);
188
189         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
190         synchronized(groups) {
191             if(groups.size() == 0) {
192                 sb.append("empty routing table");
193             }
194             else {
195                 for(Iterator JavaDoc i=groups.keySet().iterator(); i.hasNext();) {
196                     String JavaDoc gname=(String JavaDoc)i.next();
197                     sb.append("GROUP: '" + gname + "'\n");
198                     List l=(List)groups.get(gname);
199                     if(l == null) {
200                         sb.append("\tnull list of addresses\n");
201                     }
202                     else
203                         if(l.size() == 0) {
204                             sb.append("\tempty list of addresses\n");
205                         }
206                         else {
207                             for(Enumeration JavaDoc e=l.elements(); e.hasMoreElements();) {
208                                 AddressEntry ae=(AddressEntry)e.nextElement();
209                                 sb.append('\t');
210                                 sb.append(ae.toString());
211                                 sb.append('\n');
212                             }
213                         }
214                 }
215             }
216         }
217         try {
218             output.writeUTF(sb.toString());
219         }
220         catch(Exception JavaDoc e) {
221             if(log.isErrorEnabled()) log.error("Error sending the answer back to the client: " + e);
222         }
223         finally {
224             try {
225                 if(output != null) {
226                     output.close();
227                 }
228             }
229             catch(Exception JavaDoc e) {
230                 if(log.isErrorEnabled()) log.error("Error closing the output stream: " + e);
231             }
232             try {
233                 sock.close();
234             }
235             catch(Exception JavaDoc e) {
236                 if(log.isErrorEnabled()) log.error("Error closing the socket: " + e);
237             }
238         }
239     }
240
241     synchronized void route(Address dest, String JavaDoc dest_group, byte[] msg) {
242
243         if(dest == null) { // send to all members in group dest.getChannelName()
244
if(dest_group == null) {
245                 if(log.isErrorEnabled()) log.error("both dest address and group are null");
246                 return;
247             }
248             else {
249                 sendToAllMembersInGroup(dest_group, msg);
250             }
251         }
252         else { // send to destination address
253
DataOutputStream JavaDoc out=findSocket(dest);
254             if(out != null)
255                 sendToMember(out, msg);
256             else
257                 if(log.isErrorEnabled()) log.error("routing of message to " + dest + " failed; outstream is null !");
258         }
259     }
260
261
262     void addEntry(String JavaDoc groupname, AddressEntry e) {
263         List val;
264         AddressEntry old_entry;
265
266         // Util.print("addEntry(" + groupname + ", " + e + ")");
267

268         if(groupname == null) {
269             if(log.isErrorEnabled()) log.error("groupname was null, not added !");
270             return;
271         }
272
273         synchronized(groups) {
274             val=(List)groups.get(groupname);
275
276             if(val == null) {
277                 val=new List();
278                 groups.put(groupname, val);
279             }
280             if(val.contains(e)) {
281                 old_entry=(AddressEntry)val.removeElement(e);
282                 if(old_entry != null)
283                     old_entry.destroy();
284             }
285             val.add(e);
286         }
287     }
288
289
290     void removeEntry(Socket JavaDoc sock) {
291         List val;
292         AddressEntry entry;
293
294         synchronized(groups) {
295             for(Enumeration JavaDoc e=groups.keys(); e.hasMoreElements();) {
296                 val=(List)groups.get(e.nextElement());
297
298                 for(Enumeration JavaDoc e2=val.elements(); e2.hasMoreElements();) {
299                     entry=(AddressEntry)e2.nextElement();
300                     if(entry.sock == sock) {
301                         try {
302                             entry.sock.close();
303                         }
304                         catch(Exception JavaDoc ex) {
305                         }
306                         //Util.print("Removing entry " + entry);
307
val.removeElement(entry);
308                         return;
309                     }
310                 }
311             }
312         }
313     }
314
315
316     void removeEntry(OutputStream JavaDoc out) {
317         List val;
318         AddressEntry entry;
319
320         synchronized(groups) {
321             for(Enumeration JavaDoc e=groups.keys(); e.hasMoreElements();) {
322                 val=(List)groups.get(e.nextElement());
323
324                 for(Enumeration JavaDoc e2=val.elements(); e2.hasMoreElements();) {
325                     entry=(AddressEntry)e2.nextElement();
326                     if(entry.output == out) {
327                         try {
328                             if(entry.sock != null)
329                                 entry.sock.close();
330                         }
331                         catch(Exception JavaDoc ex) {
332                         }
333                         //Util.print("Removing entry " + entry);
334
val.removeElement(entry);
335                         return;
336                     }
337                 }
338             }
339         }
340     }
341
342
343     void removeEntry(String JavaDoc groupname, Address addr) {
344         List val;
345         AddressEntry entry;
346
347
348         synchronized(groups) {
349             val=(List)groups.get(groupname);
350             if(val == null || val.size() == 0)
351                 return;
352             for(Enumeration JavaDoc e2=val.elements(); e2.hasMoreElements();) {
353                 entry=(AddressEntry)e2.nextElement();
354                 if(entry.addr.equals(addr)) {
355                     try {
356                         if(entry.sock != null)
357                             entry.sock.close();
358                     }
359                     catch(Exception JavaDoc ex) {
360                     }
361                     //Util.print("Removing entry " + entry);
362
val.removeElement(entry);
363                     return;
364                 }
365             }
366         }
367     }
368
369
370     DataOutputStream JavaDoc findSocket(Address addr) {
371         List val;
372         AddressEntry entry;
373
374         synchronized(groups) {
375             for(Enumeration JavaDoc e=groups.keys(); e.hasMoreElements();) {
376                 val=(List)groups.get(e.nextElement());
377                 for(Enumeration JavaDoc e2=val.elements(); e2.hasMoreElements();) {
378                     entry=(AddressEntry)e2.nextElement();
379                     if(addr.equals(entry.addr))
380                         return entry.output;
381                 }
382             }
383             return null;
384         }
385     }
386
387
388     void sendToAllMembersInGroup(String JavaDoc groupname, byte[] msg) {
389         List val;
390
391         synchronized(groups) {
392             val=(List)groups.get(groupname);
393             if(val == null || val.size() == 0)
394                 return;
395             for(Enumeration JavaDoc e=val.elements(); e.hasMoreElements();) {
396                 sendToMember(((AddressEntry)e.nextElement()).output, msg);
397             }
398         }
399     }
400
401
402     void sendToMember(DataOutputStream JavaDoc out, byte[] msg) {
403         try {
404             if(out != null) {
405                 out.writeInt(msg.length);
406                 out.write(msg, 0, msg.length);
407             }
408         }
409         catch(Exception JavaDoc e) {
410             if(log.isErrorEnabled()) log.error("exception=" + e);
411             removeEntry(out); // closes socket
412
}
413     }
414
415
416     class AddressEntry {
417         Address addr=null;
418         Socket JavaDoc sock=null;
419         DataOutputStream JavaDoc output=null;
420
421
422         public AddressEntry(Address addr, Socket JavaDoc sock, DataOutputStream JavaDoc output) {
423             this.addr=addr;
424             this.sock=sock;
425             this.output=output;
426         }
427
428
429         void destroy() {
430             if(output != null) {
431                 try {
432                     output.close();
433                 }
434                 catch(Exception JavaDoc e) {
435                 }
436                 output=null;
437             }
438             if(sock != null) {
439                 try {
440                     sock.close();
441                 }
442                 catch(Exception JavaDoc e) {
443                 }
444                 sock=null;
445             }
446         }
447
448         public boolean equals(Object JavaDoc other) {
449             return addr.equals(((AddressEntry)other).addr);
450         }
451
452         public String JavaDoc toString() {
453             return "addr=" + addr + ", sock=" + sock;
454         }
455     }
456
457
458     /** A SocketThread manages one connection to a client. Its main task is message routing. */
459     class SocketThread extends Thread JavaDoc {
460         Socket JavaDoc sock=null;
461         DataInputStream JavaDoc input=null;
462
463
464         public SocketThread(Socket JavaDoc sock, DataInputStream JavaDoc ois) {
465             this.sock=sock;
466             input=ois;
467         }
468
469         void closeSocket() {
470             try {
471                 if(input != null)
472                     input.close();
473                 if(sock != null)
474                     sock.close();
475             }
476             catch(Exception JavaDoc e) {
477             }
478         }
479
480
481         public void run() {
482             byte[] buf;
483             int len;
484             Address dst_addr=null;
485             String JavaDoc gname;
486
487             while(true) {
488                 try {
489                     gname=input.readUTF(); // group name
490
len=input.readInt();
491                     if(len == 0)
492                         dst_addr=null;
493                     else {
494                         buf=new byte[len];
495                         input.readFully(buf, 0, buf.length); // dest address
496
dst_addr=(Address)Util.objectFromByteBuffer(buf);
497                     }
498
499                     len=input.readInt();
500                     if(len == 0) {
501                         if(log.isWarnEnabled()) log.warn("received null message");
502                         continue;
503                     }
504                     buf=new byte[len];
505                     input.readFully(buf, 0, buf.length); // message
506
route(dst_addr, gname, buf);
507                 }
508                 catch(IOException JavaDoc io_ex) {
509
510                         if(log.isInfoEnabled()) log.info("client " +
511                                                                 sock.getInetAddress().getHostName() + ':' + sock.getPort() +
512                                                                 " closed connection; removing it from routing table");
513                     removeEntry(sock); // will close socket
514
return;
515                 }
516                 catch(Exception JavaDoc e) {
517                     if(log.isErrorEnabled()) log.error("exception=" + e);
518                     break;
519                 }
520             }
521             closeSocket();
522         }
523
524     }
525
526
527     public static void main(String JavaDoc[] args) throws Exception JavaDoc {
528         String JavaDoc arg;
529         int port=8080;
530         Router router=null;
531         InetAddress JavaDoc address=null;
532         System.out.println("Router is starting...");
533         for(int i=0; i < args.length; i++) {
534             arg=args[i];
535             if("-help".equals(arg)) {
536                 System.out.println("Router [-port <port>] [-bindaddress <address>]");
537                 return;
538             }
539             else
540                 if("-port".equals(arg)) {
541                     port=Integer.parseInt(args[++i]);
542                 }
543                 else
544                     if("-bindaddress".equals(arg)) {
545                         address=InetAddress.getByName(args[++i]);
546                     }
547
548         }
549
550
551
552         try {
553             if(address == null) router=new Router(port); else router=new Router(port, address);
554             router.start();
555             System.out.println("Router was created at " + new Date JavaDoc());
556             System.out.println("Listening on port " + port + " and bound to address " + address);
557         }
558         catch(Exception JavaDoc e) {
559             System.err.println(e);
560         }
561     }
562
563
564 }
565
Popular Tags