1 14 15 package org.apache.activemq.broker.region.cursors; 16 17 import java.util.ArrayList ; 18 import java.util.List ; 19 import java.util.Properties ; 20 import java.util.concurrent.CountDownLatch ; 21 import java.util.concurrent.TimeUnit ; 22 import javax.jms.Connection ; 23 import javax.jms.ConnectionFactory ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 import junit.framework.TestCase; 33 import org.apache.activemq.ActiveMQConnectionFactory; 34 import org.apache.activemq.broker.BrokerService; 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 38 41 public abstract class CursorSupport extends TestCase{ 42 43 protected static final Log log=LogFactory.getLog(CursorSupport.class); 44 protected static final int MESSAGE_COUNT=500; 45 protected static final int PREFETCH_SIZE=50; 46 protected BrokerService broker; 47 protected String bindAddress="tcp://localhost:60706"; 48 49 protected abstract Destination getDestination(Session session) throws JMSException ; 50 51 protected abstract MessageConsumer getConsumer(Connection connection) throws Exception ; 52 53 protected abstract void configureBroker(BrokerService answer) throws Exception ; 54 55 public void testSendFirstThenConsume() throws Exception { 56 ConnectionFactory factory=createConnectionFactory(); 57 Connection consumerConnection=getConsumerConnection(factory); 58 MessageConsumer consumer=getConsumer(consumerConnection); 59 consumerConnection.close(); 60 Connection producerConnection=factory.createConnection(); 61 producerConnection.start(); 62 Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); 63 MessageProducer producer=session.createProducer(getDestination(session)); 64 List senderList=new ArrayList (); 65 for(int i=0;i<MESSAGE_COUNT;i++){ 66 Message msg=session.createTextMessage("test"+i); 67 senderList.add(msg); 68 producer.send(msg); 69 } 70 producerConnection.close(); 71 consumerConnection=getConsumerConnection(factory); 73 consumer=getConsumer(consumerConnection); 75 List consumerList=new ArrayList (); 76 for(int i=0;i<MESSAGE_COUNT;i++){ 77 Message msg=consumer.receive(); 78 consumerList.add(msg); 79 } 80 assertEquals(senderList,consumerList); 81 consumerConnection.close(); 82 } 83 84 public void testSendWhilstConsume() throws Exception { 85 ConnectionFactory factory=createConnectionFactory(); 86 Connection consumerConnection=getConsumerConnection(factory); 87 MessageConsumer consumer=getConsumer(consumerConnection); 89 consumerConnection.close(); 90 Connection producerConnection=factory.createConnection(); 91 producerConnection.start(); 92 Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); 93 MessageProducer producer=session.createProducer(getDestination(session)); 94 List senderList=new ArrayList (); 95 for(int i=0;i<MESSAGE_COUNT/10;i++){ 96 TextMessage msg=session.createTextMessage("test"+i); 97 senderList.add(msg); 98 producer.send(msg); 99 } 100 consumerConnection=getConsumerConnection(factory); 102 consumer=getConsumer(consumerConnection); 104 final List consumerList=new ArrayList (); 105 final CountDownLatch latch=new CountDownLatch (1); 106 consumer.setMessageListener(new MessageListener (){ 107 108 public void onMessage(Message msg){ 109 try{ 110 Thread.sleep(50); 114 }catch(Exception e){ 115 e.printStackTrace(); 117 } 118 consumerList.add(msg); 119 if(consumerList.size()==MESSAGE_COUNT){ 120 latch.countDown(); 121 } 122 } 123 }); 124 for(int i=MESSAGE_COUNT/10;i<MESSAGE_COUNT;i++){ 125 TextMessage msg=session.createTextMessage("test"+i); 126 senderList.add(msg); 127 producer.send(msg); 128 } 129 latch.await(300000,TimeUnit.MILLISECONDS); 130 producerConnection.close(); 131 consumerConnection.close(); 132 assertEquals("Still dipatching - count down latch not sprung",latch.getCount(),0); 133 assertEquals("cosumerList - expected: "+MESSAGE_COUNT+" but was: "+consumerList.size(),consumerList.size(), 134 senderList.size()); 135 for (int i =0; i < senderList.size(); i++) { 136 Message sent = (Message )senderList.get(i); 137 Message consumed = (Message )consumerList.get(i); 138 if (!sent.equals(consumed)) { 139 System.err.println("BAD MATCH AT POS " + i); 140 System.err.println(sent); 141 System.err.println(consumed); 142 148 } 149 assertEquals("This should be the same at pos " + i + " in the list",sent.getJMSMessageID(),consumed.getJMSMessageID()); 150 } 151 } 152 153 protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { 154 Connection connection=fac.createConnection(); 155 connection.setClientID("testConsumer"); 156 connection.start(); 157 return connection; 158 } 159 160 protected void setUp() throws Exception { 161 if(broker==null){ 162 broker=createBroker(); 163 } 164 super.setUp(); 165 } 166 167 protected void tearDown() throws Exception { 168 super.tearDown(); 169 if(broker!=null){ 170 broker.stop(); 171 } 172 } 173 174 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 175 ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress); 176 Properties props=new Properties (); 177 props.setProperty("prefetchPolicy.durableTopicPrefetch",""+PREFETCH_SIZE); 178 props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch",""+PREFETCH_SIZE); 179 props.setProperty("prefetchPolicy.queuePrefetch",""+PREFETCH_SIZE); 180 cf.setProperties(props); 181 return cf; 182 } 183 184 protected BrokerService createBroker() throws Exception { 185 BrokerService answer=new BrokerService(); 186 configureBroker(answer); 187 answer.start(); 188 return answer; 189 } 190 } 191 | Popular Tags |