KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > STATE_TRANSFER


1 // $Id: STATE_TRANSFER.java,v 1.11 2005/04/03 07:23:10 wenbo Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.Address;
6 import org.jgroups.Event;
7 import org.jgroups.Message;
8 import org.jgroups.View;
9 import org.jgroups.blocks.GroupRequest;
10 import org.jgroups.blocks.RequestCorrelator;
11 import org.jgroups.blocks.RequestHandler;
12 import org.jgroups.stack.Protocol;
13 import org.jgroups.stack.StateTransferInfo;
14 import org.jgroups.util.Rsp;
15 import org.jgroups.util.RspList;
16 import org.jgroups.util.Util;
17
18 import java.io.Serializable JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.Properties JavaDoc;
21 import java.util.Vector JavaDoc;
22
23
24 class StateTransferRequest implements Serializable JavaDoc {
25     static final int MAKE_COPY=1; // arg = originator of request
26
static final int RETURN_STATE=2; // arg = orginator of request
27

28     int type=0;
29     final Object JavaDoc arg;
30
31
32     public StateTransferRequest(int type, Object JavaDoc arg) {
33         this.type=type;
34         this.arg=arg;
35     }
36
37     public int getType() {
38         return type;
39     }
40
41     public Object JavaDoc getArg() {
42         return arg;
43     }
44
45     public String JavaDoc toString() {
46         return "[StateTransferRequest: type=" + type2Str(type) + ", arg=" + arg + ']';
47     }
48
49     static String JavaDoc type2Str(int t) {
50         switch(t) {
51             case MAKE_COPY:
52                 return "MAKE_COPY";
53             case RETURN_STATE:
54                 return "RETURN_STATE";
55             default:
56                 return "<unknown>";
57         }
58     }
59 }
60
61
62 /**
63  * State transfer layer. Upon receiving a GET_STATE event from JChannel, a MAKE_COPY message is
64  * sent to all members. When the originator receives MAKE_COPY, it queues all messages to the
65  * channel.
66  * When another member receives the message, it asks the JChannel to provide it with a copy of
67  * the current state (GetStateEvent is received by application, returnState() sends state down the
68  * stack). Then the current layer sends a unicast RETURN_STATE message to the coordinator, which
69  * returns the cached copy.
70  * When the state is received by the originator, the GET_STATE sender is unblocked with a
71  * GET_STATE_OK event up the stack (unless it already timed out).<p>
72  * Requires QUEUE layer on top.
73  *
74  * @author Bela Ban
75  */

