1 18 package org.apache.activemq.test.retroactive; 19 20 import org.apache.activemq.ActiveMQConnectionFactory; 21 import org.apache.activemq.EmbeddedBrokerTestSupport; 22 import org.apache.activemq.broker.BrokerFactory; 23 import org.apache.activemq.broker.BrokerService; 24 import org.apache.activemq.util.MessageIdList; 25 import org.apache.activemq.xbean.BrokerFactoryBean; 26 import org.springframework.core.io.ClassPathResource; 27 28 import javax.jms.Connection ; 29 import javax.jms.ConnectionFactory ; 30 import javax.jms.MessageConsumer ; 31 import javax.jms.MessageProducer ; 32 import javax.jms.Session ; 33 import javax.jms.TextMessage ; 34 35 import java.net.URI ; 36 import java.util.Date ; 37 38 42 public class RetroactiveConsumerWithMessageQueryTest extends EmbeddedBrokerTestSupport { 43 protected int messageCount = 20; 44 protected Connection connection; 45 protected Session session; 46 47 public void testConsumeAndReceiveInitialQueryBeforeUpdates() throws Exception { 48 49 connection = createConnection(); 51 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 52 connection.start(); 53 54 MessageConsumer consumer = session.createConsumer(destination); 55 MessageIdList listener = new MessageIdList(); 56 listener.setVerbose(true); 57 consumer.setMessageListener(listener); 58 59 MessageProducer producer = session.createProducer(destination); 60 int updateMessageCount = messageCount - DummyMessageQuery.messageCount; 61 for (int i = 0; i < updateMessageCount; i++) { 62 TextMessage message = session.createTextMessage("Update Message: " + i + " sent at: " + new Date ()); 63 producer.send(message); 64 } 65 producer.close(); 66 log.info("Sent: " + updateMessageCount + " update messages"); 67 68 listener.assertMessagesReceived(messageCount); 69 } 70 71 protected void setUp() throws Exception { 72 useTopic = true; 73 bindAddress = "vm://localhost"; 74 super.setUp(); 75 } 76 77 protected void tearDown() throws Exception { 78 if (session != null) { 79 session.close(); 80 session = null; 81 } 82 if (connection != null) { 83 connection.close(); 84 } 85 super.tearDown(); 86 } 87 88 protected ConnectionFactory createConnectionFactory() throws Exception { 89 ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(bindAddress); 90 answer.setUseRetroactiveConsumer(true); 91 return answer; 92 } 93 94 protected BrokerService createBroker() throws Exception { 95 String uri = getBrokerXml(); 96 log.info("Loading broker configuration from the classpath with URI: " + uri); 97 return BrokerFactory.createBroker(new URI ("xbean:"+uri)); 98 } 99 100 protected void startBroker() throws Exception { 101 } 103 104 protected String getBrokerXml() { 105 return "org/apache/activemq/test/retroactive/activemq-message-query.xml"; 106 } 107 108 } 109 | Popular Tags |