1 3 package org.jgroups.blocks; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 import org.jgroups.*; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.Scheduler; 10 import org.jgroups.util.SchedulerListener; 11 import org.jgroups.util.Util; 12 13 import java.io.*; 14 import java.util.ArrayList ; 15 import java.util.HashMap ; 16 import java.util.Iterator ; 17 import java.util.List ; 18 19 20 21 22 39 public class RequestCorrelator { 40 41 42 protected Object transport=null; 43 44 45 protected final HashMap requests=new HashMap (); 46 47 49 protected RequestHandler request_handler=null; 50 51 52 protected String name=null; 53 54 55 protected Scheduler scheduler=null; 56 57 58 59 protected Address local_addr=null; 60 61 67 protected java.util.Stack call_stack=null; 68 69 72 protected boolean deadlock_detection=false; 73 74 78 protected CallStackSetter call_stack_setter=null; 79 80 84 protected boolean concurrent_processing=false; 85 86 87 protected boolean started=false; 88 89 protected static final Log log=LogFactory.getLog(RequestCorrelator.class); 90 91 92 107 public RequestCorrelator(String name, Object transport, RequestHandler handler) { 108 this.name = name; 109 this.transport = transport; 110 request_handler = handler; 111 start(); 112 } 113 114 115 public RequestCorrelator(String name, Object transport, RequestHandler handler, Address local_addr) { 116 this.name = name; 117 this.transport = transport; 118 this.local_addr=local_addr; 119 request_handler = handler; 120 start(); 121 } 122 123 124 144 public RequestCorrelator(String name, Object transport, 145 RequestHandler handler, boolean deadlock_detection) { 146 this.deadlock_detection = deadlock_detection; 147 this.name = name; 148 this.transport = transport; 149 request_handler = handler; 150 start(); 151 } 152 153 154 public RequestCorrelator(String name, Object transport, 155 RequestHandler handler, boolean deadlock_detection, boolean concurrent_processing) { 156 this.deadlock_detection = deadlock_detection; 157 this.name = name; 158 this.transport = transport; 159 request_handler = handler; 160 this.concurrent_processing = concurrent_processing; 161 start(); 162 } 163 164 public RequestCorrelator(String name, Object transport, 165 RequestHandler handler, boolean deadlock_detection, Address local_addr) { 166 this.deadlock_detection = deadlock_detection; 167 this.name = name; 168 this.transport = transport; 169 this.local_addr = local_addr; 170 request_handler = handler; 171 start(); 172 } 173 174 public RequestCorrelator(String name, Object transport, RequestHandler handler, 175 boolean deadlock_detection, Address local_addr, boolean concurrent_processing) { 176 this.deadlock_detection = deadlock_detection; 177 this.name = name; 178 this.transport = transport; 179 this.local_addr = local_addr; 180 request_handler = handler; 181 this.concurrent_processing = concurrent_processing; 182 start(); 183 } 184 185 186 187 188 192 public void setDeadlockDetection(boolean flag) { 193 if(deadlock_detection != flag) { deadlock_detection=flag; 195 if(started) { 196 if(deadlock_detection) { 197 startScheduler(); 198 } 199 else { 200 stopScheduler(); 201 } 202 } 203 } 204 } 205 206 207 public void setRequestHandler(RequestHandler handler) { 208 request_handler=handler; 209 start(); 210 } 211 212 213 public void setConcurrentProcessing(boolean concurrent_processing) { 214 this.concurrent_processing=concurrent_processing; 215 } 216 217 218 221 public void sendRequest(long id, Message msg, RspCollector coll) { 222 sendRequest(id, null, msg, coll); 223 } 224 225 226 243 public void sendRequest(long id, List dest_mbrs, Message msg, RspCollector coll) { 244 Header hdr; 245 246 if(transport == null) { 247 if(log.isWarnEnabled()) log.warn("transport is not available !"); 248 return; 249 } 250 251 hdr = new Header(Header.REQ, id, (coll!=null? true:false), name); 258 hdr.dest_mbrs=dest_mbrs; 259 260 if (coll != null) { 261 if(deadlock_detection) { 262 if(local_addr == null) { 263 if(log.isErrorEnabled()) log.error("local address is null !"); 264 return; 265 } 266 java.util.Stack new_call_stack = (call_stack != null? 267 (java.util.Stack )call_stack.clone():new java.util.Stack ()); 268 new_call_stack.push(local_addr); 269 hdr.callStack=new_call_stack; 270 } 271 addEntry(hdr.id, new RequestEntry(coll)); 272 } 273 msg.putHeader(name, hdr); 274 275 try { 276 if(transport instanceof Protocol) 277 ((Protocol)transport).passDown(new Event(Event.MSG, msg)); 278 else if(transport instanceof Transport) 279 ((Transport)transport).send(msg); 280 else 281 if(log.isErrorEnabled()) log.error("transport object has to be either a " + 282 "Transport or a Protocol, however it is a " + transport.getClass()); 283 } 284 catch(Throwable e) { 285 if(log.isWarnEnabled()) log.warn(e.toString()); 286 } 287 } 288 289 290 291 292 293 297 public void done(long id) { 298 removeEntry(id); 299 } 300 301 302 314 public void receive(Event evt) { 315 switch(evt.getType()) { 316 case Event.SUSPECT: receiveSuspect((Address)evt.getArg()); 318 break; 319 case Event.VIEW_CHANGE: receiveView((View)evt.getArg()); 321 break; 322 case Event.SET_LOCAL_ADDRESS: 323 setLocalAddress((Address)evt.getArg()); 324 break; 325 case Event.MSG: 326 if(!receiveMessage((Message)evt.getArg())) 327 return; 328 break; 329 } 330 if(transport instanceof Protocol) 331 ((Protocol)transport).passUp(evt); 332 else 333 if(log.isErrorEnabled()) log.error("we do not pass up messages via Transport"); 334 } 335 336 337 339 public void start() { 340 if(deadlock_detection) { 341 startScheduler(); 342 } 343 started=true; 344 } 345 346 347 void startScheduler() { 348 if(scheduler == null) { 349 scheduler=new Scheduler(); 350 if(deadlock_detection && call_stack_setter == null) { 351 call_stack_setter=new CallStackSetter(); 352 scheduler.setListener(call_stack_setter); 353 } 354 if(concurrent_processing) 355 scheduler.setConcurrentProcessing(concurrent_processing); 356 scheduler.start(); 357 } 358 } 359 360 361 363 public void stop() { 364 stopScheduler(); 365 started=false; 366 } 367 368 void stopScheduler() { 369 if(scheduler != null) { 370 scheduler.stop(); 371 scheduler=null; 372 } 373 } 374 375 376 378 379 380 387 public void receiveSuspect(Address mbr) { 388 RequestEntry entry; 389 ArrayList copy; 390 391 if(mbr == null) return; 392 if(log.isDebugEnabled()) log.debug("suspect=" + mbr); 393 394 synchronized(requests) { 396 copy=new ArrayList (requests.values()); 397 } 398 for(Iterator it=copy.iterator(); it.hasNext();) { 399 entry=(RequestEntry)it.next(); 400 if(entry.coll != null) 401 entry.coll.suspect(mbr); 402 } 403 } 404 405 406 413 public void receiveView(View new_view) { 414 RequestEntry entry; 415 ArrayList copy; 416 417 synchronized(requests) { 419 copy=new ArrayList (requests.values()); 420 } 421 for(Iterator it=copy.iterator(); it.hasNext();) { 422 entry=(RequestEntry)it.next(); 423 if(entry.coll != null) 424 entry.coll.viewChange(new_view); 425 } 426 } 427 428 429 434 public boolean receiveMessage(Message msg) { 435 Object tmpHdr; 436 Header hdr; 437 RspCollector coll; 438 java.util.Stack stack; 439 java.util.List dests; 440 441 tmpHdr=msg.getHeader(name); 447 if(tmpHdr == null || !(tmpHdr instanceof Header)) { 448 return (true); 449 } 450 451 hdr=(Header)tmpHdr; 452 if(hdr.corrName == null || !hdr.corrName.equals(name)) { 453 if(log.isTraceEnabled()) { 454 log.trace("name of request correlator header (" + 455 hdr.corrName + ") is different from ours (" + name + "). Msg not accepted, passed up"); 456 } 457 return (true); 458 } 459 460 dests=hdr.dest_mbrs; 463 if(dests != null && local_addr != null && !dests.contains(local_addr)) { 464 if(log.isTraceEnabled()) { 465 log.trace("discarded request from " + msg.getSrc() + 466 " as we are not part of destination list (local_addr=" + local_addr + ", hdr=" + hdr + ')'); 467 } 468 return false; 469 } 470 471 if(log.isTraceEnabled()) { 472 log.trace("from " + msg.getSrc() + ", header is " + hdr); 473 } 474 475 switch(hdr.type) { 485 case Header.REQ: 486 if(request_handler == null) { 487 if(log.isWarnEnabled()) { 488 log.warn("there is no request handler installed to deliver request !"); 489 } 490 return (false); 491 } 492 493 if(deadlock_detection) { 494 if(scheduler == null) { 495 log.error("deadlock_detection is true, but scheduler is null: this is not supposed to happen" + 496 " (discarding request)"); 497 break; 498 } 499 500 Request req=new Request(msg); 501 stack=hdr.callStack; 502 if(hdr.rsp_expected && stack != null && local_addr != null) { 503 if(stack.contains(local_addr)) { 504 if(log.isTraceEnabled()) 505 log.trace("call stack=" + hdr.callStack + " contains " + local_addr + 506 ": adding request to priority queue"); 507 scheduler.addPrio(req); 508 break; 509 } 510 } 511 scheduler.add(req); 512 break; 513 } 514 515 handleRequest(msg); 516 break; 517 518 case Header.RSP: 519 msg.removeHeader(name); 520 coll=findEntry(hdr.id); 521 if(coll != null) { 522 coll.receiveResponse(msg); 523 } 524 break; 525 526 default: 527 msg.removeHeader(name); 528 if(log.isErrorEnabled()) log.error("header's type is neither REQ nor RSP !"); 529 break; 530 } 531 532 return (false); 533 } 534 535 public Address getLocalAddress() { 536 return local_addr; 537 } 538 539 public void setLocalAddress(Address local_addr) { 540 this.local_addr=local_addr; 541 } 542 543 544 546 550 private void addEntry(long id, RequestEntry entry) { 551 Long id_obj = new Long (id); 552 synchronized(requests) { 553 if(!requests.containsKey(id_obj)) 554 requests.put(id_obj, entry); 555 else 556 if(log.isWarnEnabled()) log.warn("entry " + entry + " for request-id=" + id + " already present !"); 557 } 558 } 559 560 561 566 private void removeEntry(long id) { 567 Long id_obj = new Long (id); 568 569 synchronized(requests) { 573 requests.remove(id_obj); 574 } 575 } 576 577 578 583 private RspCollector findEntry(long id) { 584 Long id_obj = new Long (id); 585 RequestEntry entry; 586 587 synchronized(requests) { 588 entry=(RequestEntry)requests.get(id_obj); 589 } 590 return((entry != null)? entry.coll:null); 591 } 592 593 594 599 private void handleRequest(Message req) { 600 Object retval; 601 byte[] rsp_buf=null; 602 Header hdr, rsp_hdr; 603 Message rsp; 604 605 hdr = (Header)req.removeHeader(name); 612 613 if(log.isTraceEnabled()) 614 log.trace("calling (" + (request_handler != null? request_handler.getClass().getName() : "null") + 615 ") with request " + hdr.id); 616 617 try { 618 retval = request_handler.handle(req); 619 } 620 catch(Throwable t) { 621 if(log.isErrorEnabled()) log.error("error invoking method", t); 622 retval=t; 623 } 624 625 if(!hdr.rsp_expected) return; 627 628 if (transport == null) { 629 if(log.isErrorEnabled()) log.error("failure sending response; no transport available"); 630 return; 631 } 632 633 try { 635 rsp_buf=Util.objectToByteBuffer(retval); } 637 catch(Throwable t) { 638 try { 639 rsp_buf=Util.objectToByteBuffer(t); } 641 catch(Throwable tt) { 642 if(log.isErrorEnabled()) log.error("failed sending response: " + 643 "return value (" + retval + ") is not serializable"); 644 return; 645 } 646 } 647 648 rsp=req.makeReply(); 649 if(rsp_buf != null) 650 rsp.setBuffer(rsp_buf); 651 rsp_hdr=new Header(Header.RSP, hdr.id, false, name); 652 rsp.putHeader(name, rsp_hdr); 653 if(log.isTraceEnabled()) log.trace("sending rsp for " + 654 rsp_hdr.id + " to " + rsp.getDest()); 655 656 try { 657 if(transport instanceof Protocol) 658 ((Protocol)transport).passDown(new Event(Event.MSG, rsp)); 659 else if(transport instanceof Transport) 660 ((Transport)transport).send(rsp); 661 else 662 if(log.isErrorEnabled()) log.error("transport object has to be either a " + 663 "Transport or a Protocol, however it is a " + transport.getClass()); 664 } 665 catch(Throwable e) { 666 if(log.isErrorEnabled()) log.error(throwableToString(e)); 667 } 668 } 669 670 671 674 private String throwableToString(Throwable ex) { 675 StringWriter sw = new StringWriter(); 676 PrintWriter pw = new PrintWriter(sw); 677 ex.printStackTrace(pw); 678 return(sw.toString()); 679 } 680 681 682 684 685 686 687 688 691 private static class RequestEntry { 692 public RspCollector coll = null; 693 694 public RequestEntry(RspCollector coll) { 695 this.coll = coll; 696 } 697 } 698 699 700 701 704 public static class Header extends org.jgroups.Header { 705 public static final int REQ = 0; 706 public static final int RSP = 1; 707 708 709 public int type=REQ; 710 714 public long id=0; 715 716 public boolean rsp_expected=true; 717 718 public String corrName=null; 719 720 721 public java.util.Stack callStack=null; 722 723 724 public java.util.List dest_mbrs=null; 725 726 727 730 public Header() {} 731 732 740 public Header(int type, long id, boolean rsp_expected, String name) { 741 this.type = type; 742 this.id = id; 743 this.rsp_expected = rsp_expected; 744 this.corrName = name; 745 } 746 747 749 public String toString() { 750 StringBuffer ret=new StringBuffer (); 751 ret.append("[Header: name=" + corrName + ", type="); 752 ret.append(type == REQ ? "REQ" : type == RSP ? "RSP" : "<unknown>"); 753 ret.append(", id=" + id); 754 ret.append(", rsp_expected=" + rsp_expected + ']'); 755 if(callStack != null) 756 ret.append(", call stack=" + callStack); 757 if(dest_mbrs != null) 758 ret.append(", dest_mbrs=").append(dest_mbrs); 759 return ret.toString(); 760 } 761 762 763 766 public void writeExternal(ObjectOutput out) throws IOException { 767 out.writeInt(type); 768 out.writeLong(id); 769 out.writeBoolean(rsp_expected); 770 if(corrName != null) { 771 out.writeBoolean(true); 772 out.writeUTF(corrName); 773 } 774 else { 775 out.writeBoolean(false); 776 } 777 out.writeObject(callStack); 778 out.writeObject(dest_mbrs); 779 } 780 781 782 785 public void readExternal(ObjectInput in) 786 throws IOException, ClassNotFoundException { 787 type = in.readInt(); 788 id = in.readLong(); 789 rsp_expected = in.readBoolean(); 790 if(in.readBoolean()) 791 corrName = in.readUTF(); 792 callStack = (java.util.Stack )in.readObject(); 793 dest_mbrs=(java.util.List )in.readObject(); 794 } 795 } 796 797 798 799 800 806 private class CallStackSetter implements SchedulerListener { 807 public void started(Runnable r) { setCallStack(r); } 808 public void stopped(Runnable r) { setCallStack(null); } 809 public void suspended(Runnable r) { setCallStack(null); } 810 public void resumed(Runnable r) { setCallStack(r); } 811 812 void setCallStack(Runnable r) { 813 java.util.Stack new_stack; 814 Message req; 815 Header hdr; 816 Object obj; 817 818 if(r == null) { 819 call_stack=null; 820 return; 821 } 822 823 req=((Request)r).req; 824 if(req == null) 825 return; 826 827 obj=req.getHeader(name); 828 if(obj == null || !(obj instanceof Header)) 829 return; 830 831 hdr=(Header)obj; 832 if(hdr.rsp_expected == false) 833 return; 834 835 new_stack=hdr.callStack; 836 if(new_stack != null) 837 call_stack=(java.util.Stack )new_stack.clone(); 838 } 839 } 840 841 842 846 private class Request implements Runnable { 847 public final Message req; 848 849 public Request(Message req) { this.req=req; } 850 public void run() { handleRequest(req); } 851 852 public String toString() { 853 StringBuffer sb=new StringBuffer (); 854 if(req != null) 855 sb.append("req=" + req + ", headers=" + req.printObjectHeaders()); 856 return sb.toString(); 857 } 858 } 859 860 } 861 | Popular Tags |