1 3 package org.jgroups.protocols.pbcast; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.stack.StateTransferInfo; 8 import org.jgroups.util.List; 9 10 import java.io.IOException ; 11 import java.io.ObjectInput ; 12 import java.io.ObjectOutput ; 13 import java.util.Enumeration ; 14 import java.util.HashMap ; 15 import java.util.Properties ; 16 import java.util.Vector ; 17 18 19 27 public class STATE_TRANSFER extends Protocol { 28 Address local_addr=null; 29 final Vector members=new Vector (); 30 long state_id=1; final List state_requesters=new List(); Digest digest=null; 33 final HashMap map=new HashMap (); long start, stop; final static String name="STATE_TRANSFER"; 36 37 38 39 public String getName() { 40 return name; 41 } 42 43 44 public Vector requiredDownServices() { 45 Vector retval=new Vector (); 46 retval.addElement(new Integer (Event.GET_DIGEST_STATE)); 47 retval.addElement(new Integer (Event.SET_DIGEST)); 48 return retval; 49 } 50 51 52 public void init() throws Exception { 53 map.put("state_transfer", Boolean.TRUE); 54 map.put("protocol_class", getClass().getName()); 55 } 56 57 58 public void start() throws Exception { 59 passUp(new Event(Event.CONFIG, map)); 60 } 61 62 63 public void up(Event evt) { 64 Message msg; 65 StateHeader hdr; 66 67 switch(evt.getType()) { 68 69 case Event.BECOME_SERVER: 70 break; 71 72 case Event.SET_LOCAL_ADDRESS: 73 local_addr=(Address)evt.getArg(); 74 break; 75 76 case Event.TMP_VIEW: 77 case Event.VIEW_CHANGE: 78 Vector new_members=((View)evt.getArg()).getMembers(); 79 synchronized(members) { 80 members.removeAllElements(); 81 members.addAll(new_members); 82 } 83 break; 84 85 case Event.GET_DIGEST_STATE_OK: 86 synchronized(state_requesters) { 87 if(digest != null) { 88 if(log.isWarnEnabled()) 89 log.warn("GET_DIGEST_STATE_OK: existing digest is not null, overwriting it !"); 90 } 91 digest=(Digest)evt.getArg(); 92 if(log.isDebugEnabled()) 93 log.debug("GET_DIGEST_STATE_OK: digest is " + digest + "\npassUp(GET_APPLSTATE)"); 94 passUp(new Event(Event.GET_APPLSTATE)); 95 } 96 return; 97 98 case Event.MSG: 99 msg=(Message)evt.getArg(); 100 if(!(msg.getHeader(name) instanceof StateHeader)) 101 break; 102 103 hdr=(StateHeader)msg.removeHeader(name); 104 switch(hdr.type) { 105 case StateHeader.STATE_REQ: 106 handleStateReq(hdr.sender, hdr.id); 107 break; 108 case StateHeader.STATE_RSP: 109 handleStateRsp(hdr.sender, hdr.my_digest, msg.getBuffer()); 110 break; 111 default: 112 if(log.isErrorEnabled()) log.error("type " + hdr.type + " not known in StateHeader"); 113 break; 114 } 115 return; 116 } 117 passUp(evt); 118 } 119 120 121 public void down(Event evt) { 122 byte[] state; 123 Address target, requester; 124 StateTransferInfo info; 125 StateHeader hdr; 126 Message state_req, state_rsp; 127 128 switch(evt.getType()) { 129 130 case Event.TMP_VIEW: 131 case Event.VIEW_CHANGE: 132 Vector new_members=((View)evt.getArg()).getMembers(); 133 synchronized(members) { 134 members.removeAllElements(); 135 members.addAll(new_members); 136 } 137 break; 138 139 case Event.GET_STATE: 141 info=(StateTransferInfo)evt.getArg(); 142 if(info.type != StateTransferInfo.GET_FROM_SINGLE) { 143 if(log.isWarnEnabled()) log.warn("[GET_STATE] (info=" + info + "): getting the state from " + 144 "all members is not currently supported by pbcast.STATE_TRANSFER, will use " + 145 "coordinator to fetch state instead"); 146 } 147 if(info.target == null) { 148 target=determineCoordinator(); 149 } 150 else { 151 target=info.target; 152 if(target.equals(local_addr)) { 153 if(log.isErrorEnabled()) log.error("GET_STATE: cannot fetch state from myself !"); 154 target=null; 155 } 156 } 157 if(target == null) { 158 if(log.isDebugEnabled()) log.debug("GET_STATE: first member (no state)"); 159 passUp(new Event(Event.GET_STATE_OK, null)); 160 } 161 else { 162 state_req=new Message(target, null, null); 163 state_req.putHeader(name, new StateHeader(StateHeader.STATE_REQ, local_addr, state_id++, null)); 164 if(log.isDebugEnabled()) log.debug("GET_STATE: asking " + target + " for state"); 165 166 if(log.isDebugEnabled()) 169 log.debug("passing down a SUSPEND_STABLE event"); 170 passDown(new Event(Event.SUSPEND_STABLE, new Long (info.timeout))); 171 172 start=System.currentTimeMillis(); 173 passDown(new Event(Event.MSG, state_req)); 174 } 175 return; 177 case Event.GET_APPLSTATE_OK: 178 state=(byte[])evt.getArg(); 179 synchronized(state_requesters) { 180 if(state_requesters.size() == 0) { 181 if(log.isWarnEnabled()) 182 log.warn("GET_APPLSTATE_OK: received application state, but there are no requesters !"); 183 return; 184 } 185 if(digest == null) 186 if(log.isWarnEnabled()) log.warn("GET_APPLSTATE_OK: received application state, " + 187 "but there is no digest !"); 188 else 189 digest=digest.copy(); 190 for(Enumeration e=state_requesters.elements(); e.hasMoreElements();) { 191 requester=(Address)e.nextElement(); 192 state_rsp=new Message(requester, null, state); hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, digest); 194 state_rsp.putHeader(name, hdr); 195 passDown(new Event(Event.MSG, state_rsp)); 196 } 197 digest=null; 198 state_requesters.removeAll(); 199 } 200 return; } 202 203 passDown(evt); } 205 206 207 public boolean setProperties(Properties props) { 208 super.setProperties(props); 209 210 if(props.size() > 0) { 211 System.err.println("STATE_TRANSFER.setProperties(): the following " + 212 "properties are not recognized:"); 213 props.list(System.out); 214 return false; 215 } 216 return true; 217 } 218 219 220 221 222 223 224 225 226 227 228 Address determineCoordinator() { 229 Address ret=null; 230 if(members != null && members.size() > 1) { 231 for(int i=0; i < members.size(); i++) 232 if(!local_addr.equals(members.elementAt(i))) 233 return (Address)members.elementAt(i); 234 } 235 return ret; 236 } 237 238 239 244 void handleStateReq(Object sender, long state_id) { 245 if(sender == null) { 246 if(log.isErrorEnabled()) log.error("sender is null !"); 247 return; 248 } 249 250 synchronized(state_requesters) { 251 if(state_requesters.size() > 0) { state_requesters.add(sender); 253 } 254 else { 255 state_requesters.add(sender); 256 digest=null; 257 if(log.isDebugEnabled()) log.debug("passing down GET_DIGEST_STATE"); 258 passDown(new Event(Event.GET_DIGEST_STATE)); 259 } 260 } 261 } 262 263 264 265 void handleStateRsp(Object sender, Digest digest, byte[] state) { 266 if(digest == null) { 267 if(log.isWarnEnabled()) 268 log.warn("digest received from " + sender + " is null, skipping setting digest !"); 269 } 270 else 271 passDown(new Event(Event.SET_DIGEST, digest)); stop=System.currentTimeMillis(); 273 274 if(log.isDebugEnabled()) 278 log.debug("passing down a RESUME_STABLE event"); 279 passDown(new Event(Event.RESUME_STABLE)); 280 281 if(state == null) { 282 if(log.isWarnEnabled()) 283 log.warn("state received from " + sender + " is null, will return null state to application"); 284 } 285 else 286 log.debug("received state, size=" + state.length + " bytes. Time=" + (stop-start) + " milliseconds"); 287 passUp(new Event(Event.GET_STATE_OK, state)); 288 } 289 290 291 292 293 294 295 300 public static class StateHeader extends Header { 301 static final int STATE_REQ=1; 302 static final int STATE_RSP=2; 303 304 Address sender=null; long id=0; int type=0; 307 Digest my_digest=null; 309 310 public StateHeader() { 311 } 313 314 public StateHeader(int type, Address sender, long id, Digest digest) { 315 this.type=type; 316 this.sender=sender; 317 this.id=id; 318 this.my_digest=digest; 319 } 320 321 public int getType() { 322 return type; 323 } 324 325 public Digest getDigest() { 326 return my_digest; 327 } 328 329 330 public boolean equals(Object o) { 331 StateHeader other=null; 332 333 if(sender != null && o != null) { 334 if(!(o instanceof StateHeader)) 335 return false; 336 other=(StateHeader)o; 337 return sender.equals(other.sender) && id == other.id; 338 } 339 return false; 340 } 341 342 343 public int hashCode() { 344 if(sender != null) 345 return sender.hashCode() + (int)id; 346 else 347 return (int)id; 348 } 349 350 351 public String toString() { 352 StringBuffer sb=new StringBuffer (); 353 sb.append("[StateHeader: type=" + type2Str(type)); 354 if(sender != null) sb.append(", sender=" + sender + " id=#" + id); 355 if(my_digest != null) sb.append(", digest=" + my_digest); 356 return sb.toString(); 357 } 358 359 360 static String type2Str(int t) { 361 switch(t) { 362 case STATE_REQ: 363 return "STATE_REQ"; 364 case STATE_RSP: 365 return "STATE_RSP"; 366 default: 367 return "<unknown>"; 368 } 369 } 370 371 372 public void writeExternal(ObjectOutput out) throws IOException { 373 out.writeObject(sender); 374 out.writeLong(id); 375 out.writeInt(type); 376 out.writeObject(my_digest); 377 } 378 379 380 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 381 sender=(Address)in.readObject(); 382 id=in.readLong(); 383 type=in.readInt(); 384 my_digest=(Digest)in.readObject(); 385 } 386 387 } 388 389 390 } 391 | Popular Tags |