1 18 package org.apache.activemq.test.retroactive; 19 20 import java.net.URI ; 21 import java.util.Date ; 22 23 import javax.jms.Connection ; 24 import javax.jms.ConnectionFactory ; 25 import javax.jms.MessageConsumer ; 26 import javax.jms.MessageProducer ; 27 import javax.jms.Session ; 28 import javax.jms.TextMessage ; 29 import javax.jms.JMSException ; 30 31 import org.apache.activemq.ActiveMQConnectionFactory; 32 import org.apache.activemq.EmbeddedBrokerTestSupport; 33 import org.apache.activemq.broker.BrokerFactory; 34 import org.apache.activemq.broker.BrokerService; 35 import org.apache.activemq.util.MessageIdList; 36 37 41 public class RetroactiveConsumerTestWithSimpleMessageListTest extends EmbeddedBrokerTestSupport { 42 protected int messageCount = 20; 43 protected Connection connection; 44 protected Session session; 45 46 public void testSendThenConsume() throws Exception { 47 48 connection = createConnection(); 50 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 51 MessageProducer producer = createProducer(); 52 for (int i = 0; i < messageCount; i++) { 53 TextMessage message = session.createTextMessage("Message: " + i + " sent at: " + new Date ()); 54 sendMessage(producer, message); 55 } 56 producer.close(); 57 session.close(); 58 connection.close(); 59 60 connection = createConnection(); 61 connection.start(); 62 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 63 64 MessageConsumer consumer = createConsumer(); 65 MessageIdList listener = new MessageIdList(); 66 consumer.setMessageListener(listener); 67 listener.waitForMessagesToArrive(messageCount); 68 listener.assertMessagesReceived(messageCount); 69 70 } 71 72 protected void setUp() throws Exception { 73 useTopic = true; 74 bindAddress = "vm://localhost"; 75 super.setUp(); 76 } 77 78 protected void tearDown() throws Exception { 79 if (session != null) { 80 session.close(); 81 session = null; 82 } 83 if (connection != null) { 84 connection.close(); 85 } 86 super.tearDown(); 87 } 88 89 protected ConnectionFactory createConnectionFactory() throws Exception { 90 ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(bindAddress); 91 answer.setUseRetroactiveConsumer(true); 92 return answer; 93 } 94 95 protected BrokerService createBroker() throws Exception { 96 String uri = getBrokerXml(); 97 log.info("Loading broker configuration from the classpath with URI: " + uri); 98 return BrokerFactory.createBroker(new URI ("xbean:"+uri)); 99 } 100 101 protected void startBroker() throws Exception { 102 } 104 105 protected String getBrokerXml() { 106 return "org/apache/activemq/test/retroactive/activemq-fixed-buffer.xml"; 107 } 108 109 110 protected MessageProducer createProducer() throws JMSException { 111 return session.createProducer(destination); 112 } 113 114 protected void sendMessage(MessageProducer producer, TextMessage message) throws JMSException { 115 producer.send(message); 116 } 117 118 protected MessageConsumer createConsumer() throws JMSException { 119 return session.createConsumer(destination); 120 } 121 } 122 | Popular Tags |