KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > STATE_TRANSFER


1 // $Id: STATE_TRANSFER.java,v 1.17 2005/04/20 20:25:45 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.stack.StateTransferInfo;
8 import org.jgroups.util.List;
9
10 import java.io.IOException JavaDoc;
11 import java.io.ObjectInput JavaDoc;
12 import java.io.ObjectOutput JavaDoc;
13 import java.util.Enumeration JavaDoc;
14 import java.util.HashMap JavaDoc;
15 import java.util.Properties JavaDoc;
16 import java.util.Vector JavaDoc;
17
18
19 /**
20  * New STATE_TRANSFER protocol based on PBCAST. Compared to the one in ./protocols, it doesn't
21  * need a QUEUE layer above it. A state request is sent to a chosen member (coordinator if
22  * null). That member makes a copy D of its current digest and asks the application for a copy of
23  * its current state S. Then the member returns both S and D to the requester. The requester
24  * first sets its digest to D and then returns the state to the application.
25  * @author Bela Ban
26  */

27 public class STATE_TRANSFER extends Protocol {
28     Address local_addr=null;
29     final Vector JavaDoc members=new Vector JavaDoc();
30     long state_id=1; // used to differentiate between state transfers (not currently used)
31
final List state_requesters=new List(); // requesters of state (usually just 1, could be more)
32
Digest digest=null;
33     final HashMap JavaDoc map=new HashMap JavaDoc(); // to store configuration information
34
long start, stop; // to measure state transfer time
35
final static String JavaDoc name="STATE_TRANSFER";
36
37
38     /** All protocol names have to be unique ! */
39     public String JavaDoc getName() {
40         return name;
41     }
42
43
44     public Vector JavaDoc requiredDownServices() {
45         Vector JavaDoc retval=new Vector JavaDoc();
46         retval.addElement(new Integer JavaDoc(Event.GET_DIGEST_STATE));
47         retval.addElement(new Integer JavaDoc(Event.SET_DIGEST));
48         return retval;
49     }
50
51
52     public void init() throws Exception JavaDoc {
53         map.put("state_transfer", Boolean.TRUE);
54         map.put("protocol_class", getClass().getName());
55     }
56
57
58     public void start() throws Exception JavaDoc {
59         passUp(new Event(Event.CONFIG, map));
60     }
61
62
63     public void up(Event evt) {
64         Message msg;
65         StateHeader hdr;
66
67         switch(evt.getType()) {
68
69         case Event.BECOME_SERVER:
70             break;
71
72         case Event.SET_LOCAL_ADDRESS:
73             local_addr=(Address)evt.getArg();
74             break;
75
76         case Event.TMP_VIEW:
77         case Event.VIEW_CHANGE:
78             Vector JavaDoc new_members=((View)evt.getArg()).getMembers();
79             synchronized(members) {
80                 members.removeAllElements();
81                 members.addAll(new_members);
82             }
83             break;
84
85         case Event.GET_DIGEST_STATE_OK:
86             synchronized(state_requesters) {
87                 if(digest != null) {
88                     if(log.isWarnEnabled())
89                         log.warn("GET_DIGEST_STATE_OK: existing digest is not null, overwriting it !");
90                 }
91                 digest=(Digest)evt.getArg();
92                 if(log.isDebugEnabled())
93                     log.debug("GET_DIGEST_STATE_OK: digest is " + digest + "\npassUp(GET_APPLSTATE)");
94                 passUp(new Event(Event.GET_APPLSTATE));
95             }
96             return;
97
98         case Event.MSG:
99             msg=(Message)evt.getArg();
100             if(!(msg.getHeader(name) instanceof StateHeader))
101                 break;
102
103             hdr=(StateHeader)msg.removeHeader(name);
104             switch(hdr.type) {
105             case StateHeader.STATE_REQ:
106                 handleStateReq(hdr.sender, hdr.id);
107                 break;
108             case StateHeader.STATE_RSP:
109                 handleStateRsp(hdr.sender, hdr.my_digest, msg.getBuffer());
110                 break;
111             default:
112                 if(log.isErrorEnabled()) log.error("type " + hdr.type + " not known in StateHeader");
113                 break;
114             }
115             return;
116         }
117         passUp(evt);
118     }
119
120
121     public void down(Event evt) {
122         byte[] state;
123         Address target, requester;
124         StateTransferInfo info;
125         StateHeader hdr;
126         Message state_req, state_rsp;
127
128         switch(evt.getType()) {
129
130             case Event.TMP_VIEW:
131             case Event.VIEW_CHANGE:
132                 Vector JavaDoc new_members=((View)evt.getArg()).getMembers();
133                 synchronized(members) {
134                     members.removeAllElements();
135                     members.addAll(new_members);
136                 }
137                 break;
138
139             // generated by JChannel.getState(). currently, getting the state from more than 1 mbr is not implemented
140
case Event.GET_STATE:
141                 info=(StateTransferInfo)evt.getArg();
142                 if(info.type != StateTransferInfo.GET_FROM_SINGLE) {
143                     if(log.isWarnEnabled()) log.warn("[GET_STATE] (info=" + info + "): getting the state from " +
144                             "all members is not currently supported by pbcast.STATE_TRANSFER, will use " +
145                             "coordinator to fetch state instead");
146                 }
147                 if(info.target == null) {
148                     target=determineCoordinator();
149                 }
150                 else {
151                     target=info.target;
152                     if(target.equals(local_addr)) {
153                         if(log.isErrorEnabled()) log.error("GET_STATE: cannot fetch state from myself !");
154                         target=null;
155                     }
156                 }
157                 if(target == null) {
158                     if(log.isDebugEnabled()) log.debug("GET_STATE: first member (no state)");
159                     passUp(new Event(Event.GET_STATE_OK, null));
160                 }
161                 else {
162                     state_req=new Message(target, null, null);
163                     state_req.putHeader(name, new StateHeader(StateHeader.STATE_REQ, local_addr, state_id++, null));
164                     if(log.isDebugEnabled()) log.debug("GET_STATE: asking " + target + " for state");
165
166                     // suspend sending and handling of mesage garbage collection gossip messages,
167
// fixes bugs #943480 and #938584). Wake up when state has been received
168
if(log.isDebugEnabled())
169                         log.debug("passing down a SUSPEND_STABLE event");
170                     passDown(new Event(Event.SUSPEND_STABLE, new Long JavaDoc(info.timeout)));
171
172                     start=System.currentTimeMillis();
173                     passDown(new Event(Event.MSG, state_req));
174                 }
175                 return; // don't pass down any further !
176

177             case Event.GET_APPLSTATE_OK:
178                 state=(byte[])evt.getArg();
179                 synchronized(state_requesters) {
180                     if(state_requesters.size() == 0) {
181                         if(log.isWarnEnabled())
182                             log.warn("GET_APPLSTATE_OK: received application state, but there are no requesters !");
183                         return;
184                     }
185                     if(digest == null)
186                         if(log.isWarnEnabled()) log.warn("GET_APPLSTATE_OK: received application state, " +
187                                 "but there is no digest !");
188                     else
189                         digest=digest.copy();
190                     for(Enumeration JavaDoc e=state_requesters.elements(); e.hasMoreElements();) {
191                         requester=(Address)e.nextElement();
192                         state_rsp=new Message(requester, null, state); // put the state into state_rsp.buffer
193
hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, digest);
194                         state_rsp.putHeader(name, hdr);
195                         passDown(new Event(Event.MSG, state_rsp));
196                     }
197                     digest=null;
198                     state_requesters.removeAll();
199                 }
200                 return; // don't pass down any further !
201
}
202
203         passDown(evt); // pass on to the layer below us
204
}
205
206
207     public boolean setProperties(Properties JavaDoc props) {
208         super.setProperties(props);
209
210         if(props.size() > 0) {
211             System.err.println("STATE_TRANSFER.setProperties(): the following " +
212                     "properties are not recognized:");
213             props.list(System.out);
214             return false;
215         }
216         return true;
217     }
218
219
220
221
222
223
224     /* --------------------------- Private Methods -------------------------------- */
225
226
227     /** Return the first element of members which is not me. Otherwise return null. */
228     Address determineCoordinator() {
229         Address ret=null;
230         if(members != null && members.size() > 1) {
231             for(int i=0; i < members.size(); i++)
232                 if(!local_addr.equals(members.elementAt(i)))
233                     return (Address)members.elementAt(i);
234         }
235         return ret;
236     }
237
238
239     /**
240      * If a state transfer is in progress, we don't need to send a GET_APPLSTATE event to the application, but
241      * instead we just add the sender to the requester list so it will receive the same state when done. If not,
242      * we add the sender to the requester list and send a GET_APPLSTATE event up.
243      */

