1 package org.sapia.ubik.rmi.server.transport.nio.tcp; 2 3 import java.io.DataInputStream ; 4 import java.io.DataOutputStream ; 5 import java.io.EOFException ; 6 import java.io.IOException ; 7 import java.io.OutputStream ; 8 import java.net.Socket ; 9 import java.net.SocketException ; 10 import java.rmi.RemoteException ; 11 12 import org.sapia.ubik.net.ServerAddress; 13 import org.sapia.ubik.rmi.server.VmId; 14 import org.sapia.ubik.rmi.server.transport.MarshalInputStream; 15 import org.sapia.ubik.rmi.server.transport.MarshalOutputStream; 16 import org.sapia.ubik.rmi.server.transport.RmiConnection; 17 import org.sapia.ubik.util.ByteVector; 18 import org.sapia.ubik.util.ByteVectorOutputStream; 19 20 38 public class NioTcpRmiClientConnection implements RmiConnection { 39 static final long RESET_INTERVAL = 2000; 40 static final int BUFSZ = 1000; 41 private Socket _sock; 42 private ByteVector _bytes; 43 private MarshalOutputStream _mos; 44 private MarshalInputStream _mis; 45 private DataInputStream _is; 46 private byte[] _readBuf, _headers; 47 private long _lastReset = System.currentTimeMillis(); 48 private ServerAddress _address; 49 50 55 public NioTcpRmiClientConnection(Socket sock, int bufsize) throws IOException { 56 _sock = sock; 57 _address = new NioAddress(sock.getInetAddress().getHostAddress(), sock 58 .getPort()); 59 _bytes = new ByteVector(bufsize, 10); 60 _readBuf = new byte[bufsize]; 61 } 62 63 66 public ServerAddress getServerAddress() { 67 return _address; 68 } 69 70 74 public void send(Object o, VmId vmId, String tranportType) 75 throws IOException , RemoteException { 76 try { 77 _mos.setUp(vmId, tranportType); 78 send(o); 79 } catch(java.net.SocketException e) { 80 throw new RemoteException ( 81 "communication with server interrupted; server probably disappeared", 82 e); 83 } 84 } 85 86 89 public void send(Object o) throws IOException , RemoteException { 90 _bytes.clear(false); 91 if((System.currentTimeMillis() - _lastReset) >= RESET_INTERVAL && _mos != null) { 92 _mos.reset(); 93 _lastReset = System.currentTimeMillis(); 94 } 95 if(_mos == null){ 96 _mos = new MarshalOutputStream(new ByteVectorOutputStream(_bytes)); 97 } 98 _mos.writeObject(o); 99 _mos.flush(); 100 _bytes.reset(); 101 102 OutputStream sos = _sock.getOutputStream(); 103 DataOutputStream dos = new DataOutputStream (sos); 104 dos.writeInt(_bytes.length()); 105 _bytes.read(dos); 106 dos.flush(); 107 } 108 109 112 public Object receive() throws IOException , ClassNotFoundException , 113 RemoteException { 114 try { 115 _bytes.clear(false); 116 if(_is == null){ 117 _is = new DataInputStream (_sock.getInputStream()); 118 _is.readInt(); 119 _mis = new MarshalInputStream(_is); 120 } 121 else{ 122 _is.readInt(); 123 } 124 return _mis.readObject(); 125 } catch(EOFException e) { 126 throw new RemoteException ( 127 "Communication with server interrupted; server probably disappeared", 128 e); 129 } catch(SocketException e) { 130 throw new RemoteException ( 131 "Connection could not be opened; server is probably down", e); 132 } 133 } 134 135 138 public void close() { 139 try { 140 _sock.close(); 141 } catch(Throwable t) { 142 } 144 } 145 } 146 | Popular Tags |