1 18 package org.apache.activemq.broker; 19 20 import junit.framework.Test; 21 22 import org.apache.activemq.command.ActiveMQDestination; 23 import org.apache.activemq.command.ActiveMQQueue; 24 import org.apache.activemq.command.ConnectionInfo; 25 import org.apache.activemq.command.ConsumerInfo; 26 import org.apache.activemq.command.DataArrayResponse; 27 import org.apache.activemq.command.Message; 28 import org.apache.activemq.command.MessageAck; 29 import org.apache.activemq.command.ProducerInfo; 30 import org.apache.activemq.command.Response; 31 import org.apache.activemq.command.SessionInfo; 32 import org.apache.activemq.command.TransactionId; 33 import org.apache.activemq.command.TransactionInfo; 34 import org.apache.activemq.command.XATransactionId; 35 36 41 public class XARecoveryBrokerTest extends BrokerRestartTestSupport { 42 43 44 public void testPreparedTransactionRecoveredOnRestart() throws Exception { 45 46 ActiveMQDestination destination = createDestination(); 47 48 StubConnection connection = createConnection(); 50 ConnectionInfo connectionInfo = createConnectionInfo(); 51 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 52 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 53 connection.send(connectionInfo); 54 connection.send(sessionInfo); 55 connection.send(producerInfo); 56 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 57 connection.send(consumerInfo); 58 59 for( int i=0; i < 4; i++) { 61 XATransactionId txid = createXATransaction(sessionInfo); 63 connection.send(createBeginTransaction(connectionInfo, txid)); 64 65 Message message = createMessage(producerInfo, destination); 66 message.setPersistent(true); 67 message.setTransactionId(txid); 68 connection.send(message); 69 70 connection.send(createPrepareTransaction(connectionInfo, txid)); 72 } 73 74 assertNoMessagesLeft(connection); 76 connection.request(closeConnectionInfo(connectionInfo)); 77 78 restartBroker(); 80 81 connection = createConnection(); 83 connectionInfo = createConnectionInfo(); 84 sessionInfo = createSessionInfo(connectionInfo); 85 connection.send(connectionInfo); 86 connection.send(sessionInfo); 87 consumerInfo = createConsumerInfo(sessionInfo, destination); 88 connection.send(consumerInfo); 89 90 assertNoMessagesLeft(connection); 92 93 Response response = connection.request( new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER) ); 94 assertNotNull(response); 95 DataArrayResponse dar = (DataArrayResponse) response; 96 assertEquals(4, dar.getData().length); 97 98 for( int i=0; i < dar.getData().length ;i ++ ) { 100 connection.send( createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]) ); 101 } 102 103 for( int i=0; i < 4 ;i ++ ) { 105 Message m = receiveMessage(connection); 106 assertNotNull(m); 107 } 108 109 assertNoMessagesLeft(connection); 110 } 111 112 public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception { 113 114 ActiveMQDestination destination = createDestination(); 115 116 StubConnection connection = createConnection(); 118 ConnectionInfo connectionInfo = createConnectionInfo(); 119 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 120 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 121 connection.send(connectionInfo); 122 connection.send(sessionInfo); 123 connection.send(producerInfo); 124 125 XATransactionId txid = createXATransaction(sessionInfo); 127 connection.send(createBeginTransaction(connectionInfo, txid)); 128 129 for( int i=0; i < 4; i++) { 130 Message message = createMessage(producerInfo, destination); 131 message.setPersistent(true); 132 message.setTransactionId(txid); 133 connection.send(message); 134 } 135 136 connection.send(createCommitTransaction1Phase(connectionInfo, txid)); 138 connection.request(closeConnectionInfo(connectionInfo)); 139 restartBroker(); 141 142 connection = createConnection(); 144 connectionInfo = createConnectionInfo(); 145 sessionInfo = createSessionInfo(connectionInfo); 146 connection.send(connectionInfo); 147 connection.send(sessionInfo); 148 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 149 connection.send(consumerInfo); 150 151 for( int i=0; i < 4 ;i ++ ) { 152 Message m = receiveMessage(connection); 153 assertNotNull(m); 154 } 155 156 assertNoMessagesLeft(connection); 157 } 158 159 public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { 160 161 ActiveMQDestination destination = createDestination(); 162 163 StubConnection connection = createConnection(); 165 ConnectionInfo connectionInfo = createConnectionInfo(); 166 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 167 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 168 connection.send(connectionInfo); 169 connection.send(sessionInfo); 170 connection.send(producerInfo); 171 172 for( int i=0; i < 4; i++) { 173 Message message = createMessage(producerInfo, destination); 174 message.setPersistent(true); 175 connection.send(message); 176 } 177 178 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 180 connection.send(consumerInfo); 181 182 XATransactionId txid = createXATransaction(sessionInfo); 184 connection.send(createBeginTransaction(connectionInfo, txid)); 185 for( int i=0; i < 4 ;i ++ ) { 186 Message m = receiveMessage(connection); 187 assertNotNull(m); 188 MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); 189 ack.setTransactionId(txid); 190 connection.send(ack); 191 } 192 connection.request(createCommitTransaction1Phase(connectionInfo, txid)); 194 195 restartBroker(); 197 198 connection = createConnection(); 200 connectionInfo = createConnectionInfo(); 201 sessionInfo = createSessionInfo(connectionInfo); 202 connection.send(connectionInfo); 203 connection.send(sessionInfo); 204 consumerInfo = createConsumerInfo(sessionInfo, destination); 205 connection.send(consumerInfo); 206 207 assertNoMessagesLeft(connection); 209 210 Message m = receiveMessage(connection); 211 assertNull(m); 212 } 213 214 public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception { 215 216 ActiveMQDestination destination = createDestination(); 217 218 StubConnection connection = createConnection(); 220 ConnectionInfo connectionInfo = createConnectionInfo(); 221 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 222 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 223 connection.send(connectionInfo); 224 connection.send(sessionInfo); 225 connection.send(producerInfo); 226 227 for( int i=0; i < 4; i++) { 228 Message message = createMessage(producerInfo, destination); 229 message.setPersistent(true); 230 connection.send(message); 231 } 232 233 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 235 connection.send(consumerInfo); 236 237 XATransactionId txid = createXATransaction(sessionInfo); 239 connection.send(createBeginTransaction(connectionInfo, txid)); 240 for( int i=0; i < 4 ;i ++ ) { 241 Message m = receiveMessage(connection); 242 assertNotNull(m); 243 MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); 244 ack.setTransactionId(txid); 245 connection.send(ack); 246 } 247 249 restartBroker(); 251 252 connection = createConnection(); 254 connectionInfo = createConnectionInfo(); 255 sessionInfo = createSessionInfo(connectionInfo); 256 connection.send(connectionInfo); 257 connection.send(sessionInfo); 258 consumerInfo = createConsumerInfo(sessionInfo, destination); 259 connection.send(consumerInfo); 260 261 for( int i=0; i < 4 ;i ++ ) { 263 Message m = receiveMessage(connection); 264 assertNotNull(m); 265 } 266 267 assertNoMessagesLeft(connection); 268 } 269 270 public static Test suite() { 271 return suite(XARecoveryBrokerTest.class); 272 } 273 274 public static void main(String [] args) { 275 junit.textui.TestRunner.run(suite()); 276 } 277 278 protected ActiveMQDestination createDestination() { 279 return new ActiveMQQueue(getClass().getName() + "." + getName()); 280 } 281 282 } 283 | Popular Tags |