1 package org.sapia.ubik.rmi.server.invocation; 2 3 import org.sapia.ubik.rmi.server.*; 4 import org.sapia.ubik.rmi.server.command.CommandProcessor; 5 import org.sapia.ubik.rmi.server.command.Destination; 6 import org.sapia.ubik.rmi.server.command.ResponseQueue; 7 import org.sapia.ubik.rmi.server.command.ResponseSender; 8 import org.sapia.ubik.rmi.server.transport.RmiConnection; 9 import org.sapia.ubik.rmi.server.transport.TransportManager; 10 11 import java.io.IOException ; 12 import java.io.ObjectInput ; 13 import java.io.ObjectOutput ; 14 15 import java.util.List ; 16 17 18 28 public class RMICommandProcessor extends CommandProcessor { 29 private ResponseQueue _responses = ResponseQueue.getInstance(); 30 31 34 public RMICommandProcessor(int maxThreads) { 35 super(maxThreads); 36 super.setResponseSender(new RMIResponseSender()); 37 } 38 39 42 public void shutdown(long timeout) throws InterruptedException { 43 super.shutdown(timeout); 44 Log.warning(getClass(), "Shutting down incoming response queue"); 45 ResponseQueue.getInstance().shutdown(timeout); 46 _responses.shutdown(timeout); 47 } 48 49 52 static final class RMIResponseSender implements ResponseSender { 53 56 public void sendResponses(Destination dest, List responses) { 57 if (Log.isDebug()) { 58 Log.debug(getClass(), "Sending callback responses to " + dest); 59 } 60 61 if (responses.size() > 0) { 62 RmiConnection conn = null; 63 64 try { 65 conn = TransportManager.getConnectionsFor(dest.getServerAddress()) 66 .acquire(); 67 68 conn.send(new ResponseListCommand(responses), dest.getVmId(), 70 dest.getServerAddress().getTransportType()); 71 conn.receive(); 72 } catch (Exception e) { 73 Log.error(RMICommandProcessor.class, e); 74 75 if (conn != null) { 76 conn.close(); 77 } 78 79 return; 80 } 81 82 if (conn != null) { 83 try { 84 TransportManager.getConnectionsFor(dest.getServerAddress()).release(conn); 85 } catch (Exception e) { 86 Log.error(RMICommandProcessor.class, e); 87 } 88 } 89 } 90 } 91 } 92 93 public static final class ResponseListCommand extends RMICommand { 94 private List _responses; 95 96 public ResponseListCommand() { 97 } 98 99 ResponseListCommand(List responses) { 100 _responses = responses; 101 } 102 103 106 public Object execute() throws Throwable { 107 if (Log.isDebug()) { 108 Log.debug(getClass(), "Receiving callbacks"); 109 } 110 111 ResponseQueue.getInstance().onResponses(_responses); 112 113 return new Integer (0); 114 } 115 116 119 public void writeExternal(ObjectOutput out) throws IOException { 120 super.writeExternal(out); 121 out.writeObject(_responses); 122 } 123 124 127 public void readExternal(ObjectInput in) 128 throws IOException , ClassNotFoundException { 129 super.readExternal(in); 130 _responses = (List ) in.readObject(); 131 } 132 } 133 } 134 | Popular Tags |