76 public class STATE_TRANSFER extends Protocol implements RequestHandler {
77     Address local_addr=null;
78     final Vector JavaDoc members=new Vector JavaDoc(11);
79     final Message m=null;
80     boolean is_server=false;
81     byte[] cached_state=null;
82     final Object JavaDoc state_xfer_mutex=new Object JavaDoc(); // get state from appl (via channel).
83
long timeout_get_appl_state=5000;
84     long timeout_return_state=5000;
85     RequestCorrelator corr=null;
86     final Vector JavaDoc observers=new Vector JavaDoc(5);
87     final HashMap JavaDoc map=new HashMap JavaDoc(7);
88
89
90     /**
91      * All protocol names have to be unique !
92      */

93     public String JavaDoc getName() {
94         return "STATE_TRANSFER";
95     }
96
97
98     public void init() throws Exception JavaDoc {
99         map.put("state_transfer", Boolean.TRUE);
100         map.put("protocol_class", getClass().getName());
101
102     }
103
104     public void start() throws Exception JavaDoc {
105         corr=new RequestCorrelator(getName(), this, this);
106         passUp(new Event(Event.CONFIG, map));
107     }
108
109     public void stop() {
110         if(corr != null) {
111             corr.stop();
112             corr=null;
113         }
114     }
115
116
117     public boolean setProperties(Properties JavaDoc props) {
118         String JavaDoc str;
119
120         super.setProperties(props);
121         // Milliseconds to wait for application to provide requested state, events are
122
// STATE_TRANSFER up and STATE_TRANSFER_OK down
123
str=props.getProperty("timeout_get_appl_state");
124         if(str != null) {
125             timeout_get_appl_state=Long.parseLong(str);
126             props.remove("timeout_get_appl_state");
127         }
128
129         // Milliseconds to wait for 1 or all members to return its/their state. 0 means wait
130
// forever. States are retrieved using GroupRequest/RequestCorrelator
131
str=props.getProperty("timeout_return_state");
132         if(str != null) {
133             timeout_return_state=Long.parseLong(str);
134             props.remove("timeout_return_state");
135         }
136
137         if(props.size() > 0) {
138             System.err.println("STATE_TRANSFER.setProperties(): the following " +
139                     "properties are not recognized:");
140             props.list(System.out);
141             return false;
142         }
143         return true;
144     }
145
146
147     public Vector JavaDoc requiredUpServices() {
148         Vector JavaDoc ret=new Vector JavaDoc(2);
149         ret.addElement(new Integer JavaDoc(Event.START_QUEUEING));
150         ret.addElement(new Integer JavaDoc(Event.STOP_QUEUEING));
151         return ret;
152     }
153
154
155     public void up(Event evt) {
156         switch(evt.getType()) {
157
158             case Event.BECOME_SERVER:
159                 is_server=true;
160                 break;
161
162             case Event.SET_LOCAL_ADDRESS:
163                 local_addr=(Address)evt.getArg();
164                 break;
165
166             case Event.TMP_VIEW:
167             case Event.VIEW_CHANGE:
168                 Vector JavaDoc new_members=((View)evt.getArg()).getMembers();
169                 synchronized(members) {
170                     members.removeAllElements();
171                     if(new_members != null && new_members.size() > 0)
172                         for(int k=0; k < new_members.size(); k++)
173                             members.addElement(new_members.elementAt(k));
174                 }
175                 break;
176         }
177
178         if(corr != null)
179             corr.receive(evt); // will consume or pass up, depending on header
180
else
181             passUp(evt);
182     }
183
184
185     public void down(Event evt) {
186         Object JavaDoc coord, state;
187         Vector JavaDoc event_list=null;
188         StateTransferInfo info;
189
190
191         switch(evt.getType()) {
192
193             case Event.TMP_VIEW:
194             case Event.VIEW_CHANGE:
195                 Vector JavaDoc new_members=((View)evt.getArg()).getMembers();
196                 synchronized(members) {
197                     members.removeAllElements();
198                     if(new_members != null && new_members.size() > 0)
199                         for(int k=0; k < new_members.size(); k++)
200                             members.addElement(new_members.elementAt(k));
201                 }
202                 break;
203
204             case Event.GET_STATE: // generated by JChannel.getState()
205
info=(StateTransferInfo)evt.getArg();
206                 coord=determineCoordinator();
207
208                 if(coord == null || coord.equals(local_addr)) {
209                     if(log.isWarnEnabled()) log.warn("GET_STATE: coordinator is null");
210                     event_list=new Vector JavaDoc(1);
211                     event_list.addElement(new Event(Event.GET_STATE_OK, null));
212                     passUp(new Event(Event.STOP_QUEUEING, event_list));
213                     return; // don't pass down any further !
214
}
215
216                 sendMakeCopyMessage(); // multicast MAKE_COPY to all members (including me)
217

218                 if(info.type == StateTransferInfo.GET_FROM_MANY)
219                     state=getStateFromMany(info.targets);
220                 else
221                     state=getStateFromSingle(info.target);
222
223                 /* Pass up the state to the application layer (insert into JChannel's event queue */
224                 event_list=new Vector JavaDoc(1);
225                 event_list.addElement(new Event(Event.GET_STATE_OK, state));
226
227                 /* Now stop queueing */
228                 passUp(new Event(Event.STOP_QUEUEING, event_list));
229                 return; // don't pass down any further !
230

231             case Event.GET_APPLSTATE_OK:
232                 synchronized(state_xfer_mutex) {
233                     cached_state=(byte[])evt.getArg();
234                     state_xfer_mutex.notifyAll();
235                 }
236                 return; // don't pass down any further !
237

238         }
239
240         passDown(evt); // pass on to the layer below us
241
}
242
243
244     /* ---------------------- Interface RequestHandler -------------------------- */
245     public Object JavaDoc handle(Message msg) {
246         StateTransferRequest req;
247
248         try {
249             req=(StateTransferRequest)msg.getObject();
250
251             switch(req.getType()) {
252                 case StateTransferRequest.MAKE_COPY:
253                     makeCopy(req.getArg());
254                     return null;
255                 case StateTransferRequest.RETURN_STATE:
256                     if(is_server)
257                         return cached_state;
258                     else {
259                         if(log.isWarnEnabled()) log.warn("RETURN_STATE: returning null" +
260                                 "as I'm not yet an operational state server !");
261                         return null;
262                     }
263                 default:
264                     if(log.isErrorEnabled()) log.error("type " + req.getType() +
265                             "is unknown in StateTransferRequest !");
266                     return null;
267             }
268         }
269         catch(Exception JavaDoc e) {
270             if(log.isErrorEnabled()) log.error("exception is " + e);
271             return null;
272         }
273     }
274     /* ------------------- End of Interface RequestHandler ---------------------- */
275
276
277
278
279
280
281
282
283     byte[] getStateFromSingle(Address target) {
284         Vector JavaDoc dests=new Vector JavaDoc(11);
285         Message msg;
286         StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr);
287         RspList rsp_list;
288         Rsp rsp;
289         Address dest;
290         GroupRequest req;
291         int num_tries=0;
292
293
294         try {
295             msg=new Message(null, null, Util.objectToByteBuffer(r));
296         }
297         catch(Exception JavaDoc e) {
298             if(log.isErrorEnabled()) log.error("exception=" + e);
299             return null;
300         }
301
302         while(members.size() > 1 && num_tries++ < 3) { // excluding myself
303
dest=target != null? target : determineCoordinator();
304             if(dest == null)
305                 return null;
306             msg.setDest(dest);
307             dests.removeAllElements();
308             dests.addElement(dest);
309             req=new GroupRequest(msg, corr, dests, GroupRequest.GET_FIRST, timeout_return_state, 0);
310             req.execute();
311             rsp_list=req.getResults();
312             for(int i=0; i < rsp_list.size(); i++) { // get the first non-suspected result
313
rsp=(Rsp)rsp_list.elementAt(i);
314                 if(rsp.wasReceived())
315                     return (byte[])rsp.getValue();
316             }
317             Util.sleep(1000);
318         }
319
320         return null;
321     }
322
323
324     Vector JavaDoc getStateFromMany(Vector JavaDoc targets) {
325         Vector JavaDoc dests=new Vector JavaDoc(11);
326         Message msg;
327         StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr);
328         RspList rsp_list;
329         GroupRequest req;
330         int i;
331
332
333         if(targets != null) {
334             for(i=0; i < targets.size(); i++)
335                 if(!local_addr.equals(targets.elementAt(i)))
336                     dests.addElement(targets.elementAt(i));
337         }
338         else {
339             for(i=0; i < members.size(); i++)
340                 if(!local_addr.equals(members.elementAt(i)))
341                     dests.addElement(members.elementAt(i));
342         }
343
344         if(dests.size() == 0)
345             return null;
346
347         msg=new Message();
348         try {
349             msg.setBuffer(Util.objectToByteBuffer(r));
350         }
351         catch(Exception JavaDoc e) {
352         }
353
354         req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0);
355         req.execute();
356         rsp_list=req.getResults();
357         return rsp_list.getResults();
358     }
359
360
361     void sendMakeCopyMessage() {
362         GroupRequest req;
363         Message msg=new Message();
364         StateTransferRequest r=new StateTransferRequest(StateTransferRequest.MAKE_COPY, local_addr);
365         Vector JavaDoc dests=new Vector JavaDoc(11);
366
367         for(int i=0; i < members.size(); i++) // don't add myself twice in dests !
368
// if(!local_addr.equals(members.elementAt(i))) - this will prevent the local delivery
369
dests.addElement(members.elementAt(i));
370
371         if(dests.size() == 0)
372             return;
373
374         try {
375             msg.setBuffer(Util.objectToByteBuffer(r));
376         }
377         catch(Exception JavaDoc e) {
378         }
379
380         req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0);
381         req.execute();
382     }
383
384
385     /**
386      * Return the first element of members which is not me. Otherwise return null.
387      */

