KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > BARRIER


1 package org.jgroups.protocols;
2
3 import org.jgroups.Event;
4 import org.jgroups.stack.Protocol;
5 import org.jgroups.util.TimeScheduler;
6
7 import java.util.Properties JavaDoc;
8 import java.util.concurrent.ConcurrentHashMap JavaDoc;
9 import java.util.concurrent.ConcurrentMap JavaDoc;
10 import java.util.concurrent.Future JavaDoc;
11 import java.util.concurrent.TimeUnit JavaDoc;
12 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
13 import java.util.concurrent.locks.Condition JavaDoc;
14 import java.util.concurrent.locks.Lock JavaDoc;
15 import java.util.concurrent.locks.ReentrantLock JavaDoc;
16
17 /**
18  * All messages up the stack have to go through a barrier (read lock, RL). By default, the barrier is open.
19  * When a CLOSE_BARRIER event is received, we close the barrier by acquiring a write lock (WL). This succeeds when all
20  * previous messages have completed (by releasing their RLs). Thus, when we have acquired the WL, we know that there
21  * are no pending messages processed.<br/>
22  * When an OPEN_BARRIER event is received, we simply open the barrier again and let all messages pass in the up
23  * direction. This is done by releasing the WL.
24  * @author Bela Ban
25  * @version $Id: BARRIER.java,v 1.6 2007/06/11 08:14:39 belaban Exp $
26  */

27
28 public class BARRIER extends Protocol {
29     long max_close_time=60000; // how long can the barrier stay closed (in ms) ? 0 means forever
30
final Lock JavaDoc lock=new ReentrantLock JavaDoc();
31     final AtomicBoolean JavaDoc barrier_closed=new AtomicBoolean JavaDoc(false);
32
33     /** signals to waiting threads that the barrier is open again */
34     Condition JavaDoc barrier_opened=lock.newCondition();
35     Condition JavaDoc no_msgs_pending=lock.newCondition();
36     ConcurrentMap JavaDoc<Thread JavaDoc,Object JavaDoc> in_flight_threads=new ConcurrentHashMap JavaDoc<Thread JavaDoc,Object JavaDoc>();
37     Future JavaDoc barrier_opener_future=null;
38     TimeScheduler timer;
39     private static final Object JavaDoc NULL=new Object JavaDoc();
40
41
42     public String JavaDoc getName() {
43         return "BARRIER";
44     }
45
46
47     public boolean setProperties(Properties JavaDoc props) {
48         String JavaDoc str;
49         super.setProperties(props);
50         str=props.getProperty("max_close_time");
51         if(str != null) {
52             max_close_time=Long.parseLong(str);
53             props.remove("max_close_time");
54         }
55
56         if(!props.isEmpty()) {
57             log.error("these properties are not recognized: " + props);
58             return false;
59         }
60         return true;
61     }
62
63     public boolean isClosed() {
64         return barrier_closed.get();
65     }
66
67
68     public int getNumberOfInFlightThreads() {
69         return in_flight_threads.size();
70     }
71
72     public void init() throws Exception JavaDoc {
73         super.init();
74         timer=stack.timer;
75     }
76
77     public void stop() {
78         super.stop();
79         openBarrier();
80     }
81
82
83     public void destroy() {
84         super.destroy();
85         openBarrier();
86     }
87
88     public Object JavaDoc down(Event evt) {
89         switch(evt.getType()) {
90             case Event.CLOSE_BARRIER:
91                 closeBarrier();
92                 return null;
93             case Event.OPEN_BARRIER:
94                 openBarrier();
95                 return null;
96         }
97         return down_prot.down(evt);
98     }
99
100     public Object JavaDoc up(Event evt) {
101         switch(evt.getType()) {
102             case Event.MSG:
103                 Thread JavaDoc current_thread=Thread.currentThread();
104                 in_flight_threads.put(current_thread, NULL);
105                 if(barrier_closed.get()) {
106                     lock.lock();
107                     try {
108                         while(barrier_closed.get()) {
109                             try {
110                                 barrier_opened.await();
111                             }
112                             catch(InterruptedException JavaDoc e) {
113                             }
114                         }
115                     }
116                     finally {
117                         lock.unlock();
118                     }
119                 }
120
121                 try {
122                     return up_prot.up(evt);
123                 }
124                 finally {
125                     lock.lock();
126                     try {
127                         if(in_flight_threads.remove(current_thread) == NULL &&
128                                 in_flight_threads.isEmpty() &&
129                                 barrier_closed.get()) {
130                             no_msgs_pending.signalAll();
131                         }
132                     }
133                     finally {
134                         lock.unlock();
135                     }
136                 }
137             case Event.CLOSE_BARRIER:
138                 closeBarrier();
139                 return null;
140             case Event.OPEN_BARRIER:
141                 openBarrier();
142                 return null;
143         }
144         return up_prot.up(evt);
145     }
146
147
148     private void closeBarrier() {
149         if(!barrier_closed.compareAndSet(false, true))
150             return; // barrier was already closed
151

152         lock.lock();
153         try {
154             // wait until all pending (= in-progress) msgs have returned
155
in_flight_threads.remove(Thread.currentThread());
156             while(!in_flight_threads.isEmpty()) {
157                 try {
158                     no_msgs_pending.await();
159                 }
160                 catch(InterruptedException JavaDoc e) {
161                 }
162             }
163         }
164         finally {
165             lock.unlock();
166         }
167
168         if(log.isTraceEnabled())
169             log.trace("barrier was closed");
170
171         if(max_close_time > 0)
172             scheduleBarrierOpener();
173     }
174
175     private void openBarrier() {
176         lock.lock();
177         try {
178             if(!barrier_closed.compareAndSet(true, false))
179                 return; // barrier was already open
180
barrier_opened.signalAll();
181         }
182         finally {
183             lock.unlock();
184         }
185         if(log.isTraceEnabled())
186             log.trace("barrier was opened");
187         cancelBarrierOpener(); // cancels if running
188
}
189
190     private void scheduleBarrierOpener() {
191         if(barrier_opener_future == null || barrier_opener_future.isDone()) {
192             barrier_opener_future=timer.schedule(new Runnable JavaDoc() {public void run() {openBarrier();}},
193                                                  max_close_time, TimeUnit.MILLISECONDS
194             );
195         }
196     }
197
198     private void cancelBarrierOpener() {
199         if(barrier_opener_future != null) {
200             barrier_opener_future.cancel(true);
201             barrier_opener_future=null;
202         }
203     }
204 }
205
Popular Tags