KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > MessageDispatcher


1 // $Id: MessageDispatcher.java,v 1.38 2005/04/25 15:36:04 belaban Exp $
2

3 package org.jgroups.blocks;
4
5 import org.apache.commons.logging.Log;
6 import org.apache.commons.logging.LogFactory;
7 import org.jgroups.*;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.*;
10
11 import java.io.Serializable JavaDoc;
12 import java.util.Vector JavaDoc;
13
14
15 /**
16  * Used on top of channel to implement group requests. Client's <code>handle()</code> method is called when request is
17  * received. Is the equivalent of RpcProtocol on the application instead of protocol level.
18  *
19  * @author Bela Ban
20  */

21 public class MessageDispatcher implements RequestHandler {
22     protected Channel channel=null;
23     protected RequestCorrelator corr=null;
24     protected MessageListener msg_listener=null;
25     protected MembershipListener membership_listener=null;
26     protected RequestHandler req_handler=null;
27     protected ProtocolAdapter prot_adapter=null;
28     protected TransportAdapter transport_adapter=null;
29     protected final Vector JavaDoc members=new Vector JavaDoc();
30     protected Address local_addr=null;
31     protected boolean deadlock_detection=false;
32     protected PullPushAdapter adapter=null;
33     protected Serializable JavaDoc id=null;
34     protected final Log log=LogFactory.getLog(getClass());
35
36
37     /**
38      * Process items on the queue concurrently (RequestCorrelator). The default is to wait until the processing of an
39      * item has completed before fetching the next item from the queue. Note that setting this to true may destroy the
40      * properties of a protocol stack, e.g total or causal order may not be guaranteed. Set this to true only if you
41      * know what you're doing !
42      */

43     protected boolean concurrent_processing=false;
44
45
46     public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2) {
47         this.channel=channel;
48         prot_adapter=new ProtocolAdapter();
49         if(channel != null) {
50             local_addr=channel.getLocalAddress();
51             channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
52         }
53         setMessageListener(l);
54         setMembershipListener(l2);
55         if(channel != null) {
56             channel.setUpHandler(prot_adapter);
57         }
58         start();
59     }
60
61
62     public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, boolean deadlock_detection) {
63         this.channel=channel;
64         this.deadlock_detection=deadlock_detection;
65         prot_adapter=new ProtocolAdapter();
66         if(channel != null) {
67             local_addr=channel.getLocalAddress();
68             channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
69         }
70         setMessageListener(l);
71         setMembershipListener(l2);
72         if(channel != null) {
73             channel.setUpHandler(prot_adapter);
74         }
75         start();
76     }
77
78     public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2,
79                              boolean deadlock_detection, boolean concurrent_processing) {
80         this.channel=channel;
81         this.deadlock_detection=deadlock_detection;
82         this.concurrent_processing=concurrent_processing;
83         prot_adapter=new ProtocolAdapter();
84         if(channel != null) {
85             local_addr=channel.getLocalAddress();
86             channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
87         }
88         setMessageListener(l);
89         setMembershipListener(l2);
90         if(channel != null) {
91             channel.setUpHandler(prot_adapter);
92         }
93         start();
94     }
95
96
97     public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler) {
98         this(channel, l, l2);
99         setRequestHandler(req_handler);
100     }
101
102
103     public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler,
104                              boolean deadlock_detection) {
105         this(channel, l, l2);
106         this.deadlock_detection=deadlock_detection;
107         setRequestHandler(req_handler);
108     }
109
110     public MessageDispatcher(Channel channel, MessageListener l, MembershipListener l2, RequestHandler req_handler,
111                              boolean deadlock_detection, boolean concurrent_processing) {
112         this(channel, l, l2);
113         this.deadlock_detection=deadlock_detection;
114         this.concurrent_processing=concurrent_processing;
115         setRequestHandler(req_handler);
116     }
117
118
119     /*
120      * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
121      * used to register under that id. This is typically used when another building block is already using
122      * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
123      * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
124      * first block created on PullPushAdapter.
125      * @param adapter The PullPushAdapter which to use as underlying transport
126      * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
127      * requests/responses for different building blocks on top of PullPushAdapter.
128      */

