KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > MERGE2


1 // $Id: MERGE2.java,v 1.17 2005/04/20 20:25:47 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.Address;
7 import org.jgroups.Event;
8 import org.jgroups.View;
9 import org.jgroups.stack.Protocol;
10 import org.jgroups.util.Promise;
11 import org.jgroups.util.Util;
12
13 import java.util.Properties JavaDoc;
14 import java.util.Vector JavaDoc;
15
16
17
18
19 /**
20  * Protocol to discover subgroups, e.g. existing due to a network partition (that healed). Example: group
21  * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send
22  * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time
23  * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the
24  * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done
25  * somewhere above this protocol (typically in the GMS protocol).<p>
26  * This protocol works as follows:
27  * <ul>
28  * <li>If coordinator: periodically retrieve the initial membership (using the FIND_INITIAL_MBRS event provided e.g.
29  * by PING or TCPPING protocols. This list contains {coord,addr} pairs.
30  * <li>If there is more than 1 coordinator:
31  * <ol>
32  * <li>Get all coordinators
33  * <li>Create a MERGE event with the list of coordinators as argument
34  * <li>Send the event up the stack
35  * </ol>
36  * </ul>
37  *
38  * <p>
39  *
40  * Requires: FIND_INITIAL_MBRS event from below<br>
41  * Provides: sends MERGE event with list of coordinators up the stack<br>
42  * @author Bela Ban, Oct 16 2001
43  */

