1 package org.jgroups.protocols; 2 3 4 import org.jgroups.*; 5 import org.jgroups.Channel; 6 import org.jgroups.annotations.GuardedBy; 7 import org.jgroups.stack.IpAddress; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.*; 10 import org.jgroups.util.Queue; 11 12 import java.io.DataInputStream ; 13 import java.io.IOException ; 14 import java.io.DataOutputStream ; 15 import java.net.*; 16 import java.text.NumberFormat ; 17 import java.util.*; 18 import java.util.List ; 19 import java.util.concurrent.*; 20 import java.util.concurrent.locks.ReentrantLock ; 21 22 23 49 public abstract class TP extends Protocol { 50 51 52 protected Address local_addr=null; 53 54 55 protected String channel_name=null; 56 57 58 protected InetAddress bind_addr=null; 59 60 61 boolean use_local_host=false; 62 63 64 boolean receive_on_all_interfaces=false; 65 66 71 List <NetworkInterface> receive_interfaces=null; 72 73 75 boolean send_on_all_interfaces=false; 76 77 82 List <NetworkInterface> send_interfaces=null; 83 84 85 86 int bind_port=0; 87 int port_range=1; 89 90 final protected Vector<Address> members=new Vector<Address>(11); 91 92 protected View view=null; 93 94 95 final ExposedByteArrayInputStream in_stream=new ExposedByteArrayInputStream(new byte[]{'0'}); 96 final DataInputStream dis=new DataInputStream (in_stream); 97 98 99 103 boolean loopback=false; 104 105 106 108 protected boolean discard_incompatible_packets=false; 109 110 114 boolean use_incoming_packet_handler=true; 115 116 117 Queue incoming_packet_queue=null; 118 119 121 IncomingPacketHandler incoming_packet_handler=null; 122 123 124 125 Queue incoming_msg_queue=null; 126 127 IncomingMessageHandler incoming_msg_handler; 128 129 130 boolean use_concurrent_stack=true; 131 ThreadGroup pool_thread_group=new ThreadGroup (Util.getGlobalThreadGroup(), "Thread Pools"); 132 133 139 protected ThreadNamingPattern thread_naming_pattern=new ThreadNamingPattern("cl"); 140 141 142 143 144 Executor oob_thread_pool; 145 boolean oob_thread_pool_enabled=true; 146 int oob_thread_pool_min_threads=2; 147 int oob_thread_pool_max_threads=10; 148 149 long oob_thread_pool_keep_alive_time=30000; 150 151 long num_oob_msgs_received=0; 152 153 154 BlockingQueue<Runnable > oob_thread_pool_queue=null; 155 156 boolean oob_thread_pool_queue_enabled=true; 157 158 int oob_thread_pool_queue_max_size=500; 159 161 String oob_thread_pool_rejection_policy="Run"; 162 163 164 165 166 167 Executor thread_pool; 168 boolean thread_pool_enabled=true; 169 int thread_pool_min_threads=2; 170 int thread_pool_max_threads=10; 171 172 long thread_pool_keep_alive_time=30000; 173 174 long num_incoming_msgs_received=0; 175 176 177 BlockingQueue<Runnable > thread_pool_queue=null; 178 179 boolean thread_pool_queue_enabled=true; 180 181 int thread_pool_queue_max_size=500; 182 184 String thread_pool_rejection_policy="Run"; 185 186 187 189 byte[] additional_data=null; 190 191 193 int max_bundle_size=65535; 194 195 198 long max_bundle_timeout=20; 199 200 201 boolean enable_bundling=false; 202 203 private Bundler bundler=null; 204 205 protected TimeScheduler timer=null; 206 207 private DiagnosticsHandler diag_handler=null; 208 boolean enable_diagnostics=true; 209 String diagnostics_addr="224.0.0.75"; 210 int diagnostics_port=7500; 211 212 TpHeader header; 213 final String name=getName(); 214 215 static final byte LIST = 1; static final byte MULTICAST = 2; static final byte OOB = 4; 219 long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0; 220 221 static NumberFormat f; 222 private static final int INITIAL_BUFSIZE=1024; 223 224 static { 225 f=NumberFormat.getNumberInstance(); 226 f.setGroupingUsed(false); 227 f.setMaximumFractionDigits(2); 228 } 229 230 231 232 233 237 protected TP() { 238 } 239 240 243 public String toString() { 244 return name + "(local address: " + local_addr + ')'; 245 } 246 247 public void resetStats() { 248 num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=0; 249 num_oob_msgs_received=num_incoming_msgs_received=0; 250 } 251 252 public long getNumMessagesSent() {return num_msgs_sent;} 253 public long getNumMessagesReceived() {return num_msgs_received;} 254 public long getNumBytesSent() {return num_bytes_sent;} 255 public long getNumBytesReceived() {return num_bytes_received;} 256 public String getBindAddress() {return bind_addr != null? bind_addr.toString() : "null";} 257 public void setBindAddress(String bind_addr) throws UnknownHostException { 258 this.bind_addr=InetAddress.getByName(bind_addr); 259 } 260 261 public boolean getBindToAllInterfaces() {return receive_on_all_interfaces;} 262 public void setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag;} 263 264 public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;} 265 public java.util.List getReceiveInterfaces() {return receive_interfaces;} 266 public boolean isSendOnAllInterfaces() {return send_on_all_interfaces;} 267 public java.util.List getSendInterfaces() {return send_interfaces;} 268 public boolean isDiscardIncompatiblePackets() {return discard_incompatible_packets;} 269 public void setDiscardIncompatiblePackets(boolean flag) {discard_incompatible_packets=flag;} 270 public boolean isEnableBundling() {return enable_bundling;} 271 public void setEnableBundling(boolean flag) {enable_bundling=flag;} 272 public int getMaxBundleSize() {return max_bundle_size;} 273 public void setMaxBundleSize(int size) {max_bundle_size=size;} 274 public long getMaxBundleTimeout() {return max_bundle_timeout;} 275 public void setMaxBundleTimeout(long timeout) {max_bundle_timeout=timeout;} 276 public Address getLocalAddress() {return local_addr;} 277 public String getChannelName() {return channel_name;} 278 public boolean isLoopback() {return loopback;} 279 public void setLoopback(boolean b) {loopback=b;} 280 public boolean isUseIncomingPacketHandler() {return use_incoming_packet_handler;} 281 282 283 284 285 286 public int getOOBMinPoolSize() { 287 return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getCorePoolSize() : 0; 288 } 289 290 public void setOOBMinPoolSize(int size) { 291 if(oob_thread_pool instanceof ThreadPoolExecutor) 292 ((ThreadPoolExecutor)oob_thread_pool).setCorePoolSize(size); 293 } 294 295 public int getOOBMaxPoolSize() { 296 return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getMaximumPoolSize() : 0; 297 } 298 299 public void setOOBMaxPoolSize(int size) { 300 if(oob_thread_pool instanceof ThreadPoolExecutor) 301 ((ThreadPoolExecutor)oob_thread_pool).setMaximumPoolSize(size); 302 } 303 304 public int getOOBPoolSize() { 305 return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getPoolSize() : 0; 306 } 307 308 public long getOOBKeepAliveTime() { 309 return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getKeepAliveTime(TimeUnit.MILLISECONDS) : 0; 310 } 311 312 public void setOOBKeepAliveTime(long time) { 313 if(oob_thread_pool instanceof ThreadPoolExecutor) 314 ((ThreadPoolExecutor)oob_thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS); 315 } 316 317 public long getOOBMessages() { 318 return num_oob_msgs_received; 319 } 320 321 public int getOOBQueueSize() { 322 return oob_thread_pool_queue.size(); 323 } 324 325 public int getOOBMaxQueueSize() { 326 return oob_thread_pool_queue_max_size; 327 } 328 329 330 331 332 public int getIncomingMinPoolSize() { 333 return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getCorePoolSize() : 0; 334 } 335 336 public void setIncomingMinPoolSize(int size) { 337 if(thread_pool instanceof ThreadPoolExecutor) 338 ((ThreadPoolExecutor)thread_pool).setCorePoolSize(size); 339 } 340 341 public int getIncomingMaxPoolSize() { 342 return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getMaximumPoolSize() : 0; 343 } 344 345 public void setIncomingMaxPoolSize(int size) { 346 if(thread_pool instanceof ThreadPoolExecutor) 347 ((ThreadPoolExecutor)thread_pool).setMaximumPoolSize(size); 348 } 349 350 public int getIncomingPoolSize() { 351 return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getPoolSize() : 0; 352 } 353 354 public long getIncomingKeepAliveTime() { 355 return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getKeepAliveTime(TimeUnit.MILLISECONDS) : 0; 356 } 357 358 public void setIncomingKeepAliveTime(long time) { 359 if(thread_pool instanceof ThreadPoolExecutor) 360 ((ThreadPoolExecutor)thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS); 361 } 362 363 public long getIncomingMessages() { 364 return num_incoming_msgs_received; 365 } 366 367 public int getIncomingQueueSize() { 368 return thread_pool_queue.size(); 369 } 370 371 public int getIncomingMaxQueueSize() { 372 return thread_pool_queue_max_size; 373 } 374 375 376 377 378 379 380 public Map<String ,Object > dumpStats() { 381 Map<String ,Object > retval=super.dumpStats(); 382 if(retval == null) 383 retval=new HashMap<String ,Object >(); 384 retval.put("num_msgs_sent", new Long (num_msgs_sent)); 385 retval.put("num_msgs_received", new Long (num_msgs_received)); 386 retval.put("num_bytes_sent", new Long (num_bytes_sent)); 387 retval.put("num_bytes_received", new Long (num_bytes_received)); 388 return retval; 389 } 390 391 392 400 public abstract void sendToAllMembers(byte[] data, int offset, int length) throws Exception ; 401 402 411 public abstract void sendToSingleMember(Address dest, byte[] data, int offset, int length) throws Exception ; 412 413 public abstract String getInfo(); 414 415 public abstract void postUnmarshalling(Message msg, Address dest, Address src, boolean multicast); 416 417 public abstract void postUnmarshallingList(Message msg, Address dest, boolean multicast); 418 419 420 private StringBuffer _getInfo() { 421 StringBuffer sb=new StringBuffer (); 422 sb.append(local_addr).append(" (").append(channel_name).append(") ").append("\n"); 423 sb.append("local_addr=").append(local_addr).append("\n"); 424 sb.append("group_name=").append(channel_name).append("\n"); 425 sb.append("version=").append(Version.description).append(", cvs=\"").append(Version.cvs).append("\"\n"); 426 sb.append("view: ").append(view).append('\n'); 427 sb.append(getInfo()); 428 return sb; 429 } 430 431 432 private void handleDiagnosticProbe(SocketAddress sender, DatagramSocket sock, String request) { 433 try { 434 StringTokenizer tok=new StringTokenizer(request); 435 String req=tok.nextToken(); 436 StringBuffer info=new StringBuffer ("n/a"); 437 if(req.trim().toLowerCase().startsWith("query")) { 438 ArrayList<String > l=new ArrayList<String >(tok.countTokens()); 439 while(tok.hasMoreTokens()) 440 l.add(tok.nextToken().trim().toLowerCase()); 441 442 info=_getInfo(); 443 444 if(l.contains("jmx")) { 445 Channel ch=stack.getChannel(); 446 if(ch != null) { 447 Map m=ch.dumpStats(); 448 StringBuffer sb=new StringBuffer (); 449 sb.append("stats:\n"); 450 for(Iterator it=m.entrySet().iterator(); it.hasNext();) { 451 sb.append(it.next()).append("\n"); 452 } 453 info.append(sb); 454 } 455 } 456 if(l.contains("props")) { 457 String p=stack.printProtocolSpecAsXML(); 458 info.append("\nprops:\n").append(p); 459 } 460 } 461 462 463 byte[] diag_rsp=info.toString().getBytes(); 464 if(log.isDebugEnabled()) 465 log.debug("sending diag response to " + sender); 466 sendResponse(sock, sender, diag_rsp); 467 } 468 catch(Throwable t) { 469 if(log.isErrorEnabled()) 470 log.error("failed sending diag rsp to " + sender, t); 471 } 472 } 473 474 private static void sendResponse(DatagramSocket sock, SocketAddress sender, byte[] buf) throws IOException { 475 DatagramPacket p=new DatagramPacket(buf, 0, buf.length, sender); 476 sock.send(p); 477 } 478 479 480 481 482 483 484 485 486 public void init() throws Exception { 487 super.init(); 488 if(bind_addr != null) { 489 Map<String ,Object > m=new HashMap<String ,Object >(1); 490 m.put("bind_addr", bind_addr); 491 up_prot.up(new Event(Event.CONFIG, m)); 492 } 493 } 494 495 496 497 500 public void start() throws Exception { 501 timer=stack.timer; 502 if(timer == null) 503 throw new Exception ("timer is null"); 504 505 if(enable_diagnostics) { 506 diag_handler=new DiagnosticsHandler(); 507 diag_handler.start(); 508 } 509 510 if(use_incoming_packet_handler && !use_concurrent_stack) { 511 incoming_packet_queue=new Queue(); 512 incoming_packet_handler=new IncomingPacketHandler(); 513 incoming_packet_handler.start(); 514 } 515 516 517 if(oob_thread_pool_enabled) { if(oob_thread_pool_queue_enabled) 520 oob_thread_pool_queue=new LinkedBlockingQueue<Runnable >(oob_thread_pool_queue_max_size); 521 else 522 oob_thread_pool_queue=new SynchronousQueue<Runnable >(); 523 oob_thread_pool=createThreadPool(oob_thread_pool_min_threads, oob_thread_pool_max_threads, oob_thread_pool_keep_alive_time, 524 oob_thread_pool_rejection_policy, oob_thread_pool_queue, "OOB", "OOB Thread"); 525 } 526 else { oob_thread_pool=new DirectExecutor(); 528 } 529 530 531 if(thread_pool_enabled) { if(thread_pool_queue_enabled) 534 thread_pool_queue=new LinkedBlockingQueue<Runnable >(thread_pool_queue_max_size); 535 else 536 thread_pool_queue=new SynchronousQueue<Runnable >(); 537 thread_pool=createThreadPool(thread_pool_min_threads, thread_pool_max_threads, thread_pool_keep_alive_time, 538 thread_pool_rejection_policy, thread_pool_queue, "Incoming", "Incoming Thread"); 539 } 540 else { thread_pool=new DirectExecutor(); 542 } 543 544 545 if(loopback && !use_concurrent_stack) { 546 incoming_msg_queue=new Queue(); 547 incoming_msg_handler=new IncomingMessageHandler(); 548 incoming_msg_handler.start(); 549 } 550 551 if(enable_bundling) { 552 bundler=new Bundler(); 553 } 554 555 up_prot.up(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 556 } 557 558 559 public void stop() { 560 if(diag_handler != null) { 561 diag_handler.stop(); 562 diag_handler=null; 563 } 564 565 if(incoming_packet_handler != null) 567 incoming_packet_handler.stop(); 568 569 570 if(incoming_msg_handler != null) 572 incoming_msg_handler.stop(); 573 574 576 if(oob_thread_pool instanceof ThreadPoolExecutor) { 577 shutdownThreadPool((ThreadPoolExecutor)oob_thread_pool); 578 } 579 580 if(thread_pool instanceof ThreadPoolExecutor) { 581 shutdownThreadPool((ThreadPoolExecutor)thread_pool); 582 } 583 } 584 585 586 587 588 594 public boolean setProperties(Properties props) { 595 super.setProperties(props); 596 597 boolean ignore_systemprops=Util.isBindAddressPropertyIgnored(); 598 String str=Util.getProperty(new String []{Global.BIND_ADDR, Global.BIND_ADDR_OLD}, props, "bind_addr", 599 ignore_systemprops, null); 600 601 if(str != null) { 602 try { 603 bind_addr=InetAddress.getByName(str); 604 } 605 catch(UnknownHostException unknown) { 606 if(log.isFatalEnabled()) log.fatal("(bind_addr): host " + str + " not known"); 607 return false; 608 } 609 props.remove("bind_addr"); 610 } 611 612 str=props.getProperty("use_local_host"); 613 if(str != null) { 614 use_local_host=Boolean.parseBoolean(str); 615 props.remove("use_local_host"); 616 } 617 618 str=props.getProperty("bind_to_all_interfaces"); 619 if(str != null) { 620 receive_on_all_interfaces=Boolean.parseBoolean(str); 621 props.remove("bind_to_all_interfaces"); 622 log.warn("bind_to_all_interfaces has been deprecated; use receive_on_all_interfaces instead"); 623 } 624 625 str=props.getProperty("receive_on_all_interfaces"); 626 if(str != null) { 627 receive_on_all_interfaces=Boolean.parseBoolean(str); 628 props.remove("receive_on_all_interfaces"); 629 } 630 631 str=props.getProperty("receive_interfaces"); 632 if(str != null) { 633 try { 634 receive_interfaces=Util.parseInterfaceList(str); 635 props.remove("receive_interfaces"); 636 } 637 catch(Exception e) { 638 log.error("error determining interfaces (" + str + ")", e); 639 return false; 640 } 641 } 642 643 str=props.getProperty("send_on_all_interfaces"); 644 if(str != null) { 645 send_on_all_interfaces=Boolean.parseBoolean(str); 646 props.remove("send_on_all_interfaces"); 647 } 648 649 str=props.getProperty("send_interfaces"); 650 if(str != null) { 651 try { 652 send_interfaces=Util.parseInterfaceList(str); 653 props.remove("send_interfaces"); 654 } 655 catch(Exception e) { 656 log.error("error determining interfaces (" + str + ")", e); 657 return false; 658 } 659 } 660 661 str=props.getProperty("bind_port"); 662 if(str != null) { 663 bind_port=Integer.parseInt(str); 664 props.remove("bind_port"); 665 } 666 667 str=props.getProperty("port_range"); 668 if(str != null) { 669 port_range=Integer.parseInt(str); 670 props.remove("port_range"); 671 } 672 673 str=props.getProperty("loopback"); 674 if(str != null) { 675 loopback=Boolean.valueOf(str).booleanValue(); 676 props.remove("loopback"); 677 } 678 679 str=props.getProperty("discard_incompatible_packets"); 680 if(str != null) { 681 discard_incompatible_packets=Boolean.valueOf(str).booleanValue(); 682 props.remove("discard_incompatible_packets"); 683 } 684 685 str=props.getProperty("use_packet_handler"); 687 if(str != null) { 688 use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); 689 props.remove("use_packet_handler"); 690 if(log.isWarnEnabled()) log.warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead"); 691 } 692 693 str=props.getProperty("use_incoming_packet_handler"); 694 if(str != null) { 695 use_incoming_packet_handler=Boolean.valueOf(str).booleanValue(); 696 props.remove("use_incoming_packet_handler"); 697 } 698 699 str=props.getProperty("use_concurrent_stack"); 700 if(str != null) { 701 use_concurrent_stack=Boolean.valueOf(str).booleanValue(); 702 props.remove("use_concurrent_stack"); 703 } 704 705 str=props.getProperty("thread_naming_pattern"); 706 if(str != null) { 707 thread_naming_pattern=new ThreadNamingPattern(str); 708 props.remove("thread_naming_pattern"); 709 } 710 711 str=props.getProperty("oob_thread_pool.enabled"); 713 if(str != null) { 714 oob_thread_pool_enabled=Boolean.valueOf(str).booleanValue(); 715 props.remove("oob_thread_pool.enabled"); 716 } 717 718 str=props.getProperty("oob_thread_pool.min_threads"); 719 if(str != null) { 720 oob_thread_pool_min_threads=Integer.valueOf(str).intValue(); 721 props.remove("oob_thread_pool.min_threads"); 722 } 723 724 str=props.getProperty("oob_thread_pool.max_threads"); 725 if(str != null) { 726 oob_thread_pool_max_threads=Integer.valueOf(str).intValue(); 727 props.remove("oob_thread_pool.max_threads"); 728 } 729 730 str=props.getProperty("oob_thread_pool.keep_alive_time"); 731 if(str != null) { 732 oob_thread_pool_keep_alive_time=Long.valueOf(str).longValue(); 733 props.remove("oob_thread_pool.keep_alive_time"); 734 } 735 736 str=props.getProperty("oob_thread_pool.queue_enabled"); 737 if(str != null) { 738 oob_thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue(); 739 props.remove("oob_thread_pool.queue_enabled"); 740 } 741 742 str=props.getProperty("oob_thread_pool.queue_max_size"); 743 if(str != null) { 744 oob_thread_pool_queue_max_size=Integer.valueOf(str).intValue(); 745 props.remove("oob_thread_pool.queue_max_size"); 746 } 747 748 str=props.getProperty("oob_thread_pool.rejection_policy"); 749 if(str != null) { 750 str=str.toLowerCase().trim(); 751 oob_thread_pool_rejection_policy=str; 752 if(!(str.equals("run") || str.equals("abort")|| str.equals("discard")|| str.equals("discardoldest"))) { 753 log.error("rejection policy of " + str + " is unknown"); 754 return false; 755 } 756 props.remove("oob_thread_pool.rejection_policy"); 757 } 758 759 760 761 762 str=props.getProperty("thread_pool.enabled"); 764 if(str != null) { 765 thread_pool_enabled=Boolean.valueOf(str).booleanValue(); 766 props.remove("thread_pool.enabled"); 767 } 768 769 str=props.getProperty("thread_pool.min_threads"); 770 if(str != null) { 771 thread_pool_min_threads=Integer.valueOf(str).intValue(); 772 props.remove("thread_pool.min_threads"); 773 } 774 775 str=props.getProperty("thread_pool.max_threads"); 776 if(str != null) { 777 thread_pool_max_threads=Integer.valueOf(str).intValue(); 778 props.remove("thread_pool.max_threads"); 779 } 780 781 str=props.getProperty("thread_pool.keep_alive_time"); 782 if(str != null) { 783 thread_pool_keep_alive_time=Long.valueOf(str).longValue(); 784 props.remove("thread_pool.keep_alive_time"); 785 } 786 787 str=props.getProperty("thread_pool.queue_enabled"); 788 if(str != null) { 789 thread_pool_queue_enabled=Boolean.valueOf(str).booleanValue(); 790 props.remove("thread_pool.queue_enabled"); 791 } 792 793 str=props.getProperty("thread_pool.queue_max_size"); 794 if(str != null) { 795 thread_pool_queue_max_size=Integer.valueOf(str).intValue(); 796 props.remove("thread_pool.queue_max_size"); 797 } 798 799 str=props.getProperty("thread_pool.rejection_policy"); 800 if(str != null) { 801 str=str.toLowerCase().trim(); 802 thread_pool_rejection_policy=str; 803 if(!(str.equals("run") || str.equals("abort")|| str.equals("discard")|| str.equals("discardoldest"))) { 804 log.error("rejection policy of " + str + " is unknown"); 805 return false; 806 } 807 props.remove("thread_pool.rejection_policy"); 808 } 809 810 811 str=props.getProperty("use_outgoing_packet_handler"); 812 if(str != null) { 813 log.warn("Attribute \"use_outgoing_packet_handler\" has been deprecated and is ignored"); 814 props.remove("use_outgoing_packet_handler"); 815 } 816 817 str=props.getProperty("outgoing_queue_max_size"); 818 if(str != null) { 819 log.warn("Attribute \"use_outgoing_queue_max_size\" has been deprecated and is ignored"); 820 props.remove("outgoing_queue_max_size"); 821 } 822 823 str=props.getProperty("max_bundle_size"); 824 if(str != null) { 825 int bundle_size=Integer.parseInt(str); 826 if(bundle_size > max_bundle_size) { 827 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + 828 ") is greater than largest TP fragmentation size (" + max_bundle_size + ')'); 829 return false; 830 } 831 if(bundle_size <= 0) { 832 if(log.isErrorEnabled()) log.error("max_bundle_size (" + bundle_size + ") is <= 0"); 833 return false; 834 } 835 max_bundle_size=bundle_size; 836 props.remove("max_bundle_size"); 837 } 838 839 str=props.getProperty("max_bundle_timeout"); 840 if(str != null) { 841 max_bundle_timeout=Long.parseLong(str); 842 if(max_bundle_timeout <= 0) { 843 if(log.isErrorEnabled()) log.error("max_bundle_timeout of " + max_bundle_timeout + " is invalid"); 844 return false; 845 } 846 props.remove("max_bundle_timeout"); 847 } 848 849 str=props.getProperty("enable_bundling"); 850 if(str != null) { 851 enable_bundling=Boolean.valueOf(str).booleanValue(); 852 props.remove("enable_bundling"); 853 } 854 855 str=props.getProperty("enable_diagnostics"); 856 if(str != null) { 857 enable_diagnostics=Boolean.valueOf(str).booleanValue(); 858 props.remove("enable_diagnostics"); 859 } 860 861 str=props.getProperty("diagnostics_addr"); 862 if(str != null) { 863 diagnostics_addr=str; 864 props.remove("diagnostics_addr"); 865 } 866 867 str=props.getProperty("diagnostics_port"); 868 if(str != null) { 869 diagnostics_port=Integer.parseInt(str); 870 props.remove("diagnostics_port"); 871 } 872 873 return true; 874 } 875 876 877 878 879 883 public Object up(Event evt) { 884 switch(evt.getType()) { 885 case Event.CONFIG: 886 up_prot.up(evt); 887 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 888 handleConfigEvent((HashMap)evt.getArg()); 889 return null; 890 } 891 return up_prot.up(evt); 892 } 893 894 900 public Object down(Event evt) { 901 if(evt.getType() != Event.MSG) { return handleDownEvent(evt); 903 } 904 905 Message msg=(Message)evt.getArg(); 906 if(header != null) { 907 msg.putHeader(name, header); 910 } 911 912 setSourceAddress(msg); if(log.isTraceEnabled()) { 914 log.trace("sending msg to " + msg.getDest() + ", SRC=" + msg.getSrc() + ", headers are " + msg.printHeaders()); 915 } 916 917 Address dest=msg.getDest(); 921 boolean multicast=dest == null || dest.isMulticastAddress(); 922 if(loopback && (multicast || dest.equals(local_addr))) { 923 924 final Message copy=msg.copy(); 927 if(log.isTraceEnabled()) log.trace(new StringBuffer ("looping back message ").append(copy)); 928 930 Executor pool=msg.isFlagSet(Message.OOB)? oob_thread_pool : thread_pool; 932 pool.execute(new Runnable () { 933 public void run() { 934 up_prot.up(new Event(Event.MSG, copy)); 935 } 936 }); 937 938 if(!multicast) 939 return null; 940 } 941 942 try { 943 send(msg, dest, multicast); 944 } 945 catch(InterruptedException interruptedEx) { 946 Thread.currentThread().interrupt(); } 948 catch(Throwable e) { 949 if(log.isErrorEnabled()) { 950 String dst=msg.getDest() == null? "null" : msg.getDest().toString(); 951 log.error("failed sending message to " + dst + " (" + msg.size() + " bytes)", e); 952 } 953 } 954 return null; 955 } 956 957 958 959 960 961 962 963 964 965 966 972 private void setSourceAddress(Message msg) { 973 if(msg.getSrc() == null) 974 msg.setSrc(local_addr); 975 } 976 977 987 protected final void receive(Address dest, Address sender, byte[] data, int offset, int length) { 988 if(data == null) return; 989 990 if(log.isTraceEnabled()){ 991 boolean mcast=dest == null || dest.isMulticastAddress(); 992 StringBuilder sb=new StringBuilder ("received ("); 993 sb.append(mcast? "mcast) " : "ucast) ").append(length).append(" bytes from ").append(sender); 994 log.trace(sb.toString()); 995 } 996 997 try { 998 boolean oob=false; 1000 byte oob_flag=data[Global.SHORT_SIZE]; if((oob_flag & OOB) == OOB) 1002 oob=true; 1003 1004 if(use_concurrent_stack) { 1005 if(oob) { 1006 num_oob_msgs_received++; 1007 dispatchToThreadPool(oob_thread_pool, dest, sender, data, offset, length); 1008 } 1009 else { 1010 num_incoming_msgs_received++; 1011 dispatchToThreadPool(thread_pool, dest, sender, data, offset, length); 1012 } 1013 } 1014 else { 1015 if(use_incoming_packet_handler) { 1016 byte[] tmp=new byte[length]; 1017 System.arraycopy(data, offset, tmp, 0, length); 1018 incoming_packet_queue.add(new IncomingPacket(dest, sender, tmp, 0, length)); 1019 } 1020 else 1021 handleIncomingPacket(dest, sender, data, offset, length); 1022 } 1023 } 1024 catch(Throwable t) { 1025 if(log.isErrorEnabled()) 1026 log.error(new StringBuffer ("failed handling data from ").append(sender), t); 1027 } 1028 } 1029 1030 1031 1032 private void dispatchToThreadPool(Executor pool, Address dest, Address sender, byte[] data, int offset, int length) { 1033 if(pool instanceof DirectExecutor) { 1034 pool.execute(new IncomingPacket(dest, sender, data, offset, length)); 1036 } 1037 else { 1038 byte[] tmp=new byte[length]; 1039 System.arraycopy(data, offset, tmp, 0, length); 1040 pool.execute(new IncomingPacket(dest, sender, tmp, 0, length)); 1041 } 1042 } 1043 1044 1045 1051 private void handleIncomingPacket(Address dest, Address sender, byte[] data, int offset, int length) { 1052 Message msg=null; 1053 short version=0; 1054 boolean is_message_list, multicast; 1055 byte flags; 1056 List <Message> msgs; 1057 1058 try { 1059 synchronized(in_stream) { 1060 in_stream.setData(data, offset, length); 1061 try { 1062 version=dis.readShort(); 1063 } 1064 catch(IOException ex) { 1065 if(discard_incompatible_packets) 1066 return; 1067 throw ex; 1068 } 1069 if(Version.isBinaryCompatible(version) == false) { 1070 if(log.isWarnEnabled()) { 1071 StringBuffer sb=new StringBuffer (); 1072 sb.append("packet from ").append(sender).append(" has different version (").append(Version.print(version)); 1073 sb.append(") from ours (").append(Version.printVersion()).append("). "); 1074 if(discard_incompatible_packets) 1075 sb.append("Packet is discarded"); 1076 else 1077 sb.append("This may cause problems"); 1078 log.warn(sb); 1079 } 1080 if(discard_incompatible_packets) 1081 return; 1082 } 1083 1084 flags=dis.readByte(); 1085 is_message_list=(flags & LIST) == LIST; 1086 multicast=(flags & MULTICAST) == MULTICAST; 1087 1088 if(is_message_list) 1089 msgs=readMessageList(dis, dest, multicast); 1090 else { 1091 msg=readMessage(dis, dest, sender, multicast); 1092 msgs=new LinkedList<Message>(); 1093 msgs.add(msg); 1094 } 1095 } 1096 1097 Address src; 1098 for(Iterator it=msgs.iterator(); it.hasNext();) { 1099 msg=(Message)it.next(); 1100 src=msg.getSrc(); 1101 if(loopback) { 1102 if(multicast && src != null && local_addr.equals(src)) { it.remove(); 1104 } 1105 } 1106 else 1107 handleIncomingMessage(msg); 1108 } 1109 if(incoming_msg_queue != null && !msgs.isEmpty()) 1110 incoming_msg_queue.addAll(msgs); 1111 } 1112 catch(Throwable t) { 1113 if(log.isErrorEnabled()) 1114 log.error("failed unmarshalling message", t); 1115 } 1116 } 1117 1118 1119 1120 private void handleIncomingMessage(Message msg) { 1121 Event evt; 1122 TpHeader hdr; 1123 1124 if(stats) { 1125 num_msgs_received++; 1126 num_bytes_received+=msg.getLength(); 1127 } 1128 1129 evt=new Event(Event.MSG, msg); 1130 if(log.isTraceEnabled()) { 1131 StringBuffer sb=new StringBuffer ("message is ").append(msg).append(", headers are ").append(msg.printHeaders()); 1132 log.trace(sb); 1133 } 1134 1135 hdr=(TpHeader)msg.getHeader(name); if(hdr != null) { 1137 1138 1139 String ch_name=hdr.channel_name; 1140 1141 if(channel_name != null && !channel_name.equals(ch_name)) { 1144 if(log.isWarnEnabled()) 1145 log.warn(new StringBuffer ("discarded message from different group \"").append(ch_name). 1146 append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc())); 1147 return; 1148 } 1149 } 1150 else { 1151 if(log.isTraceEnabled()) 1152 log.trace(new StringBuffer ("message does not have a transport header, msg is ").append(msg). 1153 append(", headers are ").append(msg.printHeaders()).append(", will be discarded")); 1154 return; 1155 } 1156 up_prot.up(evt); 1157 } 1158 1159 1160 1161 private void send(Message msg, Address dest, boolean multicast) throws Exception { 1162 1163 if(enable_bundling && !msg.isFlagSet(Message.OOB)) { 1165 bundler.send(msg, dest); 1166 return; 1167 } 1168 1169 ExposedByteArrayOutputStream out_stream=null; 1170 ExposedDataOutputStream dos=null; 1171 Buffer buf; 1172 out_stream=new ExposedByteArrayOutputStream(INITIAL_BUFSIZE); 1173 dos=new ExposedDataOutputStream(out_stream); 1174 writeMessage(msg, dos, multicast); 1175 buf=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); 1176 doSend(buf, dest, multicast); 1177 } 1178 1179 1180 private void doSend(Buffer buf, Address dest, boolean multicast) throws Exception { 1181 if(stats) { 1182 num_msgs_sent++; 1183 num_bytes_sent+=buf.getLength(); 1184 } 1185 if(multicast) { 1186 sendToAllMembers(buf.getBuf(), buf.getOffset(), buf.getLength()); 1187 } 1188 else { 1189 sendToSingleMember(dest, buf.getBuf(), buf.getOffset(), buf.getLength()); 1190 } 1191 } 1192 1193 1194 1195 1201 private static void writeMessage(Message msg, DataOutputStream dos, boolean multicast) throws Exception { 1202 byte flags=0; 1203 dos.writeShort(Version.version); if(multicast) 1205 flags+=MULTICAST; 1206 if(msg.isFlagSet(Message.OOB)) 1207 flags+=OOB; 1208 dos.writeByte(flags); 1209 msg.writeTo(dos); 1210 } 1211 1212 private Message readMessage(DataInputStream instream, Address dest, Address sender, boolean multicast) throws Exception { 1213 Message msg=new Message(false); msg.readFrom(instream); 1215 postUnmarshalling(msg, dest, sender, multicast); return msg; 1217 } 1218 1219 1220 1221 private static void writeMessageList(List <Message> msgs, DataOutputStream dos, boolean multicast) throws Exception { 1222 Address src; 1223 byte flags=0; 1224 int len=msgs != null? msgs.size() : 0; 1225 boolean src_written=false; 1226 1227 dos.writeShort(Version.version); 1228 flags+=LIST; 1229 if(multicast) 1230 flags+=MULTICAST; 1231 dos.writeByte(flags); 1232 dos.writeInt(len); 1233 if(msgs != null) { 1234 for(Message msg: msgs) { 1235 src=msg.getSrc(); 1236 if(!src_written) { 1237 Util.writeAddress(src, dos); 1238 src_written=true; 1239 } 1240 msg.writeTo(dos); 1241 } 1242 } 1243 } 1244 1245 private List <Message> readMessageList(DataInputStream instream, Address dest, boolean multicast) throws Exception { 1246 List <Message> list=new LinkedList<Message>(); 1247 int len; 1248 Message msg; 1249 Address src; 1250 1251 len=instream.readInt(); 1252 src=Util.readAddress(instream); 1253 for(int i=0; i < len; i++) { 1254 msg=new Message(false); msg.readFrom(instream); 1256 postUnmarshallingList(msg, dest, multicast); 1257 msg.setSrc(src); 1258 list.add(msg); 1259 } 1260 return list; 1261 } 1262 1263 1264 1265 protected Object handleDownEvent(Event evt) { 1266 switch(evt.getType()) { 1267 1268 case Event.TMP_VIEW: 1269 case Event.VIEW_CHANGE: 1270 synchronized(members) { 1271 view=(View)evt.getArg(); 1272 members.clear(); 1273 Vector<Address> tmpvec=view.getMembers(); 1274 members.addAll(tmpvec); 1275 } 1276 break; 1277 1278 case Event.CONNECT: 1279 channel_name=(String )evt.getArg(); 1280 header=new TpHeader(channel_name); 1281 setThreadNames(); 1282 return null; 1283 1284 case Event.DISCONNECT: 1285 unsetThreadNames(); 1286 break; 1287 1288 case Event.CONFIG: 1289 if(log.isDebugEnabled()) log.debug("received CONFIG event: " + evt.getArg()); 1290 handleConfigEvent((HashMap)evt.getArg()); 1291 break; 1292 } 1293 return null; 1294 } 1295 1296 1297 1298 1299 protected void setThreadNames() { 1300 if(thread_naming_pattern != null){ 1301 if(incoming_packet_handler != null) 1302 thread_naming_pattern.renameThread(IncomingPacketHandler.THREAD_NAME, incoming_packet_handler.getThread()); 1303 if(incoming_msg_handler != null) 1304 thread_naming_pattern.renameThread(IncomingMessageHandler.THREAD_NAME, incoming_msg_handler.getThread()); 1305 if(diag_handler != null) 1306 thread_naming_pattern.renameThread(DiagnosticsHandler.THREAD_NAME, diag_handler.getThread()); 1307 } 1308 } 1309 1310 1311 protected void unsetThreadNames() { 1312 if(incoming_packet_handler != null && incoming_packet_handler.getThread() != null) 1313 incoming_packet_handler.getThread().setName(IncomingPacketHandler.THREAD_NAME); 1314 if(incoming_msg_handler != null && incoming_msg_handler.getThread() != null) 1315 incoming_msg_handler.getThread().setName(IncomingMessageHandler.THREAD_NAME); 1316 if(diag_handler != null && diag_handler.getThread() != null) 1317 diag_handler.getThread().setName(DiagnosticsHandler.THREAD_NAME); 1318 } 1319 1320 protected void handleConfigEvent(HashMap map) { 1321 if(map == null) return; 1322 if(map.containsKey("additional_data")) { 1323 additional_data=(byte[])map.get("additional_data"); 1324 if(local_addr instanceof IpAddress) 1325 ((IpAddress)local_addr).setAdditionalData(additional_data); 1326 } 1327 } 1328 1329 1330 protected Executor createThreadPool(int min_threads, int max_threads, long keep_alive_time, String rejection_policy, 1331 BlockingQueue<Runnable > queue, final String thread_group_name, final String thread_name) { 1332 ThreadPoolExecutor pool=null; 1333 pool=new ThreadPoolExecutor(min_threads, max_threads, keep_alive_time, TimeUnit.MILLISECONDS, queue); 1334 1335 pool.setThreadFactory(new ThreadFactory() { 1336 ThreadGroup unmarshaller_threads=new ThreadGroup (pool_thread_group, thread_group_name); 1337 public Thread newThread(Runnable command) { 1338 Thread retval=new Thread (unmarshaller_threads, command, thread_name); 1339 if(thread_naming_pattern != null) 1340 thread_naming_pattern.renameThread(thread_name, retval); 1341 return retval; 1342 } 1343 }); 1344 1345 if(rejection_policy != null) { 1346 if(rejection_policy.equals("abort")) 1347 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 1348 else if(rejection_policy.equals("discard")) 1349 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); 1350 else if(rejection_policy.equals("discardoldest")) 1351 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); 1352 else 1353 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 1354 } 1355 1356 return pool; 1357 } 1358 1359 1360 private static void shutdownThreadPool(ThreadPoolExecutor thread_pool) { 1361 thread_pool.shutdownNow(); 1362 try { 1363 thread_pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS); 1364 } 1365 catch(InterruptedException e) { 1366 } 1367 } 1368 1369 1370 1371 1372 1373 1374 1375 1376 class IncomingPacket implements Runnable { 1377 Address dest=null; 1378 Address sender=null; 1379 byte[] buf; 1380 int offset, length; 1381 1382 IncomingPacket(Address dest, Address sender, byte[] buf, int offset, int length) { 1383 this.dest=dest; 1384 this.sender=sender; 1385 this.buf=buf; 1386 this.offset=offset; 1387 this.length=length; 1388 } 1389 1390 1391 1392 public void run() { 1393 short version=0; 1394 boolean is_message_list, multicast; 1395 byte flags; 1396 ExposedByteArrayInputStream in_stream=null; 1397 DataInputStream dis=null; 1398 1399 try { 1400 in_stream=new ExposedByteArrayInputStream(buf, offset, length); 1401 dis=new DataInputStream (in_stream); 1402 try { 1403 version=dis.readShort(); 1404 } 1405 catch(IOException ex) { 1406 if(discard_incompatible_packets) 1407 return; 1408 throw ex; 1409 } 1410 if(Version.isBinaryCompatible(version) == false) { 1411 if(log.isWarnEnabled()) { 1412 StringBuffer sb=new StringBuffer (); 1413 sb.append("packet from ").append(sender).append(" has different version (").append(Version.print(version)); 1414 sb.append(") from ours (").append(Version.printVersion()).append("). "); 1415 if(discard_incompatible_packets) 1416 sb.append("Packet is discarded"); 1417 else 1418 sb.append("This may cause problems"); 1419 log.warn(sb); 1420 } 1421 if(discard_incompatible_packets) 1422 return; 1423 } 1424 1425 flags=dis.readByte(); 1426 is_message_list=(flags & LIST) == LIST; 1427 multicast=(flags & MULTICAST) == MULTICAST; 1428 1429 if(is_message_list) { List <Message> msgs=readMessageList(dis, dest, multicast); 1431 for(Message msg: msgs) { 1432 if(msg.isFlagSet(Message.OOB)) { 1433 log.warn("bundled message should not be marked as OOB"); 1434 } 1435 handleMyMessage(msg, multicast); 1436 } 1437 } 1438 else { 1439 Message msg=readMessage(dis, dest, sender, multicast); 1440 handleMyMessage(msg, multicast); 1441 } 1442 } 1443 catch(Throwable t) { 1444 if(log.isErrorEnabled()) 1445 log.error("failed handling incoming message", t); 1446 } 1447 } 1448 1449 1450 private void handleMyMessage(Message msg, boolean multicast) { 1451 if(stats) { 1452 num_msgs_received++; 1453 num_bytes_received+=msg.getLength(); 1454 } 1455 1456 Address SRC=msg.getSrc(); 1457 if(loopback && multicast && src != null && local_addr.equals(src)) { 1458 return; } 1460 1461 TpHeader hdr=(TpHeader)msg.getHeader(name); if(hdr != null) { 1463 String ch_name=hdr.channel_name; 1464 1465 if(channel_name != null && !channel_name.equals(ch_name)) { 1467 if(log.isWarnEnabled()) 1468 log.warn(new StringBuffer ("discarded message from different group \"").append(ch_name). 1469 append("\" (our group is \"").append(channel_name).append("\"). Sender was ").append(msg.getSrc())); 1470 return; 1471 } 1472 } 1473 else { 1474 if(log.isTraceEnabled()) 1475 log.trace(new StringBuffer ("message does not have a transport header, msg is ").append(msg). 1476 append(", headers are ").append(msg.printHeaders()).append(", will be discarded")); 1477 return; 1478 } 1479 1480 Event evt=new Event(Event.MSG, msg); 1481 if(log.isTraceEnabled()) { 1482 StringBuffer sb=new StringBuffer ("message is ").append(msg).append(", headers are ").append(msg.printHeaders()); 1483 log.trace(sb); 1484 } 1485 1486 up_prot.up(evt); 1487 } 1488 1489 1490 } 1491 1492 1493 1494 1495 1496 1500 class IncomingPacketHandler implements Runnable { 1501 1502 public static final String THREAD_NAME="IncomingPacketHandler"; 1503 Thread t=null; 1504 1505 Thread getThread(){ 1506 return t; 1507 } 1508 1509 void start() { 1510 if(t == null || !t.isAlive()) { 1511 t=new Thread (Util.getGlobalThreadGroup(), this, THREAD_NAME); 1512 t.setDaemon(true); 1513 t.start(); 1514 } 1515 } 1516 1517 void stop() { 1518 incoming_packet_queue.close(true); if(t != null) { 1520 try { 1521 t.join(Global.THREAD_SHUTDOWN_WAIT_TIME); 1522 } 1523 catch(InterruptedException e) { 1524 Thread.currentThread().interrupt(); } 1526 } 1527 } 1528 1529 public void run() { 1530 IncomingPacket entry; 1531 while(!incoming_packet_queue.closed() && Thread.currentThread().equals(t)) { 1532 try { 1533 entry=(IncomingPacket)incoming_packet_queue.remove(); 1534 handleIncomingPacket(entry.dest, entry.sender, entry.buf, entry.offset, entry.length); 1535 } 1536 catch(QueueClosedException closed_ex) { 1537 break; 1538 } 1539 catch(Throwable ex) { 1540 if(log.isErrorEnabled()) 1541 log.error("error processing incoming packet", ex); 1542 } 1543 } 1544 if(log.isTraceEnabled()) log.trace("incoming packet handler terminating"); 1545 } 1546 } 1547 1548 1549 class IncomingMessageHandler implements Runnable { 1550 1551 public static final String THREAD_NAME = "IncomingMessageHandler"; 1552 Thread t; 1553 1554 Thread getThread(){ 1555 return t; 1556 } 1557 1558 public void start() { 1559 if(t == null || !t.isAlive()) { 1560 t=new Thread (Util.getGlobalThreadGroup(), this, THREAD_NAME); 1561 t.setDaemon(true); 1562 t.start(); 1563 } 1564 } 1565 1566 1567 public void stop() { 1568 incoming_msg_queue.close(true); 1569 if(t != null) { 1570 try { 1571 t.join(Global.THREAD_SHUTDOWN_WAIT_TIME); 1572 } 1573 catch(InterruptedException e) { 1574 Thread.currentThread().interrupt(); } 1576 } 1577 } 1578 1579 public void run() { 1580 Message msg; 1581 while(!incoming_msg_queue.closed() && Thread.currentThread().equals(t)) { 1582 try { 1583 msg=(Message)incoming_msg_queue.remove(); 1584 handleIncomingMessage(msg); 1585 } 1586 catch(QueueClosedException closed_ex) { 1587 break; 1588 } 1589 catch(Throwable ex) { 1590 if(log.isErrorEnabled()) 1591 log.error("error processing incoming message", ex); 1592 } 1593 } 1594 if(log.isTraceEnabled()) log.trace("incoming message handler terminating"); 1595 } 1596 } 1597 1598 1599 1600 1601 private class Bundler { 1602 static final int MIN_NUMBER_OF_BUNDLING_TASKS=2; 1603 1604 final Map<Address,List <Message>> msgs=new HashMap<Address,List <Message>>(36); 1605 @GuardedBy("lock") 1606 long count=0; int num_msgs=0; 1608 @GuardedBy("lock") 1609 int num_bundling_tasks=0; 1610 long last_bundle_time; 1611 final ReentrantLock lock=new ReentrantLock (); 1612 1613 1614 private void send(Message msg, Address dest) throws Exception { 1615 long length=msg.size(); 1616 checkLength(length); 1617 Map<Address,List <Message>> bundled_msgs=null; 1618 1619 lock.lock(); 1620 try { 1621 if(count + length >= max_bundle_size) { 1622 bundled_msgs=removeBundledMessages(); 1623 } 1624 addMessage(msg, dest); 1625 count+=length; 1626 if(num_bundling_tasks < MIN_NUMBER_OF_BUNDLING_TASKS) { 1627 num_bundling_tasks++; 1628 timer.schedule(new BundlingTimer(), max_bundle_timeout, TimeUnit.MILLISECONDS); 1629 } 1630 } 1631 finally { 1632 lock.unlock(); 1633 } 1634 1635 if(bundled_msgs != null) { 1636 sendBundledMessages(bundled_msgs); 1637 } 1638 } 1639 1640 1641 private void addMessage(Message msg, Address dest) { if(msgs.isEmpty()) 1643 last_bundle_time=System.currentTimeMillis(); 1644 List <Message> tmp=msgs.get(dest); 1645 if(tmp == null) { 1646 tmp=new LinkedList<Message>(); 1647 msgs.put(dest, tmp); 1648 } 1649 tmp.add(msg); 1650 num_msgs++; 1651 } 1652 1653 1654 1655 private Map<Address,List <Message>> removeBundledMessages() { 1656 if(msgs.isEmpty()) 1657 return null; 1658 Map<Address,List <Message>> copy=new HashMap<Address,List <Message>>(msgs); 1659 if(log.isTraceEnabled()) { 1660 long stop=System.currentTimeMillis(); 1661 double percentage=100.0 / max_bundle_size * count; 1662 StringBuilder sb=new StringBuilder ("sending ").append(num_msgs).append(" msgs ("); 1663 num_msgs=0; 1664 sb.append(count).append(" bytes (" + f.format(percentage) + "% of max_bundle_size)"); 1665 if(last_bundle_time > 0) { 1666 sb.append(", collected in ").append(stop-last_bundle_time).append("ms) "); 1667 } 1668 sb.append(" to ").append(copy.size()).append(" destination(s)"); 1669 if(copy.size() > 1) sb.append(" (dests=").append(copy.keySet()).append(")"); 1670 log.trace(sb); 1671 } 1672 msgs.clear(); 1673 count=0; 1674 return copy; 1675 } 1676 1677 1678 1683 private void sendBundledMessages(Map<Address,List <Message>> msgs) { 1684 boolean multicast; 1685 Buffer buffer; 1686 Map.Entry<Address,List <Message>> entry; 1687 Address dst; 1688 ExposedByteArrayOutputStream out_stream=new ExposedByteArrayOutputStream(INITIAL_BUFSIZE); 1689 ExposedDataOutputStream dos=new ExposedDataOutputStream(out_stream); 1690 boolean first=true; 1691 1692 for(Iterator<Map.Entry<Address,List <Message>>> it=msgs.entrySet().iterator(); it.hasNext();) { 1693 entry=it.next(); 1694 List <Message> list=entry.getValue(); 1695 if(list.isEmpty()) 1696 continue; 1697 dst=entry.getKey(); 1698 multicast=dst == null || dst.isMulticastAddress(); 1699 try { 1700 if(first) { 1701 first=false; 1702 } 1703 else { 1704 out_stream.reset(); 1705 dos.reset(); 1706 } 1707 writeMessageList(list, dos, multicast); buffer=new Buffer(out_stream.getRawBuffer(), 0, out_stream.size()); 1709 doSend(buffer, dst, multicast); 1710 } 1711 catch(Throwable e) { 1712 if(log.isErrorEnabled()) log.error("exception sending msg: " + e.toString(), e.getCause()); 1713 } 1714 } 1715 } 1716 1717 1718 1719 private void checkLength(long len) throws Exception { 1720 if(len > max_bundle_size) 1721 throw new Exception ("message size (" + len + ") is greater than max bundling size (" + max_bundle_size + 1722 "). Set the fragmentation/bundle size in FRAG and TP correctly"); 1723 } 1724 1725 1726 private class BundlingTimer implements Runnable { 1727 1728 public void run() { 1729 Map<Address, List <Message>> msgs=null; 1730 boolean unlocked=false; 1731 1732 lock.lock(); 1733 try { 1734 msgs=removeBundledMessages(); 1735 if(msgs != null) { 1736 lock.unlock(); 1737 unlocked=true; 1738 sendBundledMessages(msgs); 1739 } 1740 } 1741 finally { 1742 if(unlocked) 1743 lock.lock(); 1744 num_bundling_tasks--; 1745 lock.unlock(); 1746 } 1747 } 1748 } 1749 } 1750 1751 1752 private class DiagnosticsHandler implements Runnable { 1753 public static final String THREAD_NAME = "DiagnosticsHandler"; 1754 Thread thread=null; 1755 MulticastSocket diag_sock=null; 1756 1757 DiagnosticsHandler() { 1758 } 1759 1760 Thread getThread(){ 1761 return thread; 1762 } 1763 1764 void start() throws IOException { 1765 diag_sock=new MulticastSocket(diagnostics_port); 1766 java.util.List interfaces=Util.getAllAvailableInterfaces(); 1767 bindToInterfaces(interfaces, diag_sock); 1768 1769 if(thread == null || !thread.isAlive()) { 1770 thread=new Thread (Util.getGlobalThreadGroup(), this, THREAD_NAME); 1771 thread.setDaemon(true); 1772 thread.start(); 1773 } 1774 } 1775 1776 void stop() { 1777 if(diag_sock != null) 1778 diag_sock.close(); 1779 if(thread != null){ 1780 try{ 1781 thread.join(Global.THREAD_SHUTDOWN_WAIT_TIME); 1782 } 1783 catch(InterruptedException e){ 1784 Thread.currentThread().interrupt(); } 1786 } 1787 } 1788 1789 public void run() { 1790 byte[] buf=new byte[1500]; DatagramPacket packet; 1792 while(!diag_sock.isClosed() && Thread.currentThread().equals(thread)) { 1793 packet=new DatagramPacket(buf, 0, buf.length); 1794 try { 1795 diag_sock.receive(packet); 1796 handleDiagnosticProbe(packet.getSocketAddress(), diag_sock, 1797 new String (packet.getData(), packet.getOffset(), packet.getLength())); 1798 } 1799 catch(IOException e) { 1800 } 1801 } 1802 } 1803 1804 private void bindToInterfaces(java.util.List interfaces, MulticastSocket s) { 1805 SocketAddress group_addr=new InetSocketAddress(diagnostics_addr, diagnostics_port); 1806 for(Iterator it=interfaces.iterator(); it.hasNext();) { 1807 NetworkInterface i=(NetworkInterface)it.next(); 1808 try { 1809 if (i.getInetAddresses().hasMoreElements()) { s.joinGroup(group_addr, i); 1811 if(log.isTraceEnabled()) 1812 log.trace("joined " + group_addr + " on " + i.getName()); 1813 } 1814 } 1815 catch(IOException e) { 1816 log.warn("failed to join " + group_addr + " on " + i.getName() + ": " + e); 1817 } 1818 } 1819 } 1820 } 1821 1822 public class ThreadNamingPattern { 1823 final boolean includeClusterName; 1824 final boolean includeLocalAddress; 1825 1826 public ThreadNamingPattern(String pattern) { 1827 includeClusterName=pattern.contains("c"); 1828 includeLocalAddress=pattern.contains("l"); 1829 } 1830 1831 public boolean isIncludeLocalAddress() { 1832 return includeLocalAddress; 1833 } 1834 1835 public boolean isIncludeClusterName() { 1836 return includeClusterName; 1837 } 1838 1839 1840 protected String renameThread(String base_name, Thread runner) { 1841 String oldName = null; 1842 if(runner!=null){ 1843 oldName=runner.getName(); 1844 1845 StringBuilder threadName=new StringBuilder (); 1846 threadName.append(base_name); 1847 1848 if(isIncludeClusterName()) { 1849 if(threadName.length() > 0) 1850 threadName.append(','); 1851 threadName.append(getChannelName()); 1852 } 1853 if(isIncludeLocalAddress()) { 1854 if(threadName.length() > 0) 1855 threadName.append(','); 1856 threadName.append(getLocalAddress()); 1857 } 1858 1859 runner.setName(threadName.toString()); 1860 } 1861 return oldName; 1862 } 1863 1864 } 1865 1866 1867} 1868 | Popular Tags |