1 package org.jgroups.protocols; 2 3 import org.jgroups.stack.Protocol; 4 import org.jgroups.Event; 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 8 import java.util.List ; 9 import java.util.LinkedList ; 10 import java.util.Vector ; 11 12 17 public class PingWaiter implements Runnable { 18 Thread t=null; 19 List rsps=new LinkedList (); 20 long timeout=3000; 21 int num_rsps=3; 22 Protocol parent=null; 23 PingSender ping_sender; 24 protected final Log log=LogFactory.getLog(this.getClass()); 25 26 27 public PingWaiter(long timeout, int num_rsps, Protocol parent, PingSender ping_sender) { 28 this.timeout=timeout; 29 this.num_rsps=num_rsps; 30 this.parent=parent; 31 this.ping_sender=ping_sender; 32 } 33 34 35 public synchronized void start() { 36 if(t == null || !t.isAlive()) { 38 t=new Thread (this, "PingWaiter"); 39 t.setDaemon(true); 40 t.start(); 41 } 42 } 43 44 public synchronized void stop() { 45 if(ping_sender != null) 46 ping_sender.stop(); 47 if(t != null) { 48 t=null; 50 synchronized(rsps) { 52 rsps.notifyAll(); 53 } 54 } 55 } 56 57 58 public synchronized boolean isRunning() { 59 return t != null && t.isAlive(); 60 } 61 62 public void addResponse(PingRsp rsp) { 63 if(rsp != null) { 64 synchronized(rsps) { 65 if(rsps.contains(rsp)) 66 rsps.remove(rsp); rsps.add(rsp); 68 rsps.notifyAll(); 69 } 70 } 71 } 72 73 public void clearResponses() { 74 synchronized(rsps) { 75 rsps.clear(); 76 rsps.notifyAll(); 77 } 78 } 79 80 81 public List getResponses() { 82 return rsps; 83 } 84 85 86 87 public void run() { 88 long start_time, time_to_wait; 89 90 91 92 synchronized(rsps) { 93 if(rsps.size() > 0) { 94 if(log.isTraceEnabled()) 95 log.trace("clearing old responses: " + rsps); 96 rsps.clear(); 97 } 98 99 ping_sender.start(); 100 101 start_time=System.currentTimeMillis(); 102 time_to_wait=timeout; 103 104 try { 105 while(rsps.size() < num_rsps && time_to_wait > 0 && t != null && Thread.currentThread().equals(t)) { 106 if(log.isTraceEnabled()) log.trace("waiting for initial members: time_to_wait=" + time_to_wait + 108 ", got " + rsps.size() + " rsps"); 109 110 try { 111 rsps.wait(time_to_wait); 112 } 113 catch(InterruptedException intex) { 114 ; 115 } 116 catch(Exception e) { 117 log.error("got an exception waiting for responses", e); 118 } 119 time_to_wait=timeout - (System.currentTimeMillis() - start_time); 120 } 121 if(log.isDebugEnabled()) 122 log.debug("initial mbrs are " + rsps); 123 } 124 finally { 125 if(ping_sender != null) 127 ping_sender.stop(); 128 129 if(parent != null) 130 parent.passUp(new Event(Event.FIND_INITIAL_MBRS_OK, new Vector (rsps))); 131 } 132 } 133 } 134 } 135 | Popular Tags |