1 9 10 package org.jboss.remoting.transport.socket; 11 12 import java.io.InputStream ; 13 import java.io.InterruptedIOException ; 14 import java.io.OutputStream ; 15 import java.lang.reflect.Constructor ; 16 import java.net.Socket ; 17 import java.util.LinkedList ; 18 import org.jboss.logging.Logger; 19 import org.jboss.remoting.marshal.MarshalFactory; 20 import org.jboss.remoting.marshal.Marshaller; 21 import org.jboss.remoting.marshal.UnMarshaller; 22 23 38 public class ServerThread extends Thread 39 { 40 final static private Logger log = Logger.getLogger(ServerThread.class); 41 42 protected SocketServerInvoker invoker; 43 protected LRUPool clientpool; 44 protected LinkedList threadpool; 45 protected volatile boolean running = true; 46 protected volatile boolean handlingResponse = true; protected volatile boolean shutdown = false; 48 protected static int id = 0; 49 50 private SocketWrapper socketWrapper = null; 51 protected String serverSocketClass = ServerSocketWrapper.class.getName(); 52 private Constructor serverSocketConstructor = null; 53 54 55 public static synchronized int nextID() 56 { 57 int nextID = id++; 58 return nextID; 59 } 60 61 public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool, 62 LinkedList threadpool, int timeout, String serverSocketClass) throws Exception 63 { 64 super("SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID()); 65 this.serverSocketClass = serverSocketClass; 66 this.socketWrapper = createServerSocket(socket, timeout); 67 this.invoker = invoker; 68 this.clientpool = clientpool; 69 this.threadpool = threadpool; 70 } 71 72 public void shutdown() 73 { 74 shutdown = true; 75 running = false; 76 81 if(!handlingResponse) 85 { 86 try 87 { 88 this.interrupt(); 89 Thread.interrupted(); } 91 catch(Exception ignored) 92 { 93 } 94 } 95 96 } 97 98 private SocketWrapper createServerSocket(Socket socket, int timeout) throws Exception 99 { 100 if(serverSocketConstructor == null) 101 { 102 ClassLoader classLoader = null; 104 if(classLoader == null) 105 { 106 classLoader = Thread.currentThread().getContextClassLoader(); 107 108 if(classLoader == null) 109 { 110 classLoader = getClass().getClassLoader(); 111 } 112 } 113 Class cl = classLoader.loadClass(serverSocketClass); 114 115 serverSocketConstructor = cl.getConstructor(new Class []{Socket .class}); 116 } 117 SocketWrapper serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object []{socket}); 118 serverSocketWrapper.setTimeout(timeout); 119 120 return serverSocketWrapper; 121 } 122 123 124 public void evict() 125 { 126 running = false; 127 134 135 if(!handlingResponse) 139 { 140 try 141 { 142 this.interrupt(); 143 Thread.interrupted(); } 145 catch(Exception ignored) 146 { 147 } 148 } 149 } 150 151 152 public synchronized void wakeup(Socket socket, int timeout) throws Exception 153 { 154 this.socketWrapper = createServerSocket(socket, timeout); 155 String name = "SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID(); 156 super.setName(name); 157 running = true; 158 handlingResponse = true; 159 this.notify(); 160 } 161 162 public void run() 163 { 164 try 165 { 166 while(true) 167 { 168 dorun(); 169 if(shutdown) 170 { 171 synchronized(clientpool) 172 { 173 clientpool.remove(this); 174 } 175 return; } 177 else 178 { 179 synchronized(this) 180 { 181 synchronized(clientpool) 182 { 183 synchronized(threadpool) 184 { 185 clientpool.remove(this); 186 threadpool.add(this); 187 Thread.interrupted(); clientpool.notify(); 189 } 190 } 191 log.debug("begin thread wait"); 192 this.wait(); 193 log.debug("WAKEUP in SERVER THREAD"); 194 } 195 } 196 } 197 } 198 catch(Exception ignored) 199 { 200 log.debug("Exiting run on exception", ignored); 201 } 202 } 203 204 protected void acknowledge() throws Exception 205 { 206 handlingResponse = true; 213 214 socketWrapper.checkConnection(); 215 216 handlingResponse = false; 217 } 218 219 protected void processInvocation() throws Exception 220 { 221 handlingResponse = true; 222 224 225 UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(invoker.getLocator(), this.getClass().getClassLoader()); 227 if(unmarshaller == null) 228 { 229 unmarshaller = MarshalFactory.getUnMarshaller(invoker.getDataType()); 230 } 231 Object obj = unmarshaller.read(socketWrapper.getInputStream(), null); 232 233 Object resp = null; 234 try 235 { 236 boolean interrupted = Thread.interrupted(); 238 resp = invoker.invoke(obj); 240 246 } 247 catch(Exception ex) 248 { 249 resp = ex; 250 } 251 252 Thread.interrupted(); 254 Marshaller marshaller = MarshalFactory.getMarshaller(invoker.getLocator(), this.getClass().getClassLoader()); 255 256 if(marshaller == null) 257 { 258 marshaller = MarshalFactory.getMarshaller(invoker.getDataType()); 259 } 260 marshaller.write(resp, socketWrapper.getOutputStream()); 261 262 handlingResponse = false; 263 } 264 265 268 protected void dorun() 269 { 270 log.debug("beginning dorun"); 271 running = true; 272 handlingResponse = true; 273 274 try 276 { 277 processInvocation(); 278 } 279 catch(Exception ex) 280 { 281 log.debug("failed to process invocation.", ex); 282 running = false; 283 } 284 285 while(running) 287 { 288 try 289 { 290 acknowledge(); 291 processInvocation(); 292 } 293 catch(InterruptedIOException e) 294 { 295 log.debug("socket timed out", e); 296 running = false; 297 } 298 catch(InterruptedException e) 299 { 300 log.debug("interrupted", e); 301 } 302 catch(Exception ex) 303 { 304 log.debug("failed", ex); 305 running = false; 306 } 307 Thread.interrupted(); 309 } 310 try 312 { 313 InputStream in = socketWrapper.getInputStream(); 314 if(in != null) 315 { 316 in.close(); 317 } 318 OutputStream out = socketWrapper.getOutputStream(); 319 if(out != null) 320 { 321 out.close(); 322 } 323 } 324 catch(Exception ex) 325 { 326 } 327 try 328 { 329 socketWrapper.close(); 330 } 331 catch(Exception ex) 332 { 333 log.error("Failed cleanup", ex); 334 } 335 socketWrapper = null; 336 } 337 } 338 | Popular Tags |