1 22 package org.jboss.mq.il.uil2; 23 24 import java.io.IOException ; 25 import java.io.ObjectInputStream ; 26 import java.io.ObjectOutputStream ; 27 import java.net.InetAddress ; 28 import java.net.Socket ; 29 import java.util.Iterator ; 30 31 import javax.jms.JMSException ; 32 33 import org.jboss.logging.Logger; 34 import org.jboss.mq.il.uil2.msgs.BaseMsg; 35 import org.jboss.util.stream.NotifyingBufferedInputStream; 36 import org.jboss.util.stream.NotifyingBufferedOutputStream; 37 38 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 39 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 40 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 41 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 43 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; 44 45 53 public class SocketManager 54 { 55 private static Logger log = Logger.getLogger(SocketManager.class); 56 57 private static final int STOPPED = 0; 58 private static final int STARTED = 1; 59 private static final int STOPPING = 2; 60 private static SynchronizedInt taskID = new SynchronizedInt(0); 61 62 63 private Socket socket; 64 65 private ObjectInputStream in; 66 67 NotifyingBufferedInputStream bufferedInput; 68 69 private ObjectOutputStream out; 70 71 NotifyingBufferedOutputStream bufferedOutput; 72 73 private Thread writeThread; 74 75 private Thread readThread; 76 77 PooledExecutor pool; 78 79 private int readState = STOPPED; 80 81 private int writeState = STOPPED; 82 83 private SynchronizedBoolean running = new SynchronizedBoolean(false); 84 85 private LinkedQueue sendQueue; 86 87 private ConcurrentHashMap replyMap; 88 89 private SocketManagerHandler handler; 90 91 private int bufferSize = 1; 92 93 private int chunkSize = 0x40000000; 94 95 private boolean trace; 96 97 public SocketManager(Socket s) throws IOException 98 { 99 socket = s; 100 sendQueue = new LinkedQueue(); 101 replyMap = new ConcurrentHashMap(); 102 trace = log.isTraceEnabled(); 103 } 104 105 109 public void start(ThreadGroup tg) 110 { 111 if (trace) 112 log.trace("start called", new Exception ("Start stack trace")); 113 114 InetAddress inetAddr = socket.getInetAddress(); 115 String ipAddress = (inetAddr != null) ? inetAddr.getHostAddress() : "<unknown>"; 116 ipAddress += ":" + socket.getPort(); 117 if (pool == null) 118 { 119 pool = new PooledExecutor(5); 121 pool.setMinimumPoolSize(1); 122 pool.setKeepAliveTime(1000 * 60); 123 pool.runWhenBlocked(); 124 String id = "SocketManager.MsgPool@"+ 125 Integer.toHexString(System.identityHashCode(this)) 126 + " client=" + ipAddress; 127 pool.setThreadFactory(new UILThreadFactory(id)); 128 } 129 130 ReadTask readTask = new ReadTask(); 131 readThread = new Thread (tg, readTask, "UIL2.SocketManager.ReadTask#" + taskID.increment() + " client=" + ipAddress); 132 readThread.setDaemon(true); 133 134 WriteTask writeTask = new WriteTask(); 135 writeThread = new Thread (tg, writeTask, "UIL2.SocketManager.WriteTask#" + taskID.increment() + " client=" + ipAddress); 136 writeThread.setDaemon(true); 137 138 synchronized (running) 139 { 140 readState = STARTED; 141 writeState = STARTED; 142 running.set(true); 143 } 144 145 readThread.start(); 146 writeThread.start(); 147 } 148 149 151 public void stop() 152 { 153 synchronized (running) 154 { 155 if (readState == STARTED) 156 { 157 readState = STOPPING; 158 readThread.interrupt(); 159 } 160 if (writeState == STARTED) 161 { 162 writeState = STOPPING; 163 writeThread.interrupt(); 164 } 165 running.set(false); 166 if (pool != null) 167 { 168 pool.shutdownNow(); 169 pool = null; 170 } 171 } 172 } 173 174 180 public void setHandler(SocketManagerHandler handler) 181 { 182 this.handler = handler; 183 if (bufferedInput != null) 184 bufferedInput.setStreamListener(handler); 185 if (bufferedOutput != null) 186 bufferedOutput.setStreamListener(handler); 187 } 188 189 194 public void setBufferSize(int size) 195 { 196 this.bufferSize = size; 197 } 198 199 204 public void setChunkSize(int size) 205 { 206 this.chunkSize = size; 207 } 208 209 218 public void sendMessage(BaseMsg msg) throws Exception 219 { 220 internalSendMessage(msg, true); 221 if (msg.error != null) 222 { 223 if (trace) 224 log.trace("sendMessage will throw error", msg.error); 225 throw msg.error; 226 } 227 } 228 229 235 public void sendReply(BaseMsg msg) throws Exception 236 { 237 msg.trimReply(); 238 internalSendMessage(msg, false); 239 } 240 241 247 public void sendOneWay(BaseMsg msg) throws Exception 248 { 249 msg.getMsgID(); 250 internalSendMessage(msg, false); 251 } 252 253 261 private void internalSendMessage(BaseMsg msg, boolean waitOnReply) throws Exception 262 { 263 if (running.get() == false) 264 throw new IOException ("Client is not connected"); 265 266 if (waitOnReply) 267 { synchronized (msg) 269 { 270 msg.getMsgID(); 272 if (trace) 273 log.trace("Begin internalSendMessage, round-trip msg=" + msg); 274 replyMap.put(msg, msg); 276 sendQueue.put(msg); 277 msg.wait(); 279 } 280 } 281 else 282 { if (trace) 284 log.trace("Begin internalSendMessage, one-way msg=" + msg); 285 sendQueue.put(msg); 286 } 287 if (trace) 288 log.trace("End internalSendMessage, msg=" + msg); 289 } 290 291 294 public class ReadTask implements Runnable 295 { 296 public void run() 297 { 298 int msgType = 0; 299 log.debug("Begin ReadTask.run"); 300 try 301 { 302 bufferedInput = new NotifyingBufferedInputStream(socket.getInputStream(), bufferSize, chunkSize, handler); 303 in = new ObjectInputStream (bufferedInput); 304 log.debug("Created ObjectInputStream"); 305 } 306 catch (IOException e) 307 { 308 handleStop("Failed to create ObjectInputStream", e); 309 return; 310 } 311 312 while (true) 313 { 314 try 315 { 316 msgType = in.readByte(); 317 int msgID = in.readInt(); 318 if (trace) 319 log.trace("Read msgType: " + BaseMsg.toString(msgType) + ", msgID: " + msgID); 320 BaseMsg key = new BaseMsg(msgType, msgID); 322 BaseMsg msg = (BaseMsg) replyMap.remove(key); 323 if (msg == null) 324 { 325 msg = BaseMsg.createMsg(msgType); 326 msg.setMsgID(msgID); 327 msg.read(in); 328 if (trace) 329 log.trace("Read new msg: " + msg); 330 331 if (pool == null) 333 break; 334 msg.setHandler(this); 335 pool.execute(msg); 336 } 337 else 338 { 339 if (trace) 340 log.trace("Found replyMap msg: " + msg); 341 msg.setMsgID(msgID); 342 try 343 { 344 msg.read(in); 345 if (trace) 346 log.trace("Read msg reply: " + msg); 347 } 348 catch (Throwable e) 349 { 350 msg.setError(e); 352 throw e; 353 } 354 finally 356 { 357 synchronized (msg) 358 { 359 msg.notify(); 360 } 361 } 362 } 363 } 364 catch (ClassNotFoundException e) 365 { 366 handleStop("Failed to read msgType:" + msgType, e); 367 break; 368 } 369 catch (IOException e) 370 { 371 handleStop("Exiting on IOE", e); 372 break; 373 } 374 catch (InterruptedException e) 375 { 376 handleStop("Exiting on interrupt", e); 377 break; 378 } 379 catch (Throwable e) 380 { 381 handleStop("Exiting on unexpected error in read task", e); 382 break; 383 } 384 } 385 log.debug("End ReadTask.run"); 386 } 387 388 391 public void handleMsg(BaseMsg msg) 392 { 393 try 394 { 395 handler.handleMsg(msg); 396 } 397 catch (Throwable e) 398 { 399 if (e instanceof JMSException ) 400 log.trace("Failed to handle: " + msg.toString(), e); 401 else if (e instanceof RuntimeException || e instanceof Error ) 402 log.error("Failed to handle: " + msg.toString(), e); 403 else 404 log.debug("Failed to handle: " + msg.toString(), e); 405 msg.setError(e); 406 try 407 { 408 internalSendMessage(msg, false); 409 } 410 catch (Exception ie) 411 { 412 log.debug("Failed to send error reply", ie); 413 } 414 } 415 } 416 417 420 private void handleStop(String error, Throwable e) 421 { 422 synchronized (running) 423 { 424 readState = STOPPING; 425 running.set(false); 426 } 427 428 if (e instanceof IOException || e instanceof InterruptedException ) 429 { 430 if (trace) 431 log.trace(error, e); 432 } 433 else 434 log.debug(error, e); 435 436 replyAll(e); 437 if (handler != null) 438 { 439 handler.asynchFailure(error, e); 440 handler.close(); 441 } 442 443 synchronized (running) 444 { 445 readState = STOPPED; 446 if (writeState == STARTED) 447 { 448 writeState = STOPPING; 449 writeThread.interrupt(); 450 } 451 } 452 453 try 454 { 455 in.close(); 456 } 457 catch (Exception ignored) 458 { 459 if (trace) 460 log.trace(ignored.getMessage(), ignored); 461 } 462 463 try 464 { 465 socket.close(); 466 } 467 catch (Exception ignored) 468 { 469 if (trace) 470 log.trace(ignored.getMessage(), ignored); 471 } 472 } 473 474 private void replyAll(Throwable e) 475 { 476 Thread.interrupted(); 478 479 for (Iterator iterator = replyMap.keySet().iterator(); iterator.hasNext();) 480 { 481 BaseMsg msg = (BaseMsg) iterator.next(); 482 msg.setError(e); 483 synchronized (msg) 484 { 485 msg.notify(); 486 } 487 iterator.remove(); 488 } 489 } 490 } 491 492 495 public class WriteTask implements Runnable 496 { 497 public void run() 498 { 499 log.debug("Begin WriteTask.run"); 500 try 501 { 502 bufferedOutput = 503 new NotifyingBufferedOutputStream(socket.getOutputStream(), bufferSize, chunkSize, handler); 504 out = new ObjectOutputStream (bufferedOutput); 505 log.debug("Created ObjectOutputStream"); 506 } 507 catch (IOException e) 508 { 509 handleStop(null, "Failed to create ObjectOutputStream", e); 510 return; 511 } 512 513 while (true) 514 { 515 BaseMsg msg = null; 516 try 517 { 518 msg = (BaseMsg) sendQueue.take(); 519 if (trace) 520 log.trace("Write msg: " + msg); 521 msg.write(out); 522 out.reset(); 523 out.flush(); 524 } 525 catch (InterruptedException e) 526 { 527 handleStop(msg, "WriteTask was interrupted", e); 528 break; 529 } 530 catch (IOException e) 531 { 532 handleStop(msg, "Exiting on IOE", e); 533 break; 534 } 535 catch (Throwable e) 536 { 537 handleStop(msg, "Failed to write msgType:" + msg, e); 538 break; 539 } 540 } 541 log.debug("End WriteTask.run"); 542 } 543 544 547 private void handleStop(BaseMsg msg, String error, Throwable e) 548 { 549 synchronized (running) 550 { 551 writeState = STOPPING; 552 running.set(false); 553 } 554 555 if (e instanceof InterruptedException || e instanceof IOException ) 556 { 557 if (trace) 558 log.trace(error, e); 559 } 560 else 561 log.debug(error, e); 562 563 if (msg != null) 564 { 565 msg.setError(e); 566 synchronized (msg) 567 { 568 msg.notify(); 569 } 570 } 571 572 synchronized (running) 573 { 574 writeState = STOPPED; 575 if (readState == STARTED) 576 { 577 readState = STOPPING; 578 readThread.interrupt(); 579 } 580 } 581 582 try 583 { 584 out.close(); 585 } 586 catch (Exception ignored) 587 { 588 if (trace) 589 log.trace(ignored.getMessage(), ignored); 590 } 591 592 try 593 { 594 socket.close(); 595 } 596 catch (Exception ignored) 597 { 598 if (trace) 599 log.trace(ignored.getMessage(), ignored); 600 } 601 } 602 } 603 604 static class UILThreadFactory implements ThreadFactory 605 { 606 private String id; 607 private int count; 608 609 UILThreadFactory(String id) 610 { 611 this.id = id; 612 } 613 public Thread newThread(Runnable command) 614 { 615 synchronized( this ) 616 { 617 count ++; 618 } 619 Thread t = new Thread (command, "UIL2("+id+")#"+count); 620 return t; 621 } 622 } 623 } 624 | Popular Tags |