1 7 package org.jboss.mq.il.oil; 8 9 import java.io.BufferedInputStream ; 10 import java.io.BufferedOutputStream ; 11 import java.io.IOException ; 12 import java.io.ObjectInputStream ; 13 import java.io.ObjectOutputStream ; 14 import java.net.ServerSocket ; 15 import java.net.Socket ; 16 import java.rmi.RemoteException ; 17 18 import org.jboss.mq.Connection; 19 import org.jboss.mq.ReceiveRequest; 20 import org.jboss.mq.SpyDestination; 21 22 30 public final class OILClientILService 31 implements java.lang.Runnable , 32 org.jboss.mq.il.ClientILService 33 { 34 private final static org.jboss.logging.Logger cat = org.jboss.logging.Logger.getLogger(OILClientILService.class); 35 36 private OILClientIL clientIL; 38 39 private Thread worker; 41 42 private Socket socket = null; 44 45 private Connection connection; 47 48 private boolean running; 50 51 private ServerSocket serverSocket; 53 54 57 private static int threadNumber= 0; 58 59 62 private boolean enableTcpNoDelay=false; 63 64 70 public org.jboss.mq.il.ClientIL getClientIL() 71 throws java.lang.Exception 72 { 73 return clientIL; 74 } 75 76 83 public void init(org.jboss.mq.Connection connection, java.util.Properties props) 84 throws java.lang.Exception 85 { 86 this.connection = connection; 87 serverSocket = new ServerSocket (0); 88 89 String t = props.getProperty(OILServerILFactory.OIL_TCPNODELAY_KEY); 90 if (t != null) 91 enableTcpNoDelay = t.equals("yes"); 92 93 clientIL = new OILClientIL(java.net.InetAddress.getLocalHost(), serverSocket.getLocalPort(), enableTcpNoDelay); 94 95 } 96 97 100 public void run() 101 { 102 int code = 0; 103 ObjectOutputStream out = null; 104 ObjectInputStream in = null; 105 socket = null; 106 int serverPort = serverSocket.getLocalPort(); 107 108 try 109 { 110 if( cat.isDebugEnabled() ) 111 cat.debug("Waiting for the server to connect to me on port " +serverSocket.getLocalPort()); 112 113 serverSocket.setSoTimeout(1000); 117 while (running && socket == null) 118 { 119 try 120 { 121 socket = serverSocket.accept(); 122 } 123 catch (java.io.InterruptedIOException e) 124 { 125 continue; 127 } 128 catch (IOException e) 129 { 130 if (running) 131 connection.asynchFailure("Error accepting connection from server in OILClientILService.", e); 132 return; } 134 } 135 136 if(running) 137 { 138 socket.setTcpNoDelay(enableTcpNoDelay); 139 socket.setSoTimeout(0); 140 out = new ObjectOutputStream (new BufferedOutputStream (socket.getOutputStream())); 141 out.flush(); 142 in = new ObjectInputStream (new BufferedInputStream (socket.getInputStream())); 143 } 144 else 145 { 146 return; 150 } 151 } 152 catch (IOException e) 153 { 154 connection.asynchFailure("Could not initialize the OILClientIL Service.", e); 155 return; 156 } 157 finally 158 { 159 try 160 { 161 serverSocket.close(); 162 serverSocket = null; 163 } 164 catch (IOException e) 165 { 166 if(cat.isDebugEnabled()) 167 cat.debug("run: an error occured closing the server socket", e); 168 } 169 } 170 171 while (running) 174 { 175 try 176 { 177 code = in.readByte(); 178 } 179 catch (java.io.InterruptedIOException e) 180 { 181 continue; 182 } 183 catch (IOException e) 184 { 185 break; 187 } 188 189 try 190 { 191 192 switch (code) 193 { 194 case OILConstants.RECEIVE: 195 int numReceives = in.readInt(); 196 org.jboss.mq.ReceiveRequest[] messages = new org.jboss.mq.ReceiveRequest[numReceives]; 197 for (int i = 0; i < numReceives; ++i) 198 { 199 messages[i] = new ReceiveRequest(); 200 messages[i].readExternal(in); 201 } 202 connection.asynchDeliver(messages); 203 break; 204 205 case OILConstants.DELETE_TEMPORARY_DESTINATION: 206 connection.asynchDeleteTemporaryDestination((SpyDestination)in.readObject()); 207 break; 208 209 case OILConstants.CLOSE: 210 connection.asynchClose(); 211 break; 212 213 case OILConstants.PONG: 214 connection.asynchPong(in.readLong()); 215 break; 216 217 default: 218 throw new RemoteException ("Bad method code !"); 219 } 220 221 try 224 { 225 out.writeByte(OILConstants.SUCCESS); 226 out.flush(); 227 } 228 catch (IOException e) 229 { 230 connection.asynchFailure("Connection failure(1)", e); 231 break; } 233 } 234 catch (Exception e) 235 { 236 if (!running) 237 { 238 break; 241 } 242 243 try 244 { 245 cat.error("Exception handling server request", e); 246 out.writeByte(OILConstants.EXCEPTION); 247 out.writeObject(e); 248 out.reset(); 249 out.flush(); 250 } 251 catch (IOException e2) 252 { 253 connection.asynchFailure("Connection failure(2)", e2); 254 break; 255 } 256 } 257 } 259 try 262 { 263 cat.debug("Closing receiver connections on port: " + serverPort); 264 out.close(); 265 in.close(); 266 socket.close(); 267 socket = null; 268 } 269 catch (IOException e) 270 { 271 connection.asynchFailure("Connection failure", e); 272 } 273 274 running = false; 277 } 278 279 284 public void start() 285 throws java.lang.Exception 286 { 287 288 running = true; 289 worker = new Thread (Connection.getThreadGroup(), this, "OILClientILService-" +threadNumber++); 290 worker.setDaemon(true); 291 worker.start(); 292 293 } 294 295 298 public void stop() 299 throws java.lang.Exception 300 { 301 cat.trace("Stop called on OILClientService"); 302 running = false; 303 worker.interrupt(); 304 } 305 } 306 308 309 310 311 312 313 314 | Popular Tags |