1 7 package org.jboss.mq.il.oil; 8 9 import java.io.BufferedInputStream ; 10 import java.io.BufferedOutputStream ; 11 import java.io.IOException ; 12 import java.io.ObjectInputStream ; 13 import java.io.ObjectOutputStream ; 14 import java.net.InetAddress ; 15 import java.net.Socket ; 16 import java.rmi.RemoteException ; 17 18 import org.jboss.logging.Logger; 19 import org.jboss.mq.ReceiveRequest; 20 import org.jboss.mq.SpyDestination; 21 import org.jboss.mq.il.ClientIL; 22 23 31 public final class OILClientIL 32 implements ClientIL, 33 java.io.Serializable 34 { 35 static final long serialVersionUID = 7812173621233374692L; 36 private final static Logger log = Logger.getLogger(OILClientIL.class); 37 38 private InetAddress addr; 39 private int port; 40 43 protected boolean enableTcpNoDelay=false; 44 45 46 private transient ObjectInputStream in; 47 private transient ObjectOutputStream out; 48 private transient Socket socket; 49 50 OILClientIL(InetAddress addr, int port, boolean enableTcpNoDelay) 51 { 52 this.addr = addr; 53 this.port = port; 54 this.enableTcpNoDelay = enableTcpNoDelay; 55 } 56 57 62 public synchronized void close() 63 throws Exception 64 { 65 if ( log.isTraceEnabled()) 66 log.trace("Closing OILClientIL"); 67 checkSocket(); 68 out.writeByte(OILConstants.CLOSE); 69 waitAnswer(); 70 try 71 { 72 socket.close(); 73 in.close(); 74 out.close(); 75 } 76 catch(Exception e) 77 { 78 if(log.isDebugEnabled()) 79 log.debug("Error closing the socket connection", e); 80 } 81 } 82 83 89 public synchronized void deleteTemporaryDestination(SpyDestination dest) 90 throws Exception 91 { 92 checkSocket(); 93 out.writeByte(OILConstants.DELETE_TEMPORARY_DESTINATION); 94 out.writeObject(dest); 95 waitAnswer(); 96 } 97 98 104 public synchronized void pong(long serverTime) 105 throws Exception 106 { 107 checkSocket(); 108 out.writeByte(OILConstants.PONG); 109 out.writeLong(serverTime); 110 waitAnswer(); 111 } 112 113 119 public synchronized void receive(ReceiveRequest messages[]) 120 throws Exception 121 { 122 boolean trace = log.isTraceEnabled(); 123 if( trace ) 124 log.trace("Checking socket"); 125 checkSocket(); 126 if( trace ) 127 log.trace("Writing request"); 128 out.writeByte(OILConstants.RECEIVE); 129 out.writeInt(messages.length); 130 for (int i = 0; i < messages.length; ++i) 131 { 132 messages[i].writeExternal(out); 133 } 134 if( trace ) 135 log.trace("Waiting for anwser"); 136 waitAnswer(); 137 if( trace ) 138 log.trace("Done"); 139 } 140 141 146 private void checkSocket() 147 throws RemoteException 148 { 149 if (socket == null) 150 { 151 createConnection(); 152 } 153 } 154 155 160 private void createConnection() 161 throws RemoteException 162 { 163 try 164 { 165 if (log.isDebugEnabled()) { 166 log.debug("ConnectionReceiverOILClient is connecting to: " + 167 addr.getHostAddress() + ":" + port); 168 } 169 170 socket = new Socket (addr, port); 171 out = new ObjectOutputStream (new BufferedOutputStream (socket.getOutputStream())); 172 out.flush(); 173 in = new ObjectInputStream (new BufferedInputStream (socket.getInputStream())); 174 } 175 catch (Exception e) 176 { 177 log.error("Cannot connect to the ConnectionReceiver/Server", e); 178 throw new RemoteException ("Cannot connect to the ConnectionReceiver/Server"); 179 } 180 } 181 182 187 private void waitAnswer() 188 throws Exception 189 { 190 Exception throwException = null; 191 try 192 { 193 out.reset(); 194 out.flush(); 195 int val = in.readByte(); 196 switch (val) 197 { 198 case OILConstants.EXCEPTION: 199 Exception e = (Exception )in.readObject(); 200 throwException = new RemoteException ("", e); 201 break; 202 } 203 } 204 catch (IOException e) 205 { 206 throw new RemoteException ("Cannot contact the remote object", e); 207 } 208 209 if (throwException != null) 210 { 211 throw throwException; 212 } 213 } 214 } 215 | Popular Tags |