KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > MERGE3


1 // $Id: MERGE3.java,v 1.5 2004/09/23 16:29:41 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.util.TimeScheduler;
9 import org.jgroups.util.Util;
10
11 import java.io.IOException JavaDoc;
12 import java.io.ObjectInput JavaDoc;
13 import java.io.ObjectOutput JavaDoc;
14 import java.util.*;
15
16
17
18
19 /**
20  * Protocol to discover subgroups, e.g. existing due to a network partition (that healed). Example: group
21  * {p,q,r,s,t,u,v,w} is split into 3 subgroups {p,q}, {r,s,t,u} and {v,w}. This protocol will eventually send
22  * a MERGE event with the coordinators of each subgroup up the stack: {p,r,v}. Note that - depending on the time
23  * of subgroup discovery - there could also be 2 MERGE events, which first join 2 of the subgroups, and then the
24  * resulting group to the last subgroup. The real work of merging the subgroups into one larger group is done
25  * somewhere above this protocol (typically in the GMS protocol).<p>
26  * This protocol works as follows:
27  * <ul>
28  * <li>If coordinator: periodically broadcast a "I'm the coordinator" message. If a coordinator receives such
29  * a message, it immediately initiates a merge by sending up a MERGE event
30  * <p>
31  *
32  * Provides: sends MERGE event with list of coordinators up the stack<br>
33  * @author Bela Ban, Oct 16 2001
34  */

