1 18 19 package org.apache.activemq; 20 21 import javax.jms.BytesMessage ; 22 import javax.jms.Connection ; 23 import javax.jms.DeliveryMode ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageListener ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.Topic ; 32 33 import org.apache.activemq.ActiveMQConnection; 34 import org.apache.activemq.ActiveMQConnectionFactory; 35 import org.apache.activemq.command.ActiveMQMessage; 36 import org.apache.activemq.command.ActiveMQQueue; 37 import org.apache.activemq.command.ActiveMQTopic; 38 import org.apache.activemq.util.IdGenerator; 39 40 import java.util.concurrent.atomic.AtomicInteger ; 41 42 45 public class LargeMessageTestSupport extends ClientTestSupport implements MessageListener { 46 47 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LargeMessageTestSupport.class); 48 49 protected static final int LARGE_MESSAGE_SIZE = 128 * 1024; 50 protected static final int MESSAGE_COUNT = 100; 51 protected Connection producerConnection; 52 protected Connection consumerConnection; 53 protected MessageConsumer consumer; 54 protected MessageProducer producer; 55 protected Session producerSession; 56 protected Session consumerSession; 57 protected byte[] largeMessageData; 58 protected Destination destination; 59 protected boolean isTopic = true; 60 protected boolean isDurable = true; 61 protected int deliveryMode = DeliveryMode.PERSISTENT; 62 protected IdGenerator idGen = new IdGenerator(); 63 protected boolean validMessageConsumption = true; 64 protected AtomicInteger messageCount = new AtomicInteger (0); 65 66 protected int prefetchValue = 10000000; 67 68 protected Destination createDestination() { 69 String subject = getClass().getName(); 70 if (isTopic) { 71 return new ActiveMQTopic(subject); 72 } 73 else { 74 return new ActiveMQQueue(subject); 75 } 76 } 77 78 protected MessageConsumer createConsumer() throws JMSException { 79 if (isTopic && isDurable) { 80 return consumerSession.createDurableSubscriber((Topic ) destination, idGen.generateId()); 81 } 82 else { 83 return consumerSession.createConsumer(destination); 84 } 85 } 86 87 public void setUp() throws Exception { 88 super.setUp(); 89 ClientTestSupport.removeMessageStore(); 90 log.info("Setting up . . . . . "); 91 messageCount.set(0); 92 93 destination = createDestination(); 94 largeMessageData = new byte[LARGE_MESSAGE_SIZE]; 95 for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) { 96 if (i % 2 == 0) { 97 largeMessageData[i] = 'a'; 98 } 99 else { 100 largeMessageData[i] = 'z'; 101 } 102 } 103 104 try { 105 Thread.sleep(1000); } 107 catch (InterruptedException e) { 108 throw new JMSException (e.getMessage()); 109 } 110 111 ActiveMQConnectionFactory fac = getConnectionFactory(); 112 producerConnection = fac.createConnection(); 113 setPrefetchPolicy((ActiveMQConnection) producerConnection); 114 producerConnection.start(); 115 116 consumerConnection = fac.createConnection(); 117 setPrefetchPolicy((ActiveMQConnection) consumerConnection); 118 consumerConnection.setClientID(idGen.generateId()); 119 consumerConnection.start(); 120 producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 121 producer = producerSession.createProducer(createDestination()); 122 producer.setDeliveryMode(deliveryMode); 123 consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 124 consumer = createConsumer(); 125 consumer.setMessageListener(this); 126 log.info("Setup complete"); 127 } 128 129 protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) { 130 activeMQConnection.getPrefetchPolicy().setTopicPrefetch(prefetchValue); 131 activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue); 132 activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue); 133 activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue); 134 activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue); 135 } 136 137 public void tearDown() throws Exception { 138 Thread.sleep(1000); 139 producerConnection.close(); 140 consumerConnection.close(); 141 142 super.tearDown(); 143 144 largeMessageData = null; 145 } 146 147 protected boolean isSame(BytesMessage msg1) throws Exception { 148 boolean result = false; 149 ((ActiveMQMessage) msg1).setReadOnlyBody(true); 150 151 for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) { 152 result = msg1.readByte() == largeMessageData[i]; 153 if (!result) 154 break; 155 } 156 157 return result; 158 } 159 160 public void onMessage(Message msg) { 161 try { 162 BytesMessage ba = (BytesMessage ) msg; 163 validMessageConsumption &= isSame(ba); 164 assertTrue(ba.getBodyLength() == LARGE_MESSAGE_SIZE); 165 if (messageCount.incrementAndGet() >= MESSAGE_COUNT) { 166 synchronized (messageCount) { 167 messageCount.notify(); 168 } 169 } 170 log.info("got message = " + messageCount); 171 if (messageCount.get() % 50 == 0) { 172 log.info("count = " + messageCount); 173 } 174 } 175 catch (Exception e) { 176 e.printStackTrace(); 177 } 178 } 179 180 public void testLargeMessages() throws Exception { 181 for (int i = 0; i < MESSAGE_COUNT; i++) { 182 log.info("Sending message: " + i); 183 BytesMessage msg = producerSession.createBytesMessage(); 184 msg.writeBytes(largeMessageData); 185 producer.send(msg); 186 } 187 long now = System.currentTimeMillis(); 188 while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) { 189 log.info("message count = " + messageCount); 190 synchronized (messageCount) { 191 messageCount.wait(1000); 192 } 193 } 194 log.info("Finished count = " + messageCount); 195 assertTrue("Not enough messages - expected " + MESSAGE_COUNT + " but got " + messageCount, messageCount.get() == MESSAGE_COUNT); 196 assertTrue("received messages are not valid", validMessageConsumption); 197 Thread.sleep(1000); 198 log.info("FINAL count = " + messageCount); 199 } 200 } 201 | Popular Tags |