1 18 package org.apache.activemq; 19 20 import java.util.Date ; 21 22 import javax.jms.Connection ; 23 import javax.jms.DeliveryMode ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Session ; 30 import javax.jms.Topic ; 31 32 35 public class JmsSendReceiveWithMessageExpirationTest extends TestSupport { 36 37 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 38 .getLog(JmsSendReceiveWithMessageExpirationTest.class); 39 40 protected int messageCount = 100; 41 protected String [] data; 42 protected Session session; 43 protected Destination consumerDestination; 44 protected Destination producerDestination; 45 protected boolean durable = false; 46 protected int deliveryMode = DeliveryMode.PERSISTENT; 47 protected long timeToLive = 5000; 48 protected boolean verbose = false; 49 50 protected Connection connection; 51 52 protected void setUp() throws Exception { 53 54 super.setUp(); 55 56 data = new String [messageCount]; 57 58 for (int i = 0; i < messageCount; i++) { 59 data[i] = "Text for message: " + i + " at " + new Date (); 60 } 61 62 connectionFactory = createConnectionFactory(); 63 connection = createConnection(); 64 65 if (durable) { 66 connection.setClientID(getClass().getName()); 67 } 68 69 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 70 } 71 72 77 public void testConsumeExpiredQueue() throws Exception { 78 79 MessageProducer producer = createProducer(timeToLive); 80 81 consumerDestination = session.createQueue(getConsumerSubject()); 82 producerDestination = session.createQueue(getProducerSubject()); 83 84 MessageConsumer consumer = createConsumer(); 85 connection.start(); 86 87 for (int i = 0; i < data.length; i++) { 88 Message message = session.createTextMessage(data[i]); 89 message.setStringProperty("stringProperty",data[i]); 90 message.setIntProperty("intProperty",i); 91 92 if (verbose) { 93 if (log.isDebugEnabled()) { 94 log.debug("About to send a queue message: " + message + " with text: " + data[i]); 95 } 96 } 97 98 producer.send(producerDestination, message); 99 } 100 101 Thread.sleep(timeToLive + 1000); 104 105 assertNull(consumer.receive(1000)); 107 } 108 109 114 public void testConsumeQueue() throws Exception { 115 116 MessageProducer producer = createProducer(0); 117 118 consumerDestination = session.createQueue(getConsumerSubject()); 119 producerDestination = session.createQueue(getProducerSubject()); 120 121 MessageConsumer consumer = createConsumer(); 122 connection.start(); 123 124 for (int i = 0; i < data.length; i++) { 125 Message message = session.createTextMessage(data[i]); 126 message.setStringProperty("stringProperty",data[i]); 127 message.setIntProperty("intProperty",i); 128 129 if (verbose) { 130 if (log.isDebugEnabled()) { 131 log.debug("About to send a queue message: " + message + " with text: " + data[i]); 132 } 133 } 134 135 producer.send(producerDestination, message); 136 } 137 138 assertNotNull(consumer.receive(1000)); 140 } 141 142 147 public void testConsumeExpiredTopic() throws Exception { 148 149 MessageProducer producer = createProducer(timeToLive); 150 151 consumerDestination = session.createTopic(getConsumerSubject()); 152 producerDestination = session.createTopic(getProducerSubject()); 153 154 MessageConsumer consumer = createConsumer(); 155 connection.start(); 156 157 for (int i = 0; i < data.length; i++) { 158 Message message = session.createTextMessage(data[i]); 159 message.setStringProperty("stringProperty",data[i]); 160 message.setIntProperty("intProperty",i); 161 162 if (verbose) { 163 if (log.isDebugEnabled()) { 164 log.debug("About to send a topic message: " + message + " with text: " + data[i]); 165 } 166 } 167 168 producer.send(producerDestination, message); 169 } 170 171 Thread.sleep(timeToLive + 1000); 174 175 assertNull(consumer.receive(1000)); 177 } 178 179 184 public void testConsumeTopic() throws Exception { 185 186 MessageProducer producer = createProducer(0); 187 188 consumerDestination = session.createTopic(getConsumerSubject()); 189 producerDestination = session.createTopic(getProducerSubject()); 190 191 MessageConsumer consumer = createConsumer(); 192 connection.start(); 193 194 for (int i = 0; i < data.length; i++) { 195 Message message = session.createTextMessage(data[i]); 196 message.setStringProperty("stringProperty",data[i]); 197 message.setIntProperty("intProperty",i); 198 199 if (verbose) { 200 if (log.isDebugEnabled()) { 201 log.debug("About to send a topic message: " + message + " with text: " + data[i]); 202 } 203 } 204 205 producer.send(producerDestination, message); 206 } 207 208 assertNotNull(consumer.receive(1000)); 210 } 211 212 213 214 protected MessageProducer createProducer(long timeToLive) throws JMSException { 215 MessageProducer producer = session.createProducer(null); 216 producer.setDeliveryMode(deliveryMode); 217 producer.setTimeToLive(timeToLive); 218 219 return producer; 220 } 221 222 protected MessageConsumer createConsumer() throws JMSException { 223 if (durable) { 224 log.info("Creating durable consumer"); 225 return session.createDurableSubscriber((Topic ) consumerDestination, getName()); 226 } 227 return session.createConsumer(consumerDestination); 228 } 229 230 protected void tearDown() throws Exception { 231 log.info("Dumping stats..."); 232 log.info("Closing down connection"); 233 234 session.close(); 235 connection.close(); 236 } 237 238 } 239 | Popular Tags |