KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > MERGE


1 // $Id: MERGE.java,v 1.7 2004/12/31 14:10:38 belaban Exp $
2

3 package org.jgroups.protocols;
4
5
6 import org.jgroups.*;
7 import org.jgroups.stack.Protocol;
8 import org.jgroups.stack.RouterStub;
9 import org.jgroups.util.List;
10 import org.jgroups.util.Util;
11
12 import java.io.IOException JavaDoc;
13 import java.io.ObjectInput JavaDoc;
14 import java.io.ObjectOutput JavaDoc;
15 import java.util.Enumeration JavaDoc;
16 import java.util.Properties JavaDoc;
17 import java.util.Vector JavaDoc;
18
19
20 /**
21  * Simple and stupid MERGE protocol (does not take into account state transfer).
22  * Periodically mcasts a HELLO message with its own address. When a HELLO message is
23  * received from a member that has the same group (UDP discards all messages with a group
24  * name different that our own), but is not currently in the group, a MERGE event is sent
25  * up the stack. The protocol starts working upon receiving a View in which it is the coordinator.
26  *
27  * @author Gianluca Collot, Jan 2001
28  */

29 public class MERGE extends Protocol implements Runnable JavaDoc {
30     Vector JavaDoc members=new Vector JavaDoc();
31     Address local_addr=null;
32     String JavaDoc group_addr=null;
33     final String JavaDoc groupname=null;
34     Thread JavaDoc hello_thread=null; // thread that periodically mcasts HELLO messages
35
long timeout=5000; // timeout between mcasting of HELLO messages
36

37     String JavaDoc router_host=null;
38     int router_port=0;
39
40     RouterStub client=null;
41     boolean is_server=false;
42     boolean is_coord=false;
43     boolean merging=false;
44
45
46     public String JavaDoc getName() {
47         return "MERGE";
48     }
49
50
51     public boolean setProperties(Properties JavaDoc props) {
52         String JavaDoc str;
53
54         super.setProperties(props);
55         str=props.getProperty("timeout"); // max time to wait for initial members
56
if(str != null) {
57             timeout=Long.parseLong(str);
58             props.remove("timeout");
59         }
60
61         str=props.getProperty("router_host"); // host to send gossip queries (if gossip enabled)
62
if(str != null) {
63             router_host=str;
64             props.remove("router_host");
65         }
66
67         str=props.getProperty("router_port");
68         if(str != null) {
69             router_port=Integer.parseInt(str);
70             props.remove("router_port");
71         }
72
73         if(router_host != null && router_port != 0)
74             client=new RouterStub(router_host, router_port);
75
76         if(props.size() > 0) {
77             System.err.println("MERGE.setProperties(): the following properties are not recognized:");
78             props.list(System.out);
79             return false;
80         }
81         return true;
82     }
83
84
85     public void start() throws Exception JavaDoc {
86         if(hello_thread == null) {
87             hello_thread=new Thread JavaDoc(this, "MERGE Thread");
88             hello_thread.setDaemon(true);
89             hello_thread.start();
90         }
91     }
92
93
94     public void stop() {
95         Thread JavaDoc tmp=null;
96         if(hello_thread != null && hello_thread.isAlive()) {
97             tmp=hello_thread;
98             hello_thread=null;
99             tmp.interrupt();
100             try {
101                 tmp.join(1000);
102             }
103             catch(Exception JavaDoc ex) {
104             }
105         }
106         hello_thread=null;
107     }
108
109
110     public void up(Event evt) {
111         Message msg;
112         Object JavaDoc obj;
113         MergeHeader hdr;
114         Address sender;
115         boolean contains;
116         Vector JavaDoc tmp;
117
118
119         switch(evt.getType()) {
120
121             case Event.MSG:
122                 msg=(Message)evt.getArg();
123                 obj=msg.getHeader(getName());
124                 if(obj == null || !(obj instanceof MergeHeader)) {
125                     passUp(evt);
126                     return;
127                 }
128                 hdr=(MergeHeader)msg.removeHeader(getName());
129
130                 switch(hdr.type) {
131
132                     case MergeHeader.HELLO: // if coord: handle, else: discard
133
if(!is_server || !is_coord) {
134                             return;
135                         }
136                         if(merging) {
137                             return;
138                         }
139                         sender=msg.getSrc();
140                         if((sender != null) && (members.size() >= 0)) {
141                             synchronized(members) {
142                                 contains=members.contains(sender);
143                             }
144                             //merge only with lower addresses :prevents cycles and ensures that the new coordinator is correct.
145
if(!contains && sender.compareTo(local_addr) < 0) {
146                                 if(log.isInfoEnabled())
147                                     log.info("membership " + members +
148                                             " does not contain " + sender + "; merging it");
149                                 tmp=new Vector JavaDoc();
150                                 tmp.addElement(sender);
151                                 merging=true;
152                                 passUp(new Event(Event.MERGE, tmp));
153                             }
154                         }
155                         return;
156
157                     default:
158                         if(log.isErrorEnabled()) log.error("got MERGE hdr with unknown type (" + hdr.type + ')');
159                         return;
160                 }
161
162             case Event.SET_LOCAL_ADDRESS:
163                 local_addr=(Address)evt.getArg();
164                 passUp(evt);
165                 break;
166
167             default:
168                 passUp(evt); // Pass up to the layer above us
169
break;
170         }
171     }
172
173
174     public void down(Event evt) {
175
176         switch(evt.getType()) {
177
178             case Event.TMP_VIEW:
179                 passDown(evt);
180                 break;
181
182             case Event.MERGE_DENIED:
183                 merging=false;
184                 passDown(evt);
185                 break;
186
187             case Event.VIEW_CHANGE:
188                 merging=false;
189                 members=((View)evt.getArg()).getMembers();
190                 if((members == null) || (members.size() == 0)) {
191                     if(log.isFatalEnabled()) log.fatal("received VIEW_CHANGE with null or empty vector");
192                     System.exit(6);
193                 }
194                 if(members.elementAt(0).equals(local_addr))
195                     is_coord=true;
196                 else
197                     is_coord=false;
198                 passDown(evt);
199                 if(is_coord) {
200                     if(log.isInfoEnabled()) log.info("start sending Hellos");
201                     try {
202                         start();
203                     }
204                     catch(Exception JavaDoc ex) {
205                         if(log.isWarnEnabled()) log.warn("exception calling start(): " + ex);
206                     }
207                 }
208                 else {
209                     if(log.isInfoEnabled()) log.info("stop sending Hellos");
210                     stop();
211                 }
212                 break;
213
214             case Event.BECOME_SERVER: // called after client has join and is fully working group member
215
passDown(evt);
216                 try {
217                     start();
218                     is_server=true;
219                 }
220                 catch(Exception JavaDoc ex) {
221                     if(log.isWarnEnabled()) log.warn("exception calling start(): " + ex);
222                 }
223                 break;
224
225             case Event.CONNECT:
226                 group_addr=(String JavaDoc)evt.getArg();
227                 passDown(evt);
228                 break;
229
230             case Event.DISCONNECT:
231                 if(local_addr != null && evt.getArg() != null && local_addr.equals(evt.getArg()))
232                     stop();
233                 passDown(evt);
234                 break;
235
236             default:
237                 passDown(evt); // Pass on to the layer below us
238
break;
239         }
240     }
241
242
243     /**
244      * If IP multicast: periodically mcast a HELLO message
245      * If gossiping: periodically retrieve the membership. Any members not part of our
246      * own membership are merged (passing MERGE event up).
247      */

248     public void run() {
249         Message hello_msg;
250         MergeHeader hdr;
251         List rsps;
252         Vector JavaDoc members_to_merge=new Vector JavaDoc(), tmp;
253         Object JavaDoc mbr;
254
255
256         try {
257             Thread.sleep(3000);
258         } /// initial sleep; no premature merging
259
catch(Exception JavaDoc e) {
260         }
261
262
263         while(hello_thread != null) {
264             Util.sleep(timeout);
265             if(hello_thread == null) break;
266
267             if(client == null) { // plain IP MCAST
268
hello_msg=new Message(null, null, null);
269                 hdr=new MergeHeader(MergeHeader.HELLO);
270                 hello_msg.putHeader(getName(), hdr);
271                 passDown(new Event(Event.MSG, hello_msg));
272             }
273             else { // gossiping; contact Router
274
rsps=client.get(group_addr);
275
276                 synchronized(members) {
277                     members_to_merge.removeAllElements();
278
279                     for(Enumeration JavaDoc e=rsps.elements(); e.hasMoreElements();) {
280                         mbr=e.nextElement();
281                         if(!members.contains(mbr)) {
282
283                             if(log.isInfoEnabled())
284                                 log.info("membership " + members +
285                                         " does not contain " + mbr + "; merging it");
286
287                             members_to_merge.addElement(mbr);
288                         }
289                     }
290                     if(members_to_merge.size() > 0) {
291                         Membership new_membership=new Membership(members_to_merge);
292                         new_membership.sort();
293                         Address coord=(Address)new_membership.elementAt(0);
294                         tmp=new Vector JavaDoc();
295                         tmp.addElement(coord);
296                         if(coord.compareTo(local_addr) < 0)
297                             passUp(new Event(Event.MERGE, tmp));
298                     }
299                 }
300             }
301         }
302     }
303
304
305
306
307
308
309     /* -------------------------- Private methods ---------------------------- */
310
311
312     public static class MergeHeader extends Header {
313         public static final int HELLO=1; // arg = null
314

315         public int type=0;
316
317         public MergeHeader() {
318         } // used for externalization
319

320         public MergeHeader(int type) {
321             this.type=type;
322         }
323
324         public String JavaDoc toString() {
325             return "[MERGE: type=" + type2Str(type) + ']';
326         }
327
328         String JavaDoc type2Str(int t) {
329             switch(t) {
330                 case HELLO:
331                     return "HELLO";
332                 default:
333                     return "<unkown type (" + t + ")>";
334             }
335         }
336
337         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
338             out.writeInt(type);
339         }
340
341
342         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
343             type=in.readInt();
344         }
345     }
346
347 }
348
Popular Tags