244     void handleStateReq(Object JavaDoc sender, long state_id) {
245         if(sender == null) {
246             if(log.isErrorEnabled()) log.error("sender is null !");
247             return;
248         }
249
250         synchronized(state_requesters) {
251             if(state_requesters.size() > 0) { // state transfer is in progress, digest was requested
252
state_requesters.add(sender);
253             }
254             else {
255                 state_requesters.add(sender);
256                 digest=null;
257                 if(log.isDebugEnabled()) log.debug("passing down GET_DIGEST_STATE");
258                 passDown(new Event(Event.GET_DIGEST_STATE));
259             }
260         }
261     }
262
263
264     /** Set the digest and the send the state up to the application */
265     void handleStateRsp(Object JavaDoc sender, Digest digest, byte[] state) {
266         if(digest == null) {
267             if(log.isWarnEnabled())
268                 log.warn("digest received from " + sender + " is null, skipping setting digest !");
269         }
270         else
271             passDown(new Event(Event.SET_DIGEST, digest)); // set the digest (e.g. in NAKACK)
272
stop=System.currentTimeMillis();
273
274         // resume sending and handling of mesage garbage collection gossip messages,
275
// fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage
276
// collection protocol (e.g. STABLE)
277
if(log.isDebugEnabled())
278             log.debug("passing down a RESUME_STABLE event");
279         passDown(new Event(Event.RESUME_STABLE));
280
281         if(state == null) {
282             if(log.isWarnEnabled())
283                 log.warn("state received from " + sender + " is null, will return null state to application");
284         }
285         else
286             log.debug("received state, size=" + state.length + " bytes. Time=" + (stop-start) + " milliseconds");
287         passUp(new Event(Event.GET_STATE_OK, state));
288     }
289
290
291     /* ------------------------ End of Private Methods ------------------------------ */
292
293
294
295     /**
296      * Wraps data for a state request/response. Note that for a state response the actual state will <em>not</em
297      * be stored in the header itself, but in the message's buffer.
298      *
299      */

