1 23 package org.objectweb.joram.client.jms.connection; 24 25 import java.util.Enumeration ; 26 import java.util.Hashtable ; 27 import java.util.Set ; 28 import java.util.Timer ; 29 import java.util.TimerTask ; 30 import java.util.Vector ; 31 32 import javax.jms.IllegalStateException ; 33 import javax.jms.InvalidDestinationException ; 34 import javax.jms.JMSException ; 35 import javax.jms.JMSSecurityException ; 36 37 import org.objectweb.joram.client.jms.Connection; 38 import org.objectweb.joram.shared.client.AbstractJmsReply; 39 import org.objectweb.joram.shared.client.AbstractJmsRequest; 40 import org.objectweb.joram.shared.client.ConsumerMessages; 41 import org.objectweb.joram.shared.client.JmsRequestGroup; 42 import org.objectweb.joram.shared.client.MomExceptionReply; 43 import org.objectweb.joram.shared.client.PingRequest; 44 import org.objectweb.joram.shared.client.SessDenyRequest; 45 46 import org.objectweb.joram.shared.JoramTracing; 47 import org.objectweb.util.monolog.api.BasicLevel; 48 49 public class RequestMultiplexer { 50 51 private static class Status { 52 public static final int OPEN = 0; 53 public static final int CLOSE = 1; 54 55 private static final String [] names = {"OPEN", "CLOSE"}; 56 57 public static String toString(int status) { 58 return names[status]; 59 } 60 } 61 62 private Connection cnx; 63 64 private volatile int status; 65 66 private RequestChannel channel; 67 68 public Hashtable requestsTable; 69 70 private int requestCounter; 71 72 private DemultiplexerDaemon demtpx; 73 74 private Timer timer; 75 76 80 private HeartBeatTask heartBeatTask; 81 82 private javax.jms.ExceptionListener exceptionListener; 83 84 87 private volatile long lastRequestDate; 88 89 public RequestMultiplexer(Connection cnx, 90 RequestChannel channel, 91 long heartBeat) throws JMSException { 92 this.channel = channel; 93 this.cnx = cnx; 94 requestsTable = new Hashtable (); 95 requestCounter = 0; 96 timer = new Timer (); 97 channel.setTimer(timer); 98 try { 99 channel.connect(); 100 } catch (Exception exc) { 101 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 102 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc); 103 throw new JMSException (exc.toString()); 104 } 105 106 demtpx = new DemultiplexerDaemon(); 107 demtpx.start(); 108 setStatus(Status.OPEN); 109 110 if (heartBeat > 0) { 111 heartBeatTask = new HeartBeatTask(heartBeat); 112 lastRequestDate = System.currentTimeMillis(); 113 try { 114 heartBeatTask.start(); 115 } catch (Exception exc) { 116 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 117 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc); 118 throw new JMSException (exc.toString()); 119 } 120 } 121 } 122 123 private void setStatus(int status) { 124 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 125 JoramTracing.dbgClient.log( 126 BasicLevel.DEBUG, 127 "RequestMultiplexer.setStatus(" + 128 Status.toString(status) + ')'); 129 this.status = status; 130 } 131 132 public boolean isClosed() { 133 return status == Status.CLOSE; 134 } 135 136 public void setExceptionListener( 137 javax.jms.ExceptionListener exceptionListener) { 138 this.exceptionListener = exceptionListener; 139 } 140 141 public javax.jms.ExceptionListener getExceptionListener() { 142 return exceptionListener; 143 } 144 145 public void sendRequest(AbstractJmsRequest request) throws JMSException { 146 sendRequest(request, null); 147 } 148 149 public void sendRequest(AbstractJmsRequest request, ReplyListener listener) 150 throws JMSException { 151 152 synchronized (this) { 153 if (status == Status.CLOSE) 154 throw new IllegalStateException ("Connection closed"); 155 156 if (requestCounter == Integer.MAX_VALUE) { 157 requestCounter = 0; 158 } 159 160 request.setRequestId(requestCounter++); 161 162 if (listener != null) { 163 requestsTable.put(new Integer (request.getRequestId()), listener); 164 } 165 166 if (heartBeatTask != null) { 167 lastRequestDate = System.currentTimeMillis(); 168 } 169 } 170 171 try { 172 channel.send(request); 173 } catch (Exception exc) { 174 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 175 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc); 176 JMSException jmsExc = new JMSException (exc.toString()); 177 jmsExc.setLinkedException(exc); 178 throw jmsExc; 179 } 180 } 181 182 public void setMultiThreadSync(int delay, int threshold) { 183 channel = new MultiThreadSyncChannel(channel, delay, threshold); 184 } 185 186 191 public void close() { 192 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 193 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "RequestMultiplexer.close()"); 194 195 synchronized (this) { 196 if (status == Status.CLOSE) 197 return; 198 setStatus(Status.CLOSE); 204 } 205 206 if (heartBeatTask != null) heartBeatTask.cancel(); 207 if (timer != null) timer.cancel(); 208 channel.close(); 209 demtpx.stop(); 210 211 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 212 JoramTracing.dbgClient.log( 213 BasicLevel.DEBUG, " -> requestsTable=" + requestsTable); 214 215 219 cleanup(); 220 } 221 222 227 public void cleanup() { 228 Integer [] requestIds; 231 synchronized (requestsTable) { 232 Set keySet = requestsTable.keySet(); 233 requestIds = new Integer [keySet.size()]; 234 keySet.toArray(requestIds); 235 } 236 for (int i = 0; i < requestIds.length; i++) { 237 ReplyListener rl = (ReplyListener) requestsTable.get(requestIds[i]); 238 if (rl != null) { 241 rl.replyAborted(requestIds[i].intValue()); 242 } 243 } 244 requestsTable.clear(); 245 } 246 247 252 public void abortRequest(int requestId) { 253 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 254 JoramTracing.dbgClient.log( 255 BasicLevel.DEBUG, 256 "RequestMultiplexer.abortRequest(" + requestId + ')'); 257 ReplyListener rl = doAbortRequest(requestId); 258 if (rl != null) { 259 rl.replyAborted(requestId); 260 } 261 } 262 263 private synchronized ReplyListener doAbortRequest(int requestId) { 264 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 265 JoramTracing.dbgClient.log( 266 BasicLevel.DEBUG, "RequestMultiplexer.doAbortRequest(" + 267 requestId + ')'); 268 if (status == Status.CLOSE) return null; 269 return (ReplyListener)requestsTable.remove( 270 new Integer (requestId)); 271 } 272 273 278 private void route(AbstractJmsReply reply) { 279 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 280 JoramTracing.dbgClient.log( 281 BasicLevel.DEBUG, 282 "RequestMultiplexer.route(" + reply + ')'); 283 int requestId = reply.getCorrelationId(); 284 Integer requestKey = new Integer (requestId); 285 ReplyListener rl = (ReplyListener)requestsTable.get(requestKey); 286 if (reply instanceof MomExceptionReply) { 287 MomExceptionReply excReply = (MomExceptionReply) reply; 288 int excType = excReply.getType(); 289 JMSException jmsExc = null; 290 if (excType == MomExceptionReply.AccessException) { 291 jmsExc = new JMSSecurityException (excReply.getMessage()); 292 } else if (excType == MomExceptionReply.DestinationException) { 293 jmsExc = new InvalidDestinationException (excReply.getMessage()); 294 } else { 295 jmsExc = new JMSException (excReply.getMessage()); 296 } 297 if (rl instanceof ErrorListener) { 298 ((ErrorListener)rl).errorReceived(requestId, jmsExc); 299 } else { 300 onException(jmsExc); 302 } 303 } else { 304 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 305 JoramTracing.dbgClient.log( 306 BasicLevel.DEBUG, " -> rl = " + rl + ')'); 307 if (rl != null) { 308 try { 309 if (rl.replyReceived(reply)) { 310 requestsTable.remove(requestKey); 311 } 312 } catch (AbortedRequestException exc) { 313 JoramTracing.dbgClient.log( 314 BasicLevel.WARN, 315 " -> Request aborted: " + requestId); 316 abortReply(reply); 317 } 318 } else { 319 if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN)) 320 JoramTracing.dbgClient.log( 321 BasicLevel.WARN, 322 " -> Listener not found for the reply: " + requestId); 323 abortReply(reply); 324 } 325 } 326 } 327 328 private void abortReply(AbstractJmsReply reply) { 329 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 330 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 331 "RequestMultiplexer.abortReply(" + reply + ')'); 332 if (reply instanceof ConsumerMessages) { 333 deny((ConsumerMessages)reply); 334 } 335 } 337 338 public void deny(ConsumerMessages messages) { 339 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 340 JoramTracing.dbgClient.log( 341 BasicLevel.DEBUG, "RequestMultiplexer.deny(" + 342 messages + ')'); 343 344 Vector msgList = messages.getMessages(); 345 Vector ids = new Vector (); 346 for (int i = 0; i < msgList.size(); i++) { 347 ids.addElement(((org.objectweb.joram.shared.messages.Message) msgList.elementAt(i)).id); 348 } 349 SessDenyRequest deny = new SessDenyRequest(messages.comesFrom(), 350 ids, 351 messages.getQueueMode()); 352 try { 353 sendRequest(deny); 354 } catch (JMSException exc) { 355 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 356 JoramTracing.dbgClient.log( 357 BasicLevel.DEBUG, "", exc); 358 } 361 } 362 363 class onExceptionRunner implements Runnable { 364 Exception exc; 365 366 onExceptionRunner(Exception exc) { 367 this.exc = exc; 368 } 369 370 public void run() { 371 onException(exc); 372 } 373 } 374 375 private void onException(Exception exc) { 376 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 377 JoramTracing.dbgClient.log( 378 BasicLevel.DEBUG, "RequestMultiplexer.onException(" + exc + ')'); 379 JMSException jmsExc; 380 if (exc instanceof JMSException ) { 381 jmsExc = (JMSException ) exc; 382 } else { 383 jmsExc = new IllegalStateException (exc.getMessage()); 384 } 385 if (exceptionListener != null) 386 exceptionListener.onException(jmsExc); 387 } 388 389 public void schedule(TimerTask task, 390 long period) { 391 if (timer != null) { 392 try { 393 timer.schedule(task, period); 394 } catch (Exception exc) { 395 if (JoramTracing.dbgClient.isLoggable(BasicLevel.ERROR)) 396 JoramTracing.dbgClient.log(BasicLevel.ERROR, "", exc); 397 } 398 } 399 } 400 401 public void setDemultiplexerDaemonName(String name) { 402 demtpx.setName(name); 403 } 404 405 public String getDemultiplexerDaemonName() { 406 return demtpx.getName(); 407 } 408 409 private class DemultiplexerDaemon extends fr.dyade.aaa.util.Daemon { 410 DemultiplexerDaemon() { 411 super("Connection#?"); 415 } 416 417 public void run() { 418 try { 419 loop: 420 while (running) { 421 canStop = true; 422 AbstractJmsReply reply; 423 try { 424 reply = channel.receive(); 425 } catch (Exception exc) { 426 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 427 JoramTracing.dbgClient.log(BasicLevel.DEBUG, 428 "Exception during receive", exc); 429 if (! isClosed()) { 434 RequestMultiplexer.this.close(); 435 Closer closer = new Closer(exc); 441 new Thread (closer).start(); 442 } else { 443 onExceptionRunner oer = new onExceptionRunner(exc); 447 new Thread (oer).start(); 448 } 449 450 break loop; 451 } 452 canStop = false; 453 route(reply); 454 } 455 } finally { 456 finish(); 457 } 458 } 459 460 463 public void stop() { 464 if (isCurrentThread()) { 465 finish(); 466 } else { 467 super.stop(); 468 } 469 } 470 471 protected void shutdown() {} 472 473 protected void close() {} 474 } 475 476 private class Closer implements Runnable { 477 private Exception exc; 478 479 Closer(Exception e) { 480 exc = e; 481 } 482 483 public void run() { 484 try { 485 RequestMultiplexer.this.cnx.close(); 486 } catch (JMSException exc2) { 487 if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN)) 488 JoramTracing.dbgClient.log(BasicLevel.WARN, 489 "Error during close", exc2); 490 } 491 492 onException(exc); 493 } 494 } 495 496 502 private class HeartBeatTask extends TimerTask { 503 504 private long heartBeat; 505 506 HeartBeatTask(long heartBeat) { 507 this.heartBeat = heartBeat; 508 } 509 510 public void run() { 511 try { 512 long date = System.currentTimeMillis(); 513 if ((date - lastRequestDate) > heartBeat) { 514 sendRequest(new PingRequest()); 515 } 516 } catch (Exception exc) { 517 if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG)) 518 JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc); 519 } 520 } 521 522 public void start() throws Exception { 523 timer.schedule(this, heartBeat, heartBeat); 524 } 525 } 526 527 528 529 } 530 | Popular Tags |