1 22 package org.jboss.resource.adapter.jms; 23 24 import java.io.Serializable ; 25 import java.util.HashSet ; 26 import java.util.Iterator ; 27 28 import javax.jms.BytesMessage ; 29 import javax.jms.Destination ; 30 import javax.jms.IllegalStateException ; 31 import javax.jms.JMSException ; 32 import javax.jms.MessageListener ; 33 import javax.jms.MapMessage ; 34 import javax.jms.Message ; 35 import javax.jms.MessageConsumer ; 36 import javax.jms.MessageProducer ; 37 import javax.jms.ObjectMessage ; 38 import javax.jms.Queue ; 39 import javax.jms.QueueBrowser ; 40 import javax.jms.QueueReceiver ; 41 import javax.jms.QueueSender ; 42 import javax.jms.QueueSession ; 43 import javax.jms.Session ; 44 import javax.jms.StreamMessage ; 45 import javax.jms.TemporaryQueue ; 46 import javax.jms.TemporaryTopic ; 47 import javax.jms.TextMessage ; 48 import javax.jms.Topic ; 49 import javax.jms.TopicPublisher ; 50 import javax.jms.TopicSession ; 51 import javax.jms.TopicSubscriber ; 52 import javax.resource.spi.ConnectionEvent ; 53 54 import org.jboss.logging.Logger; 55 56 64 public class JmsSession implements Session , QueueSession , TopicSession 65 { 66 private static final Logger log = Logger.getLogger(JmsSession.class); 67 68 69 private JmsManagedConnection mc; 71 72 private JmsConnectionRequestInfo info; 73 74 75 private JmsSessionFactory sf; 76 77 78 private HashSet consumers = new HashSet (); 79 80 81 private HashSet producers = new HashSet (); 82 83 84 private boolean trace = log.isTraceEnabled(); 85 86 91 public JmsSession(final JmsManagedConnection mc, JmsConnectionRequestInfo info) 92 { 93 this.mc = mc; 94 this.info = info; 95 if (trace) 96 log.trace("new JmsSession " + this + " mc=" + mc + " cri=" + info); 97 } 98 99 public void setJmsSessionFactory(JmsSessionFactory sf) 100 { 101 this.sf = sf; 102 } 103 104 111 Session getSession() throws JMSException 112 { 113 if (mc == null) 115 throw new IllegalStateException ("The session is closed"); 116 117 Session session = mc.getSession(); 118 if (trace) 119 log.trace("getSession " + session + " for " + this); 120 return session; 121 } 122 123 125 public BytesMessage createBytesMessage() throws JMSException 126 { 127 Session session = getSession(); 128 if (trace) 129 log.trace("createBytesMessage" + session); 130 return session.createBytesMessage(); 131 } 132 133 public MapMessage createMapMessage() throws JMSException 134 { 135 Session session = getSession(); 136 if (trace) 137 log.trace("createMapMessage" + session); 138 return session.createMapMessage(); 139 } 140 141 public Message createMessage() throws JMSException 142 { 143 Session session = getSession(); 144 if (trace) 145 log.trace("createMessage" + session); 146 return session.createMessage(); 147 } 148 149 public ObjectMessage createObjectMessage() throws JMSException 150 { 151 Session session = getSession(); 152 if (trace) 153 log.trace("createObjectMessage" + session); 154 return session.createObjectMessage(); 155 } 156 157 public ObjectMessage createObjectMessage(Serializable object) throws JMSException 158 { 159 Session session = getSession(); 160 if (trace) 161 log.trace("createObjectMessage(Object)" + session); 162 return session.createObjectMessage(object); 163 } 164 165 public StreamMessage createStreamMessage() throws JMSException 166 { 167 Session session = getSession(); 168 if (trace) 169 log.trace("createStreamMessage" + session); 170 return session.createStreamMessage(); 171 } 172 173 public TextMessage createTextMessage() throws JMSException 174 { 175 Session session = getSession(); 176 if (trace) 177 log.trace("createTextMessage" + session); 178 return session.createTextMessage(); 179 } 180 181 public TextMessage createTextMessage(String string) throws JMSException 182 { 183 Session session = getSession(); 184 if (trace) 185 log.trace("createTextMessage(String)" + session); 186 return session.createTextMessage(string); 187 } 188 189 190 public boolean getTransacted() throws JMSException 191 { 192 getSession(); return info.isTransacted(); 194 } 195 196 201 public MessageListener getMessageListener() throws JMSException 202 { 203 throw new IllegalStateException ("Method not allowed"); 204 } 205 206 211 public void setMessageListener(MessageListener listener) throws JMSException 212 { 213 throw new IllegalStateException ("Method not allowed"); 214 } 215 216 221 public void run() 222 { 223 throw new Error ("Method not allowed"); 225 } 226 227 233 public void close() throws JMSException 234 { 235 sf.closeSession(this); 236 closeSession(); 237 } 238 239 public void commit() throws JMSException 241 { 242 Session session = getSession(); 243 if (info.isTransacted() == false) 244 throw new IllegalStateException ("Session is not transacted"); 245 if (trace) 246 log.trace("Commit session " + this); 247 session.commit(); 248 } 249 250 public void rollback() throws JMSException 251 { 252 Session session = getSession(); 253 if (info.isTransacted() == false) 254 throw new IllegalStateException ("Session is not transacted"); 255 if (trace) 256 log.trace("Rollback session " + this); 257 session.rollback(); 258 } 259 260 public void recover() throws JMSException 261 { 262 Session session = getSession(); 263 if (info.isTransacted()) 264 throw new IllegalStateException ("Session is transacted"); 265 if (trace) 266 log.trace("Recover session " + this); 267 session.recover(); 268 } 269 270 272 public Topic createTopic(String topicName) throws JMSException 273 { 274 Session session = getSession(); 275 if (trace) 276 log.trace("createTopic " + session + " topicName=" + topicName); 277 Topic result = session.createTopic(topicName); 278 if (trace) 279 log.trace("createdTopic " + session + " topic=" + result); 280 return result; 281 } 282 283 public TopicSubscriber createSubscriber(Topic topic) throws JMSException 284 { 285 TopicSession session = getTopicSession(); 286 if (trace) 287 log.trace("createSubscriber " + session + " topic=" + topic); 288 TopicSubscriber result = session.createSubscriber(topic); 289 result = new JmsTopicSubscriber(result, this); 290 if (trace) 291 log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result); 292 addConsumer(result); 293 return result; 294 } 295 296 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException 297 { 298 TopicSession session = getTopicSession(); 299 if (trace) 300 log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal); 301 TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal); 302 result = new JmsTopicSubscriber(result, this); 303 if (trace) 304 log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result); 305 addConsumer(result); 306 return result; 307 } 308 309 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException 310 { 311 TopicSession session = getTopicSession(); 312 if (trace) 313 log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name); 314 TopicSubscriber result = session.createDurableSubscriber(topic, name); 315 result = new JmsTopicSubscriber(result, this); 316 if (trace) 317 log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result); 318 addConsumer(result); 319 return result; 320 } 321 322 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) 323 throws JMSException 324 { 325 Session session = getSession(); 326 if (trace) 327 log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal); 328 TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal); 329 result = new JmsTopicSubscriber(result, this); 330 if (trace) 331 log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result); 332 addConsumer(result); 333 return result; 334 } 335 336 public TopicPublisher createPublisher(Topic topic) throws JMSException 337 { 338 TopicSession session = getTopicSession(); 339 if (trace) 340 log.trace("createPublisher " + session + " topic=" + topic); 341 TopicPublisher result = session.createPublisher(topic); 342 if (trace) 343 log.trace("createdPublisher " + session + " publisher=" + result); 344 addProducer(result); 345 return result; 346 } 347 348 public TemporaryTopic createTemporaryTopic() throws JMSException 349 { 350 Session session = getSession(); 351 if (trace) 352 log.trace("createTemporaryTopic " + session); 353 TemporaryTopic temp = session.createTemporaryTopic(); 354 if (trace) 355 log.trace("createdTemporaryTopic " + session + " temp=" + temp); 356 sf.addTemporaryTopic(temp); 357 return temp; 358 } 359 360 public void unsubscribe(String name) throws JMSException 361 { 362 Session session = getSession(); 363 if (trace) 364 log.trace("unsubscribe " + session + " name=" + name); 365 session.unsubscribe(name); 366 } 367 368 370 public QueueBrowser createBrowser(Queue queue) throws JMSException 371 { 372 Session session = getSession(); 373 if (trace) 374 log.trace("createBrowser " + session + " queue=" + queue); 375 QueueBrowser result = session.createBrowser(queue); 376 if (trace) 377 log.trace("createdBrowser " + session + " browser=" + result); 378 return result; 379 } 380 381 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException 382 { 383 Session session = getSession(); 384 if (trace) 385 log.trace("createBrowser " + session + " queue=" + queue + " selector=" + messageSelector); 386 QueueBrowser result = session.createBrowser(queue, messageSelector); 387 if (trace) 388 log.trace("createdBrowser " + session + " browser=" + result); 389 return result; 390 } 391 392 public Queue createQueue(String queueName) throws JMSException 393 { 394 Session session = getSession(); 395 if (trace) 396 log.trace("createQueue " + session + " queueName=" + queueName); 397 Queue result = session.createQueue(queueName); 398 if (trace) 399 log.trace("createdQueue " + session + " queue=" + result); 400 return result; 401 } 402 403 public QueueReceiver createReceiver(Queue queue) throws JMSException 404 { 405 QueueSession session = getQueueSession(); 406 if (trace) 407 log.trace("createReceiver " + session + " queue=" + queue); 408 QueueReceiver result = session.createReceiver(queue); 409 result = new JmsQueueReceiver(result, this); 410 if (trace) 411 log.trace("createdReceiver " + session + " receiver=" + result); 412 addConsumer(result); 413 return result; 414 } 415 416 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException 417 { 418 QueueSession session = getQueueSession(); 419 if (trace) 420 log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector); 421 QueueReceiver result = session.createReceiver(queue, messageSelector); 422 result = new JmsQueueReceiver(result, this); 423 if (trace) 424 log.trace("createdReceiver " + session + " receiver=" + result); 425 addConsumer(result); 426 return result; 427 } 428 429 public QueueSender createSender(Queue queue) throws JMSException 430 { 431 QueueSession session = getQueueSession(); 432 if (trace) 433 log.trace("createSender " + session + " queue=" + queue); 434 QueueSender result = session.createSender(queue); 435 if (trace) 436 log.trace("createdSender " + session + " sender=" + result); 437 addProducer(result); 438 return result; 439 } 440 441 public TemporaryQueue createTemporaryQueue() throws JMSException 442 { 443 Session session = getSession(); 444 if (trace) 445 log.trace("createTemporaryQueue " + session); 446 TemporaryQueue temp = session.createTemporaryQueue(); 447 if (trace) 448 log.trace("createdTemporaryQueue " + session + " temp=" + temp); 449 sf.addTemporaryQueue(temp); 450 return temp; 451 } 452 453 455 public MessageConsumer createConsumer(Destination destination) throws JMSException 456 { 457 Session session = getSession(); 458 if (trace) 459 log.trace("createConsumer " + session + " dest=" + destination); 460 MessageConsumer result = session.createConsumer(destination); 461 result = new JmsMessageConsumer(result, this); 462 if (trace) 463 log.trace("createdConsumer " + session + " consumer=" + result); 464 addConsumer(result); 465 return result; 466 } 467 468 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException 469 { 470 Session session = getSession(); 471 if (trace) 472 log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector); 473 MessageConsumer result = session.createConsumer(destination, messageSelector); 474 result = new JmsMessageConsumer(result, this); 475 if (trace) 476 log.trace("createdConsumer " + session + " consumer=" + result); 477 addConsumer(result); 478 return result; 479 } 480 481 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) 482 throws JMSException 483 { 484 Session session = getSession(); 485 if (trace) 486 log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal); 487 MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal); 488 result = new JmsMessageConsumer(result, this); 489 if (trace) 490 log.trace("createdConsumer " + session + " consumer=" + result); 491 addConsumer(result); 492 return result; 493 } 494 495 public MessageProducer createProducer(Destination destination) throws JMSException 496 { 497 Session session = getSession(); 498 if (trace) 499 log.trace("createProducer " + session + " dest=" + destination); 500 MessageProducer result = getSession().createProducer(destination); 501 if (trace) 502 log.trace("createdProducer " + session + " producer=" + result); 503 addProducer(result); 504 return result; 505 } 506 507 public int getAcknowledgeMode() throws JMSException 508 { 509 getSession(); return info.getAcknowledgeMode(); 511 } 512 513 515 void setManagedConnection(final JmsManagedConnection mc) 516 { 517 if (this.mc != null) 518 this.mc.removeHandle(this); 519 this.mc = mc; 520 } 521 522 void destroy() 523 { 524 mc = null; 525 } 526 527 void start() throws JMSException 528 { 529 if (mc != null) 530 mc.start(); 531 } 532 533 void stop() throws JMSException 534 { 535 if (mc != null) 536 mc.stop(); 537 } 538 539 void checkStrict() throws JMSException 540 { 541 if (mc != null && mc.getManagedConnectionFactory().isStrict()) 542 throw new IllegalStateException (JmsSessionFactory.ISE); 543 } 544 545 void closeSession() throws JMSException 546 { 547 if (mc != null) 548 { 549 log.trace("Closing session"); 550 551 try 552 { 553 mc.stop(); 554 } 555 catch (Throwable t) 556 { 557 log.trace("Error stopping managed connection", t); 558 } 559 560 synchronized (consumers) 561 { 562 for (Iterator i = consumers.iterator(); i.hasNext();) 563 { 564 JmsMessageConsumer consumer = (JmsMessageConsumer) i.next(); 565 try 566 { 567 consumer.closeConsumer(); 568 } 569 catch (Throwable t) 570 { 571 log.trace("Error closing consumer", t); 572 } 573 i.remove(); 574 } 575 } 576 577 synchronized (producers) 578 { 579 for (Iterator i = producers.iterator(); i.hasNext();) 580 { 581 MessageProducer producer = (MessageProducer ) i.next(); 582 try 583 { 584 producer.close(); 585 } 586 catch (Throwable t) 587 { 588 log.trace("Error closing producer", t); 589 } 590 i.remove(); 591 } 592 } 593 594 mc.removeHandle(this); 595 ConnectionEvent ev = new ConnectionEvent (mc, ConnectionEvent.CONNECTION_CLOSED); 596 ev.setConnectionHandle(this); 597 mc.sendEvent(ev); 598 mc = null; 599 } 600 } 601 602 void addConsumer(MessageConsumer consumer) 603 { 604 synchronized (consumers) 605 { 606 consumers.add(consumer); 607 } 608 } 609 610 void removeConsumer(MessageConsumer consumer) 611 { 612 synchronized (consumers) 613 { 614 consumers.remove(consumer); 615 } 616 } 617 618 void addProducer(MessageProducer producer) 619 { 620 synchronized (producers) 621 { 622 producers.add(producer); 623 } 624 } 625 626 void removeProducer(MessageProducer producer) 627 { 628 synchronized (producers) 629 { 630 producers.remove(producer); 631 } 632 } 633 634 QueueSession getQueueSession() throws JMSException 635 { 636 return (QueueSession ) getSession(); 637 } 638 639 TopicSession getTopicSession() throws JMSException 640 { 641 return (TopicSession ) getSession(); 642 } 643 } 644 | Popular Tags |