1 3 package org.jgroups.protocols; 4 5 import org.jgroups.Address; 6 import org.jgroups.Event; 7 import org.jgroups.Message; 8 import org.jgroups.View; 9 import org.jgroups.blocks.GroupRequest; 10 import org.jgroups.blocks.RequestCorrelator; 11 import org.jgroups.blocks.RequestHandler; 12 import org.jgroups.stack.Protocol; 13 import org.jgroups.stack.StateTransferInfo; 14 import org.jgroups.util.Rsp; 15 import org.jgroups.util.RspList; 16 import org.jgroups.util.Util; 17 18 import java.io.Serializable ; 19 import java.util.HashMap ; 20 import java.util.Properties ; 21 import java.util.Vector ; 22 23 24 class StateTransferRequest implements Serializable { 25 static final int MAKE_COPY=1; static final int RETURN_STATE=2; 28 int type=0; 29 final Object arg; 30 31 32 public StateTransferRequest(int type, Object arg) { 33 this.type=type; 34 this.arg=arg; 35 } 36 37 public int getType() { 38 return type; 39 } 40 41 public Object getArg() { 42 return arg; 43 } 44 45 public String toString() { 46 return "[StateTransferRequest: type=" + type2Str(type) + ", arg=" + arg + ']'; 47 } 48 49 static String type2Str(int t) { 50 switch(t) { 51 case MAKE_COPY: 52 return "MAKE_COPY"; 53 case RETURN_STATE: 54 return "RETURN_STATE"; 55 default: 56 return "<unknown>"; 57 } 58 } 59 } 60 61 62 76 public class STATE_TRANSFER extends Protocol implements RequestHandler { 77 Address local_addr=null; 78 final Vector members=new Vector (11); 79 final Message m=null; 80 boolean is_server=false; 81 byte[] cached_state=null; 82 final Object state_xfer_mutex=new Object (); long timeout_get_appl_state=5000; 84 long timeout_return_state=5000; 85 RequestCorrelator corr=null; 86 final Vector observers=new Vector (5); 87 final HashMap map=new HashMap (7); 88 89 90 93 public String getName() { 94 return "STATE_TRANSFER"; 95 } 96 97 98 public void init() throws Exception { 99 map.put("state_transfer", Boolean.TRUE); 100 map.put("protocol_class", getClass().getName()); 101 102 } 103 104 public void start() throws Exception { 105 corr=new RequestCorrelator(getName(), this, this); 106 passUp(new Event(Event.CONFIG, map)); 107 } 108 109 public void stop() { 110 if(corr != null) { 111 corr.stop(); 112 corr=null; 113 } 114 } 115 116 117 public boolean setProperties(Properties props) { 118 String str; 119 120 super.setProperties(props); 121 str=props.getProperty("timeout_get_appl_state"); 124 if(str != null) { 125 timeout_get_appl_state=Long.parseLong(str); 126 props.remove("timeout_get_appl_state"); 127 } 128 129 str=props.getProperty("timeout_return_state"); 132 if(str != null) { 133 timeout_return_state=Long.parseLong(str); 134 props.remove("timeout_return_state"); 135 } 136 137 if(props.size() > 0) { 138 System.err.println("STATE_TRANSFER.setProperties(): the following " + 139 "properties are not recognized:"); 140 props.list(System.out); 141 return false; 142 } 143 return true; 144 } 145 146 147 public Vector requiredUpServices() { 148 Vector ret=new Vector (2); 149 ret.addElement(new Integer (Event.START_QUEUEING)); 150 ret.addElement(new Integer (Event.STOP_QUEUEING)); 151 return ret; 152 } 153 154 155 public void up(Event evt) { 156 switch(evt.getType()) { 157 158 case Event.BECOME_SERVER: 159 is_server=true; 160 break; 161 162 case Event.SET_LOCAL_ADDRESS: 163 local_addr=(Address)evt.getArg(); 164 break; 165 166 case Event.TMP_VIEW: 167 case Event.VIEW_CHANGE: 168 Vector new_members=((View)evt.getArg()).getMembers(); 169 synchronized(members) { 170 members.removeAllElements(); 171 if(new_members != null && new_members.size() > 0) 172 for(int k=0; k < new_members.size(); k++) 173 members.addElement(new_members.elementAt(k)); 174 } 175 break; 176 } 177 178 if(corr != null) 179 corr.receive(evt); else 181 passUp(evt); 182 } 183 184 185 public void down(Event evt) { 186 Object coord, state; 187 Vector event_list=null; 188 StateTransferInfo info; 189 190 191 switch(evt.getType()) { 192 193 case Event.TMP_VIEW: 194 case Event.VIEW_CHANGE: 195 Vector new_members=((View)evt.getArg()).getMembers(); 196 synchronized(members) { 197 members.removeAllElements(); 198 if(new_members != null && new_members.size() > 0) 199 for(int k=0; k < new_members.size(); k++) 200 members.addElement(new_members.elementAt(k)); 201 } 202 break; 203 204 case Event.GET_STATE: info=(StateTransferInfo)evt.getArg(); 206 coord=determineCoordinator(); 207 208 if(coord == null || coord.equals(local_addr)) { 209 if(log.isWarnEnabled()) log.warn("GET_STATE: coordinator is null"); 210 event_list=new Vector (1); 211 event_list.addElement(new Event(Event.GET_STATE_OK, null)); 212 passUp(new Event(Event.STOP_QUEUEING, event_list)); 213 return; } 215 216 sendMakeCopyMessage(); 218 if(info.type == StateTransferInfo.GET_FROM_MANY) 219 state=getStateFromMany(info.targets); 220 else 221 state=getStateFromSingle(info.target); 222 223 224 event_list=new Vector (1); 225 event_list.addElement(new Event(Event.GET_STATE_OK, state)); 226 227 228 passUp(new Event(Event.STOP_QUEUEING, event_list)); 229 return; 231 case Event.GET_APPLSTATE_OK: 232 synchronized(state_xfer_mutex) { 233 cached_state=(byte[])evt.getArg(); 234 state_xfer_mutex.notifyAll(); 235 } 236 return; 238 } 239 240 passDown(evt); } 242 243 244 245 public Object handle(Message msg) { 246 StateTransferRequest req; 247 248 try { 249 req=(StateTransferRequest)msg.getObject(); 250 251 switch(req.getType()) { 252 case StateTransferRequest.MAKE_COPY: 253 makeCopy(req.getArg()); 254 return null; 255 case StateTransferRequest.RETURN_STATE: 256 if(is_server) 257 return cached_state; 258 else { 259 if(log.isWarnEnabled()) log.warn("RETURN_STATE: returning null" + 260 "as I'm not yet an operational state server !"); 261 return null; 262 } 263 default: 264 if(log.isErrorEnabled()) log.error("type " + req.getType() + 265 "is unknown in StateTransferRequest !"); 266 return null; 267 } 268 } 269 catch(Exception e) { 270 if(log.isErrorEnabled()) log.error("exception is " + e); 271 return null; 272 } 273 } 274 275 276 277 278 279 280 281 282 283 byte[] getStateFromSingle(Address target) { 284 Vector dests=new Vector (11); 285 Message msg; 286 StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr); 287 RspList rsp_list; 288 Rsp rsp; 289 Address dest; 290 GroupRequest req; 291 int num_tries=0; 292 293 294 try { 295 msg=new Message(null, null, Util.objectToByteBuffer(r)); 296 } 297 catch(Exception e) { 298 if(log.isErrorEnabled()) log.error("exception=" + e); 299 return null; 300 } 301 302 while(members.size() > 1 && num_tries++ < 3) { dest=target != null? target : determineCoordinator(); 304 if(dest == null) 305 return null; 306 msg.setDest(dest); 307 dests.removeAllElements(); 308 dests.addElement(dest); 309 req=new GroupRequest(msg, corr, dests, GroupRequest.GET_FIRST, timeout_return_state, 0); 310 req.execute(); 311 rsp_list=req.getResults(); 312 for(int i=0; i < rsp_list.size(); i++) { rsp=(Rsp)rsp_list.elementAt(i); 314 if(rsp.wasReceived()) 315 return (byte[])rsp.getValue(); 316 } 317 Util.sleep(1000); 318 } 319 320 return null; 321 } 322 323 324 Vector getStateFromMany(Vector targets) { 325 Vector dests=new Vector (11); 326 Message msg; 327 StateTransferRequest r=new StateTransferRequest(StateTransferRequest.RETURN_STATE, local_addr); 328 RspList rsp_list; 329 GroupRequest req; 330 int i; 331 332 333 if(targets != null) { 334 for(i=0; i < targets.size(); i++) 335 if(!local_addr.equals(targets.elementAt(i))) 336 dests.addElement(targets.elementAt(i)); 337 } 338 else { 339 for(i=0; i < members.size(); i++) 340 if(!local_addr.equals(members.elementAt(i))) 341 dests.addElement(members.elementAt(i)); 342 } 343 344 if(dests.size() == 0) 345 return null; 346 347 msg=new Message(); 348 try { 349 msg.setBuffer(Util.objectToByteBuffer(r)); 350 } 351 catch(Exception e) { 352 } 353 354 req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0); 355 req.execute(); 356 rsp_list=req.getResults(); 357 return rsp_list.getResults(); 358 } 359 360 361 void sendMakeCopyMessage() { 362 GroupRequest req; 363 Message msg=new Message(); 364 StateTransferRequest r=new StateTransferRequest(StateTransferRequest.MAKE_COPY, local_addr); 365 Vector dests=new Vector (11); 366 367 for(int i=0; i < members.size(); i++) dests.addElement(members.elementAt(i)); 370 371 if(dests.size() == 0) 372 return; 373 374 try { 375 msg.setBuffer(Util.objectToByteBuffer(r)); 376 } 377 catch(Exception e) { 378 } 379 380 req=new GroupRequest(msg, corr, dests, GroupRequest.GET_ALL, timeout_return_state, 0); 381 req.execute(); 382 } 383 384 385 388 Address determineCoordinator() { 389 Address ret=null; 390 if(members != null && members.size() > 1) { 391 for(int i=0; i < members.size(); i++) 392 if(!local_addr.equals(members.elementAt(i))) 393 return (Address)members.elementAt(i); 394 } 395 return ret; 396 } 397 398 399 404 void makeCopy(Object sender) { 405 if(sender.equals(local_addr)) { passUp(new Event(Event.START_QUEUEING)); 407 } 408 else { if(is_server) { synchronized(state_xfer_mutex) { 411 cached_state=null; 412 413 passUp(new Event(Event.GET_APPLSTATE, local_addr)); 414 if(cached_state == null) { 415 try { 416 state_xfer_mutex.wait(timeout_get_appl_state); } 418 catch(Exception e) { 419 } 420 } 421 } 422 } 423 } 424 } 425 426 427 } 428 | Popular Tags |