1 3 package org.jgroups.stack; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Address; 9 import org.jgroups.util.List; 10 import org.jgroups.util.Util; 11 12 import java.io.DataInputStream ; 13 import java.io.DataOutputStream ; 14 import java.io.IOException ; 15 import java.io.OutputStream ; 16 import java.net.InetAddress ; 17 import java.net.ServerSocket ; 18 import java.net.Socket ; 19 import java.util.Date ; 20 import java.util.Enumeration ; 21 import java.util.Hashtable ; 22 import java.util.Iterator ; 23 24 25 26 27 43 public class Router { 44 final Hashtable groups=new Hashtable (); int port=8080; 46 ServerSocket srv_sock=null; 47 InetAddress bind_address; 48 protected final Log log=LogFactory.getLog(getClass()); 49 50 public static final int GET=-10; 51 public static final int REGISTER=-11; 52 public static final int DUMP=-21; 53 54 55 public Router(int port) throws Exception { 56 this.port=port; 57 srv_sock=new ServerSocket (port, 50); } 59 60 public Router(int port, InetAddress bind_address) throws Exception { 61 this.port=port; 62 this.bind_address=bind_address; 63 srv_sock=new ServerSocket (port, 50, bind_address); } 65 66 67 public void start() { 68 Socket sock; 69 DataInputStream input; 70 DataOutputStream output; 71 Address peer_addr; 72 byte[] buf; 73 int len, type; 74 String gname; 75 Date d; 76 77 if(bind_address == null) bind_address=srv_sock.getInetAddress(); 78 d=new Date (); 79 { 80 if(log.isInfoEnabled()) log.info("Router started at " + d); 81 if(log.isInfoEnabled()) log.info("Listening on port " + port + " bound on address " + bind_address + '\n'); 82 } 83 d=null; 84 85 while(true) { 86 try { 87 sock=srv_sock.accept(); 88 sock.setSoLinger(true, 500); 89 peer_addr=new org.jgroups.stack.IpAddress(sock.getInetAddress(), sock.getPort()); 90 output=new DataOutputStream (sock.getOutputStream()); 91 92 buf=Util.objectToByteBuffer(peer_addr); 94 output.writeInt(buf.length); 95 output.write(buf, 0, buf.length); 96 97 98 103 input=new DataInputStream (sock.getInputStream()); 104 type=input.readInt(); 105 gname=input.readUTF(); 106 107 switch(type) { 108 case Router.GET: 109 processGetRequest(sock, output, gname); break; 111 case Router.DUMP: 112 processDumpRequest(peer_addr, sock, output); break; 114 case Router.REGISTER: 115 Address addr; 116 len=input.readInt(); 117 buf=new byte[len]; 118 input.readFully(buf, 0, buf.length); addr=(Address)Util.objectFromByteBuffer(buf); 120 addEntry(gname, new AddressEntry(addr, sock, output)); 121 new SocketThread(sock, input).start(); 122 break; 123 default: 124 if(log.isErrorEnabled()) log.error("request of type " + type + " not recognized"); 125 continue; 126 } 127 } 128 catch(Exception e) { 129 if(log.isErrorEnabled()) log.error("exception=" + e); 130 continue; 131 } 132 } 133 } 134 135 136 public void stop() { 137 138 } 139 140 143 void processGetRequest(Socket sock, DataOutputStream output, String groupname) { 144 List grpmbrs=(List)groups.get(groupname), ret=null; 145 AddressEntry entry; 146 byte[] buf; 147 148 if(log.isTraceEnabled()) log.trace("groupname=" + groupname + ", result=" + grpmbrs); 149 150 if(grpmbrs != null && grpmbrs.size() > 0) { 151 ret=new List(); 152 for(Enumeration e=grpmbrs.elements(); e.hasMoreElements();) { 153 entry=(AddressEntry)e.nextElement(); 154 ret.add(entry.addr); 155 } 156 } 157 try { 158 if(ret == null || ret.size() == 0) { 159 output.writeInt(0); 160 } 161 else { 162 buf=Util.objectToByteBuffer(ret); 163 output.writeInt(buf.length); 164 output.write(buf, 0, buf.length); 165 } 166 } 167 catch(Exception e) { 168 if(log.isErrorEnabled()) log.error("exception=" + e); 169 } 170 finally { 171 try { 172 if(output != null) 173 output.close(); 174 sock.close(); 175 } 176 catch(Exception e) { 177 } 178 } 179 } 180 181 182 185 void processDumpRequest(Address peerAddress, Socket sock, DataOutputStream output) { 186 187 if(log.isTraceEnabled()) log.trace("request from " + peerAddress); 188 189 StringBuffer sb=new StringBuffer (); 190 synchronized(groups) { 191 if(groups.size() == 0) { 192 sb.append("empty routing table"); 193 } 194 else { 195 for(Iterator i=groups.keySet().iterator(); i.hasNext();) { 196 String gname=(String )i.next(); 197 sb.append("GROUP: '" + gname + "'\n"); 198 List l=(List)groups.get(gname); 199 if(l == null) { 200 sb.append("\tnull list of addresses\n"); 201 } 202 else 203 if(l.size() == 0) { 204 sb.append("\tempty list of addresses\n"); 205 } 206 else { 207 for(Enumeration e=l.elements(); e.hasMoreElements();) { 208 AddressEntry ae=(AddressEntry)e.nextElement(); 209 sb.append('\t'); 210 sb.append(ae.toString()); 211 sb.append('\n'); 212 } 213 } 214 } 215 } 216 } 217 try { 218 output.writeUTF(sb.toString()); 219 } 220 catch(Exception e) { 221 if(log.isErrorEnabled()) log.error("Error sending the answer back to the client: " + e); 222 } 223 finally { 224 try { 225 if(output != null) { 226 output.close(); 227 } 228 } 229 catch(Exception e) { 230 if(log.isErrorEnabled()) log.error("Error closing the output stream: " + e); 231 } 232 try { 233 sock.close(); 234 } 235 catch(Exception e) { 236 if(log.isErrorEnabled()) log.error("Error closing the socket: " + e); 237 } 238 } 239 } 240 241 synchronized void route(Address dest, String dest_group, byte[] msg) { 242 243 if(dest == null) { if(dest_group == null) { 245 if(log.isErrorEnabled()) log.error("both dest address and group are null"); 246 return; 247 } 248 else { 249 sendToAllMembersInGroup(dest_group, msg); 250 } 251 } 252 else { DataOutputStream out=findSocket(dest); 254 if(out != null) 255 sendToMember(out, msg); 256 else 257 if(log.isErrorEnabled()) log.error("routing of message to " + dest + " failed; outstream is null !"); 258 } 259 } 260 261 262 void addEntry(String groupname, AddressEntry e) { 263 List val; 264 AddressEntry old_entry; 265 266 268 if(groupname == null) { 269 if(log.isErrorEnabled()) log.error("groupname was null, not added !"); 270 return; 271 } 272 273 synchronized(groups) { 274 val=(List)groups.get(groupname); 275 276 if(val == null) { 277 val=new List(); 278 groups.put(groupname, val); 279 } 280 if(val.contains(e)) { 281 old_entry=(AddressEntry)val.removeElement(e); 282 if(old_entry != null) 283 old_entry.destroy(); 284 } 285 val.add(e); 286 } 287 } 288 289 290 void removeEntry(Socket sock) { 291 List val; 292 AddressEntry entry; 293 294 synchronized(groups) { 295 for(Enumeration e=groups.keys(); e.hasMoreElements();) { 296 val=(List)groups.get(e.nextElement()); 297 298 for(Enumeration e2=val.elements(); e2.hasMoreElements();) { 299 entry=(AddressEntry)e2.nextElement(); 300 if(entry.sock == sock) { 301 try { 302 entry.sock.close(); 303 } 304 catch(Exception ex) { 305 } 306 val.removeElement(entry); 308 return; 309 } 310 } 311 } 312 } 313 } 314 315 316 void removeEntry(OutputStream out) { 317 List val; 318 AddressEntry entry; 319 320 synchronized(groups) { 321 for(Enumeration e=groups.keys(); e.hasMoreElements();) { 322 val=(List)groups.get(e.nextElement()); 323 324 for(Enumeration e2=val.elements(); e2.hasMoreElements();) { 325 entry=(AddressEntry)e2.nextElement(); 326 if(entry.output == out) { 327 try { 328 if(entry.sock != null) 329 entry.sock.close(); 330 } 331 catch(Exception ex) { 332 } 333 val.removeElement(entry); 335 return; 336 } 337 } 338 } 339 } 340 } 341 342 343 void removeEntry(String groupname, Address addr) { 344 List val; 345 AddressEntry entry; 346 347 348 synchronized(groups) { 349 val=(List)groups.get(groupname); 350 if(val == null || val.size() == 0) 351 return; 352 for(Enumeration e2=val.elements(); e2.hasMoreElements();) { 353 entry=(AddressEntry)e2.nextElement(); 354 if(entry.addr.equals(addr)) { 355 try { 356 if(entry.sock != null) 357 entry.sock.close(); 358 } 359 catch(Exception ex) { 360 } 361 val.removeElement(entry); 363 return; 364 } 365 } 366 } 367 } 368 369 370 DataOutputStream findSocket(Address addr) { 371 List val; 372 AddressEntry entry; 373 374 synchronized(groups) { 375 for(Enumeration e=groups.keys(); e.hasMoreElements();) { 376 val=(List)groups.get(e.nextElement()); 377 for(Enumeration e2=val.elements(); e2.hasMoreElements();) { 378 entry=(AddressEntry)e2.nextElement(); 379 if(addr.equals(entry.addr)) 380 return entry.output; 381 } 382 } 383 return null; 384 } 385 } 386 387 388 void sendToAllMembersInGroup(String groupname, byte[] msg) { 389 List val; 390 391 synchronized(groups) { 392 val=(List)groups.get(groupname); 393 if(val == null || val.size() == 0) 394 return; 395 for(Enumeration e=val.elements(); e.hasMoreElements();) { 396 sendToMember(((AddressEntry)e.nextElement()).output, msg); 397 } 398 } 399 } 400 401 402 void sendToMember(DataOutputStream out, byte[] msg) { 403 try { 404 if(out != null) { 405 out.writeInt(msg.length); 406 out.write(msg, 0, msg.length); 407 } 408 } 409 catch(Exception e) { 410 if(log.isErrorEnabled()) log.error("exception=" + e); 411 removeEntry(out); } 413 } 414 415 416 class AddressEntry { 417 Address addr=null; 418 Socket sock=null; 419 DataOutputStream output=null; 420 421 422 public AddressEntry(Address addr, Socket sock, DataOutputStream output) { 423 this.addr=addr; 424 this.sock=sock; 425 this.output=output; 426 } 427 428 429 void destroy() { 430 if(output != null) { 431 try { 432 output.close(); 433 } 434 catch(Exception e) { 435 } 436 output=null; 437 } 438 if(sock != null) { 439 try { 440 sock.close(); 441 } 442 catch(Exception e) { 443 } 444 sock=null; 445 } 446 } 447 448 public boolean equals(Object other) { 449 return addr.equals(((AddressEntry)other).addr); 450 } 451 452 public String toString() { 453 return "addr=" + addr + ", sock=" + sock; 454 } 455 } 456 457 458 459 class SocketThread extends Thread { 460 Socket sock=null; 461 DataInputStream input=null; 462 463 464 public SocketThread(Socket sock, DataInputStream ois) { 465 this.sock=sock; 466 input=ois; 467 } 468 469 void closeSocket() { 470 try { 471 if(input != null) 472 input.close(); 473 if(sock != null) 474 sock.close(); 475 } 476 catch(Exception e) { 477 } 478 } 479 480 481 public void run() { 482 byte[] buf; 483 int len; 484 Address dst_addr=null; 485 String gname; 486 487 while(true) { 488 try { 489 gname=input.readUTF(); len=input.readInt(); 491 if(len == 0) 492 dst_addr=null; 493 else { 494 buf=new byte[len]; 495 input.readFully(buf, 0, buf.length); dst_addr=(Address)Util.objectFromByteBuffer(buf); 497 } 498 499 len=input.readInt(); 500 if(len == 0) { 501 if(log.isWarnEnabled()) log.warn("received null message"); 502 continue; 503 } 504 buf=new byte[len]; 505 input.readFully(buf, 0, buf.length); route(dst_addr, gname, buf); 507 } 508 catch(IOException io_ex) { 509 510 if(log.isInfoEnabled()) log.info("client " + 511 sock.getInetAddress().getHostName() + ':' + sock.getPort() + 512 " closed connection; removing it from routing table"); 513 removeEntry(sock); return; 515 } 516 catch(Exception e) { 517 if(log.isErrorEnabled()) log.error("exception=" + e); 518 break; 519 } 520 } 521 closeSocket(); 522 } 523 524 } 525 526 527 public static void main(String [] args) throws Exception { 528 String arg; 529 int port=8080; 530 Router router=null; 531 InetAddress address=null; 532 System.out.println("Router is starting..."); 533 for(int i=0; i < args.length; i++) { 534 arg=args[i]; 535 if("-help".equals(arg)) { 536 System.out.println("Router [-port <port>] [-bindaddress <address>]"); 537 return; 538 } 539 else 540 if("-port".equals(arg)) { 541 port=Integer.parseInt(args[++i]); 542 } 543 else 544 if("-bindaddress".equals(arg)) { 545 address=InetAddress.getByName(args[++i]); 546 } 547 548 } 549 550 551 552 try { 553 if(address == null) router=new Router(port); else router=new Router(port, address); 554 router.start(); 555 System.out.println("Router was created at " + new Date ()); 556 System.out.println("Listening on port " + port + " and bound to address " + address); 557 } 558 catch(Exception e) { 559 System.err.println(e); 560 } 561 } 562 563 564 } 565 | Popular Tags |