1 package org.jgroups.protocols; 2 3 import org.jgroups.Event; 4 import org.jgroups.stack.Protocol; 5 import org.jgroups.util.TimeScheduler; 6 7 import java.util.Properties ; 8 import java.util.concurrent.ConcurrentHashMap ; 9 import java.util.concurrent.ConcurrentMap ; 10 import java.util.concurrent.Future ; 11 import java.util.concurrent.TimeUnit ; 12 import java.util.concurrent.atomic.AtomicBoolean ; 13 import java.util.concurrent.locks.Condition ; 14 import java.util.concurrent.locks.Lock ; 15 import java.util.concurrent.locks.ReentrantLock ; 16 17 27 28 public class BARRIER extends Protocol { 29 long max_close_time=60000; final Lock lock=new ReentrantLock (); 31 final AtomicBoolean barrier_closed=new AtomicBoolean (false); 32 33 34 Condition barrier_opened=lock.newCondition(); 35 Condition no_msgs_pending=lock.newCondition(); 36 ConcurrentMap <Thread ,Object > in_flight_threads=new ConcurrentHashMap <Thread ,Object >(); 37 Future barrier_opener_future=null; 38 TimeScheduler timer; 39 private static final Object NULL=new Object (); 40 41 42 public String getName() { 43 return "BARRIER"; 44 } 45 46 47 public boolean setProperties(Properties props) { 48 String str; 49 super.setProperties(props); 50 str=props.getProperty("max_close_time"); 51 if(str != null) { 52 max_close_time=Long.parseLong(str); 53 props.remove("max_close_time"); 54 } 55 56 if(!props.isEmpty()) { 57 log.error("these properties are not recognized: " + props); 58 return false; 59 } 60 return true; 61 } 62 63 public boolean isClosed() { 64 return barrier_closed.get(); 65 } 66 67 68 public int getNumberOfInFlightThreads() { 69 return in_flight_threads.size(); 70 } 71 72 public void init() throws Exception { 73 super.init(); 74 timer=stack.timer; 75 } 76 77 public void stop() { 78 super.stop(); 79 openBarrier(); 80 } 81 82 83 public void destroy() { 84 super.destroy(); 85 openBarrier(); 86 } 87 88 public Object down(Event evt) { 89 switch(evt.getType()) { 90 case Event.CLOSE_BARRIER: 91 closeBarrier(); 92 return null; 93 case Event.OPEN_BARRIER: 94 openBarrier(); 95 return null; 96 } 97 return down_prot.down(evt); 98 } 99 100 public Object up(Event evt) { 101 switch(evt.getType()) { 102 case Event.MSG: 103 Thread current_thread=Thread.currentThread(); 104 in_flight_threads.put(current_thread, NULL); 105 if(barrier_closed.get()) { 106 lock.lock(); 107 try { 108 while(barrier_closed.get()) { 109 try { 110 barrier_opened.await(); 111 } 112 catch(InterruptedException e) { 113 } 114 } 115 } 116 finally { 117 lock.unlock(); 118 } 119 } 120 121 try { 122 return up_prot.up(evt); 123 } 124 finally { 125 lock.lock(); 126 try { 127 if(in_flight_threads.remove(current_thread) == NULL && 128 in_flight_threads.isEmpty() && 129 barrier_closed.get()) { 130 no_msgs_pending.signalAll(); 131 } 132 } 133 finally { 134 lock.unlock(); 135 } 136 } 137 case Event.CLOSE_BARRIER: 138 closeBarrier(); 139 return null; 140 case Event.OPEN_BARRIER: 141 openBarrier(); 142 return null; 143 } 144 return up_prot.up(evt); 145 } 146 147 148 private void closeBarrier() { 149 if(!barrier_closed.compareAndSet(false, true)) 150 return; 152 lock.lock(); 153 try { 154 in_flight_threads.remove(Thread.currentThread()); 156 while(!in_flight_threads.isEmpty()) { 157 try { 158 no_msgs_pending.await(); 159 } 160 catch(InterruptedException e) { 161 } 162 } 163 } 164 finally { 165 lock.unlock(); 166 } 167 168 if(log.isTraceEnabled()) 169 log.trace("barrier was closed"); 170 171 if(max_close_time > 0) 172 scheduleBarrierOpener(); 173 } 174 175 private void openBarrier() { 176 lock.lock(); 177 try { 178 if(!barrier_closed.compareAndSet(true, false)) 179 return; barrier_opened.signalAll(); 181 } 182 finally { 183 lock.unlock(); 184 } 185 if(log.isTraceEnabled()) 186 log.trace("barrier was opened"); 187 cancelBarrierOpener(); } 189 190 private void scheduleBarrierOpener() { 191 if(barrier_opener_future == null || barrier_opener_future.isDone()) { 192 barrier_opener_future=timer.schedule(new Runnable () {public void run() {openBarrier();}}, 193 max_close_time, TimeUnit.MILLISECONDS 194 ); 195 } 196 } 197 198 private void cancelBarrierOpener() { 199 if(barrier_opener_future != null) { 200 barrier_opener_future.cancel(true); 201 barrier_opener_future=null; 202 } 203 } 204 } 205 | Popular Tags |