1 package org.jgroups.mux; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.*; 6 import org.jgroups.TimeoutException; 7 import org.jgroups.protocols.pbcast.FLUSH; 8 import org.jgroups.stack.StateTransferInfo; 9 import org.jgroups.util.FIFOMessageQueue; 10 import org.jgroups.util.Promise; 11 import org.jgroups.util.Util; 12 13 import java.util.*; 14 import java.util.concurrent.*; 15 import java.util.concurrent.atomic.AtomicInteger ; 16 17 25 public class Multiplexer implements UpHandler { 26 27 private final ConcurrentMap<String ,MuxChannel> services=new ConcurrentHashMap<String ,MuxChannel>(); 28 private final JChannel channel; 29 static final Log log=LogFactory.getLog(Multiplexer.class); 30 static final String SEPARATOR="::"; 31 static final short SEPARATOR_LEN=(short)SEPARATOR.length(); 32 static final String NAME="MUX"; 33 34 private boolean flush_present=true; 35 private boolean blocked=false; 36 37 38 private ExecutorService thread_pool; 39 40 42 private FIFOMessageQueue<String ,Runnable > fifo_queue=new FIFOMessageQueue<String ,Runnable >(); 43 44 45 46 View view=null; 47 48 Address local_addr=null; 49 50 51 private final Map<String ,Boolean > state_transfer_listeners=new HashMap<String ,Boolean >(); 52 53 54 private final Map<String ,List<Address>> service_state=new HashMap<String ,List<Address>>(); 55 56 57 private final Promise service_state_promise=new Promise(); 58 59 61 private final Map<Address, Set<String >> service_responses=new HashMap<Address, Set<String >>(); 62 63 private long SERVICES_RSP_TIMEOUT=10000; 64 65 public Multiplexer() { 66 this.channel=null; 67 flush_present=isFlushPresent(); 68 69 if(Global.getPropertyAsBoolean(Global.MUX_ENABLED, true)){ 71 thread_pool=createThreadPool(); 72 } 73 } 74 75 public Multiplexer(JChannel channel) { 76 this.channel=channel; 77 this.channel.setUpHandler(this); 78 this.channel.setOpt(Channel.BLOCK, Boolean.TRUE); flush_present=isFlushPresent(); 80 81 if(Global.getPropertyAsBoolean(Global.MUX_ENABLED, true)){ 83 thread_pool=createThreadPool(); 84 } 85 } 86 87 91 public Set getApplicationIds() { 92 return services != null? Collections.unmodifiableSet(services.keySet()) : null; 93 } 94 95 public Set<String > getServiceIds() { 96 return services != null? Collections.unmodifiableSet(services.keySet()) : null; 97 } 98 99 100 public long getServicesResponseTimeout() { 101 return SERVICES_RSP_TIMEOUT; 102 } 103 104 public void setServicesResponseTimeout(long services_rsp_timeout) { 105 this.SERVICES_RSP_TIMEOUT=services_rsp_timeout; 106 } 107 108 113 public View getServiceView(String service_id) { 114 List hosts=service_state.get(service_id); 115 if(hosts == null) return null; 116 return generateServiceView(hosts); 117 } 118 119 public boolean stateTransferListenersPresent() { 120 return state_transfer_listeners != null && !state_transfer_listeners.isEmpty(); 121 } 122 123 public synchronized void registerForStateTransfer(String appl_id, String substate_id) { 124 String key=appl_id; 125 if(substate_id != null && substate_id.length() > 0) 126 key+=SEPARATOR + substate_id; 127 state_transfer_listeners.put(key, Boolean.FALSE); 128 } 129 130 public synchronized boolean getState(Address target, String id, long timeout) throws ChannelNotConnectedException, ChannelClosedException { 131 if(state_transfer_listeners.isEmpty()) 132 return false; 133 134 for(Iterator<Map.Entry<String ,Boolean >> it=state_transfer_listeners.entrySet().iterator(); it.hasNext();) { 135 Map.Entry<String , Boolean > entry = it.next(); 136 String key=entry.getKey(); 137 int index=key.indexOf(SEPARATOR); 138 boolean match; 139 if(index > -1) { 140 String tmp=key.substring(0, index); 141 match=id.equals(tmp); 142 } 143 else { 144 match=id.equals(key); 145 } 146 if(match) { 147 entry.setValue(Boolean.TRUE); 148 break; 149 } 150 } 151 152 Collection values=state_transfer_listeners.values(); 153 boolean all_true=Util.all(values, Boolean.TRUE); 154 if(!all_true) 155 return true; 157 boolean rc=false; 158 Set<String > keys=new HashSet<String >(state_transfer_listeners.keySet()); 159 rc=fetchServiceStates(target, keys, timeout); 160 state_transfer_listeners.clear(); 161 return rc; 162 } 163 164 protected static ThreadPoolExecutor createThreadPool() { 165 int min_threads=1, max_threads=4; 166 long keep_alive=30000; 167 168 ThreadFactory factory=new ThreadFactory() { 169 AtomicInteger num = new AtomicInteger (1); 170 171 ThreadGroup mux_threads=new ThreadGroup (Util.getGlobalThreadGroup(), "MultiplexerThreads"); 172 173 public Thread newThread(Runnable command) { 174 Thread ret=new Thread (mux_threads, command, "Multiplexer-" + num.incrementAndGet()); 175 ret.setDaemon(true); 176 return ret; 177 } 178 }; 179 180 min_threads=Global.getPropertyAsInteger(Global.MUX_MIN_THREADS, min_threads); 181 max_threads=Global.getPropertyAsInteger(Global.MUX_MAX_THREADS, max_threads); 182 keep_alive=Global.getPropertyAsLong(Global.MUX_KEEPALIVE, keep_alive); 183 184 return new ThreadPoolExecutor(min_threads, max_threads, keep_alive, TimeUnit.MILLISECONDS, 185 new SynchronousQueue<Runnable >(), factory, 186 new ThreadPoolExecutor.CallerRunsPolicy()); 187 } 188 189 protected void shutdownThreadPool() { 190 if(thread_pool != null && !thread_pool.isShutdown()) { 191 thread_pool.shutdownNow(); 192 try { 193 thread_pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS); 194 } 195 catch(InterruptedException e) { 196 } 197 } 198 } 199 200 204 private boolean fetchServiceStates(Address target, Set<String > keys, long timeout) throws ChannelClosedException, ChannelNotConnectedException { 205 boolean rc, all_rcs=true; 206 for(String stateId: keys) { 207 rc=channel.getState(target, stateId, timeout); 208 if(!rc) 209 all_rcs=false; 210 } 211 return all_rcs; 212 } 213 214 215 218 public void fetchServiceInformation() throws Exception { 219 while(true) { 220 Address coord=getCoordinator(), local_address=channel != null? channel.getLocalAddress() : null; 221 boolean is_coord=coord != null && local_address != null && local_address.equals(coord); 222 if(is_coord) { 223 if(log.isTraceEnabled()) 224 log.trace("I'm coordinator, will not fetch service state information"); 225 break; 226 } 227 228 ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_REQ, null, null, null); 229 MuxHeader hdr=new MuxHeader(si); 230 Message state_req=new Message(coord, null, null); 231 state_req.putHeader(NAME, hdr); 232 service_state_promise.reset(); 233 channel.send(state_req); 234 235 try { 236 byte[] state=(byte[])service_state_promise.getResultWithTimeout(2000); 237 if(state != null) { 238 Map<String ,List<Address>> new_state=(Map<String ,List<Address>>)Util.objectFromByteBuffer(state); 239 synchronized(service_state) { 240 service_state.clear(); 241 service_state.putAll(new_state); 242 } 243 if(log.isTraceEnabled()) 244 log.trace("service state was set successfully (" + service_state.size() + " entries)"); 245 } 246 else { 247 if(log.isWarnEnabled()) 248 log.warn("received service state was null"); 249 } 250 break; 251 } 252 catch(TimeoutException e) { 253 if(log.isTraceEnabled()) 254 log.trace("timed out waiting for service state from " + coord + ", retrying"); 255 } 256 } 257 } 258 259 260 261 public void sendServiceUpMessage(String service, Address host,boolean bypassFlush) throws Exception { 262 sendServiceMessage(ServiceInfo.SERVICE_UP, service, host,bypassFlush, null); 263 if(local_addr != null && host != null && local_addr.equals(host)) 264 handleServiceUp(service, host, false); 265 } 266 267 268 public void sendServiceDownMessage(String service, Address host,boolean bypassFlush) throws Exception { 269 sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host,bypassFlush, null); 270 if(local_addr != null && host != null && local_addr.equals(host)) 271 handleServiceDown(service, host, false); 272 } 273 274 275 276 277 282 public Object up(final Event evt) { 283 switch(evt.getType()) { 284 case Event.MSG: 285 final Message msg=(Message)evt.getArg(); 286 final MuxHeader hdr=(MuxHeader)msg.getHeader(NAME); 287 if(hdr == null) { 288 log.error("MuxHeader not present - discarding message " + msg); 289 return null; 290 } 291 292 Address sender=msg.getSrc(); 293 if(hdr.info != null) { try { 295 handleServiceStateRequest(hdr.info, sender); 296 } 297 catch(Exception e) { 298 if(log.isErrorEnabled()) 299 log.error("failure in handling service state request", e); 300 } 301 break; 302 } 303 304 MuxChannel mux_ch=services.get(hdr.id); 305 if(mux_ch == null) { 306 log.warn("service " + hdr.id + " not currently running, discarding message " + msg); 307 return null; 308 } 309 return passToMuxChannel(mux_ch, evt, fifo_queue, sender, hdr.id, false); 311 case Event.VIEW_CHANGE: 312 Vector old_members=view != null? view.getMembers() : null; 313 view=(View)evt.getArg(); 314 Vector<Address> new_members=view != null? view.getMembers() : null; 315 Vector<Address> left_members=Util.determineLeftMembers(old_members, new_members); 316 317 if(view instanceof MergeView) { 318 final MergeView temp_merge_view=(MergeView)view.clone(); 319 if(log.isTraceEnabled()) 320 log.trace("received a MergeView: " + temp_merge_view + ", adjusting the service view"); 321 try { 322 Thread merge_handler=new Thread () { 323 public void run() { 324 try { 325 handleMergeView(temp_merge_view); 326 } 327 catch(Exception e) { 328 if(log.isErrorEnabled()) 329 log.error("problems handling merge view", e); 330 } 331 } 332 }; 333 merge_handler.setName("merge handler view_change"); 334 merge_handler.setDaemon(false); 335 merge_handler.start(); 336 } 337 catch(Exception e) { 338 if(log.isErrorEnabled()) 339 log.error("failed handling merge view", e); 340 } 341 } 342 else { synchronized(service_responses) { 344 service_responses.clear(); 345 } 346 } 347 if(!left_members.isEmpty()) 348 adjustServiceViews(left_members); 349 break; 350 351 case Event.SUSPECT: 352 Address suspected_mbr=(Address)evt.getArg(); 353 354 synchronized(service_responses) { 355 service_responses.put(suspected_mbr, null); 356 service_responses.notifyAll(); 357 } 358 passToAllMuxChannels(evt); 359 break; 360 361 case Event.GET_APPLSTATE: 362 case Event.STATE_TRANSFER_OUTPUTSTREAM: 363 return handleStateRequest(evt); 364 365 case Event.GET_STATE_OK: 366 case Event.STATE_TRANSFER_INPUTSTREAM: 367 handleStateResponse(evt); 368 break; 369 370 case Event.SET_LOCAL_ADDRESS: 371 local_addr=(Address)evt.getArg(); 372 passToAllMuxChannels(evt); 373 break; 374 375 case Event.BLOCK: 376 blocked=true; 377 if(!services.isEmpty()) { 378 passToAllMuxChannels(evt, true, true); } 380 waitUntilThreadPoolHasNoRunningTasks(1000); 381 return null; 382 383 case Event.UNBLOCK: if(blocked) 385 blocked=false; 386 passToAllMuxChannels(evt); 387 break; 388 389 default: 390 passToAllMuxChannels(evt); 391 break; 392 } 393 return null; 394 } 395 396 private int waitUntilThreadPoolHasNoRunningTasks(long timeout) { 397 int num_threads=0; 398 long end_time=System.currentTimeMillis() + timeout; 399 400 while(fifo_queue != null && (num_threads=fifo_queue.size()) > 0 && System.currentTimeMillis() < end_time) { 401 Util.sleep(100); 402 } 403 return num_threads; 404 } 405 406 407 public Channel createMuxChannel(JChannelFactory f, String id, String stack_name) throws Exception { 408 MuxChannel ch; 409 if(services.containsKey(id)) 410 throw new Exception ("service ID \"" + id + "\" is already registered, cannot register duplicate ID"); 411 ch=new MuxChannel(f, channel, id, stack_name, this); 412 services.put(id, ch); 413 return ch; 414 } 415 416 417 418 419 private void passToAllMuxChannels(Event evt) { 420 passToAllMuxChannels(evt, false, true); 421 } 422 423 424 private void passToAllMuxChannels(Event evt, boolean block, boolean bypass_thread_pool) { 425 String service_name; 426 MuxChannel ch; 427 for(Map.Entry<String ,MuxChannel> entry: services.entrySet()) { 428 service_name=entry.getKey(); 429 ch=entry.getValue(); 430 passToMuxChannel(ch, evt, fifo_queue, null, service_name, block, bypass_thread_pool); 432 } 433 } 434 435 public MuxChannel remove(String id) { 436 return services.remove(id); 437 } 438 439 440 441 442 public void disconnect() { 443 boolean all_disconnected=true; 444 for(MuxChannel mux_ch: services.values()) { 445 if(mux_ch.isConnected()) { 446 all_disconnected=false; 447 break; 448 } 449 } 450 if(all_disconnected) { 451 if(log.isTraceEnabled()) { 452 log.trace("disconnecting underlying JChannel as all MuxChannels are disconnected"); 453 } 454 channel.disconnect(); 455 } 456 } 457 458 459 public void unregister(String appl_id) { 460 services.remove(appl_id); 461 } 462 463 public boolean close() { 464 boolean all_closed=true; 465 for(MuxChannel mux_ch: services.values()) { 466 if(mux_ch.isOpen()) { 467 all_closed=false; 468 break; 469 } 470 } 471 if(all_closed) { 472 if(log.isTraceEnabled()) { 473 log.trace("closing underlying JChannel as all MuxChannels are closed"); 474 } 475 channel.close(); 476 services.clear(); 477 shutdownThreadPool(); 478 } 479 return all_closed; 480 } 481 482 public void closeAll() { 483 for(MuxChannel mux_ch: services.values()) { 484 mux_ch.setConnected(false); 485 mux_ch.setClosed(true); 486 mux_ch.closeMessageQueue(true); 487 } 488 shutdownThreadPool(); 489 } 490 491 public boolean shutdown() { 492 boolean all_closed=true; 493 for(MuxChannel mux_ch: services.values()) { 494 if(mux_ch.isOpen()) { 495 all_closed=false; 496 break; 497 } 498 } 499 if(all_closed) { 500 if(log.isTraceEnabled()) { 501 log.trace("shutting down underlying JChannel as all MuxChannels are closed"); 502 } 503 channel.shutdown(); 504 services.clear(); 505 shutdownThreadPool(); 506 } 507 return all_closed; 508 } 509 510 511 private boolean isFlushPresent() { 512 return channel.getProtocolStack().findProtocol("FLUSH") != null; 513 } 514 515 516 private void sendServiceState() throws Exception { 517 byte[] data=Util.objectToByteBuffer(new HashSet<String >(services.keySet())); 518 sendServiceMessage(ServiceInfo.LIST_SERVICES_RSP, null, channel.getLocalAddress(), true, data); 519 } 520 521 522 private Address getLocalAddress() { 523 if(local_addr != null) 524 return local_addr; 525 if(channel != null) 526 local_addr=channel.getLocalAddress(); 527 return local_addr; 528 } 529 530 private Address getCoordinator() { 531 if(channel != null) { 532 View v=channel.getView(); 533 if(v != null) { 534 Vector members=v.getMembers(); 535 if(members != null && !members.isEmpty()) { 536 return (Address)members.firstElement(); 537 } 538 } 539 } 540 return null; 541 } 542 543 554 public Address getStateProvider(Address preferredTarget, String service_id) { 555 Address result = null; 556 List hosts=service_state.get(service_id); 557 if(hosts != null && !hosts.isEmpty()){ 558 if(hosts.contains(preferredTarget)){ 559 result = preferredTarget; 560 } 561 else{ 562 result = (Address)hosts.get(0); 563 } 564 } 565 return result; 566 } 567 568 private void sendServiceMessage(byte type, String service, Address host,boolean bypassFlush, byte[] payload) throws Exception { 569 if(host == null) 570 host=getLocalAddress(); 571 if(host == null) { 572 if(log.isWarnEnabled()) { 573 log.warn("local_addr is null, cannot send ServiceInfo." + ServiceInfo.typeToString(type) + " message"); 574 } 575 return; 576 } 577 578 ServiceInfo si=new ServiceInfo(type, service, host, payload); 579 MuxHeader hdr=new MuxHeader(si); 580 Message service_msg=new Message(); 581 service_msg.putHeader(NAME, hdr); 582 if(bypassFlush && flush_present) 583 service_msg.putHeader(FLUSH.NAME, new FLUSH.FlushHeader(FLUSH.FlushHeader.FLUSH_BYPASS)); 584 585 channel.send(service_msg); 586 } 587 588 589 590 private Object handleStateRequest(Event evt) { 591 StateTransferInfo info=(StateTransferInfo)evt.getArg(); 592 String id=info.state_id; 593 String original_id=id; 594 Address requester=info.target; 596 try { 597 int index=id.indexOf(SEPARATOR); 598 if(index > -1) { 599 info.state_id=id.substring(index + SEPARATOR_LEN); 600 id=id.substring(0, index); } 602 else { 603 info.state_id=null; 604 } 605 606 MuxChannel mux_ch=services.get(id); 607 if(mux_ch == null) 608 throw new IllegalArgumentException ("didn't find service with ID=" + id + " to fetch state from"); 609 610 StateTransferInfo ret=(StateTransferInfo)passToMuxChannel(mux_ch, evt, fifo_queue, requester, id, true); 612 if(ret != null) 613 ret.state_id=original_id; 614 return ret; 615 } 616 catch(Throwable ex) { 617 if(log.isErrorEnabled()) 618 log.error("failed returning the application state, will return null", ex); 619 return new StateTransferInfo(null, original_id, 0L, null); 620 } 621 } 622 623 624 625 private void handleStateResponse(Event evt) { 626 StateTransferInfo info=(StateTransferInfo)evt.getArg(); 627 MuxChannel mux_ch; 628 Address state_sender=info.target; 629 630 String appl_id, substate_id, tmp; 631 tmp=info.state_id; 632 633 if(tmp == null) { 634 if(log.isTraceEnabled()) 635 log.trace("state is null, not passing up: " + info); 636 return; 637 } 638 639 int index=tmp.indexOf(SEPARATOR); 640 if(index > -1) { 641 appl_id=tmp.substring(0, index); 642 substate_id=tmp.substring(index+SEPARATOR_LEN); 643 } 644 else { 645 appl_id=tmp; 646 substate_id=null; 647 } 648 649 mux_ch=services.get(appl_id); 650 if(mux_ch == null) { 651 log.error("didn't find service with ID=" + appl_id + " to fetch state from"); 652 } 653 else { 654 StateTransferInfo tmp_info=info.copy(); 655 tmp_info.state_id=substate_id; 656 Event tmpEvt=new Event(evt.getType(), tmp_info); 657 passToMuxChannel(mux_ch, tmpEvt, fifo_queue, state_sender, appl_id, false); 658 } 659 } 660 661 private void handleServiceStateRequest(ServiceInfo info, Address sender) throws Exception { 662 switch(info.type) { 663 case ServiceInfo.STATE_REQ: 664 byte[] state; 665 synchronized(service_state) { 666 state=Util.objectToByteBuffer(service_state); 667 } 668 ServiceInfo si=new ServiceInfo(ServiceInfo.STATE_RSP, null, null, state); 669 MuxHeader hdr=new MuxHeader(si); 670 Message state_rsp=new Message(sender); 671 state_rsp.putHeader(NAME, hdr); 672 channel.send(state_rsp); 673 break; 674 case ServiceInfo.STATE_RSP: 675 service_state_promise.setResult(info.state); 676 break; 677 case ServiceInfo.SERVICE_UP: 678 handleServiceUp(info.service, info.host, true); 679 break; 680 case ServiceInfo.SERVICE_DOWN: 681 handleServiceDown(info.service, info.host, true); 682 break; 683 case ServiceInfo.LIST_SERVICES_RSP: 684 handleServicesRsp(sender, info.state); 685 break; 686 default: 687 if(log.isErrorEnabled()) 688 log.error("service request type " + info.type + " not known"); 689 break; 690 } 691 } 692 693 private void handleServicesRsp(Address sender, byte[] state) throws Exception { 694 Set<String > s=(Set<String >) Util.objectFromByteBuffer(state); 695 696 synchronized(service_responses) { 697 Set<String > tmp=service_responses.get(sender); 698 if(tmp == null) 699 tmp=new HashSet<String >(); 700 tmp.addAll(s); 701 702 service_responses.put(sender, tmp); 703 if(log.isTraceEnabled()) 704 log.trace("received service response: " + sender + "(" + s.toString() + ")"); 705 service_responses.notifyAll(); 706 } 707 } 708 709 710 private void handleServiceDown(String service, Address host, boolean received) { 711 List<Address> hosts, hosts_copy; 712 boolean removed=false; 713 714 if(received && host != null && local_addr != null && local_addr.equals(host)) { 716 return; 717 } 718 719 synchronized(service_state) { 720 hosts=service_state.get(service); 721 if(hosts == null) 722 return; 723 removed=hosts.remove(host); 724 hosts_copy=new ArrayList<Address>(hosts); } 726 727 if(removed) { 728 View service_view=generateServiceView(hosts_copy); 729 if(service_view != null) { 730 MuxChannel ch=services.get(service); 731 if(ch != null) { 732 Event view_evt=new Event(Event.VIEW_CHANGE, service_view); 733 if(ch.isConnected()) 735 passToMuxChannel(ch, view_evt, fifo_queue, null, service, false); 736 } 737 else { 738 if(log.isTraceEnabled()) 739 log.trace("service " + service + " not found, cannot dispatch service view " + service_view); 740 } 741 } 742 } 743 744 Address local_address=getLocalAddress(); 745 if(local_address != null && host != null && host.equals(local_address)) 746 unregister(service); 747 } 748 749 750 private void handleServiceUp(String service, Address host, boolean received) { 751 List<Address> hosts, hosts_copy; 752 boolean added=false; 753 754 755 756 757 if(received && host != null && local_addr != null && local_addr.equals(host)) { 759 return; 760 } 761 762 synchronized(service_state) { 763 hosts=service_state.get(service); 764 if(hosts == null) { 765 hosts=new ArrayList<Address>(); 766 service_state.put(service, hosts); 767 } 768 if(!hosts.contains(host)) { 769 hosts.add(host); 770 added=true; 771 } 772 hosts_copy=new ArrayList<Address>(hosts); } 774 775 if(added) { 776 View service_view=generateServiceView(hosts_copy); 777 if(service_view != null) { 778 MuxChannel ch=services.get(service); 779 if(ch != null) { 780 Event view_evt=new Event(Event.VIEW_CHANGE, service_view); 781 passToMuxChannel(ch, view_evt, fifo_queue, null, service, false); 783 } 784 else { 785 if(log.isTraceEnabled()) 786 log.trace("service " + service + " not found, cannot dispatch service view " + service_view); } 787 } 788 } 789 } 790 791 792 797 private void handleMergeView(MergeView view) throws Exception { 798 long time_to_wait=SERVICES_RSP_TIMEOUT, start; 799 int num_members=view.size(); Map<Address, Set<String >> copy=null; 801 802 sendServiceState(); 803 804 synchronized(service_responses) { 805 start=System.currentTimeMillis(); 806 try { 807 while(time_to_wait > 0 && numResponses(service_responses) < num_members) { 808 service_responses.wait(time_to_wait); 809 time_to_wait-=System.currentTimeMillis() - start; 810 } 811 copy=new HashMap<Address, Set<String >>(service_responses); 812 } 813 catch(Exception ex) { 814 if(log.isErrorEnabled()) 815 log.error("failed fetching a list of services from other members in the cluster, cannot handle merge view " + view, ex); 816 } 817 } 818 819 if(log.isTraceEnabled()) 820 log.trace("merging service state, my service_state: " + service_state + ", received responses: " + copy); 821 822 mergeServiceState(view, copy); 824 service_responses.clear(); 825 } 826 827 private static int numResponses(Map m) { 828 int num=0; 829 Collection values=m.values(); 830 for(Iterator it=values.iterator(); it.hasNext();) { 831 if(it.next() != null) 832 num++; 833 } 834 835 return num; 836 } 837 838 839 private void mergeServiceState(MergeView view, Map<Address, Set<String >> copy) { 840 Set<String > modified_services=new HashSet<String >(); 841 synchronized(service_state) { 842 for(Iterator <Map.Entry<Address, Set<String >>> it=copy.entrySet().iterator(); it.hasNext();) { 843 Map.Entry<Address, Set<String >> entry = it.next(); 844 Address host=entry.getKey(); 845 Set<String > service_list=entry.getValue(); 846 if(service_list == null) 847 continue; 848 849 for(String service:service_list) { 850 List<Address> my_services=service_state.get(service); 851 if(my_services == null) { 852 my_services=new ArrayList<Address>(); 853 service_state.put(service, my_services); 854 } 855 856 boolean was_modified=my_services.add(host); 857 if(was_modified) { 858 modified_services.add(service); 859 } 860 } 861 } 862 } 863 864 for(String service:modified_services) { 866 MuxChannel ch=services.get(service); 867 List<Address> hosts=service_state.get(service); 868 Vector<Address> membersCopy = new Vector<Address>(view.getMembers()); 869 membersCopy.retainAll(hosts); 870 MergeView v=new MergeView(view.getVid(), membersCopy, view.getSubgroups()); 871 passToMuxChannel(ch, new Event(Event.VIEW_CHANGE, v), fifo_queue, null, service, false); 873 } 874 } 875 876 private void adjustServiceViews(Vector left_members) { 877 if(left_members != null) 878 for(int i=0; i < left_members.size(); i++) { 879 try { 880 adjustServiceView((Address)left_members.elementAt(i)); 881 } 882 catch(Throwable t) { 883 if(log.isErrorEnabled()) 884 log.error("failed adjusting service views", t); 885 } 886 } 887 } 888 889 private void adjustServiceView(Address host) { 890 891 synchronized(service_state) { 892 for(Iterator<Map.Entry<String ,List<Address>>> it=service_state.entrySet().iterator(); it.hasNext();) { 893 Map.Entry<String , List<Address>> entry = it.next(); 894 String service=entry.getKey(); 895 List<Address> hosts = entry.getValue(); 896 if(hosts == null) 897 continue; 898 899 if(hosts.remove(host)) { 900 View service_view=generateServiceView(new ArrayList<Address>(hosts)); 902 if(service_view != null) { 903 MuxChannel ch=services.get(service); 904 if(ch != null) { 905 Event view_evt=new Event(Event.VIEW_CHANGE, service_view); 906 if(ch.isConnected()) 908 passToMuxChannel(ch, view_evt, fifo_queue, null, service, false); 909 } 910 else { 911 if(log.isTraceEnabled()) 912 log.trace("service " + service + " not found, cannot dispatch service view " + service_view); 913 } 914 } 915 } 916 Address local_address=getLocalAddress(); 917 if(local_address != null && host != null && host.equals(local_address)) 918 unregister(service); 919 } 920 } 921 } 922 923 924 930 private View generateServiceView(List hosts) { 931 Vector<Address> members=new Vector<Address>(view.getMembers()); 932 members.retainAll(hosts); 933 return new View(view.getVid(), members); 934 } 935 936 private Object passToMuxChannel(MuxChannel ch, Event evt, final FIFOMessageQueue<String ,Runnable > queue, 937 final Address sender, final String dest, boolean block) { 938 return passToMuxChannel(ch, evt, queue, sender, dest, block, false); 939 } 940 941 942 943 private Object passToMuxChannel(MuxChannel ch, Event evt, final FIFOMessageQueue<String ,Runnable > queue, 944 final Address sender, final String dest, boolean block, boolean bypass_thread_pool) { 945 if(thread_pool == null || bypass_thread_pool) { 946 return ch.up(evt); 947 } 948 949 Task task=new Task(ch, evt, queue, sender, dest, block); 950 ExecuteTask execute_task=new ExecuteTask(fifo_queue); 952 try { 953 fifo_queue.put(sender, dest, task); 954 thread_pool.execute(execute_task); 955 if(block) { 956 try { 957 return task.exchanger.exchange(null); 958 } 959 catch(InterruptedException e) { 960 Thread.currentThread().interrupt(); 961 } 962 } 963 } 964 catch(InterruptedException e) { 965 Thread.currentThread().interrupt(); 966 } 967 return null; 968 } 969 970 public void addServiceIfNotPresent(String id, MuxChannel ch) { 971 services.putIfAbsent(id, ch); 972 } 973 974 975 private static class Task implements Runnable { 976 Exchanger<Object > exchanger; 977 MuxChannel channel; 978 Event evt; 979 FIFOMessageQueue<String ,Runnable > queue; 980 Address sender; 981 String dest; 982 983 Task(MuxChannel channel, Event evt, FIFOMessageQueue<String ,Runnable > queue, Address sender, String dest, boolean result_expected) { 984 this.channel=channel; 985 this.evt=evt; 986 this.queue=queue; 987 this.sender=sender; 988 this.dest=dest; 989 if(result_expected) 990 exchanger=new Exchanger<Object >(); 991 } 992 993 public void run() { 994 Object retval; 995 try { 996 retval=channel.up(evt); 997 if(exchanger != null) 998 exchanger.exchange(retval); 999 } 1000 catch(InterruptedException e) { 1001 Thread.currentThread().interrupt(); } 1003 finally { 1004 queue.done(sender, dest); 1005 } 1006 } 1007 } 1008 1009 1010 private static class ExecuteTask implements Runnable { 1011 FIFOMessageQueue<String ,Runnable > queue; 1012 1013 public ExecuteTask(FIFOMessageQueue<String ,Runnable > queue) { 1014 this.queue=queue; 1015 } 1016 1017 public void run() { 1018 try { 1019 Runnable task=queue.take(); 1020 task.run(); 1021 } 1022 catch(InterruptedException e) { 1023 } 1024 } 1025 } 1026 1027 1028} 1029 | Popular Tags |