KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > Discovery


1
2 package org.jgroups.protocols;
3
4 import org.jgroups.*;
5 import org.jgroups.stack.Protocol;
6
7 import java.util.*;
8
9
10 /**
11  * The Discovery protocol layer retrieves the initial membership (used by the GMS when started
12  * by sending event FIND_INITIAL_MBRS down the stack). We do this by specific subclasses, e.g. by mcasting PING
13  * requests to an IP MCAST address or, if gossiping is enabled, by contacting the GossipServer.
14  * The responses should allow us to determine the coordinator whom we have to
15  * contact, e.g. in case we want to join the group. When we are a server (after having
16  * received the BECOME_SERVER event), we'll respond to PING requests with a PING
17  * response.<p> The FIND_INITIAL_MBRS event will eventually be answered with a
18  * FIND_INITIAL_MBRS_OK event up the stack.
19  * The following properties are available
20  * <ul>
21  * <li>timeout - the timeout (ms) to wait for the initial members, default is 3000=3 secs
22  * <li>num_initial_members - the minimum number of initial members for a FIND_INITAL_MBRS, default is 2
23  * <li>num_ping_requests - the number of GET_MBRS_REQ messages to be sent (min=1), distributed over timeout ms
24  * </ul>
25  * @author Bela Ban
26  * @version $Id: Discovery.java,v 1.7 2005/04/13 13:03:02 belaban Exp $
27  */

