1 package org.sapia.ubik.rmi.server; 2 3 import java.io.Externalizable ; 4 import java.io.IOException ; 5 import java.io.ObjectInput ; 6 import java.io.ObjectOutput ; 7 import java.lang.reflect.Method ; 8 import java.rmi.RemoteException ; 9 import java.util.Collections ; 10 import java.util.HashSet ; 11 import java.util.Iterator ; 12 import java.util.LinkedList ; 13 import java.util.List ; 14 import java.util.Set ; 15 16 import javax.naming.Name ; 17 18 import org.sapia.ubik.net.Connection; 19 import org.sapia.ubik.net.ServerAddress; 20 import org.sapia.ubik.rmi.Consts; 21 import org.sapia.ubik.rmi.server.invocation.CallBackInvokeCommand; 22 import org.sapia.ubik.rmi.server.invocation.InvokeCommand; 23 import org.sapia.ubik.rmi.server.transport.Connections; 24 import org.sapia.ubik.rmi.server.transport.TransportManager; 25 26 27 46 public class RemoteRefStateless implements StubInvocationHandler, 47 Externalizable , HealthCheck { 48 static final long serialVersionUID = 1L; 49 protected Name _name; 51 protected String _domain; 52 protected String _mcastAddress; 53 protected int _mcastPort; 54 protected OID _oid = new OID(UIDGenerator.createdUID()); 55 protected transient boolean _isRegistered; 56 protected List _serviceInfos = Collections.synchronizedList(new LinkedList ()); 57 58 61 public RemoteRefStateless() { 62 super(); 63 } 64 65 68 public RemoteRefStateless(Name name, String domain) { 69 _name = name; 70 _domain = domain; 71 } 72 73 76 public OID getOID(){ 77 return _oid; 78 } 79 80 83 public Object invoke(Object obj, Method toCall, Object [] params) 84 throws Throwable { 85 Object toReturn = null; 86 87 ServiceInfo info = acquire(); 88 89 try { 90 toReturn = doInvoke(info, obj, toCall, params); 91 } catch (ShutdownException e) { 92 toReturn = handleError(info, obj, toCall, params, e); 93 } catch (RemoteException e) { 94 toReturn = handleError(info, obj, toCall, params, e); 95 } 96 97 return toReturn; 98 } 99 100 103 public boolean isValid() { 104 try { 105 return clean(); } catch (Throwable t) { 107 Log.error(getClass(), "Stub not valid", t); 108 109 return false; 110 } 111 } 112 113 116 public StubContainer toStubContainer(Object proxy) { 117 Set interfaces = new HashSet (); 118 ServerTable.appendInterfaces(proxy.getClass(), interfaces); 119 120 String [] names = new String [interfaces.size()]; 121 int count = 0; 122 123 for (Iterator iter = interfaces.iterator(); iter.hasNext();) { 124 names[count++] = ((Class ) iter.next()).getName(); 125 } 126 127 return new StubContainerBase(names, this); 128 } 129 130 133 public void readExternal(ObjectInput in) 134 throws IOException , ClassNotFoundException { 135 _name = (Name ) in.readObject(); 136 _domain = in.readUTF(); 137 _mcastAddress = in.readUTF(); 138 _mcastPort = in.readInt(); 139 _oid = (OID) in.readObject(); 140 _serviceInfos = (List ) in.readObject(); 141 142 ServiceInfo info; 143 144 for (int i = 0; i < _serviceInfos.size(); i++) { 145 info = (ServiceInfo) _serviceInfos.get(i); 146 processServiceInfo(info); 147 } 148 149 if (Log.isInfo()) { 150 Log.info(getClass(), "Deserializing stateless stub; endpoints: " + _serviceInfos); 151 } 152 153 StatelessStubTable.registerStatelessRef(this); 154 } 155 156 159 public void writeExternal(ObjectOutput out) throws IOException { 160 out.writeObject(_name); 161 out.writeUTF(_domain); 162 out.writeUTF(_mcastAddress); 163 out.writeInt(_mcastPort); 164 out.writeObject(_oid); 165 out.writeObject(_serviceInfos); 166 } 167 168 176 public static RemoteRefStateless fromRemoteRefs(Name name, String domain, 177 List remoteRefs) { 178 RemoteRefStateless ref = new RemoteRefStateless(name, domain); 179 RemoteRef current; 180 ServiceInfo info; 181 182 String mcastAddress = Consts.DEFAULT_MCAST_ADDR; 183 int mcastPort = Consts.DEFAULT_MCAST_PORT; 184 185 try { 186 if (System.getProperty(org.sapia.ubik.rmi.Consts.MCAST_PORT_KEY) != null) { 187 mcastPort = Integer.parseInt(System.getProperty( 188 org.sapia.ubik.rmi.Consts.MCAST_PORT_KEY)); 189 } 190 } catch (NumberFormatException e) { 191 } 193 194 if (System.getProperty(org.sapia.ubik.rmi.Consts.MCAST_ADDR_KEY) != null) { 195 mcastAddress = System.getProperty(org.sapia.ubik.rmi.Consts.MCAST_ADDR_KEY); 196 } 197 198 for (int i = 0; i < remoteRefs.size(); i++) { 199 current = (RemoteRef) remoteRefs.get(i); 200 201 if (!ref._serviceInfos.contains(current)) { 202 info = new ServiceInfo(current._serverAddress, current._oid, 203 current._callBack, current._vmId, current._isFirstVoyage); 204 ref._serviceInfos.add(info); 205 ref._mcastAddress = mcastAddress; 206 ref._mcastPort = mcastPort; 207 } 208 } 209 210 return ref; 211 } 212 213 218 public void addSibling(RemoteRefStateless other) { 219 if (other._oid.equals(_oid)) { 220 return; 221 } 222 223 synchronized (_serviceInfos) { 224 for (int i = 0; i < other._serviceInfos.size(); i++) { 225 ServiceInfo toAdd = (ServiceInfo) other._serviceInfos.get(i); 226 if (!_serviceInfos.contains(toAdd)) { 227 Log.info(getClass(), 228 "Remote server " + other + "added to stateless stub: " + 229 toString() + " for name: " + other._name); 230 _serviceInfos.add(toAdd); 231 } 232 } 233 if(Log.isInfo()) 234 Log.info(getClass(), "Got " + _serviceInfos.size() + " endpoints : " + _serviceInfos); 235 } 236 } 237 238 243 List getInfos() { 244 return _serviceInfos; 245 } 246 247 public int hashCode() { 248 return _oid.hashCode(); 249 } 250 251 public boolean equals(Object o) { 252 try { 253 return (this == o) || ((RemoteRefStateless) o)._oid.equals(_oid); 254 } catch (ClassCastException e) { 255 return false; 256 } 257 } 258 259 public String toString() { 260 return _serviceInfos.toString(); 261 } 262 263 protected boolean clean(){ 264 synchronized(_serviceInfos){ 265 for(int i = 0; i < _serviceInfos.size(); i++){ 266 ServiceInfo info = (ServiceInfo)_serviceInfos.get(i); 267 Connections pool = null; 268 Connection conn = null; 269 try{ 270 pool = TransportManager.getConnectionsFor(info.address); 271 conn = pool.acquire(); 272 conn.send(new CommandPing()); 273 conn.receive(); 274 pool.release(conn); 275 }catch(RemoteException e){ 276 _serviceInfos.remove(i--); 277 Log.info(getClass(), "Cleaning up invalid endpoint: " + info.address); 278 if(pool != null){ 279 pool.clear(); 280 } 281 if(conn != null){ 282 conn.close(); 283 } 284 }catch(Exception e){ 285 Log.error(getClass(), e); 286 if(conn != null){ 287 conn.close(); 288 } 289 } 290 } 291 } 292 return _serviceInfos.size() > 0; 293 } 294 295 protected Object sendCommand(RMICommand cmd) throws Throwable { 296 ServiceInfo info = acquire(); 297 Connections pool = TransportManager.getConnectionsFor(info.address); 298 Connection conn = null; 299 Object toReturn = null; 300 301 try { 302 while (_serviceInfos.size() > 0) { 303 try { 304 conn = pool.acquire(); 305 conn.send(cmd); 306 toReturn = conn.receive(); 307 308 if (!(toReturn instanceof ShutdownException)) { 309 break; 310 } else { 311 pool.clear(); 312 info = removeAcquire(info); 313 pool = TransportManager.getConnectionsFor(info.address); 314 } 316 } catch (RemoteException e) { 317 pool.clear(); 318 319 try { 320 conn = pool.acquire(); 321 322 conn.send(new CommandPing()); 324 conn.receive(); 325 } catch (RemoteException e2) { 326 if (Log.isInfo()) { 327 Log.info(getClass(), 328 "Invalid endpoint for stateless stub: " + info); 329 Log.info(getClass(), "Got " + _serviceInfos.size() + " remaining endpoints: " + _serviceInfos); 330 } 331 332 pool.clear(); 333 info = removeAcquire(info); 334 pool = TransportManager.getConnectionsFor(info.address); 335 } 336 } 337 } 338 339 if ((_serviceInfos.size() == 0) || (conn == null)) { 340 throw new RemoteException ("No connection available"); 341 } 342 343 if (toReturn == null) { 344 return toReturn; 345 } else if (toReturn instanceof Throwable ) { 346 Throwable err = (Throwable ) toReturn; 347 err.fillInStackTrace(); 348 throw err; 349 } 350 351 return toReturn; 352 } finally { 353 if (conn != null) { 354 pool.release(conn); 355 } 356 } 357 } 358 359 protected Object doInvoke(ServiceInfo info, Object obj, Method toCall, 360 Object [] params) throws Throwable { 361 Object toReturn = null; 362 363 if (info.callback && 364 Hub.clientRuntime.isCallback(info.address.getTransportType())) { 365 if (Log.isDebug()) { 366 Log.debug(getClass(), "invoking (callback): " + toCall); 367 } 368 369 Connections pool = TransportManager.getConnectionsFor(info.address); 370 toReturn = ClientRuntime.invoker.dispatchInvocation(info.vmId, pool, 371 new CallBackInvokeCommand(info.oid, toCall.getName(), params, 372 toCall.getParameterTypes(), info.address.getTransportType())); 373 } else { 374 if (Log.isDebug()) { 375 Log.debug(getClass(), "invoking (no callback): " + toCall); 376 } 377 378 Connections pool = TransportManager.getConnectionsFor(info.address); 379 380 toReturn = ClientRuntime.invoker.dispatchInvocation(info.vmId, pool, 381 new InvokeCommand(info.oid, toCall.getName(), params, 382 toCall.getParameterTypes(), info.address.getTransportType())); 383 } 384 385 if (toReturn == null) { 386 return toReturn; 387 } else if (toReturn instanceof Throwable ) { 388 Throwable err = (Throwable ) toReturn; 389 err.fillInStackTrace(); 390 throw err; 391 } 392 393 return toReturn; 394 } 395 396 protected ServiceInfo acquire() throws RemoteException { 397 if (_serviceInfos.size() == 0) { 398 throw new RemoteException ("No connection available"); 399 } 400 401 ServiceInfo toReturn; 402 403 synchronized (_serviceInfos) { 404 toReturn = (ServiceInfo) _serviceInfos.remove(0); 405 _serviceInfos.add(toReturn); 406 } 407 408 return toReturn; 409 } 410 411 protected ServiceInfo removeAcquire(ServiceInfo toRemove) 412 throws RemoteException { 413 synchronized (_serviceInfos) { 414 if(Log.isInfo()) 415 Log.info(getClass(), "Removing invalid instance: " + toRemove.address); 416 _serviceInfos.remove(toRemove); 417 if(Log.isInfo()) 418 Log.info(getClass(), "Remaining: " + _serviceInfos); 419 } 420 421 return acquire(); 422 } 423 424 protected Object handleError(ServiceInfo info, Object obj, Method toCall, 425 Object [] params, Throwable err) throws Throwable { 426 do { 427 info = removeAcquire(info); 428 429 try { 430 return doInvoke(info, obj, toCall, params); 431 } catch (Throwable t) { 432 if (t instanceof RemoteException || t instanceof ShutdownException) { 433 err = t; 434 } 435 } 436 } while ((err instanceof ShutdownException || 437 err instanceof RemoteException ) && (_serviceInfos.size() > 0)); 438 439 throw err; 440 } 441 442 private void processServiceInfo(ServiceInfo info) throws IOException { 443 Hub.clientRuntime.gc.register(info.address, info.oid, this); 444 445 if (info.isFirstVoyage) { 446 info.isFirstVoyage = false; 447 } else { 448 try { 449 Hub.createReference(info.address, info.oid); 450 } catch (RemoteException e) { 451 _serviceInfos.remove(info); 452 } 453 } 454 } 455 456 459 public static class ServiceInfo implements java.io.Serializable { 460 ServerAddress address; 461 OID oid; 462 boolean callback; 463 boolean isFirstVoyage; 464 VmId vmId; 465 466 public ServiceInfo(ServerAddress addr, OID id, boolean callback, VmId vmId, 467 boolean isFirstVoyage) { 468 this.address = addr; 469 this.oid = id; 470 this.callback = callback; 471 this.vmId = vmId; 472 this.isFirstVoyage = isFirstVoyage; 473 } 474 475 public int hashCode() { 476 return oid.hashCode(); 477 } 478 479 public boolean equals(Object other) { 480 try { 481 ServiceInfo otherInfo = (ServiceInfo) other; 482 483 return otherInfo.oid.equals(oid); 484 } catch (ClassCastException e) { 485 return false; 486 } 487 } 488 489 public String toString() { 490 return "[ oid=" + oid + ", address=" + address + " ]"; 491 } 492 } 493 } 494 | Popular Tags |