KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > NotificationBus


1 // $Id: NotificationBus.java,v 1.8 2004/09/23 16:29:11 belaban Exp $
2

3 package org.jgroups.blocks;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.*;
9 import org.jgroups.util.Promise;
10 import org.jgroups.util.Util;
11
12 import java.io.Serializable JavaDoc;
13 import java.util.Vector JavaDoc;
14
15
16 /**
17  * Class for dissemination of notifications. Producers can send notifications to all registered consumers.
18  * Provides hooks to implement shared group state (cache).
19  * @author Bela Ban
20  */

21 public class NotificationBus implements MessageListener, MembershipListener {
22     final Vector JavaDoc members=new Vector JavaDoc();
23     JChannel channel=null;
24     Address local_addr=null;
25     PullPushAdapter ad=null;
26     Consumer consumer=null; // only a single consumer allowed
27
String JavaDoc bus_name="notification_bus";
28     final Promise get_cache_promise=new Promise();
29     final Object JavaDoc cache_mutex=new Object JavaDoc();
30
31     protected final Log log=LogFactory.getLog(getClass());
32
33
34     String JavaDoc props=null;
35
36
37     public interface Consumer {
38         void handleNotification(Serializable JavaDoc n);
39
40         /** Called on the coordinator to obtains its cache */
41         Serializable JavaDoc getCache();
42
43         void memberJoined(Address mbr);
44
45         void memberLeft(Address mbr);
46     }
47
48
49     public NotificationBus() throws Exception JavaDoc {
50         this(null, null);
51     }
52
53
54     public NotificationBus(String JavaDoc bus_name) throws Exception JavaDoc {
55         this(bus_name, null);
56     }
57
58
59     public NotificationBus(String JavaDoc bus_name, String JavaDoc properties) throws Exception JavaDoc {
60         if(bus_name != null) this.bus_name=bus_name;
61         if(properties != null) props=properties;
62         channel=new JChannel(props);
63     }
64
65
66     public void setConsumer(Consumer c) {
67         consumer=c;
68     }
69
70
71     public Address getLocalAddress() {
72         if(local_addr != null) return local_addr;
73         if(channel != null)
74             local_addr=channel.getLocalAddress();
75         return local_addr;
76     }
77
78
79     /**
80      Returns a reference to the real membership: don't modify. If you need to modify, make a copy first !
81      */

82     public Vector JavaDoc getMembership() {
83         return members;
84     }
85
86
87     /** Used to operate on the underlying channel directly, e.g. perform operations that are not
88      provided using only NotificationBus. Should be used sparingly */

89     public Channel getChannel() {
90         return channel;
91     }
92
93
94     public boolean isCoordinator() {
95         Object JavaDoc first_mbr=null;
96
97         synchronized(members) {
98             first_mbr=members.size() > 0 ? members.elementAt(0) : null;
99             if(first_mbr == null)
100                 return true;
101         }
102         if(getLocalAddress() != null)
103             return getLocalAddress().equals(first_mbr);
104         return false;
105     }
106
107
108     public void start() throws Exception JavaDoc {
109         channel.connect(bus_name);
110         ad=new PullPushAdapter(channel, this, this);
111     }
112
113
114     public void stop() {
115         if(ad != null) {
116             ad.stop();
117             ad=null;
118         }
119         if(channel != null) {
120             channel.close(); // disconnects from channel and closes it
121
channel=null;
122         }
123     }
124
125
126     /** Pack the argument in a Info, serialize that one into the message buffer and send the message */
127     public void sendNotification(Serializable JavaDoc n) {
128         Message msg=null;
129         byte[] data=null;
130         Info info;
131
132         try {
133             if(n == null) return;
134             info=new Info(Info.NOTIFICATION, n);
135             data=Util.objectToByteBuffer(info);
136             msg=new Message(null, null, data);
137             if(channel == null) {
138                 if(log.isErrorEnabled()) log.error("channel is null. " +
139                                                                   " Won't send notification");
140                 return;
141             }
142             channel.send(msg);
143         }
144         catch(Throwable JavaDoc ex) {
145
146                 if(log.isErrorEnabled()) log.error("exception is " + ex);
147         }
148     }
149
150
151     /**
152      Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
153      null will be returned. Used only internally by NotificationBus.
154      @param timeout Max number of msecs until the call returns
155      @param max_tries Max number of attempts to fetch the cache from the coordinator
156      */

157     public Serializable JavaDoc getCacheFromCoordinator(long timeout, int max_tries) {
158         return getCacheFromMember(null, timeout, max_tries);
159     }
160
161
162     /**
163      Determines the coordinator and asks it for its cache. If there is no coordinator (because we are first member),
164      null will be returned. Used only internally by NotificationBus.
165      @param mbr The address of the member from which to fetch the state. If null, the current coordinator
166      will be asked for the state
167      @param timeout Max number of msecs until the call returns - if timeout elapses
168      null will be returned
169      @param max_tries Max number of attempts to fetch the cache from the coordinator (will be set to 1 if < 1)
170      */

171     public Serializable JavaDoc getCacheFromMember(Address mbr, long timeout, int max_tries) {
172         Serializable JavaDoc cache=null;
173         int num_tries=0;
174         Info info=new Info(Info.GET_CACHE_REQ);
175         Message msg;
176         Address dst=mbr; // member from which to fetch the cache
177

178         long start, stop; // +++ remove
179

180
181         if(max_tries < 1) max_tries=1;
182
183         get_cache_promise.reset();
184         while(num_tries <= max_tries) {
185             if(mbr == null) { // mbr == null means get cache from coordinator
186
dst=determineCoordinator();
187                 if(dst == null || dst.equals(getLocalAddress())) { // we are the first member --> empty cache
188
if(log.isInfoEnabled()) log.info("[" + getLocalAddress() +
189                                                      "] no coordinator found --> first member (cache is empty)");
190                     return null;
191                 }
192             }
193
194             // +++ remove
195
if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + "] dst=" + dst +
196                                              ", timeout=" + timeout + ", max_tries=" + max_tries + ", num_tries=" + num_tries);
197
198             info=new Info(Info.GET_CACHE_REQ);
199             msg=new Message(dst, null, info);
200             channel.down(new Event(Event.MSG, msg));
201
202             start=System.currentTimeMillis();
203             cache=(Serializable JavaDoc) get_cache_promise.getResult(timeout);
204             stop=System.currentTimeMillis();
205             if(cache != null) {
206                 if(log.isInfoEnabled()) log.info("got cache from " +
207                                                  dst + ": cache is valid (waited " + (stop - start) + " msecs on get_cache_promise)");
208                 return cache;
209             }
210             else {
211                 if(log.isErrorEnabled()) log.error("received null cache; retrying (waited " +
212                                                    (stop - start) + " msecs on get_cache_promise)");
213             }
214
215             Util.sleep(500);
216             ++num_tries;
217         }
218         if(cache == null)
219             if(log.isErrorEnabled()) log.error("[" + getLocalAddress() +
220                                                "] cache is null (num_tries=" + num_tries + ')');
221         return cache;
222     }
223
224
225     /**
226      Don't multicast this to all members, just apply it to local consumers.
227      */