300     public static class StateHeader extends Header {
301         static final int STATE_REQ=1;
302         static final int STATE_RSP=2;
303
304         Address sender=null; // sender of state STATE_REQ or STATE_RSP
305
long id=0; // state transfer ID (to separate multiple state transfers at the same time)
306
int type=0;
307         Digest my_digest=null; // digest of sender (if type is STATE_RSP)
308

309
310         public StateHeader() {
311         } // for externalization
312

313
314         public StateHeader(int type, Address sender, long id, Digest digest) {
315             this.type=type;
316             this.sender=sender;
317             this.id=id;
318             this.my_digest=digest;
319         }
320
321         public int getType() {
322             return type;
323         }
324
325         public Digest getDigest() {
326             return my_digest;
327         }
328
329
330         public boolean equals(Object JavaDoc o) {
331             StateHeader other=null;
332
333             if(sender != null && o != null) {
334                 if(!(o instanceof StateHeader))
335                     return false;
336                 other=(StateHeader)o;
337                 return sender.equals(other.sender) && id == other.id;
338             }
339             return false;
340         }
341
342
343         public int hashCode() {
344             if(sender != null)
345                 return sender.hashCode() + (int)id;
346             else
347                 return (int)id;
348         }
349
350
351         public String JavaDoc toString() {
352             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
353             sb.append("[StateHeader: type=" + type2Str(type));
354             if(sender != null) sb.append(", sender=" + sender + " id=#" + id);
355             if(my_digest != null) sb.append(", digest=" + my_digest);
356             return sb.toString();
357         }
358
359
360         static String JavaDoc type2Str(int t) {
361             switch(t) {
362                 case STATE_REQ:
363                     return "STATE_REQ";
364                 case STATE_RSP:
365                     return "STATE_RSP";
366                 default:
367                     return "<unknown>";
368             }
369         }
370
371
372         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
373             out.writeObject(sender);
374             out.writeLong(id);
375             out.writeInt(type);
376             out.writeObject(my_digest);
377         }
378
379
380         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
381             sender=(Address)in.readObject();
382             id=in.readLong();
383             type=in.readInt();
384             my_digest=(Digest)in.readObject();
385         }
386
387     }
388
389
390 }
391
Popular Tags