KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > RequestCorrelator


1 // $Id: RequestCorrelator.java,v 1.18 2005/04/25 15:14:18 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.*;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.Scheduler;
10 import org.jgroups.util.SchedulerListener;
11 import org.jgroups.util.Util;
12
13 import java.io.*;
14 import java.util.ArrayList JavaDoc;
15 import java.util.HashMap JavaDoc;
16 import java.util.Iterator JavaDoc;
17 import java.util.List JavaDoc;
18
19
20
21
22 /**
23  * Framework to send requests and receive matching responses (matching on
24  * request ID).
25  * Multiple requests can be sent at a time. Whenever a response is received,
26  * the correct <code>RspCollector</code> is looked up (key = id) and its
27  * method <code>receiveResponse()</code> invoked. A caller may use
28  * <code>done()</code> to signal that no more responses are expected, and that
29  * the corresponding entry may be removed.
30  * <p>
31  * <code>RequestCorrelator</code> can be installed at both client and server
32  * sides, it can also switch roles dynamically, i.e. send a request and at
33  * the same time process an incoming request (when local delivery is enabled,
34  * this is actually the default).
35  * <p>
36  *
37  * @author Bela Ban
38  */

39 public class RequestCorrelator {
40
41     /** The protocol layer to use to pass up/down messages. Can be either a Protocol or a Transport */
42     protected Object JavaDoc transport=null;
43
44     /** The table of pending requests (keys=Long (request IDs), values=<tt>RequestEntry</tt>) */
45     protected final HashMap JavaDoc requests=new HashMap JavaDoc();
46
47     /** The handler for the incoming requests. It is called from inside the
48      * dispatcher thread */

49     protected RequestHandler request_handler=null;
50
51     /** makes the instance unique (together with IDs) */
52     protected String JavaDoc name=null;
53
54     /** The dispatching thread pool */
55     protected Scheduler scheduler=null;
56
57
58     /** The address of this group member */
59     protected Address local_addr=null;
60
61     /**
62      * This field is used only if deadlock detection is enabled.
63      * In case of nested synchronous requests, it holds a list of the
64      * addreses of the senders with the address at the bottom being the
65      * address of the first caller
66      */

67     protected java.util.Stack JavaDoc call_stack=null;
68
69     /** Whether or not to perform deadlock detection for synchronous (potentially recursive) group method invocations.
70      * If on, we use a scheduler (handling a priority queue), otherwise we don't and call handleRequest() directly.
71      */

72     protected boolean deadlock_detection=false;
73
74     /**
75      * This field is used only if deadlock detection is enabled.
76      * It sets the calling stack to the currently running request
77      */

78     protected CallStackSetter call_stack_setter=null;
79
80     /** Process items on the queue concurrently (Scheduler). The default is to wait until the processing of an item
81      * has completed before fetching the next item from the queue. Note that setting this to true
82      * may destroy the properties of a protocol stack, e.g total or causal order may not be
83      * guaranteed. Set this to true only if you know what you're doing ! */

84     protected boolean concurrent_processing=false;
85
86
87     protected boolean started=false;
88
89     protected static final Log log=LogFactory.getLog(RequestCorrelator.class);
90
91
92     /**
93      * Constructor. Uses transport to send messages. If <code>handler</code>
94      * is not null, all incoming requests will be dispatched to it (via
95      * <code>handle(Message)</code>).
96      *
97      * @param name Used to differentiate between different RequestCorrelators
98      * (e.g. in different protocol layers). Has to be unique if multiple
99      * request correlators are used.
100      *
101      * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
102      * used then), or a Protocol (passUp()/passDown() will be used)
103      *
104      * @param handler Request handler. Method <code>handle(Message)</code>
105      * will be called when a request is received.
106      */

107     public RequestCorrelator(String JavaDoc name, Object JavaDoc transport, RequestHandler handler) {
108         this.name = name;
109         this.transport = transport;
110         request_handler = handler;
111         start();
112     }
113
114
115     public RequestCorrelator(String JavaDoc name, Object JavaDoc transport, RequestHandler handler, Address local_addr) {
116         this.name = name;
117         this.transport = transport;
118         this.local_addr=local_addr;
119         request_handler = handler;
120         start();
121     }
122
123
124     /**
125      * Constructor. Uses transport to send messages. If <code>handler</code>
126      * is not null, all incoming requests will be dispatched to it (via
127      * <code>handle(Message)</code>).
128      *
129      * @param name Used to differentiate between different RequestCorrelators
130      * (e.g. in different protocol layers). Has to be unique if multiple
131      * request correlators are used.
132      *
133      * @param transport Used to send/pass up requests. Can be either a Transport (only send() will be
134      * used then), or a Protocol (passUp()/passDown() will be used)
135      *
136      * @param handler Request handler. Method <code>handle(Message)</code>
137      * will be called when a request is received.
138      *
139      * @param deadlock_detection When enabled (true) recursive synchronous
140      * message calls will be detected and processed with higher priority in
141      * order to solve deadlocks. Slows down processing a little bit when
142      * enabled due to runtime checks involved.
143      */

144     public RequestCorrelator(String JavaDoc name, Object JavaDoc transport,
145                              RequestHandler handler, boolean deadlock_detection) {
146         this.deadlock_detection = deadlock_detection;
147         this.name = name;
148         this.transport = transport;
149         request_handler = handler;
150         start();
151     }
152
153
154     public RequestCorrelator(String JavaDoc name, Object JavaDoc transport,
155                              RequestHandler handler, boolean deadlock_detection, boolean concurrent_processing) {
156         this.deadlock_detection = deadlock_detection;
157         this.name = name;
158         this.transport = transport;
159         request_handler = handler;
160         this.concurrent_processing = concurrent_processing;
161         start();
162     }
163
164     public RequestCorrelator(String JavaDoc name, Object JavaDoc transport,
165                              RequestHandler handler, boolean deadlock_detection, Address local_addr) {
166         this.deadlock_detection = deadlock_detection;
167         this.name = name;
168         this.transport = transport;
169         this.local_addr = local_addr;
170         request_handler = handler;
171         start();
172     }
173
174     public RequestCorrelator(String JavaDoc name, Object JavaDoc transport, RequestHandler handler,
175                              boolean deadlock_detection, Address local_addr, boolean concurrent_processing) {
176         this.deadlock_detection = deadlock_detection;
177         this.name = name;
178         this.transport = transport;
179         this.local_addr = local_addr;
180         request_handler = handler;
181         this.concurrent_processing = concurrent_processing;
182         start();
183     }
184
185
186
187
188     /**
189      * Switch the deadlock detection mechanism on/off
190      * @param flag the deadlock detection flag
191      */

192     public void setDeadlockDetection(boolean flag) {
193         if(deadlock_detection != flag) { // only set it if different
194
deadlock_detection=flag;
195             if(started) {
196                 if(deadlock_detection) {
197                     startScheduler();
198                 }
199                 else {
200                     stopScheduler();
201                 }
202             }
203         }
204     }
205
206
207     public void setRequestHandler(RequestHandler handler) {
208         request_handler=handler;
209         start();
210     }
211
212
213     public void setConcurrentProcessing(boolean concurrent_processing) {
214         this.concurrent_processing=concurrent_processing;
215     }
216
217
218     /**
219      * Helper method for {@link #sendRequest(long,List,Message,RspCollector)}.
220      */

221     public void sendRequest(long id, Message msg, RspCollector coll) {
222         sendRequest(id, null, msg, coll);
223     }
224
225
226     /**
227      * Send a request to a group. If no response collector is given, no
228      * responses are expected (making the call asynchronous).
229      *
230      * @param id The request ID. Must be unique for this JVM (e.g. current
231      * time in millisecs)
232      * @param dest_mbrs The list of members who should receive the call. Usually a group RPC
233      * is sent via multicast, but a receiver drops the request if its own address
234      * is not in this list. Will not be used if it is null.
235      * @param msg The request to be sent. The body of the message carries
236      * the request data
237      *
238      * @param coll A response collector (usually the object that invokes
239      * this method). Its methods <code>receiveResponse()</code> and
240      * <code>suspect()</code> will be invoked when a message has been received
241      * or a member is suspected, respectively.
242      */

243     public void sendRequest(long id, List JavaDoc dest_mbrs, Message msg, RspCollector coll) {
244         Header hdr;
245
246         if(transport == null) {
247             if(log.isWarnEnabled()) log.warn("transport is not available !");
248             return;
249         }
250
251         // i. Create the request correlator header and add it to the
252
// msg
253
// ii. If a reply is expected (sync call / 'coll != null'), add a
254
// coresponding entry in the pending requests table
255
// iii. If deadlock detection is enabled, set/update the call stack
256
// iv. Pass the msg down to the protocol layer below
257
hdr = new Header(Header.REQ, id, (coll!=null? true:false), name);
258         hdr.dest_mbrs=dest_mbrs;
259
260         if (coll != null) {
261             if(deadlock_detection) {
262                 if(local_addr == null) {
263                     if(log.isErrorEnabled()) log.error("local address is null !");
264                     return;
265                 }
266                 java.util.Stack JavaDoc new_call_stack = (call_stack != null?
267                                                   (java.util.Stack JavaDoc)call_stack.clone():new java.util.Stack JavaDoc());
268                 new_call_stack.push(local_addr);
269                 hdr.callStack=new_call_stack;
270             }
271             addEntry(hdr.id, new RequestEntry(coll));
272         }
273         msg.putHeader(name, hdr);
274
275         try {
276             if(transport instanceof Protocol)
277                 ((Protocol)transport).passDown(new Event(Event.MSG, msg));
278             else if(transport instanceof Transport)
279                 ((Transport)transport).send(msg);
280             else
281                 if(log.isErrorEnabled()) log.error("transport object has to be either a " +
282                             "Transport or a Protocol, however it is a " + transport.getClass());
283         }
284         catch(Throwable JavaDoc e) {
285             if(log.isWarnEnabled()) log.warn(e.toString());
286         }
287     }
288
289
290
291
292
293     /**
294      * Used to signal that a certain request may be garbage collected as
295      * all responses have been received.
296      */

297     public void done(long id) {
298         removeEntry(id);
299     }
300
301
302     /**
303      * <b>Callback</b>.
304      * <p>
305      * Called by the protocol below when a message has been received. The
306      * algorithm should test whether the message is destined for us and,
307      * if not, pass it up to the next layer. Otherwise, it should remove
308      * the header and check whether the message is a request or response.
309      * In the first case, the message will be delivered to the request
310      * handler registered (calling its <code>handle()</code> method), in the
311      * second case, the corresponding response collector is looked up and
312      * the message delivered.
313      */

314     public void receive(Event evt) {
315         switch(evt.getType()) {
316         case Event.SUSPECT: // don't wait for responses from faulty members
317
receiveSuspect((Address)evt.getArg());
318             break;
319         case Event.VIEW_CHANGE: // adjust number of responses to wait for
320
receiveView((View)evt.getArg());
321             break;
322         case Event.SET_LOCAL_ADDRESS:
323             setLocalAddress((Address)evt.getArg());
324             break;
325         case Event.MSG:
326             if(!receiveMessage((Message)evt.getArg()))
327                 return;
328             break;
329         }
330         if(transport instanceof Protocol)
331             ((Protocol)transport).passUp(evt);
332         else
333             if(log.isErrorEnabled()) log.error("we do not pass up messages via Transport");
334     }
335
336
337     /**
338      */

339     public void start() {
340         if(deadlock_detection) {
341             startScheduler();
342         }
343         started=true;
344     }
345
346
347     void startScheduler() {
348         if(scheduler == null) {
349             scheduler=new Scheduler();
350             if(deadlock_detection && call_stack_setter == null) {
351                 call_stack_setter=new CallStackSetter();
352                 scheduler.setListener(call_stack_setter);
353             }
354             if(concurrent_processing)
355                 scheduler.setConcurrentProcessing(concurrent_processing);
356             scheduler.start();
357         }
358     }
359
360
361     /**
362      */

363     public void stop() {
364         stopScheduler();
365         started=false;
366     }
367
368     void stopScheduler() {
369         if(scheduler != null) {
370             scheduler.stop();
371             scheduler=null;
372         }
373     }
374
375
376     // .......................................................................
377

378
379
380     /**
381      * <tt>Event.SUSPECT</tt> event received from a layer below
382      * <p>
383      * All response collectors currently registered will
384      * be notified that <code>mbr</code> may have crashed, so they won't
385      * wait for its response.
386      */

387     public void receiveSuspect(Address mbr) {
388         RequestEntry entry;
389         ArrayList JavaDoc copy;
390
391         if(mbr == null) return;
392         if(log.isDebugEnabled()) log.debug("suspect=" + mbr);
393
394         // copy so we don't run into bug #761804 - Bela June 27 2003
395
synchronized(requests) {
396             copy=new ArrayList JavaDoc(requests.values());
397         }
398         for(Iterator JavaDoc it=copy.iterator(); it.hasNext();) {
399             entry=(RequestEntry)it.next();
400             if(entry.coll != null)
401                 entry.coll.suspect(mbr);
402         }
403     }
404
405
406     /**
407      * <tt>Event.VIEW_CHANGE</tt> event received from a layer below
408      * <p>
409      * Mark all responses from members that are not in new_view as
410      * NOT_RECEIVED.
411      *
412      */

413     public void receiveView(View new_view) {
414         RequestEntry entry;
415         ArrayList JavaDoc copy;
416
417         // copy so we don't run into bug #761804 - Bela June 27 2003
418
synchronized(requests) {
419             copy=new ArrayList JavaDoc(requests.values());
420         }
421         for(Iterator JavaDoc it=copy.iterator(); it.hasNext();) {
422             entry=(RequestEntry)it.next();
423             if(entry.coll != null)
424                 entry.coll.viewChange(new_view);
425         }
426     }
427
428
429     /**
430      * Handles a message coming from a layer below
431      *
432      * @return true if the event should be forwarded further up, otherwise false (message was consumed)
433      */

434     public boolean receiveMessage(Message msg) {
435         Object JavaDoc tmpHdr;
436         Header hdr;
437         RspCollector coll;
438         java.util.Stack JavaDoc stack;
439         java.util.List JavaDoc dests;
440
441         // i. If header is not an instance of request correlator header, ignore
442
//
443
// ii. Check whether the message was sent by a request correlator with
444
// the same name (there may be multiple request correlators in the same
445
// protocol stack...)
446
tmpHdr=msg.getHeader(name);
447         if(tmpHdr == null || !(tmpHdr instanceof Header)) {
448             return (true);
449         }
450
451         hdr=(Header)tmpHdr;
452         if(hdr.corrName == null || !hdr.corrName.equals(name)) {
453             if(log.isTraceEnabled()) {
454                 log.trace("name of request correlator header (" +
455                           hdr.corrName + ") is different from ours (" + name + "). Msg not accepted, passed up");
456             }
457             return (true);
458         }
459
460         // If the header contains a destination list, and we are not part of it, then we discard the
461
// request (was addressed to other members)
462
dests=hdr.dest_mbrs;
463         if(dests != null && local_addr != null && !dests.contains(local_addr)) {
464             if(log.isTraceEnabled()) {
465                 log.trace("discarded request from " + msg.getSrc() +
466                           " as we are not part of destination list (local_addr=" + local_addr + ", hdr=" + hdr + ')');
467             }
468             return false;
469         }
470
471         if(log.isTraceEnabled()) {
472             log.trace("from " + msg.getSrc() + ", header is " + hdr);
473         }
474
475         // [Header.REQ]:
476
// i. If there is no request handler, discard
477
// ii. Check whether priority: if synchronous and call stack contains
478
// address that equals local address -> add priority request. Else
479
// add normal request.
480
//
481
// [Header.RSP]:
482
// Remove the msg request correlator header and notify the associated
483
// <tt>RspCollector</tt> that a reply has been received
484
switch(hdr.type) {
485             case Header.REQ:
486                 if(request_handler == null) {
487                     if(log.isWarnEnabled()) {
488                         log.warn("there is no request handler installed to deliver request !");
489                     }
490                     return (false);
491                 }
492
493                 if(deadlock_detection) {
494                     if(scheduler == null) {
495                         log.error("deadlock_detection is true, but scheduler is null: this is not supposed to happen" +
496                                   " (discarding request)");
497                         break;
498                     }
499
500                     Request req=new Request(msg);
501                     stack=hdr.callStack;
502                     if(hdr.rsp_expected && stack != null && local_addr != null) {
503                         if(stack.contains(local_addr)) {
504                             if(log.isTraceEnabled())
505                                 log.trace("call stack=" + hdr.callStack + " contains " + local_addr +
506                                           ": adding request to priority queue");
507                             scheduler.addPrio(req);
508                             break;
509                         }
510                     }
511                     scheduler.add(req);
512                     break;
513                 }
514
515                 handleRequest(msg);
516                 break;
517
518             case Header.RSP:
519                 msg.removeHeader(name);
520                 coll=findEntry(hdr.id);
521                 if(coll != null) {
522                     coll.receiveResponse(msg);
523                 }
524                 break;
525
526             default:
527                 msg.removeHeader(name);
528                 if(log.isErrorEnabled()) log.error("header's type is neither REQ nor RSP !");
529                 break;
530         }
531
532         return (false);
533     }
534
535     public Address getLocalAddress() {
536         return local_addr;
537     }
538
539     public void setLocalAddress(Address local_addr) {
540         this.local_addr=local_addr;
541     }
542
543
544     // .......................................................................
545

546     /**
547      * Add an association of:<br>
548      * ID -> <tt>RspCollector</tt>
549      */

550     private void addEntry(long id, RequestEntry entry) {
551         Long JavaDoc id_obj = new Long JavaDoc(id);
552         synchronized(requests) {
553             if(!requests.containsKey(id_obj))
554                 requests.put(id_obj, entry);
555             else
556                 if(log.isWarnEnabled()) log.warn("entry " + entry + " for request-id=" + id + " already present !");
557         }
558     }
559
560
561     /**
562      * Remove the request entry associated with the given ID
563      *
564      * @param id the id of the <tt>RequestEntry</tt> to remove
565      */

566     private void removeEntry(long id) {
567         Long JavaDoc id_obj = new Long JavaDoc(id);
568
569         // changed by bela Feb 28 2003 (bug fix for 690606)
570
// changed back to use synchronization by bela June 27 2003 (bug fix for #761804),
571
// we can do this because we now copy for iteration (viewChange() and suspect())
572
synchronized(requests) {
573             requests.remove(id_obj);
574         }
575     }
576
577
578     /**
579      * @param id the ID of the corresponding <tt>RspCollector</tt>
580      *
581      * @return the <tt>RspCollector</tt> associated with the given ID
582      */

583     private RspCollector findEntry(long id) {
584         Long JavaDoc id_obj = new Long JavaDoc(id);
585         RequestEntry entry;
586
587         synchronized(requests) {
588             entry=(RequestEntry)requests.get(id_obj);
589         }
590         return((entry != null)? entry.coll:null);
591     }
592
593
594     /**
595      * Handle a request msg for this correlator
596      *
597      * @param req the request msg
598      */

599     private void handleRequest(Message req) {
600         Object JavaDoc retval;
601         byte[] rsp_buf=null;
602         Header hdr, rsp_hdr;
603         Message rsp;
604
605         // i. Remove the request correlator header from the msg and pass it to
606
// the registered handler
607
//
608
// ii. If a reply is expected, pack the return value from the request
609
// handler to a reply msg and send it back. The reply msg has the same
610
// ID as the request and the name of the sender request correlator
611
hdr = (Header)req.removeHeader(name);
612
613         if(log.isTraceEnabled())
614             log.trace("calling (" + (request_handler != null? request_handler.getClass().getName() : "null") +
615                     ") with request " + hdr.id);
616
617         try {
618             retval = request_handler.handle(req);
619         }
620         catch(Throwable JavaDoc t) {
621             if(log.isErrorEnabled()) log.error("error invoking method", t);
622             retval=t;
623         }
624
625         if(!hdr.rsp_expected) // asynchronous call, we don't need to send a response; terminate call here
626
return;
627
628         if (transport == null) {
629             if(log.isErrorEnabled()) log.error("failure sending response; no transport available");
630             return;
631         }
632
633         // changed (bela Feb 20 2004): catch exception and return exception
634
try {
635             rsp_buf=Util.objectToByteBuffer(retval); // retval could be an exception, or a real value
636
}
637         catch(Throwable JavaDoc t) {
638             try {
639                 rsp_buf=Util.objectToByteBuffer(t); // this call should succeed (all exceptions are serializable)
640
}
641             catch(Throwable JavaDoc tt) {
642                 if(log.isErrorEnabled()) log.error("failed sending response: " +
643                         "return value (" + retval + ") is not serializable");
644                 return;
645             }
646         }
647
648         rsp=req.makeReply();
649         if(rsp_buf != null)
650             rsp.setBuffer(rsp_buf);
651         rsp_hdr=new Header(Header.RSP, hdr.id, false, name);
652         rsp.putHeader(name, rsp_hdr);
653         if(log.isTraceEnabled()) log.trace("sending rsp for " +
654                 rsp_hdr.id + " to " + rsp.getDest());
655
656         try {
657             if(transport instanceof Protocol)
658                 ((Protocol)transport).passDown(new Event(Event.MSG, rsp));
659             else if(transport instanceof Transport)
660                 ((Transport)transport).send(rsp);
661             else
662                 if(log.isErrorEnabled()) log.error("transport object has to be either a " +
663                             "Transport or a Protocol, however it is a " + transport.getClass());
664         }
665         catch(Throwable JavaDoc e) {
666             if(log.isErrorEnabled()) log.error(throwableToString(e));
667         }
668     }
669
670
671     /**
672      * Convert exception stack trace to string
673      */

674     private String JavaDoc throwableToString(Throwable JavaDoc ex) {
675         StringWriter sw = new StringWriter();
676         PrintWriter pw = new PrintWriter(sw);
677         ex.printStackTrace(pw);
678         return(sw.toString());
679     }
680
681
682     // .......................................................................
683

684
685
686
687
688     /**
689      * Associates an ID with an <tt>RspCollector</tt>
690      */

691     private static class RequestEntry {
692         public RspCollector coll = null;
693
694         public RequestEntry(RspCollector coll) {
695             this.coll = coll;
696         }
697     }
698
699
700
701     /**
702      * The header for <tt>RequestCorrelator</tt> messages
703      */

704     public static class Header extends org.jgroups.Header {
705         public static final int REQ = 0;
706         public static final int RSP = 1;
707
708         /** Type of header: request or reply */
709         public int type=REQ;
710         /**
711          * The id of this request to distinguish among other requests from
712          * the same <tt>RequestCorrelator</tt>
713          */

714         public long id=0;
715         /** msg is synchronous if true */
716         public boolean rsp_expected=true;
717         /** The unique name of the associated <tt>RequestCorrelator</tt> */
718         public String JavaDoc corrName=null;
719
720         /** Contains senders (e.g. P --> Q --> R) */
721         public java.util.Stack JavaDoc callStack=null;
722
723         /** Contains a list of members who should receive the request (others will drop). Ignored if null */
724         public java.util.List JavaDoc dest_mbrs=null;
725
726
727         /**
728          * Used for externalization
729          */

730         public Header() {}
731
732         /**
733          * @param type type of header (<tt>REQ</tt>/<tt>RSP</tt>)
734          * @param id id of this header relative to ids of other requests
735          * originating from the same correlator
736          * @param rsp_expected whether it's a sync or async request
737          * @param name the name of the <tt>RequestCorrelator</tt> from which
738          * this header originates
739          */

740         public Header(int type, long id, boolean rsp_expected, String JavaDoc name) {
741             this.type = type;
742             this.id = id;
743             this.rsp_expected = rsp_expected;
744             this.corrName = name;
745         }
746
747         /**
748          */

749         public String JavaDoc toString() {
750             StringBuffer JavaDoc ret=new StringBuffer JavaDoc();
751             ret.append("[Header: name=" + corrName + ", type=");
752             ret.append(type == REQ ? "REQ" : type == RSP ? "RSP" : "<unknown>");
753             ret.append(", id=" + id);
754             ret.append(", rsp_expected=" + rsp_expected + ']');
755             if(callStack != null)
756                 ret.append(", call stack=" + callStack);
757             if(dest_mbrs != null)
758                 ret.append(", dest_mbrs=").append(dest_mbrs);
759             return ret.toString();
760         }
761
762
763         /**
764          * Write out the header to the given stream
765          */

766         public void writeExternal(ObjectOutput out) throws IOException {
767             out.writeInt(type);
768             out.writeLong(id);
769             out.writeBoolean(rsp_expected);
770             if(corrName != null) {
771                 out.writeBoolean(true);
772                 out.writeUTF(corrName);
773             }
774             else {
775                 out.writeBoolean(false);
776             }
777             out.writeObject(callStack);
778             out.writeObject(dest_mbrs);
779         }
780
781
782         /**
783          * Read the header from the given stream
784          */

785         public void readExternal(ObjectInput in)
786             throws IOException, ClassNotFoundException JavaDoc {
787             type = in.readInt();
788             id = in.readLong();
789             rsp_expected = in.readBoolean();
790             if(in.readBoolean())
791                 corrName = in.readUTF();
792             callStack = (java.util.Stack JavaDoc)in.readObject();
793             dest_mbrs=(java.util.List JavaDoc)in.readObject();
794         }
795     }
796
797
798
799
800     /**
801      * Listens for scheduler events and sets the current call chain (stack)
802      * whenever a thread is started, or a suspended thread resumed. Does
803      * this only for synchronous requests (<code>Runnable</code> is actually
804      * a <code>Request</code>).
805      */

806     private class CallStackSetter implements SchedulerListener {
807         public void started(Runnable JavaDoc r) { setCallStack(r); }
808         public void stopped(Runnable JavaDoc r) { setCallStack(null); }
809         public void suspended(Runnable JavaDoc r) { setCallStack(null); }
810         public void resumed(Runnable JavaDoc r) { setCallStack(r); }
811
812         void setCallStack(Runnable JavaDoc r) {
813             java.util.Stack JavaDoc new_stack;
814             Message req;
815             Header hdr;
816             Object JavaDoc obj;
817
818             if(r == null) {
819                 call_stack=null;
820                 return;
821             }
822
823             req=((Request)r).req;
824             if(req == null)
825                 return;
826
827             obj=req.getHeader(name);
828             if(obj == null || !(obj instanceof Header))
829                 return;
830
831             hdr=(Header)obj;
832             if(hdr.rsp_expected == false)
833                 return;
834
835             new_stack=hdr.callStack;
836             if(new_stack != null)
837                 call_stack=(java.util.Stack JavaDoc)new_stack.clone();
838         }
839     }
840
841
842     /**
843      * The runnable for an incoming request which is submitted to the
844      * dispatcher
845      */

846     private class Request implements Runnable JavaDoc {
847         public final Message req;
848
849         public Request(Message req) { this.req=req; }
850         public void run() { handleRequest(req); }
851
852         public String JavaDoc toString() {
853             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
854             if(req != null)
855                 sb.append("req=" + req + ", headers=" + req.printObjectHeaders());
856             return sb.toString();
857         }
858     }
859
860 }
861
Popular Tags