44 public class MERGE2 extends Protocol {
45     Address local_addr=null;
46     FindSubgroups task=null; // task periodically executing as long as we are coordinator
47
private final Object JavaDoc task_lock=new Object JavaDoc();
48     long min_interval=5000; // minimum time between executions of the FindSubgroups task
49
long max_interval=20000; // maximum time between executions of the FindSubgroups task
50
boolean is_coord=false;
51     final Promise find_promise=new Promise(); // to synchronize FindSubgroups.findInitialMembers() on
52

53     /** Use a new thread to send the MERGE event up the stack */
54     boolean use_separate_thread=false;
55
56
57     public String JavaDoc getName() {
58         return "MERGE2";
59     }
60
61
62     public boolean setProperties(Properties JavaDoc props) {
63         String JavaDoc str;
64
65         super.setProperties(props);
66         str=props.getProperty("min_interval");
67         if(str != null) {
68             min_interval=Long.parseLong(str);
69             props.remove("min_interval");
70         }
71
72         str=props.getProperty("max_interval");
73         if(str != null) {
74             max_interval=Long.parseLong(str);
75             props.remove("max_interval");
76         }
77
78         if(min_interval <= 0 || max_interval <= 0) {
79             if(log.isErrorEnabled()) log.error("min_interval and max_interval have to be > 0");
80             return false;
81         }
82         if(max_interval <= min_interval) {
83             if(log.isErrorEnabled()) log.error("max_interval has to be greater than min_interval");
84             return false;
85         }
86
87         str=props.getProperty("use_separate_thread");
88         if(str != null) {
89             use_separate_thread=Boolean.valueOf(str).booleanValue();
90             props.remove("use_separate_thread");
91         }
92
93         if(props.size() > 0) {
94             System.err.println("MERGE2.setProperties(): the following properties are not recognized:");
95             props.list(System.out);
96             return false;
97         }
98         return true;
99     }
100
101
102     public Vector JavaDoc requiredDownServices() {
103         Vector JavaDoc retval=new Vector JavaDoc(1);
104         retval.addElement(new Integer JavaDoc(Event.FIND_INITIAL_MBRS));
105         return retval;
106     }
107
108
109     public void stop() {
110         is_coord=false;
111         stopTask();
112     }
113
114
115     /**
116      * DON'T REMOVE ! This prevents the up-handler thread to be created, which is not needed in the protocol.
117      */

118     public void startUpHandler() {
119         ;
120     }
121
122
123     /**
124      * DON'T REMOVE ! This prevents the down-handler thread to be created, which is not needed in the protocol.
125      */

126     public void startDownHandler() {
127         ;
128     }
129
130
131     public void up(Event evt) {
132         switch(evt.getType()) {
133
134             case Event.SET_LOCAL_ADDRESS:
135                 local_addr=(Address)evt.getArg();
136                 passUp(evt);
137                 break;
138
139             case Event.FIND_INITIAL_MBRS_OK:
140                 find_promise.setResult(evt.getArg());
141                 passUp(evt); // could be needed by GMS
142
break;
143
144             default:
145                 passUp(evt); // Pass up to the layer above us
146
break;
147         }
148     }
149
150
151     public void down(Event evt) {
152         Vector JavaDoc mbrs=null;
153         Address coord;
154
155         switch(evt.getType()) {
156
157             case Event.VIEW_CHANGE:
158                 passDown(evt);
159                 mbrs=((View)evt.getArg()).getMembers();
160                 if(mbrs == null || mbrs.size() == 0 || local_addr == null) {
161                     stopTask();
162                     break;
163                 }
164                 coord=(Address)mbrs.elementAt(0);
165                 if(coord.equals(local_addr)) {
166                     is_coord=true;
167                     startTask(); // start task if we became coordinator (doesn't start if already running)
168
}
169                 else {
170                     // if we were coordinator, but are no longer, stop task. this happens e.g. when we merge and someone
171
// else becomes the new coordinator of the merged group
172
if(is_coord) {
173                         is_coord=false;
174                     }
175                     stopTask();
176                 }
177                 break;
178
179             default:
180                 passDown(evt); // Pass on to the layer below us
181
break;
182         }
183     }
184
185
186     /* -------------------------------------- Private Methods --------------------------------------- */
187     void startTask() {
188         synchronized(task_lock) {
189             if(task == null)
190                 task=new FindSubgroups();
191             task.start();
192         }
193     }
194
195     void stopTask() {
196         synchronized(task_lock) {
197             if(task != null) {
198                 task.stop(); // will cause timer to remove task from execution schedule
199
task=null;
200             }
201         }
202     }
203     /* ---------------------------------- End of Private Methods ------------------------------------ */
204
205
206
207
208     /**
209      * Task periodically executing (if role is coordinator). Gets the initial membership and determines
210      * whether there are subgroups (multiple coordinators for the same group). If yes, it sends a MERGE event
211      * with the list of the coordinators up the stack
212      */

213     private class FindSubgroups implements Runnable JavaDoc {
214         Thread JavaDoc thread=null;
215
216
217         public void start() {
218             if(thread == null || !thread.isAlive()) {
219                 thread=new Thread JavaDoc(this, "MERGE2.FindSubgroups thread");
220                 thread.setDaemon(true);
221                 thread.start();
222             }
223         }
224
225
226         public void stop() {
227             if(thread != null) {
228                 Thread JavaDoc tmp=thread;
229                 thread=null;
230                 tmp.interrupt(); // wakes up sleeping thread
231
find_promise.reset();
232             }
233             thread=null;
234         }
235
236
237         public void run() {
238             long interval;
239             Vector JavaDoc coords=null;
240             Vector JavaDoc initial_mbrs;
241
242             if(log.isDebugEnabled()) log.debug("merge task started as I'm the coordinator");
243             while(thread != null && Thread.currentThread().equals(thread)) {
244                 interval=computeInterval();
245                 Util.sleep(interval);
246                 if(thread == null) break;
247                 initial_mbrs=findInitialMembers();
248                 if(thread == null) break;
249                 if(log.isDebugEnabled()) log.debug("initial_mbrs=" + initial_mbrs);
250                 coords=detectMultipleCoordinators(initial_mbrs);
251                 if(coords != null && coords.size() > 1) {
252                     if(log.isDebugEnabled())
253                         log.debug("found multiple coordinators: " + coords + "; sending up MERGE event");
254                     final Event evt=new Event(Event.MERGE, coords);
255                     if(use_separate_thread) {
256                         Thread JavaDoc merge_notifier=new Thread JavaDoc() {
257                             public void run() {
258                                 passUp(evt);
259                             }
260                         };
261                         merge_notifier.setDaemon(true);
262                         merge_notifier.setName("merge notifier thread");
263                         merge_notifier.start();
264                     }
265                     else {
266                         passUp(evt);
267                     }
268                 }
269                 else {
270                     if(log.isTraceEnabled())
271                         log.trace("didn't find multiple coordinators in " + initial_mbrs + ", no need for merge");
272                 }
273             }
274             if(log.isTraceEnabled())
275                 log.trace("MERGE2.FindSubgroups thread terminated");
276         }
277
278
279         /**
280          * Returns a random value within [min_interval - max_interval]
281          */

282         long computeInterval() {
283             long retval=min_interval + Util.random(max_interval - min_interval);
284             return retval;
285         }
286
287
288         /**
289          * Returns a list of PingRsp pairs.
290          */

291         Vector JavaDoc findInitialMembers() {
292             PingRsp tmp=new PingRsp(local_addr, local_addr, true);
293             find_promise.reset();
294             passDown(Event.FIND_INITIAL_MBRS_EVT);
295             Vector JavaDoc retval=(Vector JavaDoc)find_promise.getResult(0); // wait indefinitely until response is received
296
if(retval != null && is_coord && local_addr != null && !retval.contains(tmp))
297                 retval.add(tmp);
298             return retval;
299         }
300
301
302         /**
303          * Finds out if there is more than 1 coordinator in the initial_mbrs vector (contains PingRsp elements).
304          * @param initial_mbrs A list of PingRsp pairs
305          * @return Vector A list of the coordinators (Addresses) found. Will contain just 1 element for a correct
306          * membership, and more than 1 for multiple coordinators
307          */

308         Vector JavaDoc detectMultipleCoordinators(Vector JavaDoc initial_mbrs) {
309             Vector JavaDoc ret=new Vector JavaDoc(11);
310             PingRsp rsp;
311             Address coord;
312
313             if(initial_mbrs == null) return null;
314             for(int i=0; i < initial_mbrs.size(); i++) {
315                 rsp=(PingRsp)initial_mbrs.elementAt(i);
316                 if(!rsp.is_server)
317                     continue;
318                 coord=rsp.getCoordAddress();
319                 if(!ret.contains(coord))
320                     ret.addElement(coord);
321             }
322
323             return ret;
324         }
325
326     }
327
328 }
329
Popular Tags