KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > MessageProtocol


1 // $Id: MessageProtocol.java,v 1.4 2004/09/23 16:29:53 belaban Exp $
2

3 package org.jgroups.stack;
4
5
6 import org.jgroups.*;
7 import org.jgroups.blocks.GroupRequest;
8 import org.jgroups.blocks.RequestCorrelator;
9 import org.jgroups.blocks.RequestHandler;
10 import org.jgroups.util.Rsp;
11 import org.jgroups.util.RspList;
12 import org.jgroups.util.Util;
13
14 import java.util.Vector JavaDoc;
15
16
17
18
19 /**
20  * Based on Protocol, but incorporates RequestCorrelator and GroupRequest: the latter can
21  * be used to mcast messages to all members and receive their reponses.<p>
22  * A protocol based on this template can send messages to all members and receive all, a single,
23  * n, or none responses. Requests directed towards the protocol can be handled by overriding
24  * method <code>Handle</code>.<p>
25  * Requests and responses are in the form of <code>Message</code>s, which would typically need to
26  * contain information pertaining to the request/response, e.g. in the form of objects contained
27  * in the message. To use remote method calls, use <code>RpcProtocol</code> instead.<p>
28  * Typical use of of a <code>MessageProtocol</code> would be when a protocol needs to interact with
29  * its peer protocols at each of the members' protocol stacks. A simple protocol like fragmentation,
30  * which does not need to interact with other instances of fragmentation, may simply subclass
31  * <code>Protocol</code> instead.
32  * @author Bela Ban
33  */

