KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > blocks > RpcDispatcher


1 // $Id: RpcDispatcher.java,v 1.15 2005/01/20 02:01:12 ovidiuf Exp $
2

3 package org.jgroups.blocks;
4
5
6 import org.jgroups.util.RspList;
7 import org.jgroups.util.Util;
8 import org.jgroups.ChannelListener;
9 import org.jgroups.Channel;
10 import org.jgroups.MessageListener;
11 import org.jgroups.MembershipListener;
12 import org.jgroups.Transport;
13 import org.jgroups.Message;
14 import org.jgroups.TimeoutException;
15 import org.jgroups.SuspectedException;
16 import org.jgroups.Address;
17
18 import java.io.Serializable JavaDoc;
19 import java.util.Vector JavaDoc;
20 import java.util.List JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.Iterator JavaDoc;
23
24
25
26
27 /**
28  * Dispatches and receives remote group method calls. Is the equivalent of RpcProtocol
29  * on the application rather than protocol level.
30  * @author Bela Ban
31  */

32 public class RpcDispatcher extends MessageDispatcher implements ChannelListener {
33     protected Object JavaDoc server_obj=null;
34     protected Marshaller marshaller=null;
35     protected List additionalChannelListeners=null;
36
37
38     public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object JavaDoc server_obj) {
39         super(channel, l, l2);
40         channel.setChannelListener(this);
41         this.server_obj=server_obj;
42         additionalChannelListeners = new ArrayList JavaDoc();
43     }
44
45
46     public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object JavaDoc server_obj,
47                          boolean deadlock_detection) {
48         super(channel, l, l2, deadlock_detection);
49         channel.setChannelListener(this);
50         this.server_obj=server_obj;
51         additionalChannelListeners = new ArrayList JavaDoc();
52     }
53
54     public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object JavaDoc server_obj,
55                          boolean deadlock_detection, boolean concurrent_processing) {
56         super(channel, l, l2, deadlock_detection, concurrent_processing);
57         channel.setChannelListener(this);
58         this.server_obj=server_obj;
59         additionalChannelListeners = new ArrayList JavaDoc();
60     }
61
62
63
64     public RpcDispatcher(PullPushAdapter adapter, Serializable JavaDoc id,
65                          MessageListener l, MembershipListener l2, Object JavaDoc server_obj) {
66         super(adapter, id, l, l2);
67
68         // Fixes bug #804956
69
// channel.setChannelListener(this);
70
if(this.adapter != null) {
71             Transport t=this.adapter.getTransport();
72             if(t != null && t instanceof Channel) {
73                 ((Channel)t).setChannelListener(this);
74             }
75         }
76
77         this.server_obj=server_obj;
78         additionalChannelListeners = new ArrayList JavaDoc();
79     }
80
81
82     public interface Marshaller {
83         byte[] objectToByteBuffer(Object JavaDoc obj) throws Exception JavaDoc;
84         Object JavaDoc objectFromByteBuffer(byte[] buf) throws Exception JavaDoc;
85     }
86
87
88     public String JavaDoc getName() {return "RpcDispatcher";}
89
90     public void setMarshaller(Marshaller m) {this.marshaller=m;}
91
92     public Marshaller getMarshaller() {return marshaller;}
93
94     public Object JavaDoc getServerObject() {return server_obj;}
95
96
97     public RspList castMessage(Vector JavaDoc dests, Message msg, int mode, long timeout) {
98         if(log.isErrorEnabled()) log.error("this method should not be used with " +
99                     "RpcDispatcher, but MessageDispatcher. Returning null");
100         return null;
101     }
102
103     public Object JavaDoc sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException {
104         if(log.isErrorEnabled()) log.error("this method should not be used with " +
105                     "RpcDispatcher, but MessageDispatcher. Returning null");
106         return null;
107     }
108
109
110
111
112
113     public RspList callRemoteMethods(Vector JavaDoc dests, String JavaDoc method_name, Object JavaDoc[] args,
114                                      Class JavaDoc[] types, int mode, long timeout) {
115         MethodCall method_call=new MethodCall(method_name, args, types);
116         return callRemoteMethods(dests, method_call, mode, timeout);
117     }
118
119     public RspList callRemoteMethods(Vector JavaDoc dests, String JavaDoc method_name, Object JavaDoc[] args,
120                                      String JavaDoc[] signature, int mode, long timeout) {
121         MethodCall method_call=new MethodCall(method_name, args, signature);
122         return callRemoteMethods(dests, method_call, mode, timeout);
123     }
124
125
126     public RspList callRemoteMethods(Vector JavaDoc dests, MethodCall method_call, int mode, long timeout) {
127         byte[] buf=null;
128         Message msg=null;
129         RspList retval=null;
130
131         if(log.isTraceEnabled())
132             log.trace("dests=" + dests + ", method_call=" + method_call + ", mode=" + mode + ", timeout=" + timeout);
133
134         if(dests != null && dests.size() == 0) {
135             // don't send if dest list is empty
136
if(log.isTraceEnabled())
137                 log.trace("destination list is non-null and empty: no need to send message");
138             return new RspList();
139         }
140
141         try {
142             buf=marshaller != null? marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call);
143         }
144         catch(Exception JavaDoc e) {
145             if(log.isErrorEnabled()) log.error("exception=" + e);
146             return null;
147         }
148
149         msg=new Message(null, null, buf);
150         retval=super.castMessage(dests, msg, mode, timeout);
151         if(log.isTraceEnabled()) log.trace("responses: " + retval);
152         return retval;
153     }
154
155
156
157
158
159     public Object JavaDoc callRemoteMethod(Address dest, String JavaDoc method_name, Object JavaDoc[] args,
160                                    Class JavaDoc[] types, int mode, long timeout)
161             throws TimeoutException, SuspectedException {
162         MethodCall method_call=new MethodCall(method_name, args, types);
163         return callRemoteMethod(dest, method_call, mode, timeout);
164     }
165
166     public Object JavaDoc callRemoteMethod(Address dest, String JavaDoc method_name, Object JavaDoc[] args,
167                                    String JavaDoc[] signature, int mode, long timeout)
168             throws TimeoutException, SuspectedException {
169         MethodCall method_call=new MethodCall(method_name, args, signature);
170         return callRemoteMethod(dest, method_call, mode, timeout);
171     }
172
173     public Object JavaDoc callRemoteMethod(Address dest, MethodCall method_call, int mode, long timeout)
174             throws TimeoutException, SuspectedException {
175         byte[] buf=null;
176         Message msg=null;
177         Object JavaDoc retval=null;
178
179         if(log.isTraceEnabled())
180             log.trace("dest=" + dest + ", method_call=" + method_call + ", mode=" + mode + ", timeout=" + timeout);
181
182         try {
183             buf=marshaller != null? marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call);
184         }
185         catch(Exception JavaDoc e) {
186             if(log.isErrorEnabled()) log.error("exception=" + e);
187             return null;
188         }
189
190         msg=new Message(dest, null, buf);
191         retval=super.sendMessage(msg, mode, timeout);
192         if(log.isTraceEnabled()) log.trace("retval: " + retval);
193         return retval;
194     }
195
196
197
198
199
200     /**
201      * Message contains MethodCall. Execute it against *this* object and return result.
202      * Use MethodCall.invoke() to do this. Return result.
203      */

