KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > PullPushAdapter


1 // $Id: PullPushAdapter.java,v 1.13 2005/04/21 14:50:12 belaban Exp $
2

3 package org.jgroups.blocks;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.*;
9 import org.jgroups.util.Util;
10
11 import java.io.IOException JavaDoc;
12 import java.io.ObjectInput JavaDoc;
13 import java.io.ObjectOutput JavaDoc;
14 import java.io.Serializable JavaDoc;
15 import java.util.ArrayList JavaDoc;
16 import java.util.HashMap JavaDoc;
17 import java.util.Iterator JavaDoc;
18 import java.util.List JavaDoc;
19
20
21 /**
22  * Allows a client of <em>Channel</em> to be notified when messages have been received
23  * instead of having to actively poll the channel for new messages. Typically used in the
24  * client role (receive()). As this class does not implement interface
25  * <code>Transport</code>, but <b>uses</b> it for receiving messages, an underlying object
26  * has to be used to send messages (e.g. the channel on which an object of this class relies).<p>
27  * Multiple MembershipListeners can register with the PullPushAdapter; when a view is received, they
28  * will all be notified. There is one main message listener which sends and receives message. In addition,
29  * MessageListeners can register with a certain tag (identifier), and then send messages tagged with this
30  * identifier. When a message with such an identifier is received, the corresponding MessageListener will be
31  * looked up and the message dispatched to it. If no tag is found (default), the main MessageListener will
32  * receive the message.
33  * @author Bela Ban
34  * @version $Revision
35  */

