1 3 package org.jgroups.stack; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.Event; 9 import org.jgroups.util.Queue; 10 import org.jgroups.util.QueueClosedException; 11 import org.jgroups.util.Util; 12 13 import java.util.Properties ; 14 import java.util.Vector ; 15 16 17 18 19 class UpHandler extends Thread { 20 private Queue mq=null; 21 private Protocol handler=null; 22 private ProtocolObserver observer=null; 23 protected final Log log=LogFactory.getLog(this.getClass()); 24 25 26 public UpHandler(Queue mq, Protocol handler, ProtocolObserver observer) { 27 this.mq=mq; 28 this.handler=handler; 29 this.observer=observer; 30 if(handler != null) 31 setName("UpHandler (" + handler.getName() + ')'); 32 else 33 setName("UpHandler"); 34 setDaemon(true); 35 } 36 37 38 public void setObserver(ProtocolObserver observer) { 39 this.observer=observer; 40 } 41 42 43 44 public void run() { 45 Event evt; 46 while(!mq.closed()) { 47 try { 48 evt=(Event)mq.remove(); 49 if(evt == null) { 50 if(log.isWarnEnabled()) log.warn("removed null event"); 51 continue; 52 } 53 54 if(observer != null) { if(observer.up(evt, mq.size()) == false) { return; 57 } 58 } 59 handler.up(evt); 60 } 61 catch(QueueClosedException queue_closed) { 62 break; 63 } 64 catch(Throwable e) { 65 if(log.isWarnEnabled()) log.warn(getName() + " exception: " + e); 66 e.printStackTrace(); 67 } 68 } 69 } 70 71 } 72 73 74 class DownHandler extends Thread { 75 private Queue mq=null; 76 private Protocol handler=null; 77 private ProtocolObserver observer=null; 78 protected final Log log=LogFactory.getLog(this.getClass()); 79 80 81 82 public DownHandler(Queue mq, Protocol handler, ProtocolObserver observer) { 83 this.mq=mq; 84 this.handler=handler; 85 this.observer=observer; 86 if(handler != null) 87 setName("DownHandler (" + handler.getName() + ')'); 88 else 89 setName("DownHandler"); 90 setDaemon(true); 91 } 92 93 94 public void setObserver(ProtocolObserver observer) { 95 this.observer=observer; 96 } 97 98 99 100 public void run() { 101 Event evt; 102 while(!mq.closed()) { 103 try { 104 evt=(Event)mq.remove(); 105 if(evt == null) { 106 if(log.isWarnEnabled()) log.warn("removed null event"); 107 continue; 108 } 109 110 if(observer != null) { if(observer.down(evt, mq.size()) == false) { continue; 113 } 114 } 115 116 int type=evt.getType(); 117 if(type == Event.START || type == Event.STOP) { 118 if(handler.handleSpecialDownEvent(evt) == false) 119 continue; 120 } 121 handler.down(evt); 122 } 123 catch(QueueClosedException queue_closed) { 124 break; 125 } 126 catch(Throwable e) { 127 if(log.isWarnEnabled()) log.warn(getName() + " exception is " + e); 128 e.printStackTrace(); 129 } 130 } 131 } 132 133 } 134 135 136 157 public abstract class Protocol { 158 protected final Properties props=new Properties (); 159 protected Protocol up_prot=null, down_prot=null; 160 protected ProtocolStack stack=null; 161 protected final Queue up_queue=new Queue(); 162 protected final Queue down_queue=new Queue(); 163 protected UpHandler up_handler=null; 164 protected int up_thread_prio=-1; 165 protected DownHandler down_handler=null; 166 protected int down_thread_prio=-1; 167 protected ProtocolObserver observer=null; private final static long THREAD_JOIN_TIMEOUT=1000; 169 protected boolean down_thread=true; protected boolean up_thread=true; protected final Log log=LogFactory.getLog(this.getClass()); 172 173 174 180 public boolean setProperties(Properties props) { 181 if(props != null) 182 this.props.putAll(props); 183 return true; 184 } 185 186 187 190 public boolean setPropertiesInternal(Properties props) { 191 String str; 192 this.props.putAll(props); 193 194 str=props.getProperty("down_thread"); 195 if(str != null) { 196 down_thread=Boolean.valueOf(str).booleanValue(); 197 props.remove("down_thread"); 198 } 199 200 str=props.getProperty("down_thread_prio"); 201 if(str != null) { 202 down_thread_prio=Integer.parseInt(str); 203 props.remove("down_thread_prio"); 204 } 205 206 str=props.getProperty("up_thread"); 207 if(str != null) { 208 up_thread=Boolean.valueOf(str).booleanValue(); 209 props.remove("up_thread"); 210 } 211 212 str=props.getProperty("up_thread_prio"); 213 if(str != null) { 214 up_thread_prio=Integer.parseInt(str); 215 props.remove("up_thread_prio"); 216 } 217 218 return setProperties(props); 219 } 220 221 222 public Properties getProperties() { 223 return props; 224 } 225 226 227 public void setObserver(ProtocolObserver observer) { 228 this.observer=observer; 229 observer.setProtocol(this); 230 if(up_handler != null) 231 up_handler.setObserver(observer); 232 if(down_handler != null) 233 down_handler.setObserver(observer); 234 } 235 236 242 public void init() throws Exception { 243 } 244 245 253 public void start() throws Exception { 254 } 255 256 263 public void stop() { 264 } 265 266 267 271 public void destroy() { 272 } 273 274 275 public Queue getUpQueue() { 276 return up_queue; 277 } 279 public Queue getDownQueue() { 280 return down_queue; 281 } 283 284 286 public Vector requiredUpServices() { 287 return null; 288 } 289 290 292 public Vector requiredDownServices() { 293 return null; 294 } 295 296 299 public Vector providedUpServices() { 300 return null; 301 } 302 303 306 public Vector providedDownServices() { 307 return null; 308 } 309 310 311 public abstract String getName(); 313 public Protocol getUpProtocol() { 314 return up_prot; 315 } 316 317 public Protocol getDownProtocol() { 318 return down_prot; 319 } 320 321 public void setUpProtocol(Protocol up_prot) { 322 this.up_prot=up_prot; 323 } 324 325 public void setDownProtocol(Protocol down_prot) { 326 this.down_prot=down_prot; 327 } 328 329 public void setProtocolStack(ProtocolStack stack) { 330 this.stack=stack; 331 } 332 333 334 336 public void startUpHandler() { 337 if(up_thread) { 338 if(up_handler == null) { 339 up_handler=new UpHandler(up_queue, this, observer); 340 if(up_thread_prio >= 0) { 341 try { 342 up_handler.setPriority(up_thread_prio); 343 } 344 catch(Throwable t) { 345 if(log.isErrorEnabled()) log.error("priority " + up_thread_prio + 346 " could not be set for thread: " + Util.getStackTrace(t)); 347 } 348 } 349 up_handler.start(); 350 } 351 } 352 } 353 354 355 357 public void startDownHandler() { 358 if(down_thread) { 359 if(down_handler == null) { 360 down_handler=new DownHandler(down_queue, this, observer); 361 if(down_thread_prio >= 0) { 362 try { 363 down_handler.setPriority(down_thread_prio); 364 } 365 catch(Throwable t) { 366 if(log.isErrorEnabled()) log.error("priority " + down_thread_prio + 367 " could not be set for thread: " + Util.getStackTrace(t)); 368 } 369 } 370 down_handler.start(); 371 } 372 } 373 } 374 375 376 377 public void stopInternal() { 378 up_queue.close(false); 380 if(up_handler != null && up_handler.isAlive()) { 381 try { 382 up_handler.join(THREAD_JOIN_TIMEOUT); 383 } 384 catch(Exception ex) { 385 } 386 if(up_handler != null && up_handler.isAlive()) { 387 up_handler.interrupt(); try { 389 up_handler.join(THREAD_JOIN_TIMEOUT); 390 } 391 catch(Exception ex) { 392 } 393 if(up_handler != null && up_handler.isAlive()) 394 if(log.isErrorEnabled()) log.error("up_handler thread for " + getName() + 395 " was interrupted (in order to be terminated), but is still alive"); 396 } 397 } 398 up_handler=null; 399 400 down_queue.close(false); if(down_handler != null && down_handler.isAlive()) { 402 try { 403 down_handler.join(THREAD_JOIN_TIMEOUT); 404 } 405 catch(Exception ex) { 406 } 407 if(down_handler != null && down_handler.isAlive()) { 408 down_handler.interrupt(); try { 410 down_handler.join(THREAD_JOIN_TIMEOUT); 411 } 412 catch(Exception ex) { 413 } 414 if(down_handler != null && down_handler.isAlive()) 415 if(log.isErrorEnabled()) log.error("down_handler thread for " + getName() + 416 " was interrupted (in order to be terminated), but is is still alive"); 417 } 418 } 419 down_handler=null; 420 } 421 422 423 429 protected void receiveUpEvent(Event evt) { 430 if(up_handler == null) { 431 if(observer != null) { if(observer.up(evt, up_queue.size()) == false) { return; 434 } 435 } 436 up(evt); 437 return; 438 } 439 try { 440 up_queue.add(evt); 441 } 442 catch(Exception e) { 443 if(log.isWarnEnabled()) log.warn("exception: " + e); 444 } 445 } 446 447 453 protected void receiveDownEvent(Event evt) { 454 if(down_handler == null) { 455 if(observer != null) { if(observer.down(evt, down_queue.size()) == false) { return; 458 } 459 } 460 int type=evt.getType(); 461 if(type == Event.START || type == Event.STOP) { 462 if(handleSpecialDownEvent(evt) == false) 463 return; 464 } 465 down(evt); 466 return; 467 } 468 try { 469 down_queue.add(evt); 470 } 471 catch(Exception e) { 472 if(log.isWarnEnabled()) log.warn("exception: " + e); 473 } 474 } 475 476 480 public void passUp(Event evt) { 481 if(observer != null) { if(observer.passUp(evt) == false) { return; 484 } 485 } 486 487 if(up_prot != null) { 488 up_prot.receiveUpEvent(evt); 489 } 490 else 491 if(log.isErrorEnabled()) log.error("no upper layer available"); 492 } 493 494 498 public void passDown(Event evt) { 499 if(observer != null) { if(observer.passDown(evt) == false) { return; 502 } 503 } 504 505 if(down_prot != null) { 506 down_prot.receiveDownEvent(evt); 507 } 508 else 509 if(log.isErrorEnabled()) log.error("no lower layer available"); 510 } 511 512 513 522 public void up(Event evt) { 523 passUp(evt); 524 } 525 526 534 public void down(Event evt) { 535 passDown(evt); 536 } 537 538 539 543 protected boolean handleSpecialDownEvent(Event evt) { 544 switch(evt.getType()) { 545 case Event.START: 546 try { 547 start(); 548 549 if(down_prot == null) { 551 passUp(new Event(Event.START_OK, Boolean.TRUE)); 552 return false; } 554 else 555 return true; } 557 catch(Exception e) { 558 passUp(new Event(Event.START_OK, new Exception ("exception caused by " + getName() + ".start(): " + e))); 559 return false; 560 } 561 case Event.STOP: 562 stop(); 563 if(down_prot == null) { 564 passUp(new Event(Event.STOP_OK, Boolean.TRUE)); 565 return false; } 567 else 568 return true; default: 570 return true; } 572 } 573 } 574 | Popular Tags |