204     public Object JavaDoc handle(Message req) {
205         Object JavaDoc body=null;
206         MethodCall method_call;
207
208         if(server_obj == null) {
209             if(log.isErrorEnabled()) log.error("no method handler is registered. Discarding request.");
210             return null;
211         }
212
213         if(req == null || req.getLength() == 0) {
214             if(log.isErrorEnabled()) log.error("message or message buffer is null");
215             return null;
216         }
217
218         try {
219             body=marshaller != null? marshaller.objectFromByteBuffer(req.getBuffer()) : req.getObject();
220         }
221         catch(Throwable JavaDoc e) {
222             if(log.isErrorEnabled()) log.error("exception=" + e);
223             return e;
224         }
225
226         if(body == null || !(body instanceof MethodCall)) {
227             if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object");
228             return null;
229         }
230
231         method_call=(MethodCall)body;
232
233         try {
234             if(log.isTraceEnabled())
235                 log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call);
236             return method_call.invoke(server_obj);
237         }
238         catch(Throwable JavaDoc x) {
239             log.error("failed invoking method", x);
240             return x;
241         }
242     }
243
244     /**
245      * Add a new channel listener to be notified on the channel's state change.
246      *
247      * @return true if the listener was added or false if the listener was already in the list.
248      */

249     public boolean addChannelListener(ChannelListener l) {
250
251         synchronized(additionalChannelListeners) {
252             if (additionalChannelListeners.contains(l)) {
253                return false;
254             }
255             additionalChannelListeners.add(l);
256             return true;
257         }
258     }
259
260
261     /**
262      *
263      * @return true if the channel was removed indeed.
264      */

