1 18 package org.apache.activemq; 19 20 import javax.jms.Connection ; 21 import javax.jms.Message ; 22 import javax.jms.MessageConsumer ; 23 import javax.jms.MessageListener ; 24 import javax.jms.MessageProducer ; 25 import javax.jms.Session ; 26 import javax.jms.Topic ; 27 28 import org.apache.activemq.TestSupport; 29 30 33 public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener { 34 35 private Connection connection; 36 private Session publisherSession; 37 private Session consumerSession; 38 private MessageConsumer consumer; 39 private MessageConsumer testConsumer; 40 private MessageProducer producer; 41 private Topic topic; 42 private Object lock = new Object (); 43 44 47 protected void setUp() throws Exception { 48 super.setUp(); 49 super.topic = true; 50 connection = createConnection(); 51 connection.setClientID("connection:" + getSubject()); 52 publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 53 consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 54 topic = (Topic )super.createDestination("Test.Topic"); 55 consumer = consumerSession.createConsumer(topic); 56 consumer.setMessageListener(this); 57 producer = publisherSession.createProducer(topic); 58 connection.start(); 59 } 60 61 64 protected void tearDown() throws Exception { 65 super.tearDown(); 66 connection.close(); 67 } 68 69 70 75 public void testCreateConsumer() throws Exception { 76 Message msg = super.createMessage(); 77 producer.send(msg); 78 if (testConsumer == null){ 79 synchronized(lock){ 80 lock.wait(3000); 81 } 82 } 83 assertTrue(testConsumer != null); 84 } 85 86 91 public void onMessage(Message message) { 92 try { 93 testConsumer = consumerSession.createConsumer(topic); 94 MessageProducer anotherProducer = consumerSession.createProducer(topic); 95 synchronized(lock){ 96 lock.notify(); 97 } 98 }catch (Exception ex){ 99 ex.printStackTrace(); 100 assertTrue(false); 101 } 102 } 103 } 104 | Popular Tags |