1 package org.sapia.ubik.rmi.server.invocation; 2 3 import org.sapia.ubik.rmi.Consts; 4 import org.sapia.ubik.rmi.server.*; 5 import org.sapia.ubik.rmi.server.command.ResponseLock; 6 import org.sapia.ubik.rmi.server.command.ResponseQueue; 7 import org.sapia.ubik.rmi.server.command.ResponseTimeOutException; 8 import org.sapia.ubik.rmi.server.perf.PerfAnalyzer; 9 import org.sapia.ubik.rmi.server.transport.Connections; 10 import org.sapia.ubik.rmi.server.transport.MarshalledObject; 11 import org.sapia.ubik.rmi.server.transport.RmiConnection; 12 13 import java.lang.reflect.InvocationTargetException ; 14 import java.lang.reflect.Method ; 15 16 17 27 public class InvocationDispatcher { 28 static final long DEFAULT_CALLBACK_TIMEOUT = 30000; 29 private static long _timeout = DEFAULT_CALLBACK_TIMEOUT; 30 31 static { 32 if (System.getProperty(Consts.CLIENT_CALLBACK_TIMEOUT) != null) { 33 try { 34 _timeout = Long.parseLong(Consts.CLIENT_CALLBACK_TIMEOUT); 35 } catch (NumberFormatException e) { 36 } 38 } 39 } 40 41 private PerfAnalyzer _perf = PerfAnalyzer.getInstance(); 42 private ResponseQueue _responses = ResponseQueue.getInstance(); 43 44 47 public InvocationDispatcher() { 48 super(); 49 } 50 51 71 public Object dispatchInvocation(VmId vmId, Connections pool, 72 InvokeCommand cmd) 73 throws java.io.IOException , ClassNotFoundException , Throwable { 74 Object toReturn; 75 76 if (Log.isDebug()) { 77 Log.debug(getClass(), "sending invocation for: " + cmd.getOID()); 78 } 79 80 ClientPreInvokeEvent pre = new ClientPreInvokeEvent(cmd); 81 Hub.clientRuntime.dispatcher.dispatch(pre); 82 83 if (VmId.getInstance().equals(vmId)) { 84 if (Log.isInfo()) { 85 Log.info(getClass(), "performing colocated call"); 86 } 87 88 Object target = Hub.serverRuntime.objectTable.getObjectFor(cmd.getOID()); 89 Method toCall = target.getClass().getMethod(cmd.getMethodName(), 90 cmd.getParameterTypes()); 91 try{ 92 toReturn = toCall.invoke(target, cmd.getParams()); 93 }catch(InvocationTargetException e){ 94 throw e.getTargetException(); 95 } 96 } else { 97 if (_perf.isEnabled()) { 98 _perf.getTopic(getClass().getName() + ".AcquireConnection").start(); 99 } 100 101 RmiConnection conn = pool.acquire(); 102 103 if (_perf.isEnabled()) { 104 _perf.getTopic(getClass().getName() + ".AcquireConnection").end(); 105 } 106 107 try { 108 if (_perf.isEnabled()) { 110 _perf.getTopic(getClass().getName() + ".InvokeSend").start(); 111 } 112 113 conn.send(pre.getCommand(), cmd.getVmId(), 114 conn.getServerAddress().getTransportType()); 115 116 if (_perf.isEnabled()) { 117 _perf.getTopic(getClass().getName() + ".InvokeSend").end(); 118 } 119 120 if (_perf.isEnabled()) { 121 _perf.getTopic(getClass().getName() + ".InvokeReceive").start(); 122 } 123 124 toReturn = conn.receive(); 125 126 if (_perf.isEnabled()) { 127 _perf.getTopic(getClass().getName() + ".InvokeReceive").end(); 128 } 129 } finally { 130 pool.release(conn); 131 } 132 133 if (cmd.usesMarshalledObjects() && (toReturn != null)) { 134 try { 135 toReturn = ((MarshalledObject) toReturn).get(Thread.currentThread() 136 .getContextClassLoader()); 137 } catch (ClassCastException e) { 138 String aMessage = "Could not cast to MarshalledObject: " + 139 toReturn.getClass() + "\n" + toReturn; 140 Log.error(getClass(), aMessage); 141 throw new ClassCastException (aMessage); 142 } 143 } 144 } 145 146 ClientPostInvokeEvent post = new ClientPostInvokeEvent(pre.getCommand(), 147 toReturn); 148 149 if (Log.isDebug()) { 150 Log.debug(getClass(), "dispatching post-invocation event"); 151 } 152 153 Hub.clientRuntime.dispatcher.dispatch(post); 154 155 if (Log.isDebug()) { 156 Log.debug(getClass(), "returning invocation response"); 157 } 158 159 return toReturn; 160 } 161 162 182 public Object dispatchInvocation(VmId vmId, Connections pool, 183 CallBackInvokeCommand cmd) 184 throws java.io.IOException , ClassNotFoundException , 185 ResponseTimeOutException, Throwable { 186 RmiConnection conn; 187 Object toReturn; 188 189 ClientPreInvokeEvent pre = new ClientPreInvokeEvent(cmd); 190 191 if (VmId.getInstance().equals(vmId)) { 192 if (Log.isInfo()) { 193 Log.info(getClass(), "performing colocated call"); 194 } 195 196 Object target = Hub.serverRuntime.objectTable.getObjectFor(cmd.getOID()); 197 Method toCall = target.getClass().getMethod(cmd.getMethodName(), 198 cmd.getParameterTypes()); 199 toReturn = toCall.invoke(target, cmd.getParams()); 200 } else { 201 ResponseLock lock = _responses.createResponseLock(); 202 203 if (Log.isDebug()) { 204 Log.debug(getClass(), "sending callback invocation " + lock.getId()); 205 } 206 207 cmd.setUp(lock.getId(), 208 Hub.clientRuntime.getCallbackAddress(pool.getTransportType())); 209 Hub.clientRuntime.dispatcher.dispatch(pre); 210 211 conn = pool.acquire(); 212 213 try { 214 conn.send(pre.getCommand(), cmd.getVmId(), 216 conn.getServerAddress().getTransportType()); 217 218 if (Log.isDebug()) { 219 Log.debug(getClass(), "receiving ACK"); 220 } 221 222 conn.receive(); 223 } finally { 224 pool.release(conn); 225 } 226 227 try { 228 if (Log.isDebug()) { 229 Log.debug(getClass(), "waiting for response..."); 230 } 231 232 toReturn = lock.waitResponse(_timeout); 233 } catch (ResponseTimeOutException e) { 234 throw e; 235 } catch (InterruptedException e) { 236 lock.release(); 237 throw new java.io.IOException ("response queue thread interrupted"); 238 } 239 240 if (cmd.usesMarshalledObjects() && (toReturn != null)) { 241 try { 242 toReturn = ((MarshalledObject) toReturn).get(Thread.currentThread() 243 .getContextClassLoader()); 244 } catch (ClassCastException e) { 245 String aMessage = "Could not cast to MarshalledObject: " + 246 toReturn.getClass() + "\n" + toReturn; 247 Log.error(getClass(), aMessage); 248 throw new ClassCastException (aMessage); 249 } 250 } 251 } 252 253 ClientPostInvokeEvent post = new ClientPostInvokeEvent(pre.getCommand(), 254 toReturn); 255 Hub.clientRuntime.dispatcher.dispatch(post); 256 257 return toReturn; 258 } 259 } 260 | Popular Tags |