129     public MessageDispatcher(PullPushAdapter adapter, Serializable JavaDoc id,
130                              MessageListener l, MembershipListener l2) {
131         this.adapter=adapter;
132         this.id=id;
133         setMembers(((Channel) adapter.getTransport()).getView().getMembers());
134         setMessageListener(l);
135         setMembershipListener(l2);
136         PullPushHandler handler=new PullPushHandler();
137         Transport tp;
138
139         transport_adapter=new TransportAdapter();
140         adapter.addMembershipListener(handler);
141         if(id == null) // no other building block around, let's become the main consumer of this PullPushAdapter
142
{
143             adapter.setListener(handler);
144         }
145         else {
146             adapter.registerListener(id, handler);
147         }
148
149         if((tp=adapter.getTransport()) instanceof Channel) {
150             ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
151             local_addr=((Channel) tp).getLocalAddress();
152         }
153         start();
154     }
155
156
157     /*
158      * Uses a user-provided PullPushAdapter rather than a Channel as transport. If id is non-null, it will be
159      * used to register under that id. This is typically used when another building block is already using
160      * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
161      * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
162      * first block created on PullPushAdapter.
163      * @param adapter The PullPushAdapter which to use as underlying transport
164      * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
165      * requests/responses for different building blocks on top of PullPushAdapter.
166      * @param req_handler The object implementing RequestHandler. It will be called when a request is received
167      */

168     public MessageDispatcher(PullPushAdapter adapter, Serializable JavaDoc id,
169                              MessageListener l, MembershipListener l2,
170                              RequestHandler req_handler) {
171         this.adapter=adapter;
172         this.id=id;
173         setMembers(((Channel) adapter.getTransport()).getView().getMembers());
174         setRequestHandler(req_handler);
175         setMessageListener(l);
176         setMembershipListener(l2);
177         PullPushHandler handler=new PullPushHandler();
178         Transport tp;
179
180         transport_adapter=new TransportAdapter();
181         adapter.addMembershipListener(handler);
182         if(id == null) // no other building block around, let's become the main consumer of this PullPushAdapter
183
{
184             adapter.setListener(handler);
185         }
186         else {
187             adapter.registerListener(id, handler);
188         }
189
190         if((tp=adapter.getTransport()) instanceof Channel) {
191             ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
192             local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774
193
}
194
195         start();
196     }
197
198
199     public MessageDispatcher(PullPushAdapter adapter, Serializable JavaDoc id,
200                              MessageListener l, MembershipListener l2,
201                              RequestHandler req_handler, boolean concurrent_processing) {
202         this.concurrent_processing=concurrent_processing;
203         this.adapter=adapter;
204         this.id=id;
205         setMembers(((Channel) adapter.getTransport()).getView().getMembers());
206         setRequestHandler(req_handler);
207         setMessageListener(l);
208         setMembershipListener(l2);
209         PullPushHandler handler=new PullPushHandler();
210         Transport tp;
211
212         transport_adapter=new TransportAdapter();
213         adapter.addMembershipListener(handler);
214         if(id == null) // no other building block around, let's become the main consumer of this PullPushAdapter
215
{
216             adapter.setListener(handler);
217         }
218         else {
219             adapter.registerListener(id, handler);
220         }
221
222         if((tp=adapter.getTransport()) instanceof Channel) {
223             ((Channel) tp).setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
224             local_addr=((Channel) tp).getLocalAddress(); // fixed bug #800774
225
}
226
227         start();
228     }
229
230
231     /**
232      * If this dispatcher is using a user-provided PullPushAdapter, then need to set the members from the adapter
233      * initially since viewChange has most likely already been called in PullPushAdapter.
234      */

235     private void setMembers(Vector JavaDoc new_mbrs) {
236         if(new_mbrs != null) {
237             members.removeAllElements();
238             members.addAll(new_mbrs);
239         }
240     }
241
242     public void setDeadlockDetection(boolean flag) {
243         deadlock_detection=flag;
244         if(corr != null)
245             corr.setDeadlockDetection(flag);
246     }
247
248     public void setConcurrentProcessing(boolean flag) {
249         this.concurrent_processing=flag;
250     }
251
252
253     public void start() {
254         if(corr == null) {
255             if(transport_adapter != null) {
256                 corr=new RequestCorrelator("MessageDispatcher", transport_adapter,
257                         this, deadlock_detection, local_addr, concurrent_processing);
258             }
259             else {
260                 corr=new RequestCorrelator("MessageDispatcher", prot_adapter,
261                         this, deadlock_detection, local_addr, concurrent_processing);
262             }
263             corr.start();
264         }
265         if(channel != null) {
266             Vector JavaDoc tmp_mbrs=channel.getView() != null ? channel.getView().getMembers() : null;
267             setMembers(tmp_mbrs);
268         }
269         if(null != prot_adapter) { // null if called from the constructor that uses PullPushAdapter
270
prot_adapter.resume();
271         }
272     }
273
274
275     public void stop() {
276         if(null != prot_adapter) {
277             prot_adapter.suspend();
278         }
279         if(corr != null) {
280             corr.stop();
281             corr=null;
282         }
283     }
284
285
286     public void setMessageListener(MessageListener l) {
287         msg_listener=l;
288     }
289
290     /**
291      * Gives access to the currently configured MessageListener. Returns null if there is no
292      * configured MessageListener.
293      */