28 public abstract class Discovery extends Protocol {
29     final Vector members=new Vector(11);
30     final Set members_set=new HashSet(11); // copy of the members vector for fast random access
31
Address local_addr=null;
32     String JavaDoc group_addr=null;
33     long timeout=3000;
34     int num_initial_members=2;
35     boolean is_server=false;
36     PingWaiter ping_waiter;
37
38
39     /** Number of GET_MBRS_REQ messages to be sent (min=1), distributed over timeout ms */
40     int num_ping_requests=2;
41
42
43     public abstract String JavaDoc getName();
44
45     /** Called after local_addr was set */
46     public void localAddressSet(Address addr) {
47         ;
48     }
49
50     public abstract void sendGetMembersRequest();
51
52
53     /** Called when CONNECT_OK has been received */
54     public void handleConnectOK() {
55         ;
56     }
57
58     public void handleDisconnect() {
59         ;
60     }
61
62     public void handleConnect() {
63         ;
64     }
65
66
67     public Vector providedUpServices() {
68         Vector ret=new Vector(1);
69         ret.addElement(new Integer JavaDoc(Event.FIND_INITIAL_MBRS));
70         return ret;
71     }
72
73     /**
74      * sets the properties of the PING protocol.
75      * The following properties are available
76      * property: timeout - the timeout (ms) to wait for the initial members, default is 3000=3 secs
77      * property: num_initial_members - the minimum number of initial members for a FIND_INITAL_MBRS, default is 2
78      * @param props - a property set
79      * @return returns true if all properties were parsed properly
80      * returns false if there are unrecnogized properties in the property set
81      */

82     public boolean setProperties(Properties props) {
83         String JavaDoc str;
84
85         super.setProperties(props);
86         str=props.getProperty("timeout"); // max time to wait for initial members
87
if(str != null) {
88             timeout=Long.parseLong(str);
89             if(timeout <= 0) {
90                 if(log.isErrorEnabled()) log.error("timeout must be > 0");
91                 return false;
92             }
93             props.remove("timeout");
94         }
95
96         str=props.getProperty("num_initial_members"); // wait for at most n members
97
if(str != null) {
98             num_initial_members=Integer.parseInt(str);
99             props.remove("num_initial_members");
100         }
101
102         str=props.getProperty("num_ping_requests"); // number of GET_MBRS_REQ messages
103
if(str != null) {
104             num_ping_requests=Integer.parseInt(str);
105             props.remove("num_ping_requests");
106             if(num_ping_requests < 1)
107                 num_ping_requests=1;
108         }
109
110         if(props.size() > 0) {
111             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
112             for(Enumeration e=props.propertyNames(); e.hasMoreElements();) {
113                 sb.append(e.nextElement().toString());
114                 if(e.hasMoreElements()) {
115                     sb.append(", ");
116                 }
117             }
118             if(log.isErrorEnabled()) log.error("The following properties are not recognized: " + sb);
119             return false;
120         }
121         return true;
122     }
123
124     public void start() throws Exception JavaDoc {
125         super.start();
126         PingSender ping_sender=new PingSender(timeout, num_ping_requests, this);
127         if(ping_waiter == null)
128             ping_waiter=new PingWaiter(timeout, num_initial_members, this, ping_sender);
129     }
130
131     public void stop() {
132         is_server=false;
133         if(ping_waiter != null)
134             ping_waiter.stop();
135     }
136
137
138     /**
139      * An event was received from the layer below. Usually the current layer will want to examine
140      * the event type and - depending on its type - perform some computation
141      * (e.g. removing headers from a MSG event type, or updating the internal membership list
142      * when receiving a VIEW_CHANGE event).
143      * Finally the event is either a) discarded, or b) an event is sent down
144      * the stack using <code>PassDown</code> or c) the event (or another event) is sent up
145      * the stack using <code>PassUp</code>.
146      * <p/>
147      * For the PING protocol, the Up operation does the following things.
148      * 1. If the event is a Event.MSG then PING will inspect the message header.
149      * If the header is null, PING simply passes up the event
150      * If the header is PingHeader.GET_MBRS_REQ then the PING protocol
151      * will PassDown a PingRequest message
152      * If the header is PingHeader.GET_MBRS_RSP we will add the message to the initial members
153      * vector and wake up any waiting threads.
154      * 2. If the event is Event.SET_LOCAL_ADDR we will simple set the local address of this protocol
155      * 3. For all other messages we simple pass it up to the protocol above
156      *
157      * @param evt - the event that has been sent from the layer below
158      */

159
160     public void up(Event evt) {
161         Message msg, rsp_msg;
162         Object JavaDoc obj;
163         PingHeader hdr, rsp_hdr;
164         PingRsp rsp;
165         Address coord;
166
167         switch(evt.getType()) {
168
169         case Event.MSG:
170             msg=(Message)evt.getArg();
171             obj=msg.getHeader(getName());
172             if(obj == null || !(obj instanceof PingHeader)) {
173                 passUp(evt);
174                 return;
175             }
176             hdr=(PingHeader)msg.removeHeader(getName());
177
178             switch(hdr.type) {
179
180             case PingHeader.GET_MBRS_REQ: // return Rsp(local_addr, coord)
181
if(local_addr != null && msg.getSrc() != null && local_addr.equals(msg.getSrc())) {
182                     if(log.isTraceEnabled())
183                         log.trace("discarded my own discovery request");
184                     return;
185                 }
186                 synchronized(members) {
187                     coord=members.size() > 0 ? (Address)members.firstElement() : local_addr;
188                 }
189
190                 PingRsp ping_rsp=new PingRsp(local_addr, coord, is_server);
191                 rsp_msg=new Message(msg.getSrc(), null, null);
192                 rsp_hdr=new PingHeader(PingHeader.GET_MBRS_RSP, ping_rsp);
193                 rsp_msg.putHeader(getName(), rsp_hdr);
194                 if(log.isTraceEnabled())
195                     log.trace("received GET_MBRS_REQ from " + msg.getSrc() + ", sending response " + rsp_hdr);
196                 passDown(new Event(Event.MSG, rsp_msg));
197                 return;
198
199             case PingHeader.GET_MBRS_RSP: // add response to vector and notify waiting thread
200
rsp=hdr.arg;
201
202                 if(log.isTraceEnabled())
203                     log.trace("received GET_MBRS_RSP, rsp=" + rsp);
204                 ping_waiter.addResponse(rsp);
205                 return;
206
207             default:
208                 if(log.isWarnEnabled()) log.warn("got PING header with unknown type (" + hdr.type + ')');
209                 return;
210             }
211
212
213         case Event.SET_LOCAL_ADDRESS:
214             passUp(evt);
215             local_addr=(Address)evt.getArg();
216             localAddressSet(local_addr);
217             break;
218
219         case Event.CONNECT_OK:
220             handleConnectOK();
221             passUp(evt);
222             break;
223
224         default:
225             passUp(evt); // Pass up to the layer above us
226
break;
227         }
228     }
229
230
231     /**
232      * An event is to be sent down the stack. The layer may want to examine its type and perform
233      * some action on it, depending on the event's type. If the event is a message MSG, then
234      * the layer may need to add a header to it (or do nothing at all) before sending it down
235      * the stack using <code>PassDown</code>. In case of a GET_ADDRESS event (which tries to
236      * retrieve the stack's address from one of the bottom layers), the layer may need to send
237      * a new response event back up the stack using <code>passUp()</code>.
238      * The PING protocol is interested in several different down events,
239      * Event.FIND_INITIAL_MBRS - sent by the GMS layer and expecting a GET_MBRS_OK
240      * Event.TMP_VIEW and Event.VIEW_CHANGE - a view change event
241      * Event.BECOME_SERVER - called after client has joined and is fully working group member
242      * Event.CONNECT, Event.DISCONNECT.
243      */

244     public void down(Event evt) {
245
246         switch(evt.getType()) {
247
248         case Event.FIND_INITIAL_MBRS: // sent by GMS layer, pass up a GET_MBRS_OK event
249
// sends the GET_MBRS_REQ to all members, waits 'timeout' ms or until 'num_initial_members' have been retrieved
250
ping_waiter.start();
251             break;
252
253         case Event.TMP_VIEW:
254         case Event.VIEW_CHANGE:
255             Vector tmp;
256             if((tmp=((View)evt.getArg()).getMembers()) != null) {
257                 synchronized(members) {
258                     members.clear();
259                     members.addAll(tmp);
260                     members_set.clear();
261                     members_set.addAll(tmp);
262                 }
263             }
264             passDown(evt);
265             break;
266
267         case Event.BECOME_SERVER: // called after client has joined and is fully working group member
268
passDown(evt);
269             is_server=true;
270             break;
271
272         case Event.CONNECT:
273             group_addr=(String JavaDoc)evt.getArg();
274             passDown(evt);
275             handleConnect();
276             break;
277
278         case Event.DISCONNECT:
279             handleDisconnect();
280             passDown(evt);
281             break;
282
283         default:
284             passDown(evt); // Pass on to the layer below us
285
break;
286         }
287     }
288
289
290
291     /* -------------------------- Private methods ---------------------------- */
292
293
294     protected View makeView(Vector mbrs) {
295         Address coord=null;
296         long id=0;
297         ViewId view_id=new ViewId(local_addr);
298
299         coord=view_id.getCoordAddress();
300         id=view_id.getId();
301
302         return new View(coord, id, mbrs);
303     }
304
305
306
307 }
308
Popular Tags