1 18 package org.apache.activemq.usecases; 19 20 import org.apache.activemq.ActiveMQConnectionFactory; 21 import org.apache.activemq.test.TestSupport; 22 23 import javax.jms.Connection ; 24 import javax.jms.DeliveryMode ; 25 import javax.jms.Message ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.Session ; 28 import javax.jms.TextMessage ; 29 import javax.jms.Topic ; 30 import javax.jms.TopicSubscriber ; 31 32 36 public class SubscribeClosePublishThenConsumeTest extends TestSupport { 37 38 public void testDurableTopic() throws Exception { 39 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://locahost"); 40 41 String topicName = "TestTopic"; 42 String clientID = getName(); 43 String subscriberName = "MySubscriber:"+System.currentTimeMillis(); 44 45 Connection connection = connectionFactory.createConnection(); 46 connection.setClientID(clientID); 47 48 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 49 Topic topic = session.createTopic(topicName); 50 51 TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriberName); 54 connection.start(); 55 56 topic = null; 57 subscriber.close(); 58 subscriber = null; 59 session.close(); 60 session = null; 61 62 Connection t = connectionFactory.createConnection(); 65 connection.close(); 66 connection = t; 67 68 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 69 topic = session.createTopic(topicName); 70 MessageProducer producer = session.createProducer(topic); 71 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 72 TextMessage textMessage = session.createTextMessage("Hello World"); 73 producer.send(textMessage); 74 textMessage = null; 75 76 topic = null; 77 session.close(); 78 session = null; 79 80 t = connectionFactory.createConnection(); 83 connection.close(); 84 connection = t; 85 86 connection.setClientID(clientID); 87 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 88 topic = session.createTopic(topicName); 89 90 subscriber = session.createDurableSubscriber(topic, subscriberName); 91 connection.start(); 92 93 log.info("Started connection - now about to try receive the textMessage"); 94 95 long time = System.currentTimeMillis(); 96 Message message = subscriber.receive(15000L); 97 long elapsed = System.currentTimeMillis() - time; 98 99 log.info("Waited for: " + elapsed + " millis"); 100 101 assertNotNull("Should have received the message we published by now", message); 102 assertTrue("should be text textMessage", message instanceof TextMessage ); 103 textMessage = (TextMessage ) message; 104 assertEquals("Hello World", textMessage.getText()); 105 } 106 } 107 | Popular Tags |