294     public MessageListener getMessageListener() {
295         return msg_listener;
296     }
297
298     public void setMembershipListener(MembershipListener l) {
299         membership_listener=l;
300     }
301
302     public void setRequestHandler(RequestHandler rh) {
303         req_handler=rh;
304     }
305
306     /**
307      * Offers access to the underlying Channel.
308      * @return a reference to the underlying Channel.
309      */

310     public Channel getChannel() {
311         return channel;
312     }
313
314
315     public void send(Message msg) throws ChannelNotConnectedException, ChannelClosedException {
316         if(channel != null) {
317             channel.send(msg);
318         }
319         else
320             if(adapter != null) {
321                 try {
322                     if(id != null) {
323                         adapter.send(id, msg);
324                     }
325                     else {
326                         adapter.send(msg);
327                     }
328                 }
329                 catch(Throwable JavaDoc ex) {
330                     if(log.isErrorEnabled()) {
331                         log.error("exception=" + Util.print(ex));
332                     }
333                 }
334             }
335             else {
336                 if(log.isErrorEnabled()) {
337                     log.error("channel == null");
338                 }
339             }
340     }
341
342
343     /**
344      * Cast a message to all members, and wait for <code>mode</code> responses. The responses are returned in a response
345      * list, where each response is associated with its sender.<p> Uses <code>GroupRequest</code>.
346      *
347      * @param dests The members to which the message is to be sent. If it is null, then the message is sent to all
348      * members
349      * @param msg The message to be sent to n members
350      * @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for: <ol> <li>GET_FIRST:
351      * return the first response received. <li>GET_ALL: wait for all responses (minus the ones from
352      * suspected members) <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp
353      * size) <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once) <li>GET_N: wait for n
354      * responses (may block if n > group size) <li>GET_NONE: wait for no responses, return immediately
355      * (non-blocking) </ol>
356      * @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses <em>or</em> timeout time.
357      * @return RspList A list of responses. Each response is an <code>Object</code> and associated to its sender.
358      */

359     public RspList castMessage(final Vector JavaDoc dests, Message msg, int mode, long timeout) {
360         GroupRequest _req=null;
361         Vector JavaDoc real_dests;
362         Channel tmp;
363
364         // we need to clone because we don't want to modify the original
365
// (we remove ourselves if LOCAL is false, see below) !
366
real_dests=dests != null ? (Vector JavaDoc) dests.clone() : (members != null ? (Vector JavaDoc) members.clone() : null);
367
368         // if local delivery is off, then we should not wait for the message from the local member.
369
// therefore remove it from the membership
370
tmp=channel;
371         if(tmp == null) {
372             if(adapter != null && adapter.getTransport() instanceof Channel) {
373                 tmp=(Channel) adapter.getTransport();
374             }
375         }
376
377         if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
378             if(local_addr == null) {
379                 local_addr=tmp.getLocalAddress();
380             }
381             if(local_addr != null && real_dests != null) {
382                 real_dests.removeElement(local_addr);
383             }
384         }
385
386         // don't even send the message if the destination list is empty
387
if(log.isTraceEnabled())
388             log.trace("real_dests=" + real_dests);
389
390         if(real_dests == null || real_dests.size() == 0) {
391             if(log.isTraceEnabled())
392                 log.trace("destination list is empty, won't send message");
393             return new RspList(); // return empty response list
394
}
395
396         _req=new GroupRequest(msg, corr, real_dests, mode, timeout, 0);
397         _req.execute();
398
399         return _req.getResults();
400     }
401
402
403     /**
404      * Multicast a message request to all members in <code>dests</code> and receive responses via the RspCollector
405      * interface. When done receiving the required number of responses, the caller has to call done(req_id) on the
406      * underlyinh RequestCorrelator, so that the resources allocated to that request can be freed.
407      *
408      * @param dests The list of members from which to receive responses. Null means all members
409      * @param req_id The ID of the request. Used by the underlying RequestCorrelator to correlate responses with
410      * requests
411      * @param msg The request to be sent
412      * @param coll The sender needs to provide this interface to collect responses. Call will return immediately if
413      * this is null
414      */

