1 7 package org.jboss.mq.il.oil2; 8 9 import java.io.IOException ; 10 import java.io.ObjectInputStream ; 11 import java.io.ObjectOutputStream ; 12 import java.util.Iterator ; 13 14 import org.jboss.logging.Logger; 15 16 import EDU.oswego.cs.dl.util.concurrent.Channel; 17 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 18 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 19 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 20 import EDU.oswego.cs.dl.util.concurrent.Slot; 21 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 22 23 29 public final class OIL2SocketHandler implements java.lang.Cloneable , Runnable 30 { 31 final static private Logger log = Logger.getLogger(OIL2SocketHandler.class); 32 33 36 private ObjectInputStream in; 37 38 41 private ObjectOutputStream out; 42 43 46 private boolean running; 47 48 51 private final ThreadGroup partentThreadGroup; 52 53 56 private Thread worker; 57 58 61 private static int threadNumber = 0; 62 63 69 volatile ConcurrentHashMap responseSlots = new ConcurrentHashMap(); 70 71 75 OIL2RequestListner requestListner; 76 77 80 private volatile boolean pumpingData = false; 81 82 85 private Object pumpMutex = new Object (); 86 87 90 LinkedQueue requestQueue = new LinkedQueue(); 91 92 95 PooledExecutor pool; 96 97 103 public OIL2SocketHandler(ObjectInputStream in, ObjectOutputStream out, ThreadGroup partentThreadGroup) 104 { 105 this.in = in; 106 this.out = out; 107 this.partentThreadGroup = partentThreadGroup; 108 109 synchronized (OIL2SocketHandler.class) 110 { 111 if (pool == null) 112 { 113 pool = new PooledExecutor(50); 114 log.debug("Setting the OIL2SocketHandler's thread factory"); 116 pool.setThreadFactory( 117 new ThreadFactory() 118 { 119 private int threadNo = 0; 120 public Thread newThread(Runnable r) 121 { 122 Thread t = new Thread (OIL2SocketHandler.this.partentThreadGroup, r, "OIL2SocketHandler Thread-" + threadNo++); 123 t.setDaemon(true); 124 return t; 125 } 126 } 127 ); 128 pool.setMinimumPoolSize(1); 129 pool.setKeepAliveTime(1000 * 60); 130 pool.runWhenBlocked(); 131 pool.createThreads(1); 132 } 133 } 134 } 135 136 142 public void sendRequest(OIL2Request request) throws IOException 143 { 144 147 try 148 { 149 synchronized (out) 150 { 151 out.writeByte(1); 152 request.writeExternal(out); 153 out.reset(); 154 out.flush(); 155 } 156 } 157 catch (IOException e) 158 { 159 throw e; 160 } 161 162 } 163 164 167 private void registerResponseSlot(OIL2Request request, Slot responseSlot) throws IOException 168 { 169 responseSlots.put(request.requestId, responseSlot); 170 } 171 172 175 public void setRequestListner(OIL2RequestListner requestListner) 176 { 177 this.requestListner = requestListner; 178 } 179 180 186 public void sendResponse(OIL2Response response) throws IOException 187 { 188 191 try 192 { 193 synchronized (out) 194 { 195 out.writeByte(2); 196 response.writeExternal(out); 197 out.reset(); 198 out.flush(); 199 } 200 } 201 catch (IOException e) 202 { 203 throw e; 204 } 205 } 206 207 227 private Object pumpMessages(OIL2Request request, Channel mySlot) 228 throws IOException , ClassNotFoundException , InterruptedException 229 { 230 231 synchronized (pumpMutex) 232 { 233 if (pumpingData) 235 { 236 return null; 237 } 238 else 239 pumpingData = true; 240 } 241 242 try 243 { 244 while (true) 245 { 246 if (mySlot != null) 247 { 248 Object o; 250 while ((o = mySlot.peek()) != null) 251 { 252 o = mySlot.take(); 253 if (o != this) 254 { 255 return o; 256 } 257 } 258 } 259 260 byte code = in.readByte(); 261 switch (code) 262 { 263 case 1 : 265 OIL2Request newRequest = new OIL2Request(); 266 newRequest.readExternal(in); 267 268 if (request == null) 270 { 271 return newRequest; 272 } 273 else 274 { 275 requestQueue.put(newRequest); 276 } 277 278 break; 279 280 case 2 : 282 283 OIL2Response response = new OIL2Response(); 284 response.readExternal(in); 285 286 if (response.correlationRequestId == null) 288 continue; 289 290 if (request != null && request.requestId.equals(response.correlationRequestId)) 292 { 293 return response; 294 } 295 else 296 { 297 298 Slot slot = (Slot) responseSlots.remove(response.correlationRequestId); 299 300 if (slot != null) 301 { 302 slot.put(response); 303 } 304 else 305 { 306 if (log.isTraceEnabled()) 308 log.warn("No slot registered for: " + response); 309 } 310 } 311 break; 312 } } } 315 finally 316 { 317 synchronized (pumpMutex) 318 { 319 pumpingData = false; 320 } 321 322 Thread thread = Thread.currentThread(); 323 boolean interrupted = thread.isInterrupted(); 324 325 Iterator i = responseSlots.values().iterator(); 328 while (i.hasNext()) 329 { 330 Slot s = (Slot) i.next(); 331 if (s != mySlot) 332 s.offer(this, 0); 333 } 334 335 if (request != null) 338 { 339 requestQueue.put(this); 340 } 341 342 if (interrupted) 343 thread.interrupt(); 344 } 345 } 346 347 public OIL2Response synchRequest(OIL2Request request) 348 throws IOException , InterruptedException , ClassNotFoundException 349 { 350 351 354 Slot slot = new Slot(); 355 registerResponseSlot(request, slot); 356 sendRequest(request); 357 358 Object o = null; 359 while (true) 360 { 361 if (o != null) 363 { 364 if (o != this) 366 { 367 return (OIL2Response) o; 370 } 371 o = slot.peek(); 373 if (o != null) 374 o = slot.take(); 375 } 376 else 377 { 378 o = pumpMessages(request, slot); 381 if (o == null) 382 { 383 o = slot.take(); 386 } 387 } 388 } } 390 391 public class RequestRunner implements Runnable 392 { 393 OIL2Request request; 394 RequestRunner(OIL2Request request) 395 { 396 this.request = request; 397 } 398 public void run() 399 { 400 requestListner.handleRequest(request); 401 } 402 } 403 404 407 public void run() 408 { 409 try 410 { 411 412 Object o = null; 413 while (running) 414 { 415 if (o != null) 417 { 418 if (o != this) 420 { 421 pool.execute(new RequestRunner((OIL2Request) o)); 422 } 423 o = requestQueue.peek(); 425 if (o != null) 426 o = requestQueue.take(); 427 } 428 else 429 { 430 o = pumpMessages(null, requestQueue); 433 if (o == null) 434 { 435 o = requestQueue.take(); 438 } 439 } 440 } 442 } 443 catch (InterruptedException e) 444 { 445 if (log.isTraceEnabled()) 446 log.trace("Stopped due to interruption"); 447 } 448 catch (Exception e) 449 { 450 if (log.isTraceEnabled()) 451 log.trace("Stopping due to unexcpected exception: ", e); 452 requestListner.handleConnectionException(e); 453 } 454 455 running = false; 457 if (log.isTraceEnabled()) 458 log.trace("Stopped"); 459 } 460 461 public void start() { 463 if (log.isTraceEnabled()) 464 log.trace("Starting"); 465 466 running = true; 467 worker = new Thread (partentThreadGroup, this, "OIL2 Worker-" + threadNumber++); 468 worker.setDaemon(true); 469 worker.start(); 470 471 } 472 473 public void stop() 474 { 475 if (log.isTraceEnabled()) 476 log.trace("Stopping"); 477 running = false; 478 worker.interrupt(); 479 } 480 481 } 482 | Popular Tags |