1 18 package org.apache.activemq.broker; 19 20 import javax.jms.DeliveryMode ; 21 22 import junit.framework.Test; 23 24 import org.apache.activemq.command.ActiveMQDestination; 25 import org.apache.activemq.command.ConnectionInfo; 26 import org.apache.activemq.command.ConsumerInfo; 27 import org.apache.activemq.command.LocalTransactionId; 28 import org.apache.activemq.command.Message; 29 import org.apache.activemq.command.MessageAck; 30 import org.apache.activemq.command.ProducerInfo; 31 import org.apache.activemq.command.SessionInfo; 32 import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy; 33 import org.apache.activemq.broker.region.policy.PolicyMap; 34 import org.apache.activemq.broker.region.policy.PolicyEntry; 35 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 36 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; 37 38 public class MessageExpirationTest extends BrokerTestSupport { 39 40 public ActiveMQDestination destination; 41 public int deliveryMode; 42 public int prefetch; 43 public byte destinationType; 44 public boolean durableConsumer; 45 46 protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode, int timeToLive) { 47 Message message = createMessage(producerInfo, destination, deliveryMode); 48 long now = System.currentTimeMillis(); 49 message.setTimestamp(now); 50 message.setExpiration(now+timeToLive); 51 return message; 52 } 53 54 55 public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() { 56 addCombinationValues( "deliveryMode", new Object []{ 57 new Integer (DeliveryMode.NON_PERSISTENT), 58 new Integer (DeliveryMode.PERSISTENT)} ); 59 addCombinationValues( "destinationType", new Object []{ 60 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 61 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE), 62 new Byte (ActiveMQDestination.QUEUE_TYPE), 63 new Byte (ActiveMQDestination.TOPIC_TYPE), 64 } ); 65 } 66 67 @Override 68 protected BrokerService createBroker() throws Exception { 69 BrokerService broker = new BrokerService(); 70 broker.setPersistent(false); 71 return broker; 72 } 73 74 protected PolicyEntry getDefaultPolicy() { 75 PolicyEntry policy = super.getDefaultPolicy(); 76 policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); 78 return policy; 79 } 80 81 public void testMessagesWaitingForUssageDecreaseExpire() throws Exception { 82 83 final StubConnection connection = createConnection(); 85 ConnectionInfo connectionInfo = createConnectionInfo(); 86 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 87 final ProducerInfo producerInfo = createProducerInfo(sessionInfo); 88 connection.send(connectionInfo); 89 connection.send(sessionInfo); 90 connection.send(producerInfo); 91 92 93 final StubConnection connection2 = createConnection(); 95 ConnectionInfo connectionInfo2 = createConnectionInfo(); 96 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 97 connection2.send(connectionInfo2); 98 connection2.send(sessionInfo2); 99 100 destination = createDestinationInfo(connection2, connectionInfo2, destinationType); 101 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 102 consumerInfo2.setPrefetchSize(1); 103 connection2.request(consumerInfo2); 104 105 broker.getMemoryManager().setLimit(1); 107 108 final Message m1 = createMessage(producerInfo, destination, deliveryMode); 109 final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000); 110 final Message m3 = createMessage(producerInfo, destination, deliveryMode); 111 final Message m4 = createMessage(producerInfo, destination, deliveryMode, 1000); 112 113 new Thread () { 115 public void run() { 116 try { 118 connection.send(m1); 119 connection.send(m2); 120 connection.send(m3); 121 connection.send(m4); 122 } catch (Exception e) { 123 e.printStackTrace(); 124 } 125 } 126 }.start(); 127 128 129 Message m = receiveMessage(connection2); 131 assertNotNull(m); 132 assertEquals(m1.getMessageId(), m.getMessageId()); 133 assertNoMessagesLeft(connection); 134 135 Thread.sleep(1500); 137 connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE)); 138 139 m = receiveMessage(connection2); 141 assertNotNull(m); 142 assertEquals(m3.getMessageId(), m.getMessageId()); 143 144 Thread.sleep(1500); 146 connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE)); 147 148 assertNoMessagesLeft(connection2); 150 151 connection.send(closeConnectionInfo(connectionInfo)); 152 connection.send(closeConnectionInfo(connectionInfo2)); 153 } 154 155 156 public void initCombosForTestMessagesInLongTransactionExpire() { 157 addCombinationValues( "deliveryMode", new Object []{ 158 new Integer (DeliveryMode.NON_PERSISTENT), 159 new Integer (DeliveryMode.PERSISTENT)} ); 160 addCombinationValues( "destinationType", new Object []{ 161 new Byte (ActiveMQDestination.QUEUE_TYPE), 162 new Byte (ActiveMQDestination.TOPIC_TYPE), 163 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 164 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) 165 } ); 166 } 167 168 public void testMessagesInLongTransactionExpire() throws Exception { 169 170 StubConnection connection = createConnection(); 172 ConnectionInfo connectionInfo = createConnectionInfo(); 173 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 174 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 175 connection.send(connectionInfo); 176 connection.send(sessionInfo); 177 connection.send(producerInfo); 178 179 destination = createDestinationInfo(connection, connectionInfo, destinationType); 180 181 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 182 consumerInfo.setPrefetchSize(1000); 183 connection.send(consumerInfo); 184 185 LocalTransactionId txid = createLocalTransaction(sessionInfo); 187 connection.send(createBeginTransaction(connectionInfo, txid)); 188 189 Message m1 = createMessage(producerInfo, destination, deliveryMode); 191 m1.setTransactionId(txid); 192 connection.send(m1); 193 Message m = createMessage(producerInfo, destination, deliveryMode, 1000); 194 m.setTransactionId(txid); 195 connection.send(m); 196 Message m3 = createMessage(producerInfo, destination, deliveryMode); 197 m3.setTransactionId(txid); 198 connection.send(m3); 199 m = createMessage(producerInfo, destination, deliveryMode, 1000); 200 m.setTransactionId(txid); 201 connection.send(m); 202 203 Thread.sleep(1500); 205 connection.send(createCommitTransaction1Phase(connectionInfo, txid)); 206 207 m = receiveMessage(connection); 208 assertNotNull(m); 209 assertEquals(m1.getMessageId(), m.getMessageId()); 210 connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); 211 212 m = receiveMessage(connection); 214 assertNotNull(m); 215 assertEquals(m3.getMessageId(), m.getMessageId()); 216 connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); 217 218 assertNoMessagesLeft(connection); 220 221 connection.send(closeConnectionInfo(connectionInfo)); 222 } 223 224 225 public void TestMessagesInSubscriptionPendingListExpire() { 226 addCombinationValues( "deliveryMode", new Object []{ 227 new Integer (DeliveryMode.NON_PERSISTENT), 228 new Integer (DeliveryMode.PERSISTENT)} ); 229 addCombinationValues( "destinationType", new Object []{ 230 new Byte (ActiveMQDestination.QUEUE_TYPE), 231 new Byte (ActiveMQDestination.TOPIC_TYPE), 232 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 233 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) 234 } ); 235 } 236 237 public void initCombosForTestMessagesInSubscriptionPendingListExpire() { 238 addCombinationValues( "deliveryMode", new Object []{ 239 new Integer (DeliveryMode.NON_PERSISTENT), 240 new Integer (DeliveryMode.PERSISTENT)} ); 241 addCombinationValues( "destinationType", new Object []{ 242 new Byte (ActiveMQDestination.QUEUE_TYPE), 243 new Byte (ActiveMQDestination.TOPIC_TYPE), 244 new Byte (ActiveMQDestination.TEMP_QUEUE_TYPE), 245 new Byte (ActiveMQDestination.TEMP_TOPIC_TYPE) 246 } ); 247 } 248 249 public void testMessagesInSubscriptionPendingListExpire() throws Exception { 250 251 StubConnection connection = createConnection(); 253 ConnectionInfo connectionInfo = createConnectionInfo(); 254 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 255 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 256 connection.send(connectionInfo); 257 connection.send(sessionInfo); 258 connection.send(producerInfo); 259 260 destination = createDestinationInfo(connection, connectionInfo, destinationType); 261 262 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 263 consumerInfo.setPrefetchSize(1); 264 connection.send(consumerInfo); 265 266 Message m1 = createMessage(producerInfo, destination, deliveryMode); 268 connection.send(m1); 269 connection.send(createMessage(producerInfo, destination, deliveryMode, 1000)); 270 Message m3 = createMessage(producerInfo, destination, deliveryMode); 271 connection.send(m3); 272 connection.send(createMessage(producerInfo, destination, deliveryMode, 1000)); 273 274 Message m = receiveMessage(connection); 276 assertNotNull(m); 277 assertEquals(m1.getMessageId(), m.getMessageId()); 278 assertNoMessagesLeft(connection); 279 280 Thread.sleep(1500); 282 connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); 283 284 m = receiveMessage(connection); 286 assertNotNull(m); 287 assertEquals(m3.getMessageId(), m.getMessageId()); 288 connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); 289 290 assertNoMessagesLeft(connection); 292 293 connection.send(closeConnectionInfo(connectionInfo)); 294 } 295 296 public static Test suite() { 297 return suite(MessageExpirationTest.class); 298 } 299 300 public static void main(String [] args) { 301 junit.textui.TestRunner.run(suite()); 302 } 303 304 } 305 | Popular Tags |