415     public void castMessage(final Vector JavaDoc dests, long req_id, Message msg, RspCollector coll) {
416         Vector JavaDoc real_dests;
417         Channel tmp;
418
419         if(msg == null) {
420             if(log.isErrorEnabled())
421                 log.error("request is null");
422             return;
423         }
424
425         if(coll == null) {
426             if(log.isErrorEnabled())
427                 log.error("response collector is null (must be non-null)");
428             return;
429         }
430             
431         // we need to clone because we don't want to modify the original
432
// (we remove ourselves if LOCAL is false, see below) !
433
real_dests=dests != null ? (Vector JavaDoc) dests.clone() : (Vector JavaDoc) members.clone();
434
435         // if local delivery is off, then we should not wait for the message from the local member.
436
// therefore remove it from the membership
437
tmp=channel;
438         if(tmp == null) {
439             if(adapter != null && adapter.getTransport() instanceof Channel) {
440                 tmp=(Channel) adapter.getTransport();
441             }
442         }
443
444         if(tmp != null && tmp.getOpt(Channel.LOCAL).equals(Boolean.FALSE)) {
445             if(local_addr == null) {
446                 local_addr=tmp.getLocalAddress();
447             }
448             if(local_addr != null) {
449                 real_dests.removeElement(local_addr);
450             }
451         }
452         
453         // don't even send the message if the destination list is empty
454
if(real_dests.size() == 0) {
455             if(log.isDebugEnabled())
456                 log.debug("destination list is empty, won't send message");
457             return;
458         }
459
460         corr.sendRequest(req_id, real_dests, msg, coll);
461     }
462
463
464     public void done(long req_id) {
465         corr.done(req_id);
466     }
467
468
469     /**
470      * Sends a message to a single member (destination = msg.dest) and returns the response. The message's destination
471      * must be non-zero !
472      */

473     public Object JavaDoc sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {
474         Vector JavaDoc mbrs=new Vector JavaDoc();
475         RspList rsp_list=null;
476         Object JavaDoc dest=msg.getDest();
477         Rsp rsp;
478         GroupRequest _req=null;
479
480         if(dest == null) {
481             if(log.isErrorEnabled())
482                 log.error("the message's destination is null, " +
483                         "cannot send message");
484             return null;
485         }
486
487         mbrs.addElement(dest); // dummy membership (of destination address)
488

489         _req=new GroupRequest(msg, corr, mbrs, mode, timeout, 0);
490         _req.execute();
491
492         if(mode == GroupRequest.GET_NONE) {
493             return null;
494         }
495
496         rsp_list=_req.getResults();
497
498         if(rsp_list.size() == 0) {
499             if(log.isWarnEnabled())
500                 log.warn(" response list is empty");
501             return null;
502         }
503         if(rsp_list.size() > 1) {
504             if(log.isWarnEnabled())
505                 log.warn("response list contains more that 1 response; returning first response !");
506         }
507         rsp=(Rsp) rsp_list.elementAt(0);
508         if(rsp.wasSuspected()) {
509             throw new SuspectedException(dest);
510         }
511         if(!rsp.wasReceived()) {
512             throw new TimeoutException();
513         }
514         return rsp.getValue();
515     }
516
517
518 // public void channelConnected(Channel channel) {
519
// if(channel != null) {
520
// Address new_local_addr=channel.getLocalAddress();
521
// if(new_local_addr != null) {
522
// this.local_addr=new_local_addr;
523
//
524
// if(log.isInfoEnabled()) log.info("MessageDispatcher.channelConnected()", "new local address is " + this.local_addr);
525
// }
526
// }
527
// }
528
//
529
// public void channelDisconnected(Channel channel) {
530
// }
531
//
532
// public void channelClosed(Channel channel) {
533
// }
534
//
535
// public void channelShunned() {
536
// }
537
//
538
// public void channelReconnected(Address addr) {
539
// if(channel != null) {
540
// Address new_local_addr=channel.getLocalAddress();
541
// if(new_local_addr != null) {
542
// this.local_addr=new_local_addr;
543
//
544
// if(log.isInfoEnabled()) log.info("MessageDispatcher.channelReconnected()", "new local address is " + this.local_addr);
545
// }
546
// }
547
// }
548