36 public class PullPushAdapter implements Runnable JavaDoc, ChannelListener {
37     protected Transport transport=null;
38     protected MessageListener listener=null; // main message receiver
39
protected final List JavaDoc membership_listeners=new ArrayList JavaDoc();
40     protected Thread JavaDoc receiver_thread=null;
41     protected final HashMap JavaDoc listeners=new HashMap JavaDoc(); // keys=identifier (Serializable), values=MessageListeners
42
protected final Log log=LogFactory.getLog(getClass());
43     static final String JavaDoc PULL_HEADER="PULL_HEADER";
44
45
46     public PullPushAdapter(Transport transport) {
47         this.transport=transport;
48         start();
49     }
50
51     public PullPushAdapter(Transport transport, MessageListener l) {
52         this.transport=transport;
53         setListener(l);
54         start();
55     }
56
57
58     public PullPushAdapter(Transport transport, MembershipListener ml) {
59         this.transport=transport;
60         addMembershipListener(ml);
61         start();
62     }
63
64
65     public PullPushAdapter(Transport transport, MessageListener l, MembershipListener ml) {
66         this.transport=transport;
67         setListener(l);
68         addMembershipListener(ml);
69         start();
70     }
71
72
73     public Transport getTransport() {
74         return transport;
75     }
76
77
78     public void start() {
79         if(receiver_thread == null || !receiver_thread.isAlive()) {
80             receiver_thread=new Thread JavaDoc(this, "PullPushAdapterThread");
81             receiver_thread.setDaemon(true);
82             receiver_thread.start();
83         }
84         if(transport instanceof JChannel)
85             ((JChannel)transport).setChannelListener(this);
86     }
87
88     public void stop() {
89         Thread JavaDoc tmp=null;
90         if(receiver_thread != null && receiver_thread.isAlive()) {
91             tmp=receiver_thread;
92             receiver_thread=null;
93             tmp.interrupt();
94             try {
95                 tmp.join(1000);
96             }
97             catch(Exception JavaDoc ex) {
98             }
99         }
100         receiver_thread=null;
101     }
102
103     /**
104      * Sends a message to the group - listeners to this identifier will receive the messages
105      * @param identifier the key that the proper listeners are listenting on
106      * @param msg the Message to be sent
107      * @see #registerListener
108      */

109     public void send(Serializable JavaDoc identifier, Message msg) throws Exception JavaDoc {
110         if(msg == null) {
111             if(log.isErrorEnabled()) log.error("msg is null");
112             return;
113         }
114         if(identifier == null)
115             transport.send(msg);
116         else {
117             msg.putHeader(PULL_HEADER, new PullHeader(identifier));
118             transport.send(msg);
119         }
120     }
121
122     /**
123      * sends a message with no identifier , listener member will get this message on the other group members
124      * @param msg the Message to be sent
125      * @throws Exception
126      */

127     public void send(Message msg) throws Exception JavaDoc {
128         send(null, msg);
129     }
130
131
132     public void setListener(MessageListener l) {
133         listener=l;
134     }
135
136
137     
138     /**
139      * sets a listener to messages with a given identifier messages sent with this identifier in there header will be routed to this listener
140      * <b>note: there could be only one listener for one identifier, if you want to register a different listener to an already registered identifier then unregister first</b>
141      * @param identifier - messages sent on the group with this object will be receive by this listener
142      * @param l - the listener that will get the message
143      */

144     public void registerListener(Serializable JavaDoc identifier, MessageListener l) {
145         if(l == null || identifier == null) {
146             if(log.isErrorEnabled()) log.error("message listener or identifier is null");
147             return;
148         }
149         if(listeners.containsKey(identifier)) {
150             if(log.isErrorEnabled()) log.error("listener with identifier=" + identifier +
151                     " already exists, choose a different identifier or unregister current listener");
152             // we do not want to overwrite the listener
153
return;
154         }
155         listeners.put(identifier, l);
156     }
157     
158     /**
159      * removes a listener to a given identifier from the listeners map
160      * @param identifier - the key to whom we do not want to listen any more
161      */

162     public void unregisterListener(Serializable JavaDoc identifier) {
163         listeners.remove(identifier);
164     }
165
166
167     /** @deprecated Use {@link #addMembershipListener} */
168     public void setMembershipListener(MembershipListener ml) {
169         addMembershipListener(ml);
170     }
171
172     public void addMembershipListener(MembershipListener l) {
173         if(l != null && !membership_listeners.contains(l))
174             membership_listeners.add(l);
175     }
176
177     public void removeMembershipListener(MembershipListener l) {
178         if(l != null && membership_listeners.contains(l))
179             membership_listeners.remove(l);
180     }
181
182
183     /**
184      * Reentrant run(): message reception is serialized, then the listener is notified of the
185      * message reception
186      */

187     public void run() {
188         Object JavaDoc obj;
189
190         while(receiver_thread != null && Thread.currentThread().equals(receiver_thread)) {
191             try {
192                 obj=transport.receive(0);
193                 if(obj == null)
194                     continue;
195
196                 if(obj instanceof Message) {
197                     handleMessage((Message)obj);
198                 }
199                 else if(obj instanceof GetStateEvent) {
200                     byte[] retval=null;
201                     if(listener != null) {
202                         try {
203                             retval=listener.getState();
204                         }
205                         catch(Throwable JavaDoc t) {
206                             log.error("getState() from application failed, will return empty state", t);
207                         }
208                     }
209                     else {
210                         log.warn("no listener registered, returning empty state");
211                     }
212
213                     if(transport instanceof Channel) {
214                         ((Channel)transport).returnState(retval);
215                     }
216                     else {
217                         if(log.isErrorEnabled())
218                             log.error("underlying transport is not a Channel, but a " +
219                                     transport.getClass().getName() + ": cannot return state using returnState()");
220                         continue;
221                     }
222                 }
223                 else if(obj instanceof SetStateEvent) {
224                     if(listener != null) {
225                         try {
226                             listener.setState(((SetStateEvent)obj).getArg());
227                         }
228                         catch(ClassCastException JavaDoc cast_ex) {
229                             if(log.isErrorEnabled()) log.error("received SetStateEvent, but argument " +
230                                     ((SetStateEvent)obj).getArg() + " is not serializable ! Discarding message.");
231                             continue;
232                         }
233                     }
234                 }
235                 else if(obj instanceof View) {
236                     notifyViewChange((View)obj);
237                 }
238                 else if(obj instanceof SuspectEvent) {
239                     notifySuspect((Address)((SuspectEvent)obj).getMember());
240                 }
241                 else if(obj instanceof BlockEvent) {
242                     notifyBlock();
243                 }
244             }
245             catch(ChannelNotConnectedException conn) {
246                 Address local_addr=((Channel)transport).getLocalAddress();
247                 if(log.isWarnEnabled()) log.warn('[' + (local_addr == null ? "<null>" : local_addr.toString()) +
248                         "] channel not connected, exception is " + conn);
249                 Util.sleep(1000);
250                 receiver_thread=null;
251                 break;
252             }
253             catch(ChannelClosedException closed_ex) {
254                 Address local_addr=((Channel)transport).getLocalAddress();
255                 if(log.isWarnEnabled()) log.warn('[' + (local_addr == null ? "<null>" : local_addr.toString()) +
256                         "] channel closed, exception is " + closed_ex);
257                 // Util.sleep(1000);
258
receiver_thread=null;
259                 break;
260             }
261             catch(Throwable JavaDoc e) {
262             }
263         }
264     }
265
266
267     /**
268      * Check whether the message has an identifier. If yes, lookup the MessageListener associated with the
269      * given identifier in the hashtable and dispatch to it. Otherwise just use the main (default) message
270      * listener
271      */

272     protected void handleMessage(Message msg) {
273         PullHeader hdr=(PullHeader)msg.getHeader(PULL_HEADER);
274         Serializable JavaDoc identifier;
275         MessageListener l;
276
277         if(hdr != null && (identifier=hdr.getIdentifier()) != null) {
278             l=(MessageListener)listeners.get(identifier);
279             if(l == null) {
280                 if(log.isErrorEnabled()) log.error("received a messages tagged with identifier=" +
281                         identifier + ", but there is no registration for that identifier. Will drop message");
282             }
283             else
284                 l.receive(msg);
285         }
286         else {
287             if(listener != null)
288                 listener.receive(msg);
289         }
290     }
291
292
293     protected void notifyViewChange(View v) {
294         MembershipListener l;
295
296         if(v == null) return;
297         for(Iterator JavaDoc it=membership_listeners.iterator(); it.hasNext();) {
298             l=(MembershipListener)it.next();
299             try {
300                 l.viewAccepted(v);
301             }
302             catch(Throwable JavaDoc ex) {
303                 if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex);
304             }
305         }
306     }
307
308     protected void notifySuspect(Address suspected_mbr) {
309         MembershipListener l;
310
311         if(suspected_mbr == null) return;
312         for(Iterator JavaDoc it=membership_listeners.iterator(); it.hasNext();) {
313             l=(MembershipListener)it.next();
314             try {
315                 l.suspect(suspected_mbr);
316             }
317             catch(Throwable JavaDoc ex) {
318                 if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex);
319             }
320         }
321     }
322
323     protected void notifyBlock() {
324         MembershipListener l;
325
326         for(Iterator JavaDoc it=membership_listeners.iterator(); it.hasNext();) {
327             l=(MembershipListener)it.next();
328             try {
329                 l.block();
330             }
331             catch(Throwable JavaDoc ex) {
332                 if(log.isErrorEnabled()) log.error("exception notifying " + l + ": " + ex);
333             }
334         }
335     }
336
337     public void channelConnected(Channel channel) {
338         if(log.isTraceEnabled())
339             log.trace("channel is connected");
340     }
341
342     public void channelDisconnected(Channel channel) {
343         if(log.isTraceEnabled())
344             log.trace("channel is disconnected");
345     }
346
347     public void channelClosed(Channel channel) {
348     }
349
350     public void channelShunned() {
351         if(log.isTraceEnabled())
352             log.trace("channel is shunned");
353     }
354
355     public void channelReconnected(Address addr) {
356         start();
357     }
358
359
360
361
362     public static final class PullHeader extends Header {
363         Serializable JavaDoc identifier=null;
364
365         public PullHeader() {
366             ; // used by externalization
367
}
368
369         public PullHeader(Serializable JavaDoc identifier) {
370             this.identifier=identifier;
371         }
372
373         public Serializable JavaDoc getIdentifier() {
374             return identifier;
375         }
376
377         public long size() {
378             if(identifier == null)
379                 return 12;
380             else
381                 return 64;
382         }
383
384
385         public String JavaDoc toString() {
386             return "PullHeader";
387         }
388
389         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
390             out.writeObject(identifier);
391         }
392
393         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
394             identifier=(Serializable JavaDoc)in.readObject();
395         }
396     }
397
398
399     /**
400      * @return Returns the listener.
401      */

402     public MessageListener getListener() {
403         return listener;
404     }
405 }
406
Popular Tags