35 public class MERGE3 extends Protocol {
36     Address local_addr=null;
37     long min_interval=5000; // minimum time between executions of the FindSubgroups task
38
long max_interval=20000; // maximum time between executions of the FindSubgroups task
39
boolean is_coord=false;
40     final Vector mbrs=new Vector();
41     TimeScheduler timer=null;
42     CoordinatorAnnouncer announcer_task=null;
43     final Set announcements=Collections.synchronizedSet(new HashSet());
44
45     /** Use a new thread to send the MERGE event up the stack */
46     boolean use_separate_thread=false;
47
48
49
50
51     public String JavaDoc getName() {
52         return "MERGE3";
53     }
54
55
56     public boolean setProperties(Properties props) {
57         String JavaDoc str;
58
59         super.setProperties(props);
60         str=props.getProperty("min_interval");
61         if(str != null) {
62             min_interval=Long.parseLong(str);
63             props.remove("min_interval");
64         }
65
66         str=props.getProperty("max_interval");
67         if(str != null) {
68             max_interval=Long.parseLong(str);
69             props.remove("max_interval");
70         }
71
72         if(min_interval <= 0 || max_interval <= 0) {
73             if(log.isErrorEnabled()) log.error("min_interval and max_interval have to be > 0");
74             return false;
75         }
76         if(max_interval <= min_interval) {
77             if(log.isErrorEnabled()) log.error("max_interval has to be greater than min_interval");
78             return false;
79         }
80
81         str=props.getProperty("use_separate_thread");
82         if(str != null) {
83             use_separate_thread=Boolean.valueOf(str).booleanValue();
84             props.remove("use_separate_thread");
85         }
86
87         if(props.size() > 0) {
88             System.err.println("MERGE2.setProperties(): the following properties are not recognized:");
89             props.list(System.out);
90             return false;
91         }
92         return true;
93     }
94
95     public void init() throws Exception JavaDoc {
96         timer=stack.timer;
97     }
98
99
100     /**
101      * DON'T REMOVE ! This prevents the up-handler thread to be created, which is not needed in the protocol.
102      */

103     public void startUpHandler() {
104         ;
105     }
106
107
108     /**
109      * DON'T REMOVE ! This prevents the down-handler thread to be created, which is not needed in the protocol.
110      */

111     public void startDownHandler() {
112         ;
113     }
114
115
116     public void up(Event evt) {
117         switch(evt.getType()) {
118
119             case Event.MSG:
120                 Message msg=(Message)evt.getArg();
121                 CoordAnnouncement hdr=(CoordAnnouncement)msg.removeHeader(getName());
122                 if(hdr != null) {
123                     if(hdr.coord_addr != null && is_coord) {
124                         boolean contains;
125                         contains=announcements.contains(hdr.coord_addr);
126                         announcements.add(hdr.coord_addr);
127                         if(log.isDebugEnabled()) {
128                             if(contains)
129                                 log.debug("discarded duplicate announcement: " + hdr.coord_addr +
130                                           ", announcements=" + announcements);
131                             else
132                                 log.debug("received announcement: " + hdr.coord_addr + ", announcements=" + announcements);
133                         }
134
135                         if(announcements.size() > 1 && is_coord) {
136                             processAnnouncements();
137                         }
138                     }
139                 }
140                 else
141                     passUp(evt);
142                 break;
143
144             case Event.SET_LOCAL_ADDRESS:
145                 local_addr=(Address)evt.getArg();
146                 passUp(evt);
147                 break;
148
149             default:
150                 passUp(evt); // Pass up to the layer above us
151
break;
152         }
153     }
154
155
156     public void down(Event evt) {
157         Vector tmp=null;
158         Address coord;
159
160         switch(evt.getType()) {
161
162             case Event.VIEW_CHANGE:
163                 passDown(evt);
164                 tmp=((View)evt.getArg()).getMembers();
165                 mbrs.clear();
166                 mbrs.addAll(tmp);
167                 coord=(Address)mbrs.elementAt(0);
168                 if(coord.equals(local_addr)) {
169                     if(is_coord == false) {
170                         is_coord=true;
171                         startCoordAnnouncerTask();
172                     }
173                 }
174                 else {
175                     if(is_coord == true) {
176                         is_coord=false;
177                         stopCoordAnnouncerTask();
178                     }
179                 }
180                 break;
181
182             default:
183                 passDown(evt); // Pass on to the layer below us
184
break;
185         }
186     }
187
188
189     void startCoordAnnouncerTask() {
190         if(announcer_task == null) {
191             announcements.add(local_addr);
192             announcer_task=new CoordinatorAnnouncer();
193             timer.add(announcer_task);
194             if(log.isDebugEnabled())
195                 log.debug("coordinator announcement task started, announcements=" + announcements);
196         }
197     }
198
199     void stopCoordAnnouncerTask() {
200         if(announcer_task != null) {
201             announcer_task.stop();
202             announcer_task=null;
203             announcements.clear();
204             if(log.isDebugEnabled())
205                 log.debug("coordinator announcement task stopped");
206         }
207     }
208
209
210
211     /**
212      * Returns a random value within [min_interval - max_interval]
213      */

214     long computeInterval() {
215         long retval=min_interval + Util.random(max_interval - min_interval);
216         return retval;
217     }
218
219
220
221     void sendCoordinatorAnnouncement(Address coord) {
222         Message coord_announcement=new Message(); // multicast to all
223
CoordAnnouncement hdr=new CoordAnnouncement(coord);
224         coord_announcement.putHeader(getName(), hdr);
225         passDown(new Event(Event.MSG, coord_announcement));
226     }
227
228     void processAnnouncements() {
229         if(announcements.size() > 1) {
230             Vector coords=new Vector(announcements); // create a clone
231
if(coords.size() > 1) {
232                 if(log.isDebugEnabled())
233                     log.debug("passing up MERGE event, coords=" + coords);
234                 final Event evt=new Event(Event.MERGE, coords);
235                 if(use_separate_thread) {
236                     Thread JavaDoc merge_notifier=new Thread JavaDoc() {
237                         public void run() {
238                             passUp(evt);
239                         }
240                     };
241                     merge_notifier.setDaemon(true);
242                     merge_notifier.setName("merge notifier thread");
243                 }
244                 else {
245                     passUp(evt);
246                 }
247             }
248             announcements.clear();
249         }
250     }
251
252
253     class CoordinatorAnnouncer implements TimeScheduler.Task {
254         boolean cancelled=false;
255
256         public void start() {
257             cancelled=false;
258         }
259
260         public void stop() {
261             cancelled=true;
262         }
263
264         public boolean cancelled() {
265             return cancelled;
266         }
267
268         public long nextInterval() {
269             return computeInterval();
270         }
271
272         public void run() {
273             if(is_coord)
274                 sendCoordinatorAnnouncement(local_addr);
275         }
276     }
277
278
279
280     public static class CoordAnnouncement extends Header {
281         Address coord_addr=null;
282
283         public CoordAnnouncement() {
284         }
285
286         public CoordAnnouncement(Address coord) {
287             this.coord_addr=coord;
288         }
289
290         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
291             coord_addr=(Address)in.readObject();
292         }
293
294         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
295             out.writeObject(coord_addr);
296         }
297     }
298
299 }
300
Popular Tags