34 public abstract class MessageProtocol extends Protocol implements RequestHandler {
35     protected RequestCorrelator _corr=null;
36     protected final Vector JavaDoc members=new Vector JavaDoc();
37
38
39     public void start() throws Exception JavaDoc {
40         if(_corr == null)
41             _corr=new RequestCorrelator(getName(), this, this);
42     }
43
44     public void stop() {
45         if(_corr != null) {
46             _corr.stop();
47             _corr=null;
48         }
49     }
50
51
52     /**
53      Cast a message to all members, and wait for <code>mode</code> responses. The responses are
54      returned in a response list, where each response is associated with its sender.<p>
55      Uses <code>GroupRequest</code>.
56      @param dests The members from which responses are expected. If it is null, replies from all members
57      are expected. The request itself is multicast to all members.
58      @param msg The message to be sent to n members
59      @param mode Defined in <code>GroupRequest</code>. The number of responses to wait for:
60      <ol>
61      <li>GET_FIRST: return the first response received.
62      <li>GET_ALL: wait for all responses (minus the ones from suspected members)
63      <li>GET_MAJORITY: wait for a majority of all responses (relative to the grp size)
64      <li>GET_ABS_MAJORITY: wait for majority (absolute, computed once)
65      <li>GET_N: wait for n responses (may block if n > group size)
66      <li>GET_NONE: wait for no responses, return immediately (non-blocking)
67      </ol>
68      @param timeout If 0: wait forever. Otherwise, wait for <code>mode</code> responses
69      <em>or</em> timeout time.
70      @return RspList A list of responses. Each response is an <code>Object</code> and associated
71      to its sender.
72      */

73     public RspList castMessage(Vector JavaDoc dests, Message msg, int mode, long timeout) {
74         GroupRequest _req=null;
75         Vector JavaDoc real_dests=dests != null? (Vector JavaDoc)dests.clone() : (Vector JavaDoc)members.clone();
76
77         // This marks message as sent by us ! (used in up()
78
// msg.addHeader(new MsgProtHeader(getName())); ++ already done by RequestCorrelator
79

80         _req=new GroupRequest(msg, _corr, real_dests, mode, timeout, 0);
81         _req.execute();
82
83         return _req.getResults();
84     }
85
86
87     /**
88      Sends a message to a single member (destination = msg.dest) and returns the response.
89      The message's destination must be non-zero !
90      */

91     public Object JavaDoc sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {
92         Vector JavaDoc mbrs=new Vector JavaDoc();
93         RspList rsp_list=null;
94         Object JavaDoc dest=msg.getDest();
95         Rsp rsp;
96         GroupRequest _req=null;
97
98         if(dest == null) {
99             System.out.println("MessageProtocol.sendMessage(): the message's destination is null ! " +
100                                "Cannot send message !");
101             return null;
102         }
103
104
105         mbrs.addElement(dest); // dummy membership (of destination address)
106

107
108         _req=new GroupRequest(msg, _corr, mbrs, mode, timeout, 0);
109         _req.execute();
110
111         if(mode == GroupRequest.GET_NONE)
112             return null;
113
114
115         rsp_list=_req.getResults();
116
117         if(rsp_list.size() == 0) {
118             if(log.isErrorEnabled()) log.error("response list is empty");
119             return null;
120         }
121         if(rsp_list.size() > 1)
122             if(log.isErrorEnabled()) log.error("response list contains " +
123                                                          "more that 1 response; returning first response");
124         rsp=(Rsp)rsp_list.elementAt(0);
125         if(rsp.wasSuspected())
126             throw new SuspectedException(dest);
127         if(!rsp.wasReceived())
128             throw new TimeoutException();
129         return rsp.getValue();
130     }
131
132
133     /**
134      Processes a request destined for this layer. The return value is sent as response.
135      */

136     public Object JavaDoc handle(Message req) {
137         System.out.println("MessageProtocol.handle(): this method should be overridden !");
138         return null;
139     }
140
141
142     /**
143      * Handle an event coming from the layer above
144      */

145     public final void up(Event evt) {
146         Message msg;
147         Object JavaDoc hdr;
148
149         switch(evt.getType()) {
150             case Event.VIEW_CHANGE:
151                 updateView((View)evt.getArg());
152                 break;
153             default:
154                 if(!handleUpEvent(evt)) return;
155
156                 if(evt.getType() == Event.MSG) {
157                     msg=(Message)evt.getArg();
158                     hdr=msg.getHeader(getName());
159                     if(!(hdr instanceof RequestCorrelator.Header))
160                         break;
161                 }
162                 // [[[ TODO
163
// RequestCorrelator.receive() is currently calling passUp()
164
// itself. Only _this_ method should call passUp()!
165
// So return instead of breaking until fixed (igeorg)
166
// ]]] TODO
167
if(_corr != null) {
168                     _corr.receive(evt);
169                     return;
170                 }
171                 else
172                     if(log.isWarnEnabled()) log.warn("Request correlator is null, evt=" + Util.printEvent(evt));
173
174                 break;
175         }
176
177         passUp(evt);
178     }
179
180
181     /**
182      * This message is not originated by this layer, therefore we can just
183      * pass it down without having to go through the request correlator.
184      * We do this ONLY for messages !
185      */

186     public final void down(Event evt) {
187         switch(evt.getType()) {
188             case Event.VIEW_CHANGE:
189                 updateView((View)evt.getArg());
190                 if(!handleDownEvent(evt)) return;
191                 break;
192             case Event.MSG:
193                 if(!handleDownEvent(evt)) return;
194                 break;
195             default:
196                 if(!handleDownEvent(evt)) return;
197                 break;
198         }
199
200         passDown(evt);
201     }
202
203
204     protected void updateView(View new_view) {
205         Vector JavaDoc new_mbrs=new_view.getMembers();
206         if(new_mbrs != null) {
207             synchronized(members) {
208                 members.removeAllElements();
209                 members.addAll(new_mbrs);
210             }
211         }
212     }
213
214
215     /**
216      Handle up event. Return false if it should not be passed up the stack.
217      */

218     protected boolean handleUpEvent(Event evt) {
219         // override in subclasses
220
return true;
221     }
222
223     /**
224      Handle down event. Return false if it should not be passed down the stack.
225      */

226     protected boolean handleDownEvent(Event evt) {
227         // override in subclasses
228
return true;
229     }
230
231
232 }
233
Popular Tags