228     public void notifyConsumer(Serializable JavaDoc n) {
229         if(consumer != null && n != null)
230             consumer.handleNotification(n);
231     }
232
233
234     /* -------------------------------- Interface MessageListener -------------------------------- */
235     public void receive(Message msg) {
236         Info info=null;
237         Object JavaDoc obj;
238
239         if(msg == null || msg.getLength() == 0) return;
240         try {
241             obj=msg.getObject();
242             if(!(obj instanceof Info)) {
243
244                     if(log.isErrorEnabled()) log.error("expected an instance of Info (received " +
245                                                              obj.getClass().getName() + ')');
246                 return;
247             }
248             info=(Info) obj;
249             switch(info.type) {
250                 case Info.NOTIFICATION:
251                     notifyConsumer(info.data);
252                     break;
253
254                 case Info.GET_CACHE_REQ:
255                     handleCacheRequest(msg.getSrc());
256                     break;
257
258                 case Info.GET_CACHE_RSP:
259                     // +++ remove
260
if(log.isDebugEnabled()) log.debug("[GET_CACHE_RSP] cache was received from " + msg.getSrc());
261                     get_cache_promise.setResult(info.data);
262                     break;
263
264                 default:
265                     if(log.isErrorEnabled()) log.error("type " + info.type + " unknown");
266                     break;
267             }
268         }
269         catch(Throwable JavaDoc ex) {
270
271                 if(log.isErrorEnabled()) log.error("exception=" + ex);
272         }
273     }
274
275     public byte[] getState() {
276         return null;
277     }
278
279     public void setState(byte[] state) {
280     }
281
282     /* ----------------------------- End of Interface MessageListener ---------------------------- */
283
284
285
286
287     /* ------------------------------- Interface MembershipListener ------------------------------ */
288
289     public synchronized void viewAccepted(View new_view) {
290         Vector JavaDoc joined_mbrs, left_mbrs, tmp;
291         Object JavaDoc tmp_mbr;
292
293         if(new_view == null) return;
294         tmp=new_view.getMembers();
295
296         synchronized(members) {
297             // get new members
298
joined_mbrs=new Vector JavaDoc();
299             for(int i=0; i < tmp.size(); i++) {
300                 tmp_mbr=tmp.elementAt(i);
301                 if(!members.contains(tmp_mbr))
302                     joined_mbrs.addElement(tmp_mbr);
303             }
304
305             // get members that left
306
left_mbrs=new Vector JavaDoc();
307             for(int i=0; i < members.size(); i++) {
308                 tmp_mbr=members.elementAt(i);
309                 if(!tmp.contains(tmp_mbr))
310                     left_mbrs.addElement(tmp_mbr);
311             }
312
313             // adjust our own membership
314
members.removeAllElements();
315             members.addAll(tmp);
316         }
317
318         if(consumer != null) {
319             if(joined_mbrs.size() > 0)
320                 for(int i=0; i < joined_mbrs.size(); i++)
321                     consumer.memberJoined((Address) joined_mbrs.elementAt(i));
322             if(left_mbrs.size() > 0)
323                 for(int i=0; i < left_mbrs.size(); i++)
324                     consumer.memberLeft((Address) left_mbrs.elementAt(i));
325         }
326     }
327
328
329     public void suspect(Address suspected_mbr) {
330     }
331
332     public void block() {
333     }
334
335
336     /* ----------------------------- End of Interface MembershipListener ------------------------- */
337
338
339
340
341
342
343
344     /* ------------------------------------- Private Methods ------------------------------------- */
345
346     Address determineCoordinator() {
347         Vector JavaDoc v=channel != null ? channel.getView().getMembers() : null;
348         return v != null ? (Address) v.elementAt(0) : null;
349     }
350
351
352     void handleCacheRequest(Address sender) {
353         Serializable JavaDoc cache=null;
354         Message msg;
355         Info info;
356
357         if(sender == null) {
358             // +++ remove
359
//
360
if(log.isErrorEnabled()) log.error("sender is null");
361             return;
362         }
363
364         synchronized(cache_mutex) {
365             cache=getCache(); // get the cache from the consumer
366
info=new Info(Info.GET_CACHE_RSP, cache);
367             msg=new Message(sender, null, info);
368             if(log.isInfoEnabled()) log.info("[" + getLocalAddress() + "] returning cache to " + sender);
369             channel.down(new Event(Event.MSG, msg));
370         }
371     }
372
373     public Serializable JavaDoc getCache() {
374         return consumer != null ? consumer.getCache() : null;
375     }
376
377
378
379     /* --------------------------------- End of Private Methods ---------------------------------- */
380
381
382
383
384
385     private static class Info implements Serializable JavaDoc {
386         public final static int NOTIFICATION=1;
387         public final static int GET_CACHE_REQ=2;
388         public final static int GET_CACHE_RSP=3;
389
390
391         int type=0;
392         Serializable JavaDoc data=null; // if type == NOTIFICATION data is notification, if type == GET_CACHE_RSP, data is cache
393

394
395         public Info(int type) {
396             this.type=type;
397         }
398
399         public Info(int type, Serializable JavaDoc data) {
400             this.type=type;
401             this.data=data;
402         }
403
404
405         public String JavaDoc toString() {
406             StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
407             sb.append("type= ");
408             if(type == NOTIFICATION)
409                 sb.append("NOTIFICATION");
410             else if(type == GET_CACHE_REQ)
411                 sb.append("GET_CACHE_REQ");
412             else if(type == GET_CACHE_RSP)
413                 sb.append("GET_CACHE_RSP");
414             else
415                 sb.append("<unknown>");
416             if(data != null) {
417                 if(type == NOTIFICATION)
418                     sb.append(", notification=" + data);
419                 else if(type == GET_CACHE_RSP) sb.append(", cache=" + data);
420             }
421             return sb.toString();
422         }
423     }
424
425
426 }
427
428
429
430
Popular Tags