1 package org.sapia.ubik.rmi.server.transport.nio.tcp; 2 3 import java.io.EOFException ; 4 import java.io.IOException ; 5 import java.nio.ByteBuffer ; 6 import java.rmi.RemoteException ; 7 8 import org.sapia.ubik.net.ServerAddress; 9 import org.sapia.ubik.net.nio.ChannelHandler; 10 import org.sapia.ubik.net.nio.Cycle; 11 import org.sapia.ubik.rmi.server.Config; 12 import org.sapia.ubik.rmi.server.Log; 13 import org.sapia.ubik.rmi.server.RMICommand; 14 import org.sapia.ubik.rmi.server.VmId; 15 import org.sapia.ubik.rmi.server.invocation.InvokeCommand; 16 import org.sapia.ubik.rmi.server.perf.PerfAnalyzer; 17 import org.sapia.ubik.rmi.server.transport.RmiConnection; 18 import org.sapia.ubik.util.ByteVector; 19 20 35 public class RmiChannelHandler implements ChannelHandler { 36 37 private static final int READ_HEADER = 0; 38 private static final int READ_PAYLOAD = 1; 39 private static final int WRITE_PAYLOAD = 2; 40 41 private final int HEADER_BYTES = 4; 42 private final int DEFAULT_BUFSZ = 1024; 43 private final int INCREMENT = 10; 44 45 private PerfAnalyzer _perf = PerfAnalyzer.getInstance(); 46 private int _state = READ_HEADER; 47 private int _length = -1; 48 private int _processed = 0; 49 private ByteVector _bytes; 50 private byte[] _buf = new byte[DEFAULT_BUFSZ]; 51 private RmiChannelHandlerFactory _fac; 52 private RmiConnection _conn; 53 54 RmiChannelHandler(RmiChannelHandlerFactory fac, int bufsize) { 55 _fac = fac; 56 _bytes = new ByteVector(bufsize, INCREMENT); 57 _conn = new NioTcpRmiServerConnection(fac.getServerAddress(), this); 58 } 59 60 63 public void completed(Cycle cycle) { 64 _state = READ_HEADER; 65 _processed = 0; 66 _length = -1; 67 _bytes.clear(false); 68 if(cycle.state() == Cycle.STATE_COMPLETE) { 69 _fac.release(this); 70 } 71 } 72 73 76 public void error(Cycle cycle) { 77 _state = READ_HEADER; 78 _processed = 0; 79 _length = -1; 80 _bytes.clear(false); 81 if(cycle.state() == Cycle.STATE_ERROR) { 82 _fac.release(this); 83 } 84 } 85 86 89 public void process(Cycle cycle) { 90 91 RMICommand cmd = null; 92 try { 93 if(Log.isDebug()) { 94 Log.debug(getClass(), "receiving command"); 95 } 96 97 cmd = (RMICommand) _conn.receive(); 98 99 if(Log.isDebug()) { 100 Log.debug(getClass(), "command received: " + cmd.getClass().getName() 101 + " from " + cmd.getServerAddress() + '@' + cmd.getVmId()); 102 } 103 104 cmd.init(new Config(_fac.getServerAddress(), _conn)); 105 106 Object resp = null; 107 108 try { 109 if(_perf.isEnabled()) { 110 if(cmd instanceof InvokeCommand) { 111 _perf.getTopic(getClass().getName() + ".RemoteCall").start(); 112 } 113 } 114 115 resp = cmd.execute(); 116 117 if(_perf.isEnabled()) { 118 if(cmd instanceof InvokeCommand) { 119 _perf.getTopic(getClass().getName() + ".RemoteCall").end(); 120 } 121 } 122 } catch(Throwable t) { 123 t.printStackTrace(); 124 t.fillInStackTrace(); 125 resp = t; 126 } 127 128 if(_perf.isEnabled()) { 129 if(cmd instanceof InvokeCommand) { 130 _perf.getTopic(getClass().getName() + ".SendResponse").start(); 131 } 132 } 133 134 doSend(resp, cmd.getVmId(), cmd.getServerAddress()); 135 cycle.state(Cycle.STATE_WRITE); 136 }catch(RemoteException e){ 137 Log.error(getClass(), "RemoteException caught sending response", e); 138 _conn.close(); 139 cycle.state(Cycle.STATE_ERROR); 140 return; 141 }catch(EOFException e){ 142 Log.error(getClass(), "EOFException caught sending response; client probably down", e); 143 _conn.close(); 144 cycle.state(Cycle.STATE_ERROR); 145 return; 146 }catch(RuntimeException e){ 147 Log.error(getClass(), "RuntimeException caught sending response", e); 148 try{ 149 doSend(e, null, null); 150 cycle.state(Cycle.STATE_WRITE); 151 }catch(Exception e2){ 152 _conn.close(); 153 cycle.state(Cycle.STATE_ERROR); 154 } 155 return; 156 }catch(Exception e){ 157 Log.error(getClass(), "Exception caught sending response", e); 158 try{ 159 doSend(e, null, null); 160 cycle.state(Cycle.STATE_WRITE); 161 }catch(Exception e2){ 162 _conn.close(); 163 cycle.state(Cycle.STATE_ERROR); 164 } 165 return; 166 } 167 168 if(_perf.isEnabled()) { 169 if(cmd != null && cmd instanceof InvokeCommand) { 170 _perf.getTopic(getClass().getName() + ".SendResponse").end(); 171 } 172 } 173 cycle.state(Cycle.STATE_WRITE); 174 } 175 176 private void doSend(Object response, VmId vmid, ServerAddress addr) throws IOException , RemoteException { 177 if(vmid != null){ 178 _conn.send(response, vmid, addr.getTransportType()); 179 } 180 else{ 181 _conn.send(response); 182 } 183 } 184 185 188 public boolean read(Cycle cycle) { 189 switch(_state){ 190 case READ_HEADER: 191 if(cycle.getByteBuffer().remaining() >= HEADER_BYTES) { 192 _length = cycle.getByteBuffer().getInt(); 193 _state = READ_PAYLOAD; 194 } 195 return false; 196 case READ_PAYLOAD: 197 ByteBuffer input = cycle.getByteBuffer(); 198 while(input.hasRemaining()) { 199 int toRead = input.remaining(); 200 _bytes.write(input); 201 _processed += toRead; 202 } 203 if(_processed >= _length) { 204 cycle.state(Cycle.STATE_PROCESS); 205 return true; 206 } 207 return false; 208 default: 209 return false; 210 } 211 } 212 213 public boolean write(Cycle cycle) { 214 ByteBuffer output = cycle.getByteBuffer(); 215 switch(_state){ 216 case READ_PAYLOAD: 217 _bytes.reset(); 218 _state = WRITE_PAYLOAD; 219 output.putInt(_bytes.length()); 220 _processed = 0; 221 case WRITE_PAYLOAD: 222 if(_bytes.hasRemaining()) { 223 _bytes.read(output); 224 } 225 if(_bytes.hasRemaining()){ 226 return false; 227 } 228 cycle.state(Cycle.STATE_RECYCLE); 229 return true; 230 default: 231 return false; 232 } 233 } 234 235 ByteVector getByteVector() { 236 return _bytes; 237 } 238 239 ByteVector getContent() { 240 _bytes.reset(); 241 return _bytes; 242 } 243 244 } 245 | Popular Tags |