1 18 package org.apache.activemq.broker; 19 20 import javax.jms.DeliveryMode ; 21 22 import junit.framework.Test; 23 24 import org.apache.activemq.command.ActiveMQDestination; 25 import org.apache.activemq.command.ActiveMQQueue; 26 import org.apache.activemq.command.ActiveMQTopic; 27 import org.apache.activemq.command.ConnectionInfo; 28 import org.apache.activemq.command.ConsumerInfo; 29 import org.apache.activemq.command.LocalTransactionId; 30 import org.apache.activemq.command.Message; 31 import org.apache.activemq.command.MessageAck; 32 import org.apache.activemq.command.ProducerInfo; 33 import org.apache.activemq.command.SessionInfo; 34 35 40 public class RecoveryBrokerTest extends BrokerRestartTestSupport { 41 42 public void testConsumedQueuePersistentMessagesLostOnRestart() throws Exception { 43 44 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 45 46 StubConnection connection = createConnection(); 48 ConnectionInfo connectionInfo = createConnectionInfo(); 49 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 50 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 51 connection.send(connectionInfo); 52 connection.send(sessionInfo); 53 connection.send(producerInfo); 54 55 for( int i=0; i < 4; i++) { 56 Message message = createMessage(producerInfo, destination); 57 message.setPersistent(true); 58 connection.send(message); 59 } 60 61 connection = createConnection(); 63 connectionInfo = createConnectionInfo(); 64 sessionInfo = createSessionInfo(connectionInfo); 65 connection.send(connectionInfo); 66 connection.send(sessionInfo); 67 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 68 connection.send(consumerInfo); 69 70 for( int i=0; i < 4 ; i++ ) { 72 Message m2 = receiveMessage(connection); 73 assertNotNull(m2); 74 } 75 76 restartBroker(); 78 79 Message m = receiveMessage(connection); 81 assertNull(m); 82 } 83 84 public void testQueuePersistentUncommitedMessagesLostOnRestart() throws Exception { 85 86 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 87 88 StubConnection connection = createConnection(); 90 ConnectionInfo connectionInfo = createConnectionInfo(); 91 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 92 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 93 connection.send(connectionInfo); 94 connection.send(sessionInfo); 95 connection.send(producerInfo); 96 97 LocalTransactionId txid = createLocalTransaction(sessionInfo); 99 connection.send(createBeginTransaction(connectionInfo, txid)); 100 101 for( int i=0; i < 4; i++) { 102 Message message = createMessage(producerInfo, destination); 103 message.setPersistent(true); 104 message.setTransactionId(txid); 105 connection.send(message); 106 } 107 108 110 restartBroker(); 112 113 connection = createConnection(); 115 connectionInfo = createConnectionInfo(); 116 sessionInfo = createSessionInfo(connectionInfo); 117 connection.send(connectionInfo); 118 connection.send(sessionInfo); 119 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 120 connection.send(consumerInfo); 121 122 Message m = receiveMessage(connection); 124 assertNull(m); 125 } 126 127 public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception { 128 129 ActiveMQDestination destination = new ActiveMQTopic("TEST"); 130 131 StubConnection connection1 = createConnection(); 133 ConnectionInfo connectionInfo1 = createConnectionInfo(); 134 connectionInfo1.setClientId("A"); 135 SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); 136 ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); 137 connection1.send(connectionInfo1); 138 connection1.send(sessionInfo1); 139 connection1.send(producerInfo1); 140 141 ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); 143 consumerInfo1.setSubscriptionName("test"); 144 consumerInfo1.setPrefetchSize(100); 145 connection1.send(consumerInfo1); 146 147 connection1.send(closeConsumerInfo(consumerInfo1)); 149 150 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 152 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 153 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 154 connection1.send(createMessage(producerInfo1, destination, DeliveryMode.PERSISTENT)); 155 connection1.request(closeConnectionInfo(connectionInfo1)); 156 restartBroker(); 158 159 StubConnection connection2 = createConnection(); 161 ConnectionInfo connectionInfo2 = createConnectionInfo(); 162 connectionInfo2.setClientId("A"); 163 SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); 164 connection2.send(connectionInfo2); 165 connection2.send(sessionInfo2); 166 167 ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); 169 consumerInfo2.setSubscriptionName("test"); 170 consumerInfo2.setPrefetchSize(100); 171 connection2.send(consumerInfo2); 172 173 for( int i=0; i < 4 ; i++ ) { 175 Message m2 = receiveMessage(connection2); 176 assertNotNull(m2); 177 } 178 assertNoMessagesLeft(connection2); 179 } 180 181 public void testQueuePersistentMessagesNotLostOnRestart() throws Exception { 182 183 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 184 185 StubConnection connection = createConnection(); 187 ConnectionInfo connectionInfo = createConnectionInfo(); 188 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 189 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 190 connection.send(connectionInfo); 191 connection.send(sessionInfo); 192 connection.send(producerInfo); 193 Message message = createMessage(producerInfo, destination); 194 message.setPersistent(true); 195 connection.send(message); 196 connection.request(closeConnectionInfo(connectionInfo)); 197 198 restartBroker(); 200 201 connection = createConnection(); 203 connectionInfo = createConnectionInfo(); 204 sessionInfo = createSessionInfo(connectionInfo); 205 connection.send(connectionInfo); 206 connection.send(sessionInfo); 207 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 208 connection.send(consumerInfo); 209 210 Message m = receiveMessage(connection); 212 assertNotNull("Should have received a message by now!", m); 213 assertEquals( m.getMessageId(), message.getMessageId() ); 214 } 215 216 public void testQueueNonPersistentMessagesLostOnRestart() throws Exception { 217 218 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 219 220 StubConnection connection = createConnection(); 222 ConnectionInfo connectionInfo = createConnectionInfo(); 223 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 224 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 225 connection.send(connectionInfo); 226 connection.send(sessionInfo); 227 connection.send(producerInfo); 228 Message message = createMessage(producerInfo, destination); 229 message.setPersistent(false); 230 connection.send(message); 231 232 restartBroker(); 234 235 connection = createConnection(); 237 connectionInfo = createConnectionInfo(); 238 sessionInfo = createSessionInfo(connectionInfo); 239 connection.send(connectionInfo); 240 connection.send(sessionInfo); 241 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 242 connection.send(consumerInfo); 243 244 assertNoMessagesLeft(connection); 246 } 247 248 public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception { 249 250 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 251 252 StubConnection connection = createConnection(); 254 ConnectionInfo connectionInfo = createConnectionInfo(); 255 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 256 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 257 connection.send(connectionInfo); 258 connection.send(sessionInfo); 259 connection.send(producerInfo); 260 261 LocalTransactionId txid = createLocalTransaction(sessionInfo); 263 connection.send(createBeginTransaction(connectionInfo, txid)); 264 265 for( int i=0; i < 4; i++) { 266 Message message = createMessage(producerInfo, destination); 267 message.setPersistent(true); 268 message.setTransactionId(txid); 269 connection.send(message); 270 } 271 272 connection.send(createCommitTransaction1Phase(connectionInfo, txid)); 274 connection.request(closeConnectionInfo(connectionInfo)); 275 restartBroker(); 277 278 connection = createConnection(); 280 connectionInfo = createConnectionInfo(); 281 sessionInfo = createSessionInfo(connectionInfo); 282 connection.send(connectionInfo); 283 connection.send(sessionInfo); 284 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 285 connection.send(consumerInfo); 286 287 for( int i=0; i < 4 ;i ++ ) { 288 Message m = receiveMessage(connection); 289 assertNotNull(m); 290 } 291 292 assertNoMessagesLeft(connection); 293 } 294 295 public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { 296 297 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 298 299 StubConnection connection = createConnection(); 301 ConnectionInfo connectionInfo = createConnectionInfo(); 302 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 303 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 304 connection.send(connectionInfo); 305 connection.send(sessionInfo); 306 connection.send(producerInfo); 307 308 for( int i=0; i < 4; i++) { 309 Message message = createMessage(producerInfo, destination); 310 message.setPersistent(true); 311 connection.send(message); 312 } 313 314 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 316 connection.send(consumerInfo); 317 318 LocalTransactionId txid = createLocalTransaction(sessionInfo); 320 connection.send(createBeginTransaction(connectionInfo, txid)); 321 for( int i=0; i < 4 ;i ++ ) { 322 Message m = receiveMessage(connection); 323 assertNotNull(m); 324 MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); 325 ack.setTransactionId(txid); 326 connection.send(ack); 327 } 328 connection.send(createCommitTransaction1Phase(connectionInfo, txid)); 330 connection.request(closeConnectionInfo(connectionInfo)); 331 restartBroker(); 333 334 connection = createConnection(); 336 connectionInfo = createConnectionInfo(); 337 sessionInfo = createSessionInfo(connectionInfo); 338 connection.send(connectionInfo); 339 connection.send(sessionInfo); 340 consumerInfo = createConsumerInfo(sessionInfo, destination); 341 connection.send(consumerInfo); 342 343 Message m = receiveMessage(connection); 345 assertNull(m); 346 } 347 348 public void testQueuePersistentUncommitedAcksLostOnRestart() throws Exception { 349 350 ActiveMQDestination destination = new ActiveMQQueue("TEST"); 351 352 StubConnection connection = createConnection(); 354 ConnectionInfo connectionInfo = createConnectionInfo(); 355 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 356 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 357 connection.send(connectionInfo); 358 connection.send(sessionInfo); 359 connection.send(producerInfo); 360 361 for( int i=0; i < 4; i++) { 362 Message message = createMessage(producerInfo, destination); 363 message.setPersistent(true); 364 connection.send(message); 365 } 366 367 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 369 connection.send(consumerInfo); 370 371 LocalTransactionId txid = createLocalTransaction(sessionInfo); 373 connection.send(createBeginTransaction(connectionInfo, txid)); 374 for( int i=0; i < 4 ;i ++ ) { 375 Message m = receiveMessage(connection); 376 assertNotNull(m); 377 MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); 378 ack.setTransactionId(txid); 379 connection.send(ack); 380 } 381 383 restartBroker(); 385 386 connection = createConnection(); 388 connectionInfo = createConnectionInfo(); 389 sessionInfo = createSessionInfo(connectionInfo); 390 connection.send(connectionInfo); 391 connection.send(sessionInfo); 392 consumerInfo = createConsumerInfo(sessionInfo, destination); 393 connection.send(consumerInfo); 394 395 for( int i=0; i < 4 ;i ++ ) { 397 Message m = receiveMessage(connection); 398 assertNotNull(m); 399 } 400 401 assertNoMessagesLeft(connection); 402 } 403 404 public static Test suite() { 405 return suite(RecoveryBrokerTest.class); 406 } 407 408 public static void main(String [] args) { 409 junit.textui.TestRunner.run(suite()); 410 } 411 412 } 413 | Popular Tags |