1 22 package org.jboss.invocation.pooled.server; 23 24 import java.io.BufferedInputStream ; 25 import java.io.BufferedOutputStream ; 26 import java.io.InterruptedIOException ; 27 import java.io.ObjectInputStream ; 28 import java.io.ObjectOutputStream ; 29 import java.net.Socket ; 30 import java.util.LinkedList ; 31 32 import org.jboss.invocation.Invocation; 33 import org.jboss.invocation.pooled.interfaces.OptimizedObjectInputStream; 34 import org.jboss.invocation.pooled.interfaces.OptimizedObjectOutputStream; 35 import org.jboss.logging.Logger; 36 37 55 public class ServerThread extends Thread 56 { 57 final static private Logger log = Logger.getLogger(ServerThread.class); 58 59 protected ObjectInputStream in; 60 protected ObjectOutputStream out; 61 protected Socket socket; 62 protected PooledInvoker invoker; 63 protected LRUPool clientpool; 64 protected LinkedList threadpool; 65 protected volatile boolean running = true; 66 protected volatile boolean handlingResponse = true; protected volatile boolean shutdown = false; 68 protected boolean trace; 69 protected static int id = 0; 70 71 public static synchronized int nextID() 72 { 73 int nextID = id ++; 74 return nextID; 75 } 76 77 public ServerThread(Socket socket, PooledInvoker invoker, LRUPool clientpool, 78 LinkedList threadpool, int timeout) throws Exception 79 { 80 super("PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID()); 81 this.socket = socket; 82 this.invoker = invoker; 83 this.clientpool = clientpool; 84 this.threadpool = threadpool; 85 this.trace = log.isTraceEnabled(); 86 socket.setSoTimeout(timeout); 87 } 88 89 public void shutdown() 90 { 91 shutdown = true; 92 running = false; 93 98 if (!handlingResponse) 102 { 103 try 104 { 105 this.interrupt(); 106 Thread.interrupted(); } 108 catch (Exception ignored) {} 109 } 110 111 } 112 113 public void evict() 114 { 115 running = false; 116 123 124 if (!handlingResponse) 128 { 129 try 130 { 131 this.interrupt(); 132 Thread.interrupted(); } 134 catch (Exception ignored) {} 135 } 136 } 137 138 139 public synchronized void wakeup(Socket socket, int timeout) throws Exception 140 { 141 this.socket = socket; 142 String name = "PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID(); 143 super.setName(name); 144 socket.setSoTimeout(timeout); 145 running = true; 146 handlingResponse = true; 147 this.notify(); 148 } 149 150 public void run() 151 { 152 try 153 { 154 while (true) 155 { 156 dorun(); 157 if (shutdown) 159 { 160 synchronized (clientpool) 162 { 163 clientpool.remove(this); 164 } 165 return; } 167 else 168 { 169 synchronized (this) 171 { 172 synchronized(clientpool) 174 { 175 synchronized(threadpool) 177 { 178 clientpool.remove(this); 180 threadpool.add(this); 182 Thread.interrupted(); clientpool.notify(); 184 } 185 } 186 if( trace ) 187 log.trace("begin thread wait"); 188 this.wait(); 189 if( trace ) 190 log.trace("WAKEUP in SERVER THREAD"); 191 } 192 } 193 } 194 } 195 catch (Exception ignored) 196 { 197 if( trace ) 198 log.trace("Exiting run on exception", ignored); 199 } 200 } 201 202 protected void acknowledge() throws Exception 203 { 204 byte ACK = in.readByte(); 208 210 handlingResponse = true; 217 218 out.writeByte(ACK); 219 out.flush(); 220 } 221 222 protected void processInvocation() throws Exception 223 { 224 handlingResponse = true; 225 Invocation invocation = (Invocation)in.readObject(); 227 in.readObject(); Object response = null; 229 try 230 { 231 boolean interrupted = Thread.interrupted(); 233 response = invoker.invoke(invocation); 234 } 235 catch (Exception ex) 236 { 237 response = ex; 238 } 239 Thread.interrupted(); out.writeObject(response); 241 out.reset(); 242 out.writeObject(Boolean.TRUE); 246 out.flush(); 247 out.reset(); 248 handlingResponse = false; 249 } 250 251 254 protected void dorun() 255 { 256 log.debug("beginning dorun"); 257 running = true; 258 handlingResponse = true; 259 try 260 { 261 BufferedOutputStream bos = new BufferedOutputStream (socket.getOutputStream()); 262 out = new OptimizedObjectOutputStream(bos); 263 out.flush(); 264 BufferedInputStream bis = new BufferedInputStream (socket.getInputStream()); 265 in = new OptimizedObjectInputStream(bis); 266 } 267 catch (Exception e) 268 { 269 log.error("Failed to initialize", e); 270 } 271 272 try 274 { 275 processInvocation(); 276 } 277 catch (Exception e) 278 { 279 running = false; 280 if( trace ) 281 log.trace("invocation failed", e); 282 } 283 284 while (running) 286 { 287 try 288 { 289 acknowledge(); 290 processInvocation(); 291 } 292 catch (InterruptedIOException e) 293 { 294 log.debug("socket timed out", e); 295 running = false; 296 } 297 catch (InterruptedException e) 298 { 299 log.debug("interrupted", e); 300 } 301 catch (Exception ex) 302 { 303 if( trace ) 304 log.debug("invocation failed", ex); 305 running = false; 306 } 307 Thread.interrupted(); 309 } 310 311 if( trace ) 312 log.trace("finished loop"); 313 try 315 { 316 if (in != null) in.close(); 317 if (out != null) out.close(); 318 } 319 catch (Exception ex) 320 { 321 } 322 try 323 { 324 socket.close(); 325 } 326 catch (Exception ex) 327 { 328 log.debug("Failed cleanup", ex); 329 } 330 socket = null; 331 in = null; 332 out = null; 333 } 334 } 335 | Popular Tags |