388     Address determineCoordinator() {
389         Address ret=null;
390         if(members != null && members.size() > 1) {
391             for(int i=0; i < members.size(); i++)
392                 if(!local_addr.equals(members.elementAt(i)))
393                     return (Address)members.elementAt(i);
394         }
395         return ret;
396     }
397
398
399     /**
400      * If server, ask application to send us a copy of its state (STATE_TRANSFER up,
401      * STATE_TRANSFER down). If client, start queueing events. Queuing will be stopped when
402      * state has been retrieved (or not) from single or all member(s).
403      */

404     void makeCopy(Object JavaDoc sender) {
405         if(sender.equals(local_addr)) { // was sent by us, has to start queueing
406
passUp(new Event(Event.START_QUEUEING));
407         }
408         else { // only retrieve state from appl when not in client state anymore
409
if(is_server) { // get state from application and store it locally
410
synchronized(state_xfer_mutex) {
411                     cached_state=null;
412
413                     passUp(new Event(Event.GET_APPLSTATE, local_addr));
414                     if(cached_state == null) {
415                         try {
416                             state_xfer_mutex.wait(timeout_get_appl_state); // wait for STATE_TRANSFER_OK
417
}
418                         catch(Exception JavaDoc e) {
419                         }
420                     }
421                 }
422             }
423         }
424     }
425
426
427 }
428
Popular Tags