1 21 package com.presumo.jms.client; 22 23 import com.presumo.jms.message.AckHelper; 24 import com.presumo.jms.message.JmsBytesMessage; 25 import com.presumo.jms.message.JmsMessage; 26 import com.presumo.jms.message.JmsMapMessage; 27 import com.presumo.jms.message.JmsObjectMessage; 28 import com.presumo.jms.message.JmsStreamMessage; 29 import com.presumo.jms.message.JmsTextMessage; 30 import com.presumo.jms.message.MessageStateListener; 31 32 import com.presumo.jms.resources.Resources; 33 import com.presumo.jms.router.Router; 34 import com.presumo.jms.router.RouterAdapter; 35 import com.presumo.jms.router.RoutingTarget; 36 import com.presumo.jms.selector.Parser; 37 import com.presumo.jms.selector.JmsOperand; 38 import com.presumo.jms.plugin.implementation.MemoryMessageQueue; 39 import com.presumo.util.log.Logger; 40 import com.presumo.util.log.LoggerFactory; 41 42 import java.io.IOException ; 43 import java.io.Serializable ; 44 import java.util.BitSet ; 45 import java.util.ArrayList ; 46 47 import javax.jms.BytesMessage ; 48 import javax.jms.Destination ; 49 import javax.jms.ExceptionListener ; 50 import javax.jms.IllegalStateException ; 51 import javax.jms.JMSException ; 52 import javax.jms.MapMessage ; 53 import javax.jms.Message ; 54 import javax.jms.MessageListener ; 55 import javax.jms.ObjectMessage ; 56 import javax.jms.Session ; 57 import javax.jms.StreamMessage ; 58 import javax.jms.TextMessage ; 59 60 61 68 public abstract class JmsSession extends RouterAdapter 69 implements Session , RoutingTarget, MessageStateListener 70 { 71 72 static final int QUEUE_RECEIVER_CRT = 0; 75 static final int QUEUE_RECEIVER_CLOSE = 1; 76 static final int QUEUE_BROWSER_CRT = 2; 77 static final int QUEUE_BROWSER_CLOSE = 3; 78 static final int DURABLE_SUBSCRIBER_CRT = 4; 79 static final int DURABLE_SUBSCRIBER_CLOSE = 5; 80 static final int DURABLE_SUBSCRIBER_DELETE = 6; 81 82 83 84 protected final boolean transacted; 85 86 87 protected final int acknowledgeMode; 88 89 90 protected int asynchCount; 91 92 93 protected volatile boolean closed; 94 95 96 protected final Router router; 97 98 99 protected final JmsConnection connx; 100 101 102 protected final Parser parser = Parser.getInstance(); 103 104 105 protected JmsOperand joinedFilter; 106 107 108 private final ArrayList consumers = new ArrayList (0); 109 110 111 private int numOfConsumers; 112 113 114 private final ArrayList producers = new ArrayList (0); 115 116 private final ArrayList outgoingMsgs; 117 private final ArrayList unacknowledgedMsgs; 118 119 public JmsSession(Router router, 123 boolean transacted, 124 int acknowledgeMode, 125 JmsConnection connx) 126 throws JMSException 127 { 128 super(new MemoryMessageQueue(), 1, "JmsSession Router"); 129 130 logger.entry("JmsSession"); 131 132 this.router = router; 133 this.transacted = transacted; 134 this.acknowledgeMode = acknowledgeMode; 135 this.connx = connx; 136 137 unacknowledgedMsgs = new ArrayList (); 138 if (transacted == true) { 139 outgoingMsgs = new ArrayList (); 140 } else { 141 outgoingMsgs = null; 142 } 143 144 145 150 logger.exit("JmsSession"); 151 } 152 153 157 public final BytesMessage createBytesMessage() throws JMSException 158 { 159 return new JmsBytesMessage(router.getName()); 160 } 161 162 163 public final MapMessage createMapMessage() throws JMSException 164 { 165 return new JmsMapMessage(router.getName()); 166 } 167 168 169 public final Message createMessage() throws JMSException 170 { 171 return new JmsMessage(router.getName()); 172 } 173 174 175 public final ObjectMessage createObjectMessage() throws JMSException 176 { 177 return new JmsObjectMessage(router.getName()); 178 } 179 180 181 public final ObjectMessage createObjectMessage(Serializable object) 182 throws JMSException 183 { 184 ObjectMessage msg = new JmsObjectMessage(router.getName()); 185 msg.setObject(object); 186 return msg; 187 } 188 189 190 public final StreamMessage createStreamMessage() throws JMSException 191 { 192 return new JmsStreamMessage(router.getName()); 193 } 194 195 196 public final TextMessage createTextMessage() throws JMSException 197 { 198 return new JmsTextMessage(router.getName()); 199 } 200 201 202 public final TextMessage createTextMessage(String text) throws JMSException 203 { 204 TextMessage msg = new JmsTextMessage(router.getName()); 205 msg.setText(text); 206 return msg; 207 } 208 209 public final boolean getTransacted() throws JMSException 210 { 211 return this.transacted; 212 } 213 214 215 public final void commit() throws JMSException 216 { 217 if (!transacted) { 218 throw new IllegalStateException ("Commit called on non-transacted session"); 219 } 220 221 synchronized(unacknowledgedMsgs) { 223 for (int i=0; i < unacknowledgedMsgs.size(); i++) { 224 JmsMessage msg = (JmsMessage) unacknowledgedMsgs.get(i); 225 msg.getAckHelper().routedAck(this); 226 } 227 } 228 229 JmsMessage [] msgs = new JmsMessage[outgoingMsgs.size()]; 231 msgs = (JmsMessage[]) outgoingMsgs.toArray(msgs); 232 try { 233 router.routeMessages(msgs); 234 } catch(IOException ioe) { 235 JMSException jmsex = new JMSException (""); 236 jmsex.setLinkedException(ioe); 237 throw jmsex; 238 } 239 240 outgoingMsgs.clear(); 241 242 synchronized(unacknowledgedMsgs) { 244 while(unacknowledgedMsgs.size() != 0) { 245 try { 246 unacknowledgedMsgs.wait(3000); 247 } catch (InterruptedException ie) {} 248 } 249 } 250 } 251 252 public final void rollback() 253 throws JMSException 254 { 255 if (!transacted) { 256 throw new IllegalStateException ("rollback called on non-transacted session"); 257 } 258 259 outgoingMsgs.clear(); 260 JmsMessage [] msgs = new JmsMessage[unacknowledgedMsgs.size()]; 261 msgs = (JmsMessage[])unacknowledgedMsgs.toArray(msgs); 262 unacknowledgedMsgs.clear(); 263 try { 264 queueMessages(msgs); 265 } catch (IOException ioe) { 266 ioe.printStackTrace(); 268 } 269 270 } 271 272 273 public final void recover() throws JMSException 274 { 275 if (transacted) { 276 throw new IllegalStateException ("Recover called on transacted session"); 277 } 278 279 if (unacknowledgedMsgs != null) { 280 JmsMessage [] msgs = new JmsMessage[unacknowledgedMsgs.size()]; 281 msgs = (JmsMessage[]) unacknowledgedMsgs.toArray(msgs); 282 unacknowledgedMsgs.clear(); 283 try { 284 queueMessages(msgs); 285 } catch (IOException ioe) { 286 ioe.printStackTrace(); 288 } 289 } 290 } 291 292 293 public void close() throws JMSException 294 { 295 logger.entry("close"); 296 297 if (!closed) { 298 super.stopRouter(); 299 super.closeRouter(); 300 301 closed = true; 302 synchronized (consumers) { 303 while( consumers.size() > 0) { 304 ( (JmsMessageConsumer) consumers.get(consumers.size()-1) ).close(); 305 } 306 } 307 308 synchronized (producers) { 309 while( producers.size() > 0) { 310 ( (JmsMessageProducer) producers.get(producers.size()-1) ).close(); 311 } 312 } 313 connx.removeSession(this); 314 } 315 logger.exit("close"); 316 } 317 318 319 public MessageListener getMessageListener() throws JMSException 320 { 321 throw new JMSException ("Operation not supported"); 322 } 323 324 public void setMessageListener(MessageListener listener) throws JMSException 325 { 326 throw new JMSException ("Operation not supported"); 327 } 328 329 330 public void acknowledge() 331 { 332 logger.entry("acknowledge"); 333 synchronized (unacknowledgedMsgs) { 334 335 Object [] acks = unacknowledgedMsgs.toArray(); 338 for (int i=0; i < acks.length; i++) { 339 JmsMessage msg = (JmsMessage) acks[i]; 340 logger.debug("acknowledge - Acking message: " + msg.toString()); 341 msg.getAckHelper().routedAck(this); 342 } 343 344 if (acknowledgeMode != Session.DUPS_OK_ACKNOWLEDGE) { 345 while(unacknowledgedMsgs.size() != 0) { 346 try { 347 logger.debug("acknowledge - waiting for deletion acks"); 348 unacknowledgedMsgs.wait(3000); 349 } catch (InterruptedException ie) {} 350 } 351 } 352 } 353 logger.exit("acknowledge"); 354 } 355 356 357 361 364 public final JmsMessage takeMessage(JmsMessage msg) 365 { 366 if (logger.isDebugEnabled()) 367 logger.entry("takeMessage " + msg.toString()); 368 369 boolean cloned = false; 370 synchronized (consumers) 371 { 372 int size = consumers.size(); 373 BitSet routingMask = null;; 374 for (int i=0; i < size; ++i) { 375 JmsMessageConsumer c = (JmsMessageConsumer) consumers.get(i); 376 if (parser.evaluate(c.getFilter(), msg)) { 377 if (!cloned) { 378 msg = (JmsMessage) msg.clone(); 379 cloned = true; 380 routingMask = new BitSet (size); 381 } 382 routingMask.set(i); 383 } 384 } 385 if (cloned) { 386 msg.setRoutingMask(routingMask); 387 388 if (msg.getJMSDeliveryMode() == javax.jms.DeliveryMode.PERSISTENT) { 390 AckHelper ack = msg.getAckHelper(); 391 ack.addDeletionListener(this); 392 ack.safeAck(this); 393 394 if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) { 395 msg.setSessionCallback(this); 396 } 397 } 398 399 try { 400 queueMessage(msg); 401 } catch (IOException ioe) { 402 ioe.printStackTrace(); 404 } 405 } 406 } 407 408 if (logger.isDebugEnabled()) 409 logger.exit("takeMessage", new Boolean (cloned)); 410 return msg; 411 } 412 413 416 public final JmsOperand getRoutingFilter() 417 { 418 synchronized(consumers) { 419 return joinedFilter; 420 } 421 } 422 423 426 public final void setRemoteRoutingFilter(JmsOperand filter, boolean add) 427 { 428 } 431 432 435 public final void setTargetID(int id) 436 { 437 } 438 439 442 public final boolean needsFilterUpdates() 443 { 444 return false; 445 } 446 447 448 449 453 public final void messageRouted(JmsMessage msg) 454 { 455 } 456 457 public final void messageDeleted(JmsMessage msg) 458 { 459 logger.entry("messageDeleted", msg); 460 461 synchronized (unacknowledgedMsgs) { 462 463 logger.debug("messageDeleted - unacks size: " + unacknowledgedMsgs.size()); 464 unacknowledgedMsgs.remove(msg); 465 if (unacknowledgedMsgs.size() == 0) { 466 logger.debug("messageDelted - notifyall"); 467 unacknowledgedMsgs.notifyAll(); 468 } else { 469 logger.debug("messageDeleted - unacks size: " + unacknowledgedMsgs.size()); 470 } 471 472 } 473 logger.exit("messageDeleted"); 474 } 475 476 477 481 482 final void start() 483 { 484 logger.entry("start"); 485 486 this.startRouter(); 487 488 logger.exit("start"); 489 } 490 491 final void stop() 492 { 493 logger.entry("stop"); 494 495 this.stopRouter(); 496 497 logger.exit("stop"); 498 } 499 500 501 final void send(JmsMessage msg) throws JMSException 502 { 503 if (transacted) { 504 outgoingMsgs.add(msg); 505 } else { 506 try { 507 router.routeMessage(msg); 508 } catch(IOException ioe) { 509 JMSException jmsex = new JMSException (""); 510 jmsex.setLinkedException(ioe); 511 throw jmsex; 512 } 513 } 514 } 515 516 final boolean autoAcknowledge() 517 { 518 return ( (!transacted) && 519 ((acknowledgeMode == Session.AUTO_ACKNOWLEDGE) || 520 (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)) ); 521 } 522 523 final void addProducer(JmsMessageProducer producer) 524 { 525 synchronized (producers) { 526 producers.add(producer); 527 } 528 } 529 530 final void removeProducer(JmsMessageProducer producer) 531 { 532 synchronized (producers) { 533 producers.remove(producer); 534 } 535 } 536 537 540 void addConsumer(JmsMessageConsumer consumer) 541 { 542 synchronized (consumers) { 543 consumers.add(consumer); 544 ++numOfConsumers; 545 this.recalculateJoinedFilter(); 546 if (numOfConsumers == 1) 547 router.addTarget(this); 548 else 549 router.recalculateFilters(true); 550 } 551 } 552 553 556 void removeConsumer(JmsMessageConsumer consumer) 557 { 558 synchronized (consumers) { 559 560 int index = consumers.indexOf(consumer); 561 if (index == consumers.size()-1) 562 consumers.remove(consumer); 563 else 564 consumers.set(index, null); --numOfConsumers; 567 this.recalculateJoinedFilter(); 568 if (numOfConsumers == 0) 569 router.removeTarget(this); 570 else 571 router.recalculateFilters(false); 572 } 573 } 574 575 578 final void reportException(Exception e) 579 { 580 logger.exception(e); 581 ExceptionListener el = null; 582 try { 583 el = connx.getExceptionListener(); 584 } catch (JMSException jmsex) {} 585 586 if (el != null) { 587 JMSException jmsex; 588 if (e instanceof JMSException ) { 589 jmsex = (JMSException ) e; 590 } else { 591 jmsex = new JMSException ("An exception within the connection: " + 592 e.toString()); 593 jmsex.setLinkedException(e); 594 } 595 el.onException(jmsex); 596 } 597 } 598 599 603 final void addAsynch() 604 { 605 ++asynchCount; 606 } 607 608 612 final void removeAsynch() 613 { 614 --asynchCount; 615 } 616 617 621 final boolean hasAsynchronousListeners() 622 { 623 return asynchCount != 0; 624 } 625 626 627 631 final void sendQueueRequest(String queueName, 632 String receiverID, 633 String filter, 634 int type) 635 throws JMSException 636 { 637 throw new RuntimeException ("Not implemented"); 638 } 639 640 644 648 protected final void routeMessages(int batchSize) 649 { 650 logger.entry("routeMessages"); 651 652 synchronized (consumers) { 653 int i, j; 654 JmsMessage [] msgs = null; 655 try { 656 msgs = super.getNext(batchSize); 657 } catch (IOException ioe) { 658 ioe.printStackTrace(); 660 } 661 662 if (msgs == null) return; 663 for (i=0; i < msgs.length; i++) { 664 665 JmsMessage msg = msgs[i]; 666 667 logger.debug("routeMessages: Processing message: " + msg); 668 BitSet routingMask = msg.getRoutingMask(); 669 670 if (msg.getJMSDeliveryMode() == javax.jms.DeliveryMode.PERSISTENT) { 671 synchronized(unacknowledgedMsgs) { 673 unacknowledgedMsgs.add(msg); 674 boolean msgSent = false; 675 for (j = 0; j < consumers.size(); j++) { 676 if (routingMask.get(j)) { 677 JmsMessageConsumer target = (JmsMessageConsumer) consumers.get(j); 678 if (target != null) { target.takeMessage(msg); 680 msgSent = true; 681 } 682 } 683 } 684 685 if (!msgSent) { 688 unacknowledgedMsgs.remove(msg); 689 msg.getAckHelper().routedAck(this); 690 } 691 } 692 } 693 else { 694 for (j = 0; j < consumers.size(); j++) { 696 if (routingMask.get(j)) { 697 JmsMessageConsumer target = (JmsMessageConsumer) consumers.get(j); 698 if (target != null) { target.takeMessage(msg); 700 } 701 } 702 } 703 } 704 705 706 } } 709 logger.exit("routeMessages"); 710 } 711 712 715 protected void finalize() throws Throwable 716 { 717 this.close(); 718 } 719 720 724 private void recalculateJoinedFilter() 725 { 726 if (numOfConsumers == 0) { 727 joinedFilter = null; 728 return; 729 } 730 731 JmsOperand [] allFilters = new JmsOperand[numOfConsumers]; 732 for (int i=0, j=0; i < consumers.size(); ++i) { 733 JmsMessageConsumer c = (JmsMessageConsumer) consumers.get(i); 734 if (c != null) { 735 allFilters[j] = c.getFilter(); 736 ++j; 737 } 738 } 739 joinedFilter = parser.orTogether(allFilters); 740 } 741 742 744 private static Logger logger = 745 LoggerFactory.getLogger(JmsSession.class, Resources.getBundle()); 746 747 749 } 750 751 752 753 | Popular Tags |