1 3 4 5 package org.jgroups.protocols; 6 7 8 import org.jgroups.Address; 9 import org.jgroups.Event; 10 import org.jgroups.View; 11 import org.jgroups.blocks.GroupRequest; 12 import org.jgroups.blocks.MethodCall; 13 import org.jgroups.stack.RpcProtocol; 14 import org.jgroups.util.List; 15 import org.jgroups.util.Rsp; 16 import org.jgroups.util.RspList; 17 import org.jgroups.util.Util; 18 19 import java.util.Enumeration ; 20 import java.util.Properties ; 21 import java.util.Vector ; 22 23 24 25 26 55 public class FLUSH extends RpcProtocol { 56 final Vector mbrs=new Vector (); 57 boolean is_server=false; 58 final Object block_mutex=new Object (); 59 long block_timeout=5000; 60 Address local_addr=null; 61 boolean blocked=false; final Object digest_mutex=new Object (); 63 long digest_timeout=2000; 65 final Object highest_delivered_mutex=new Object (); 66 long[] highest_delivered_msgs; 67 68 Digest digest=null; 69 70 final Object get_msgs_mutex=new Object (); 71 final long get_msgs_timeout=4000; 72 List get_msgs=null; 73 74 75 76 public String getName() {return "FLUSH";} 77 78 79 public Vector providedUpServices() { 80 Vector retval=new Vector (); 81 retval.addElement(new Integer (Event.FLUSH)); 82 return retval; 83 } 84 85 public Vector requiredDownServices() { 86 Vector retval=new Vector (); 87 retval.addElement(new Integer (Event.GET_MSGS_RECEIVED)); retval.addElement(new Integer (Event.GET_MSG_DIGEST)); retval.addElement(new Integer (Event.GET_MSGS)); return retval; 91 } 92 93 94 public void start() throws Exception { 95 super.start(); 96 if(_corr != null) { 97 _corr.setDeadlockDetection(true); 98 } 99 else 100 throw new Exception ("FLUSH.start(): cannot set deadlock detection in corr, as it is null !"); 101 } 102 103 104 111 private FlushRsp flush(Vector dests) { 112 RspList rsp_list; 113 FlushRsp retval=new FlushRsp(); 114 Digest digest; 115 long[] min, max; 116 long[] lower[]; 117 List unstable_msgs=new List(); 118 boolean get_lower_msgs=false; 119 120 highest_delivered_msgs=new long[members.size()]; 121 min=new long[members.size()]; 122 max=new long[members.size()]; 123 124 125 127 getHighestDeliveredSeqnos(); 128 129 for(int i=0; i < highest_delivered_msgs.length; i++) 130 min[i]=max[i]=highest_delivered_msgs[i]; 131 132 133 135 if(log.isInfoEnabled()) log.info("calling handleFlush(" + dests + ')'); 136 passDown(new Event(Event.SWITCH_OUT_OF_BAND)); MethodCall call = new MethodCall("handleFlush", new Object [] {dests, highest_delivered_msgs.clone()}, 138 new String [] {Vector .class.getName(), long[].class.getName()}); 139 rsp_list=callRemoteMethods(dests, call, GroupRequest.GET_ALL, 0); 140 if(log.isInfoEnabled()) log.info("flush done"); 141 142 143 146 for(int i=0; i < rsp_list.size(); i++) { 147 Rsp rsp=(Rsp)rsp_list.elementAt(i); 148 if(rsp.wasReceived()) { 149 digest=(Digest)rsp.getValue(); 150 if(digest != null) { 151 for(int j=0; j < digest.highest_seqnos.length && j < min.length; j++) { 152 min[j]=Math.min(min[j], digest.highest_seqnos[j]); 153 max[j]=Math.max(max[j], digest.highest_seqnos[j]); 154 } 155 if(digest.msgs.size() > 0) { 156 for(Enumeration e=digest.msgs.elements(); e.hasMoreElements();) 157 unstable_msgs.add(e.nextElement()); 158 } 159 } 160 } 161 } 163 164 165 168 lower=new long[min.length][]; 171 for(int i=0; i < min.length; i++) { 172 if(min[i] < highest_delivered_msgs[i]) { lower[i]=new long[2]; 174 lower[i][0]=min[i]; lower[i][1]=highest_delivered_msgs[i]; get_lower_msgs=true; 177 } 178 } 179 if(get_lower_msgs) { 180 get_msgs=null; 181 synchronized(get_msgs_mutex) { 182 passDown(new Event(Event.GET_MSGS, lower)); 183 try { 184 get_msgs_mutex.wait(get_msgs_timeout); 185 } 186 catch(Exception e) {} 187 } 188 if(get_msgs != null) { 189 for(Enumeration e=get_msgs.elements(); e.hasMoreElements();) 190 unstable_msgs.add(e.nextElement()); 191 } 192 } 193 retval.unstable_msgs=unstable_msgs.getContents(); 194 if(rsp_list.numSuspectedMembers() > 0) { 195 retval.result=false; 196 retval.failed_mbrs=rsp_list.getSuspectedMembers(); 197 } 198 199 return retval; 200 } 201 202 203 204 205 206 220 public synchronized Digest handleFlush(Vector flush_dests, long[] highest_seqnos) { 221 digest=null; 222 223 if(log.isInfoEnabled()) log.info("flush_dests=" + flush_dests + 224 " , highest_seqnos=" + Util.array2String(highest_seqnos)); 225 226 if(!is_server) return digest; 228 229 if(flush_dests == null) { 230 if(log.isWarnEnabled()) log.warn("flush dest is null, ignoring flush !"); 231 return digest; 232 } 233 234 if(flush_dests.size() == 0) { 235 if(log.isWarnEnabled()) log.warn("flush dest is empty, ignoring flush !"); 236 return digest; 237 } 238 239 if(!flush_dests.contains(local_addr)) { 240 241 if(log.isWarnEnabled()) log.warn("am not in the flush dests, ignoring flush"); 242 return digest; 243 } 244 245 if(!blocked) { 247 blocked=true; 248 synchronized(block_mutex) { 249 passUp(new Event(Event.BLOCK)); 250 try {block_mutex.wait(block_timeout);} 251 catch(Exception e) {} 252 } 253 } 254 255 getMessageDigest(highest_seqnos); 257 if(log.isInfoEnabled()) log.info("returning digest : " + digest); 258 return digest; 259 } 260 261 262 263 264 265 266 267 void getHighestDeliveredSeqnos() { 268 synchronized(highest_delivered_mutex) { 269 passDown(new Event(Event.GET_MSGS_RECEIVED)); 270 try { 271 highest_delivered_mutex.wait(4000); 272 } 273 catch(Exception e) { 274 if(log.isDebugEnabled()) log.debug("exception is " + e); 275 } 276 } 277 } 278 279 280 281 282 283 284 void getMessageDigest(long[] highest_seqnos) { 285 synchronized(digest_mutex) { 286 passDown(new Event(Event.GET_MSG_DIGEST, highest_seqnos)); 287 try { 288 digest_mutex.wait(digest_timeout); 289 } 290 catch(Exception e) {} 291 } 292 } 293 294 295 296 297 298 299 300 306 public boolean handleUpEvent(Event evt) { 307 switch(evt.getType()) { 308 309 case Event.SET_LOCAL_ADDRESS: 310 local_addr=(Address)evt.getArg(); 311 break; 312 313 case Event.GET_MSG_DIGEST_OK: 314 synchronized(digest_mutex) { 315 digest=(Digest)evt.getArg(); 316 digest_mutex.notifyAll(); 317 } 318 return false; 320 case Event.GET_MSGS_RECEIVED_OK: 321 long[] tmp=(long[])evt.getArg(); 322 if(tmp != null) 323 System.arraycopy(tmp, 0, highest_delivered_msgs, 0, tmp.length); 324 synchronized(highest_delivered_mutex) { 325 highest_delivered_mutex.notifyAll(); 326 } 327 return false; 329 case Event.GET_MSGS_OK: 330 synchronized(get_msgs_mutex) { 331 get_msgs=(List)evt.getArg(); 332 get_msgs_mutex.notifyAll(); 333 } 334 break; 335 336 } 337 return true; 338 } 339 340 341 347 public boolean handleDownEvent(Event evt) { 348 Vector dests; 349 FlushRsp rsp; 350 351 switch(evt.getType()) { 352 case Event.FLUSH: 353 dests=(Vector )evt.getArg(); 354 if(dests == null) dests=new Vector (); 355 rsp=flush(dests); 356 passUp(new Event(Event.FLUSH_OK, rsp)); 357 return false; 359 case Event.BECOME_SERVER: 360 is_server=true; 361 break; 362 363 case Event.VIEW_CHANGE: 364 blocked=false; 365 366 Vector tmp=((View)evt.getArg()).getMembers(); 367 if(tmp != null) { 368 mbrs.removeAllElements(); 369 for(int i=0; i < tmp.size(); i++) 370 mbrs.addElement(tmp.elementAt(i)); 371 } 372 break; 373 } 374 return true; 375 } 376 377 378 379 380 381 391 public void receiveDownEvent(Event evt) { 392 if(evt.getType() == Event.BLOCK_OK) { synchronized(down_queue) { 394 Event event; 395 try { 396 while(down_queue.size() > 0) { 397 event=(Event)down_queue.remove(10); down(event); 399 } 400 } 401 catch(Exception e) {} 402 } 403 404 synchronized(block_mutex) { 405 block_mutex.notifyAll(); 406 } 407 return; 408 } 409 super.receiveDownEvent(evt); 410 } 411 412 413 414 public boolean setProperties(Properties props) {super.setProperties(props); 415 String str; 416 417 str=props.getProperty("block_timeout"); 418 if(str != null) { 419 block_timeout=Long.parseLong(str); 420 props.remove("block_timeout"); 421 } 422 423 str=props.getProperty("digest_timeout"); 424 if(str != null) { 425 digest_timeout=Long.parseLong(str); 426 props.remove("digest_timeout"); 427 } 428 429 if(props.size() > 0) { 430 System.err.println("EXAMPLE.setProperties(): these properties are not recognized:"); 431 props.list(System.out); 432 return false; 433 } 434 return true; 435 } 436 437 438 439 } 440 441 | Popular Tags |