265     public boolean removeChannelListener(ChannelListener l) {
266
267         synchronized(additionalChannelListeners) {
268             return additionalChannelListeners.remove(l);
269         }
270     }
271
272
273
274     /* --------------------- Interface ChannelListener ---------------------- */
275
276     public void channelConnected(Channel channel) {
277
278         start();
279
280         synchronized(additionalChannelListeners) {
281             for(Iterator JavaDoc i = additionalChannelListeners.iterator(); i.hasNext(); ) {
282                 ChannelListener l = (ChannelListener)i.next();
283                 try {
284                     l.channelConnected(channel);
285                 }
286                 catch(Throwable JavaDoc t) {
287                     log.warn("channel listener failed", t);
288                 }
289             }
290         }
291     }
292
293     public void channelDisconnected(Channel channel) {
294
295         stop();
296
297         synchronized(additionalChannelListeners) {
298             for(Iterator JavaDoc i = additionalChannelListeners.iterator(); i.hasNext(); ) {
299                 ChannelListener l = (ChannelListener)i.next();
300                 try {
301                     l.channelDisconnected(channel);
302                 }
303                 catch(Throwable JavaDoc t) {
304                     log.warn("channel listener failed", t);
305                 }
306             }
307         }
308     }
309
310     public void channelClosed(Channel channel) {
311
312         stop();
313
314         synchronized(additionalChannelListeners) {
315             for(Iterator JavaDoc i = additionalChannelListeners.iterator(); i.hasNext(); ) {
316                 ChannelListener l = (ChannelListener)i.next();
317                 try {
318                     l.channelClosed(channel);
319                 }
320                 catch(Throwable JavaDoc t) {
321                     log.warn("channel listener failed", t);
322                 }
323             }
324         }
325     }
326
327     public void channelShunned() {
328
329         synchronized(additionalChannelListeners) {
330             for(Iterator JavaDoc i = additionalChannelListeners.iterator(); i.hasNext(); ) {
331                 ChannelListener l = (ChannelListener)i.next();
332                 try {
333                     l.channelShunned();
334                 }
335                 catch(Throwable JavaDoc t) {
336                     log.warn("channel listener failed", t);
337                 }
338             }
339         }
340     }
341
342     public void channelReconnected(Address new_addr) {
343         if(log.isTraceEnabled())
344             log.trace("channel has been rejoined, old local_addr=" + local_addr + ", new local_addr=" + new_addr);
345
346         synchronized(additionalChannelListeners) {
347             for(Iterator JavaDoc i = additionalChannelListeners.iterator(); i.hasNext(); ) {
348                 ChannelListener l = (ChannelListener)i.next();
349                 try {
350                     l.channelReconnected(new_addr);
351                 }
352                 catch(Throwable JavaDoc t) {
353                    log.warn("channel listener failed", t);
354                 }
355             }
356         }
357     }
358     /* ----------------------------------------------------------------------- */
359
360 }
361
Popular Tags