1 3 package org.jgroups.blocks; 4 5 6 import org.jgroups.util.RspList; 7 import org.jgroups.util.Util; 8 import org.jgroups.ChannelListener; 9 import org.jgroups.Channel; 10 import org.jgroups.MessageListener; 11 import org.jgroups.MembershipListener; 12 import org.jgroups.Transport; 13 import org.jgroups.Message; 14 import org.jgroups.TimeoutException; 15 import org.jgroups.SuspectedException; 16 import org.jgroups.Address; 17 18 import java.io.Serializable ; 19 import java.util.Vector ; 20 import java.util.List ; 21 import java.util.ArrayList ; 22 import java.util.Iterator ; 23 24 25 26 27 32 public class RpcDispatcher extends MessageDispatcher implements ChannelListener { 33 protected Object server_obj=null; 34 protected Marshaller marshaller=null; 35 protected List additionalChannelListeners=null; 36 37 38 public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj) { 39 super(channel, l, l2); 40 channel.setChannelListener(this); 41 this.server_obj=server_obj; 42 additionalChannelListeners = new ArrayList (); 43 } 44 45 46 public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj, 47 boolean deadlock_detection) { 48 super(channel, l, l2, deadlock_detection); 49 channel.setChannelListener(this); 50 this.server_obj=server_obj; 51 additionalChannelListeners = new ArrayList (); 52 } 53 54 public RpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object server_obj, 55 boolean deadlock_detection, boolean concurrent_processing) { 56 super(channel, l, l2, deadlock_detection, concurrent_processing); 57 channel.setChannelListener(this); 58 this.server_obj=server_obj; 59 additionalChannelListeners = new ArrayList (); 60 } 61 62 63 64 public RpcDispatcher(PullPushAdapter adapter, Serializable id, 65 MessageListener l, MembershipListener l2, Object server_obj) { 66 super(adapter, id, l, l2); 67 68 if(this.adapter != null) { 71 Transport t=this.adapter.getTransport(); 72 if(t != null && t instanceof Channel) { 73 ((Channel)t).setChannelListener(this); 74 } 75 } 76 77 this.server_obj=server_obj; 78 additionalChannelListeners = new ArrayList (); 79 } 80 81 82 public interface Marshaller { 83 byte[] objectToByteBuffer(Object obj) throws Exception ; 84 Object objectFromByteBuffer(byte[] buf) throws Exception ; 85 } 86 87 88 public String getName() {return "RpcDispatcher";} 89 90 public void setMarshaller(Marshaller m) {this.marshaller=m;} 91 92 public Marshaller getMarshaller() {return marshaller;} 93 94 public Object getServerObject() {return server_obj;} 95 96 97 public RspList castMessage(Vector dests, Message msg, int mode, long timeout) { 98 if(log.isErrorEnabled()) log.error("this method should not be used with " + 99 "RpcDispatcher, but MessageDispatcher. Returning null"); 100 return null; 101 } 102 103 public Object sendMessage(Message msg, int mode, long timeout) throws TimeoutException, SuspectedException { 104 if(log.isErrorEnabled()) log.error("this method should not be used with " + 105 "RpcDispatcher, but MessageDispatcher. Returning null"); 106 return null; 107 } 108 109 110 111 112 113 public RspList callRemoteMethods(Vector dests, String method_name, Object [] args, 114 Class [] types, int mode, long timeout) { 115 MethodCall method_call=new MethodCall(method_name, args, types); 116 return callRemoteMethods(dests, method_call, mode, timeout); 117 } 118 119 public RspList callRemoteMethods(Vector dests, String method_name, Object [] args, 120 String [] signature, int mode, long timeout) { 121 MethodCall method_call=new MethodCall(method_name, args, signature); 122 return callRemoteMethods(dests, method_call, mode, timeout); 123 } 124 125 126 public RspList callRemoteMethods(Vector dests, MethodCall method_call, int mode, long timeout) { 127 byte[] buf=null; 128 Message msg=null; 129 RspList retval=null; 130 131 if(log.isTraceEnabled()) 132 log.trace("dests=" + dests + ", method_call=" + method_call + ", mode=" + mode + ", timeout=" + timeout); 133 134 if(dests != null && dests.size() == 0) { 135 if(log.isTraceEnabled()) 137 log.trace("destination list is non-null and empty: no need to send message"); 138 return new RspList(); 139 } 140 141 try { 142 buf=marshaller != null? marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call); 143 } 144 catch(Exception e) { 145 if(log.isErrorEnabled()) log.error("exception=" + e); 146 return null; 147 } 148 149 msg=new Message(null, null, buf); 150 retval=super.castMessage(dests, msg, mode, timeout); 151 if(log.isTraceEnabled()) log.trace("responses: " + retval); 152 return retval; 153 } 154 155 156 157 158 159 public Object callRemoteMethod(Address dest, String method_name, Object [] args, 160 Class [] types, int mode, long timeout) 161 throws TimeoutException, SuspectedException { 162 MethodCall method_call=new MethodCall(method_name, args, types); 163 return callRemoteMethod(dest, method_call, mode, timeout); 164 } 165 166 public Object callRemoteMethod(Address dest, String method_name, Object [] args, 167 String [] signature, int mode, long timeout) 168 throws TimeoutException, SuspectedException { 169 MethodCall method_call=new MethodCall(method_name, args, signature); 170 return callRemoteMethod(dest, method_call, mode, timeout); 171 } 172 173 public Object callRemoteMethod(Address dest, MethodCall method_call, int mode, long timeout) 174 throws TimeoutException, SuspectedException { 175 byte[] buf=null; 176 Message msg=null; 177 Object retval=null; 178 179 if(log.isTraceEnabled()) 180 log.trace("dest=" + dest + ", method_call=" + method_call + ", mode=" + mode + ", timeout=" + timeout); 181 182 try { 183 buf=marshaller != null? marshaller.objectToByteBuffer(method_call) : Util.objectToByteBuffer(method_call); 184 } 185 catch(Exception e) { 186 if(log.isErrorEnabled()) log.error("exception=" + e); 187 return null; 188 } 189 190 msg=new Message(dest, null, buf); 191 retval=super.sendMessage(msg, mode, timeout); 192 if(log.isTraceEnabled()) log.trace("retval: " + retval); 193 return retval; 194 } 195 196 197 198 199 200 204 public Object handle(Message req) { 205 Object body=null; 206 MethodCall method_call; 207 208 if(server_obj == null) { 209 if(log.isErrorEnabled()) log.error("no method handler is registered. Discarding request."); 210 return null; 211 } 212 213 if(req == null || req.getLength() == 0) { 214 if(log.isErrorEnabled()) log.error("message or message buffer is null"); 215 return null; 216 } 217 218 try { 219 body=marshaller != null? marshaller.objectFromByteBuffer(req.getBuffer()) : req.getObject(); 220 } 221 catch(Throwable e) { 222 if(log.isErrorEnabled()) log.error("exception=" + e); 223 return e; 224 } 225 226 if(body == null || !(body instanceof MethodCall)) { 227 if(log.isErrorEnabled()) log.error("message does not contain a MethodCall object"); 228 return null; 229 } 230 231 method_call=(MethodCall)body; 232 233 try { 234 if(log.isTraceEnabled()) 235 log.trace("[sender=" + req.getSrc() + "], method_call: " + method_call); 236 return method_call.invoke(server_obj); 237 } 238 catch(Throwable x) { 239 log.error("failed invoking method", x); 240 return x; 241 } 242 } 243 244 249 public boolean addChannelListener(ChannelListener l) { 250 251 synchronized(additionalChannelListeners) { 252 if (additionalChannelListeners.contains(l)) { 253 return false; 254 } 255 additionalChannelListeners.add(l); 256 return true; 257 } 258 } 259 260 261 265 public boolean removeChannelListener(ChannelListener l) { 266 267 synchronized(additionalChannelListeners) { 268 return additionalChannelListeners.remove(l); 269 } 270 } 271 272 273 274 275 276 public void channelConnected(Channel channel) { 277 278 start(); 279 280 synchronized(additionalChannelListeners) { 281 for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { 282 ChannelListener l = (ChannelListener)i.next(); 283 try { 284 l.channelConnected(channel); 285 } 286 catch(Throwable t) { 287 log.warn("channel listener failed", t); 288 } 289 } 290 } 291 } 292 293 public void channelDisconnected(Channel channel) { 294 295 stop(); 296 297 synchronized(additionalChannelListeners) { 298 for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { 299 ChannelListener l = (ChannelListener)i.next(); 300 try { 301 l.channelDisconnected(channel); 302 } 303 catch(Throwable t) { 304 log.warn("channel listener failed", t); 305 } 306 } 307 } 308 } 309 310 public void channelClosed(Channel channel) { 311 312 stop(); 313 314 synchronized(additionalChannelListeners) { 315 for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { 316 ChannelListener l = (ChannelListener)i.next(); 317 try { 318 l.channelClosed(channel); 319 } 320 catch(Throwable t) { 321 log.warn("channel listener failed", t); 322 } 323 } 324 } 325 } 326 327 public void channelShunned() { 328 329 synchronized(additionalChannelListeners) { 330 for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { 331 ChannelListener l = (ChannelListener)i.next(); 332 try { 333 l.channelShunned(); 334 } 335 catch(Throwable t) { 336 log.warn("channel listener failed", t); 337 } 338 } 339 } 340 } 341 342 public void channelReconnected(Address new_addr) { 343 if(log.isTraceEnabled()) 344 log.trace("channel has been rejoined, old local_addr=" + local_addr + ", new local_addr=" + new_addr); 345 346 synchronized(additionalChannelListeners) { 347 for(Iterator i = additionalChannelListeners.iterator(); i.hasNext(); ) { 348 ChannelListener l = (ChannelListener)i.next(); 349 try { 350 l.channelReconnected(new_addr); 351 } 352 catch(Throwable t) { 353 log.warn("channel listener failed", t); 354 } 355 } 356 } 357 } 358 359 360 } 361 | Popular Tags |