1 3 package org.jgroups.blocks; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.*; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.*; 10 11 import java.io.Serializable ; 12 import java.util.Vector ; 13 14 15 21 public class MessageDispatcher implements RequestHandler { 22 protected Channel channel=null; 23 protected RequestCorrelator corr=null; 24 protected MessageListener msg_listener=null; 25 protected MembershipListener membership_listener=null; 26 protected RequestHandler req_handler=null; 27 protected ProtocolAdapter prot_adapter=null; 28 protected TransportAdapter transport_adapter=null; 29 protected final Vector members=new Vector (); 30 protected Address local_addr=null; 31 protected boolean deadlock_detection=false; 32 protected PullPushAdapter adapter=null; 33 protected Serializable id=null; 34 protected final Log log=LogFactory.getLog(getClass()); 35 36 37 43 protected boolean concurrent_processing=false; 44 45 46 public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2) { 47 this.channel=channel; 48 prot_adapter=new ProtocolAdapter(); 49 if(channel != null) { 50 local_addr=channel.getLocalAddress(); 51 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 52 } 53 setMessageListener(l); 54 setMembershipListener(l2); 55 if(channel != null) { 56 channel.setUpHandler(prot_adapter); 57 } 58 start(); 59 } 60 61 62 public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, boolean deadlock_detection) { 63 this.channel=channel; 64 this.deadlock_detection=deadlock_detection; 65 prot_adapter=new ProtocolAdapter(); 66 if(channel != null) { 67 local_addr=channel.getLocalAddress(); 68 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 69 } 70 setMessageListener(l); 71 setMembershipListener(l2); 72 if(channel != null) { 73 channel.setUpHandler(prot_adapter); 74 } 75 start(); 76 } 77 78 public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, 79 boolean deadlock_detection, boolean concurrent_processing) { 80 this.channel=channel; 81 this.deadlock_detection=deadlock_detection; 82 this.concurrent_processing=concurrent_processing; 83 prot_adapter=new ProtocolAdapter(); 84 if(channel != null) { 85 local_addr=channel.getLocalAddress(); 86 channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 87 } 88 setMessageListener(l); 89 setMembershipListener(l2); 90 if(channel != null) { 91 channel.setUpHandler(prot_adapter); 92 } 93 start(); 94 } 95 96 97 public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler) { 98 this(channel, l, l2); 99 setRequestHandler(req_handler); 100 } 101 102 103 public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler, 104 boolean deadlock_detection) { 105 this(channel, l, l2); 106 this.deadlock_detection=deadlock_detection; 107 setRequestHandler(req_handler); 108 } 109 110 public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler, 111 boolean deadlock_detection, boolean concurrent_processing) { 112 this(channel, l, l2); 113 this.deadlock_detection=deadlock_detection; 114 this.concurrent_processing=concurrent_processing; 115 setRequestHandler(req_handler); 116 } 117 118 119 129 public MessageDispatcher(PullPushAdapter adapter, Serializable id, 130 MessageListener l, MembershipListener l2) { 131 this.adapter=adapter; 132 this.id=id; 133 setMembers(((Channel) adapter.getTransport()).getView().getMembers()); 134 setMessageListener(l); 135 setMembershipListener(l2); 136 PullPushHandler handler=new PullPushHandler(); 137 Transport tp; 138 139 transport_adapter=new TransportAdapter(); 140 adapter.addMembershipListener(handler); 141 if(id == null) { 143 adapter.setListener(handler); 144 } 145 else { 146 adapter.registerListener(id, handler); 147 } 148 149 if((tp=adapter.getTransport()) instanceof Channel) { 150 ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 151 local_addr=((Channel) tp).getLocalAddress(); 152 } 153 start(); 154 } 155 156 157 168 public MessageDispatcher(PullPushAdapter adapter, Serializable id, 169 MessageListener l, MembershipListener l2, 170 RequestHandler req_handler) { 171 this.adapter=adapter; 172 this.id=id; 173 setMembers(((Channel) adapter.getTransport()).getView().getMembers()); 174 setRequestHandler(req_handler); 175 setMessageListener(l); 176 setMembershipListener(l2); 177 PullPushHandler handler=new PullPushHandler(); 178 Transport tp; 179 180 transport_adapter=new TransportAdapter(); 181 adapter.addMembershipListener(handler); 182 if(id == null) { 184 adapter.setListener(handler); 185 } 186 else { 187 adapter.registerListener(id, handler); 188 } 189 190 if((tp=adapter.getTransport()) instanceof Channel) { 191 ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 192 local_addr=((Channel) tp).getLocalAddress(); } 194 195 start(); 196 } 197 198 199 public MessageDispatcher(PullPushAdapter adapter, Serializable id, 200 MessageListener l, MembershipListener l2, 201 RequestHandler req_handler, boolean concurrent_processing) { 202 this.concurrent_processing=concurrent_processing; 203 this.adapter=adapter; 204 this.id=id; 205 setMembers(((Channel) adapter.getTransport()).getView().getMembers()); 206 setRequestHandler(req_handler); 207 setMessageListener(l); 208 setMembershipListener(l2); 209 PullPushHandler handler=new PullPushHandler(); 210 Transport tp; 211 212 transport_adapter=new TransportAdapter(); 213 adapter.addMembershipListener(handler); 214 if(id == null) { 216 adapter.setListener(handler); 217 } 218 else { 219 adapter.registerListener(id, handler); 220 } 221 222 if((tp=adapter.getTransport()) instanceof Channel) { 223 ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 224 local_addr=((Channel) tp).getLocalAddress(); } 226 227 start(); 228 } 229 230 231 235 private void setMembers(Vector new_mbrs) { 236 if(new_mbrs != null) { 237 members.removeAllElements(); 238 members.addAll(new_mbrs); 239 } 240 } 241 242 public void setDeadlockDetection(boolean flag) { 243 deadlock_detection=flag; 244 if(corr != null) 245 corr.setDeadlockDetection(flag); 246 } 247 248 public void setConcurrentProcessing(boolean flag) { 249 this.concurrent_processing=flag; 250 } 251 252 253 public void start() { 254 if(corr == null) { 255 if(transport_adapter != null) { 256 corr=new RequestCorrelator("MessageDispatcher", transport_adapter, 257 this, deadlock_detection, local_addr, concurrent_processing); 258 } 259 else { 260 corr=new RequestCorrelator("MessageDispatcher", prot_adapter, 261 this, deadlock_detection, local_addr, concurrent_processing); 262 } 263 corr.start(); 264 } 265 if(channel != null) { 266 Vector tmp_mbrs=channel.getView() != null ? channel.getView().getMembers() : null; 267 setMembers(tmp_mbrs); 268 } 269 if(null != prot_adapter) { prot_adapter.resume(); 271 } 272 } 273 274 275 public void stop() { 276 if(null != prot_adapter) { 277 prot_adapter.suspend(); 278 } 279 if(corr != null) { 280 corr.stop(); 281 corr=null; 282 } 283 } 284 285 286 public void setMessageListener(MessageListener l) { 287 msg_listener=l; 288 } 289 290 294 public MessageListener getMessageListener() { 295 return msg_listener; 296 } 297 298 public void setMembershipListener(MembershipListener l) { 299 membership_listener=l; 300 } 301 302 public void setRequestHandler(RequestHandler rh) { 303 req_handler=rh; 304 } 305 306 310 public Channel getChannel() { 311 return channel; 312 } 313 314 315 public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException { 316 if(channel != null) { 317 channel.send(msg); 318 } 319 else 320 if(adapter != null) { 321 try { 322 if(id != null) { 323 adapter.send(id, msg); 324 } 325 else { 326 adapter.send(msg); 327 } 328 } 329 catch(Throwable ex) { 330 if(log.isErrorEnabled()) { 331 log.error("exception=" + Util.print(ex)); 332 } 333 } 334 } 335 else { 336 if(log.isErrorEnabled()) { 337 log.error("channel == null"); 338 } 339 } 340 } 341 342 343 359 public RspList castMessage(final Vector dests, Message msg, int mode, long timeout) { 360 GroupRequest _req=null; 361 Vector real_dests; 362 Channel tmp; 363 364 real_dests=dests != null ? (Vector ) dests.clone() : (members != null ? (Vector ) members.clone() : null); 367 368 tmp=channel; 371 if(tmp == null) { 372 if(adapter != null && adapter.getTransport() instanceof Channel) { 373 tmp=(Channel) adapter.getTransport(); 374 } 375 } 376 377 if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) { 378 if(local_addr == null) { 379 local_addr=tmp.getLocalAddress(); 380 } 381 if(local_addr != null && real_dests != null) { 382 real_dests.removeElement(local_addr); 383 } 384 } 385 386 if(log.isTraceEnabled()) 388 log.trace("real_dests=" + real_dests); 389 390 if(real_dests == null || real_dests.size() == 0) { 391 if(log.isTraceEnabled()) 392 log.trace("destination list is empty, won't send message"); 393 return new RspList(); } 395 396 _req=new GroupRequest(msg, corr, real_dests, mode, timeout, 0); 397 _req.execute(); 398 399 return _req.getResults(); 400 } 401 402 403 415 public void castMessage(final Vector dests, long req_id, Message msg, RspCollector coll) { 416 Vector real_dests; 417 Channel tmp; 418 419 if(msg == null) { 420 if(log.isErrorEnabled()) 421 log.error("request is null"); 422 return; 423 } 424 425 if(coll == null) { 426 if(log.isErrorEnabled()) 427 log.error("response collector is null (must be non-null)"); 428 return; 429 } 430 431 real_dests=dests != null ? (Vector ) dests.clone() : (Vector ) members.clone(); 434 435 tmp=channel; 438 if(tmp == null) { 439 if(adapter != null && adapter.getTransport() instanceof Channel) { 440 tmp=(Channel) adapter.getTransport(); 441 } 442 } 443 444 if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) { 445 if(local_addr == null) { 446 local_addr=tmp.getLocalAddress(); 447 } 448 if(local_addr != null) { 449 real_dests.removeElement(local_addr); 450 } 451 } 452 453 if(real_dests.size() == 0) { 455 if(log.isDebugEnabled()) 456 log.debug("destination list is empty, won't send message"); 457 return; 458 } 459 460 corr.sendRequest(req_id, real_dests, msg, coll); 461 } 462 463 464 public void done(long req_id) { 465 corr.done(req_id); 466 } 467 468 469 473 public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException { 474 Vector mbrs=new Vector (); 475 RspList rsp_list=null; 476 Object dest=msg.getDest(); 477 Rsp rsp; 478 GroupRequest _req=null; 479 480 if(dest == null) { 481 if(log.isErrorEnabled()) 482 log.error("the message's destination is null, " + 483 "cannot send message"); 484 return null; 485 } 486 487 mbrs.addElement(dest); 489 _req=new GroupRequest(msg, corr, mbrs, mode, timeout, 0); 490 _req.execute(); 491 492 if(mode == GroupRequest.GET_NONE) { 493 return null; 494 } 495 496 rsp_list=_req.getResults(); 497 498 if(rsp_list.size() == 0) { 499 if(log.isWarnEnabled()) 500 log.warn(" response list is empty"); 501 return null; 502 } 503 if(rsp_list.size() > 1) { 504 if(log.isWarnEnabled()) 505 log.warn("response list contains more that 1 response; returning first response !"); 506 } 507 rsp=(Rsp) rsp_list.elementAt(0); 508 if(rsp.wasSuspected()) { 509 throw new SuspectedException(dest); 510 } 511 if(!rsp.wasReceived()) { 512 throw new TimeoutException(); 513 } 514 return rsp.getValue(); 515 } 516 517 518 549 550 551 public Object handle(Message msg) { 552 if(req_handler != null) { 553 return req_handler.handle(msg); 554 } 555 else { 556 return null; 557 } 558 } 559 560 561 562 563 564 565 566 class ProtocolAdapter extends Protocol implements UpHandler { 567 private Thread upProcessingThread=null; 568 private final Queue upQueue=new Queue(); 569 private final ReentrantLatch m_upLatch=new ReentrantLatch(false); 570 571 572 573 574 575 public String getName() { 576 return "MessageDispatcher"; 577 } 578 579 public void startUpHandler() { 580 } 582 583 public void startDownHandler() { 584 } 586 587 588 public void stopInternal() { 589 } 591 592 protected void receiveUpEvent(Event evt) { 593 } 594 595 protected void receiveDownEvent(Event evt) { 596 } 597 598 602 public void passUp(Event evt) { 603 byte[] tmp_state=null; 604 switch(evt.getType()) { 605 case Event.MSG: 606 if(msg_listener != null) { 607 msg_listener.receive((Message) evt.getArg()); 608 } 609 break; 610 611 case Event.GET_APPLSTATE: if(msg_listener != null) { 613 try { 614 tmp_state=msg_listener.getState(); 615 } 616 catch(Throwable t) { 617 this.log.error("failed getting state from message listener (" + msg_listener + ')', t); 618 } 619 } 620 channel.returnState(tmp_state); 621 break; 622 623 case Event.GET_STATE_OK: 624 if(msg_listener != null) { 625 try { 626 msg_listener.setState((byte[]) evt.getArg()); 627 } 628 catch(ClassCastException cast_ex) { 629 if(this.log.isErrorEnabled()) 630 this.log.error("received SetStateEvent, but argument " + 631 evt.getArg() + " is not serializable. Discarding message."); 632 } 633 } 634 break; 635 636 case Event.VIEW_CHANGE: 637 View v=(View) evt.getArg(); 638 Vector new_mbrs=v.getMembers(); 639 640 if(new_mbrs != null) { 641 members.removeAllElements(); 642 members.addAll(new_mbrs); 643 } 644 645 if(membership_listener != null) { 646 membership_listener.viewAccepted(v); 647 } 648 break; 649 650 case Event.SET_LOCAL_ADDRESS: 651 if(log.isTraceEnabled()) 652 log.trace("setting local_addr (" + local_addr + ") to " + evt.getArg()); 653 local_addr=(Address)evt.getArg(); 654 break; 655 656 case Event.SUSPECT: 657 if(membership_listener != null) { 658 membership_listener.suspect((Address) evt.getArg()); 659 } 660 break; 661 662 case Event.BLOCK: 663 if(membership_listener != null) { 664 membership_listener.block(); 665 } 666 break; 667 } 668 } 669 670 671 public void passDown(Event evt) { 672 down(evt); 673 } 674 675 676 677 synchronized void suspend() { 678 m_upLatch.lock(); 679 if(upProcessingThread != null) { 680 Thread t=upProcessingThread; 681 upProcessingThread=null; 682 t.interrupt(); 683 } 684 } 685 686 synchronized void resume() { 687 m_upLatch.unlock(); 688 if(upProcessingThread == null) { 689 startProcessingThread(); 690 } 691 } 692 693 694 private void startProcessingThread() { 695 upProcessingThread=new Thread (new Runnable () { 696 public void run() { 697 Event event=null; 698 while(Thread.currentThread() == upProcessingThread) { try { 701 event=(Event)upQueue.remove(); 702 m_upLatch.passThrough(); 703 handleUp(event); 704 } 705 catch(QueueClosedException ex1) { 706 break; 707 } 708 catch(InterruptedException ex2) { 709 } 711 } 712 } 713 }); 714 upProcessingThread.setDaemon(true); 716 upProcessingThread.start(); 717 } 718 719 720 723 public void up(Event evt) { 724 try { 725 upQueue.add(evt); 726 } 727 catch(QueueClosedException ex) { 728 } 730 } 731 732 private void handleUp(Event evt) { 733 if(corr != null) { 734 corr.receive(evt); 735 } 736 else { 737 if(this.log.isErrorEnabled()) { this.log.error("correlator is null, but latch is not locked! Event ignored."); 739 } 740 } 741 } 742 743 public void down(Event evt) { 744 if(channel != null) { 745 channel.down(evt); 746 } 747 else 748 if(this.log.isErrorEnabled()) { 749 this.log.error("channel == null"); 750 } 751 } 752 753 754 } 755 756 757 class TransportAdapter implements Transport { 758 759 public void send(Message msg) throws Exception { 760 if(channel != null) { 761 channel.send(msg); 762 } 763 else 764 if(adapter != null) { 765 try { 766 if(id != null) { 767 adapter.send(id, msg); 768 } 769 else { 770 adapter.send(msg); 771 } 772 } 773 catch(Throwable ex) { 774 if(log.isErrorEnabled()) { 775 log.error("exception=" + Util.print(ex)); 776 } 777 } 778 } 779 else { 780 if(log.isErrorEnabled()) { 781 log.error("channel == null"); 782 } 783 } 784 } 785 786 public Object receive(long timeout) throws Exception { 787 return null; 789 } 790 } 791 792 793 class PullPushHandler implements MessageListener, MembershipListener { 794 795 796 797 public void receive(Message msg) { 798 boolean pass_up=true; 799 if(corr != null) { 800 pass_up=corr.receiveMessage(msg); 801 } 802 803 if(pass_up) { if(msg_listener != null) { 805 msg_listener.receive(msg); 806 } 807 } 808 } 809 810 public byte[] getState() { 811 return msg_listener != null ? msg_listener.getState() : null; 812 } 813 814 public void setState(byte[] state) { 815 if(msg_listener != null) { 816 msg_listener.setState(state); 817 } 818 } 819 820 821 822 823 public void viewAccepted(View v) { 824 if(corr != null) { 825 corr.receiveView(v); 826 } 827 828 Vector new_mbrs=v.getMembers(); 829 if(new_mbrs != null) { 830 members.removeAllElements(); 831 for(int i=0; i < new_mbrs.size(); i++) { 832 members.addElement(new_mbrs.elementAt(i)); 833 } 834 } 835 836 if(membership_listener != null) { 837 membership_listener.viewAccepted(v); 838 } 839 } 840 841 public void suspect(Address suspected_mbr) { 842 if(corr != null) { 843 corr.receiveSuspect(suspected_mbr); 844 } 845 if(membership_listener != null) { 846 membership_listener.suspect(suspected_mbr); 847 } 848 } 849 850 public void block() { 851 if(membership_listener != null) { 852 membership_listener.block(); 853 } 854 } 855 856 857 858 859 861 } 862 863 864 } 865 | Popular Tags |