549
550     /* ------------------------ RequestHandler Interface ---------------------- */
551     public Object JavaDoc handle(Message msg) {
552         if(req_handler != null) {
553             return req_handler.handle(msg);
554         }
555         else {
556             return null;
557         }
558     }
559     /* -------------------- End of RequestHandler Interface ------------------- */
560
561
562
563
564
565
566     class ProtocolAdapter extends Protocol implements UpHandler {
567         private Thread JavaDoc upProcessingThread=null;
568         private final Queue upQueue=new Queue();
569         private final ReentrantLatch m_upLatch=new ReentrantLatch(false);
570
571
572
573         /* ------------------------- Protocol Interface --------------------------- */
574
575         public String JavaDoc getName() {
576             return "MessageDispatcher";
577         }
578
579         public void startUpHandler() {
580             // do nothing, DON'T REMOVE !!!!
581
}
582
583         public void startDownHandler() {
584             // do nothing, DON'T REMOVE !!!!
585
}
586
587
588         public void stopInternal() {
589             // do nothing, DON'T REMOVE !!!!
590
}
591
592         protected void receiveUpEvent(Event evt) {
593         }
594
595         protected void receiveDownEvent(Event evt) {
596         }
597
598         /**
599          * Called by request correlator when message was not generated by it. We handle it and call the message
600          * listener's corresponding methods
601          */

602         public void passUp(Event evt) {
603             byte[] tmp_state=null;
604             switch(evt.getType()) {
605                 case Event.MSG:
606                     if(msg_listener != null) {
607                         msg_listener.receive((Message) evt.getArg());
608                     }
609                     break;
610
611                 case Event.GET_APPLSTATE: // reply with GET_APPLSTATE_OK
612
if(msg_listener != null) {
613                         try {
614                             tmp_state=msg_listener.getState();
615                         }
616                         catch(Throwable JavaDoc t) {
617                             this.log.error("failed getting state from message listener (" + msg_listener + ')', t);
618                         }
619                     }
620                     channel.returnState(tmp_state);
621                     break;
622
623                 case Event.GET_STATE_OK:
624                     if(msg_listener != null) {
625                         try {
626                             msg_listener.setState((byte[]) evt.getArg());
627                         }
628                         catch(ClassCastException JavaDoc cast_ex) {
629                             if(this.log.isErrorEnabled())
630                                 this.log.error("received SetStateEvent, but argument " +
631                                         evt.getArg() + " is not serializable. Discarding message.");
632                         }
633                     }
634                     break;
635
636                 case Event.VIEW_CHANGE:
637                     View v=(View) evt.getArg();
638                     Vector JavaDoc new_mbrs=v.getMembers();
639
640                     if(new_mbrs != null) {
641                         members.removeAllElements();
642                         members.addAll(new_mbrs);
643                     }
644
645                     if(membership_listener != null) {
646                         membership_listener.viewAccepted(v);
647                     }
648                     break;
649
650                 case Event.SET_LOCAL_ADDRESS:
651                     if(log.isTraceEnabled())
652                         log.trace("setting local_addr (" + local_addr + ") to " + evt.getArg());
653                     local_addr=(Address)evt.getArg();
654                     break;
655
656                 case Event.SUSPECT:
657                     if(membership_listener != null) {
658                         membership_listener.suspect((Address) evt.getArg());
659                     }
660                     break;
661
662                 case Event.BLOCK:
663                     if(membership_listener != null) {
664                         membership_listener.block();
665                     }
666                     break;
667             }
668         }
669
670
671         public void passDown(Event evt) {
672             down(evt);
673         }
674
675
676
677         synchronized void suspend() {
678             m_upLatch.lock();
679             if(upProcessingThread != null) {
680                 Thread JavaDoc t=upProcessingThread;
681                 upProcessingThread=null;
682                 t.interrupt();
683             }
684         }
685
686         synchronized void resume() {
687             m_upLatch.unlock();
688             if(upProcessingThread == null) {
689                 startProcessingThread();
690             }
691         }
692
693
694         private void startProcessingThread() {
695             upProcessingThread=new Thread JavaDoc(new Runnable JavaDoc() {
696                 public void run() {
697                     Event event=null;
698                     // while(upProcessingThread != null) {
699
while(Thread.currentThread() == upProcessingThread) { // changed, see bug 998920
700
try {
701                             event=(Event)upQueue.remove();
702                             m_upLatch.passThrough();
703                             handleUp(event);
704                         }
705                         catch(QueueClosedException ex1) {
706                             break;
707                         }
708                         catch(InterruptedException JavaDoc ex2) {
709                             //this is ok, the 'interrupted' flag is cleared
710
}
711                     }
712                 }
713             });
714             // upProcessingThread.setName("MessageDispatcher thread " + upProcessingThread.hashCode());
715
upProcessingThread.setDaemon(true);
716             upProcessingThread.start();
717         }
718
719
720         /**
721          * Called by channel (we registered before) when event is received. This is the UpHandler interface.
722          */

723         public void up(Event evt) {
724             try {
725                 upQueue.add(evt);
726             }
727             catch(QueueClosedException ex) {
728                 // this is ok
729
}
730         }
731
732         private void handleUp(Event evt) {
733             if(corr != null) {
734                 corr.receive(evt);
735             }
736             else {
737                 if(this.log.isErrorEnabled()) { //Something is seriously wrong, correlator should not be null since latch is not locked!
738
this.log.error("correlator is null, but latch is not locked! Event ignored.");
739                 }
740             }
741         }
742
743         public void down(Event evt) {
744             if(channel != null) {
745                 channel.down(evt);
746             }
747             else
748                 if(this.log.isErrorEnabled()) {
749                     this.log.error("channel == null");
750                 }
751         }
752         /* ----------------------- End of Protocol Interface ------------------------ */
753
754     }
755
756
757     class TransportAdapter implements Transport {
758
759         public void send(Message msg) throws Exception JavaDoc {
760             if(channel != null) {
761                 channel.send(msg);
762             }
763             else
764                 if(adapter != null) {
765                     try {
766                         if(id != null) {
767                             adapter.send(id, msg);
768                         }
769                         else {
770                             adapter.send(msg);
771                         }
772                     }
773                     catch(Throwable JavaDoc ex) {
774                         if(log.isErrorEnabled()) {
775                             log.error("exception=" + Util.print(ex));
776                         }
777                     }
778                 }
779                 else {
780                     if(log.isErrorEnabled()) {
781                         log.error("channel == null");
782                     }
783                 }
784         }
785
786         public Object JavaDoc receive(long timeout) throws Exception JavaDoc {
787             // @todo: implement
788
return null;
789         }
790     }
791
792
793     class PullPushHandler implements MessageListener, MembershipListener {
794
795
796         /* ------------------------- MessageListener interface ---------------------- */
797         public void receive(Message msg) {
798             boolean pass_up=true;
799             if(corr != null) {
800                 pass_up=corr.receiveMessage(msg);
801             }
802
803             if(pass_up) { // pass on to MessageListener
804
if(msg_listener != null) {
805                     msg_listener.receive(msg);
806                 }
807             }
808         }
809
810         public byte[] getState() {
811             return msg_listener != null ? msg_listener.getState() : null;
812         }
813
814         public void setState(byte[] state) {
815             if(msg_listener != null) {
816                 msg_listener.setState(state);
817             }
818         }
819         /* --------------------- End of MessageListener interface ------------------- */
820
821
822         /* ------------------------ MembershipListener interface -------------------- */
823         public void viewAccepted(View v) {
824             if(corr != null) {
825                 corr.receiveView(v);
826             }
827
828             Vector JavaDoc new_mbrs=v.getMembers();
829             if(new_mbrs != null) {
830                 members.removeAllElements();
831                 for(int i=0; i < new_mbrs.size(); i++) {
832                     members.addElement(new_mbrs.elementAt(i));
833                 }
834             }
835
836             if(membership_listener != null) {
837                 membership_listener.viewAccepted(v);
838             }
839         }
840
841         public void suspect(Address suspected_mbr) {
842             if(corr != null) {
843                 corr.receiveSuspect(suspected_mbr);
844             }
845             if(membership_listener != null) {
846                 membership_listener.suspect(suspected_mbr);
847             }
848         }
849
850         public void block() {
851             if(membership_listener != null) {
852                 membership_listener.block();
853             }
854         }
855         /* --------------------- End of MembershipListener interface ---------------- */
856
857
858
859         // @todo: receive SET_LOCAL_ADDR event and call corr.setLocalAddress(addr)
860

861     }
862
863
864 }
865
Popular Tags