1 18 package org.apache.activemq.broker; 19 20 import java.io.ByteArrayOutputStream ; 21 import java.io.DataOutputStream ; 22 import java.io.File ; 23 import java.io.IOException ; 24 import java.net.URI ; 25 import java.util.ArrayList ; 26 import java.util.Iterator ; 27 28 import javax.jms.DeliveryMode ; 29 import javax.jms.MessageNotWriteableException ; 30 31 import org.apache.activemq.CombinationTestSupport; 32 import org.apache.activemq.broker.region.RegionBroker; 33 import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy; 34 import org.apache.activemq.broker.region.policy.PolicyEntry; 35 import org.apache.activemq.broker.region.policy.PolicyMap; 36 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 37 import org.apache.activemq.command.ActiveMQDestination; 38 import org.apache.activemq.command.ActiveMQTextMessage; 39 import org.apache.activemq.command.ConnectionId; 40 import org.apache.activemq.command.ConnectionInfo; 41 import org.apache.activemq.command.ConsumerInfo; 42 import org.apache.activemq.command.DestinationInfo; 43 import org.apache.activemq.command.LocalTransactionId; 44 import org.apache.activemq.command.Message; 45 import org.apache.activemq.command.MessageAck; 46 import org.apache.activemq.command.MessageDispatch; 47 import org.apache.activemq.command.MessageId; 48 import org.apache.activemq.command.ProducerInfo; 49 import org.apache.activemq.command.RemoveInfo; 50 import org.apache.activemq.command.SessionInfo; 51 import org.apache.activemq.command.TransactionId; 52 import org.apache.activemq.command.TransactionInfo; 53 import org.apache.activemq.command.XATransactionId; 54 import org.apache.activemq.memory.UsageManager; 55 import org.apache.activemq.store.PersistenceAdapter; 56 import org.apache.commons.logging.Log; 57 import org.apache.commons.logging.LogFactory; 58 59 import java.util.concurrent.TimeUnit ; 60 61 public class BrokerTestSupport extends CombinationTestSupport { 62 63 protected static final Log log = LogFactory.getLog(BrokerTestSupport.class); 64 65 68 public static boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true"); 69 70 protected RegionBroker regionBroker; 71 protected BrokerService broker; 72 protected long idGenerator=0; 73 protected int msgIdGenerator=0; 74 protected int txGenerator=0; 75 protected int tempDestGenerator=0; 76 protected PersistenceAdapter persistenceAdapter; 77 78 protected int MAX_WAIT = 4000; 79 80 protected UsageManager memoryManager; 81 82 83 protected void setUp() throws Exception { 84 super.setUp(); 85 broker = createBroker(); 86 PolicyMap policyMap = new PolicyMap(); 87 policyMap.setDefaultEntry(getDefaultPolicy()); 88 broker.setDestinationPolicy(policyMap); 89 broker.start(); 90 } 91 92 protected PolicyEntry getDefaultPolicy() { 93 PolicyEntry policy = new PolicyEntry(); 94 policy.setDispatchPolicy(new RoundRobinDispatchPolicy()); 95 policy.setSubscriptionRecoveryPolicy(new FixedCountSubscriptionRecoveryPolicy()); 96 return policy; 97 } 98 99 100 protected BrokerService createBroker() throws Exception { 101 BrokerService broker = BrokerFactory.createBroker(new URI ("broker:()/localhost?persistent=false")); 102 return broker; 103 } 104 105 protected void tearDown() throws Exception { 106 broker.stop(); 107 broker=null; 108 regionBroker=null; 109 persistenceAdapter=null; 110 memoryManager=null; 111 super.tearDown(); 112 } 113 114 protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { 115 ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); 116 info.setBrowser(false); 117 info.setDestination(destination); 118 info.setPrefetchSize(1000); 119 info.setDispatchAsync(false); 120 return info; 121 } 122 123 protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) { 124 return consumerInfo.createRemoveCommand(); 125 } 126 127 protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception { 128 ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); 129 return info; 130 } 131 132 protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { 133 SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); 134 return info; 135 } 136 137 protected ConnectionInfo createConnectionInfo() throws Exception { 138 ConnectionInfo info = new ConnectionInfo(); 139 info.setConnectionId(new ConnectionId("connection:"+(++idGenerator))); 140 info.setClientId( info.getConnectionId().getValue() ); 141 return info; 142 } 143 144 protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) { 145 ActiveMQTextMessage message = new ActiveMQTextMessage(); 146 message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator)); 147 message.setDestination(destination); 148 message.setPersistent(false); 149 try { 150 message.setText("Test Message Payload."); 151 } catch (MessageNotWriteableException e) { 152 } 153 return message; 154 } 155 156 protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) { 157 MessageAck ack = new MessageAck(); 158 ack.setAckType(ackType); 159 ack.setConsumerId(consumerInfo.getConsumerId()); 160 ack.setDestination( msg.getDestination() ); 161 ack.setLastMessageId( msg.getMessageId() ); 162 ack.setMessageCount(count); 163 return ack; 164 } 165 166 protected void gc() { 167 regionBroker.gc(); 168 } 169 170 protected void profilerPause(String prompt) throws IOException { 171 if( System.getProperty("profiler")!=null ) { 172 System.out.println(); 173 System.out.println(prompt+"> Press enter to continue: "); 174 while( System.in.read()!='\n' ) { 175 } 176 System.out.println(prompt+"> Done."); 177 } 178 } 179 180 protected RemoveInfo closeConnectionInfo(ConnectionInfo info) { 181 return info.createRemoveCommand(); 182 } 183 184 protected RemoveInfo closeSessionInfo(SessionInfo info) { 185 return info.createRemoveCommand(); 186 } 187 188 protected RemoveInfo closeProducerInfo(ProducerInfo info) { 189 return info.createRemoveCommand(); 190 } 191 192 protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) { 193 Message message = createMessage(producerInfo, destination); 194 message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT); 195 return message; 196 } 197 198 protected LocalTransactionId createLocalTransaction(SessionInfo info) { 199 LocalTransactionId id = new LocalTransactionId(info.getSessionId().getParentId(), ++txGenerator); 200 return id; 201 } 202 203 protected XATransactionId createXATransaction(SessionInfo info) throws IOException { 204 long id = txGenerator; 205 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 206 DataOutputStream os = new DataOutputStream (baos); 207 os.writeLong(++txGenerator); 208 os.close(); 209 byte[] bs = baos.toByteArray(); 210 211 XATransactionId xid = new XATransactionId(); 212 xid.setBranchQualifier(bs); 213 xid.setGlobalTransactionId(bs); 214 xid.setFormatId(55); 215 return xid; 216 } 217 218 protected TransactionInfo createBeginTransaction(ConnectionInfo connectionInfo, TransactionId txid) { 219 TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.BEGIN); 220 return info; 221 } 222 223 protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) { 224 TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.PREPARE); 225 return info; 226 } 227 228 protected TransactionInfo createCommitTransaction1Phase(ConnectionInfo connectionInfo, TransactionId txid) { 229 TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.COMMIT_ONE_PHASE); 230 return info; 231 } 232 233 protected TransactionInfo createCommitTransaction2Phase(ConnectionInfo connectionInfo, TransactionId txid) { 234 TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.COMMIT_TWO_PHASE); 235 return info; 236 } 237 238 protected TransactionInfo createRollbackTransaction(ConnectionInfo connectionInfo, TransactionId txid) { 239 TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.ROLLBACK); 240 return info; 241 } 242 243 protected int countMessagesInQueue(StubConnection connection, ConnectionInfo connectionInfo, ActiveMQDestination destination) throws Exception { 244 245 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 246 connection.send(sessionInfo); 247 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 248 consumerInfo.setPrefetchSize(1); 249 consumerInfo.setBrowser(true); 250 connection.send(consumerInfo); 251 252 ArrayList skipped = new ArrayList (); 253 254 Object m = connection.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS); 256 int i=0; 257 while( m!=null ) { 258 if( m instanceof MessageDispatch && ((MessageDispatch)m).getConsumerId().equals(consumerInfo.getConsumerId()) ) { 259 MessageDispatch md = (MessageDispatch) m; 260 if( md.getMessage()!=null ) { 261 i++; 262 connection.send(createAck(consumerInfo, md.getMessage(), 1, MessageAck.STANDARD_ACK_TYPE)); 263 } else { 264 break; 265 } 266 } else { 267 skipped.add(m); 268 } 269 m = connection.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS); 270 } 271 272 for (Iterator iter = skipped.iterator(); iter.hasNext();) { 273 connection.getDispatchQueue().put(iter.next()); 274 } 275 276 connection.send(closeSessionInfo(sessionInfo)); 277 return i; 278 279 } 280 281 protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) { 282 DestinationInfo info = new DestinationInfo(); 283 info.setConnectionId(connectionInfo.getConnectionId()); 284 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 285 info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId()+":"+(++tempDestGenerator), destinationType)); 286 return info; 287 } 288 289 protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception { 290 if( (destinationType & ActiveMQDestination.TEMP_MASK)!=0 ) { 291 DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType); 292 connection.send(info); 293 return info.getDestination(); 294 } else { 295 return ActiveMQDestination.createDestination("TEST", destinationType); 296 } 297 } 298 299 300 protected DestinationInfo closeDestinationInfo(DestinationInfo info) { 301 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 302 info.setTimeout(0); 303 return info; 304 } 305 306 public static void recursiveDelete(File f) { 307 if( f.isDirectory() ) { 308 File [] files = f.listFiles(); 309 for (int i = 0; i < files.length; i++) { 310 recursiveDelete(files[i]); 311 } 312 } 313 f.delete(); 314 } 315 316 protected StubConnection createConnection() throws Exception { 317 return new StubConnection(broker); 318 } 319 320 325 public Message receiveMessage(StubConnection connection) throws InterruptedException { 326 return receiveMessage(connection, MAX_WAIT); 327 } 328 329 public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException { 330 while( true ) { 331 Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS); 332 333 if( o == null ) 334 return null; 335 336 if( o instanceof MessageDispatch ) { 337 338 MessageDispatch dispatch = (MessageDispatch)o; 339 if( dispatch.getMessage()==null ) 340 return null; 341 342 dispatch.setMessage(dispatch.getMessage().copy()); 343 dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter()); 344 return dispatch.getMessage(); 345 } 346 } 347 }; 348 349 protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException { 350 long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : MAX_WAIT; 351 while( true ) { 352 Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS); 353 if( o == null ) 354 return; 355 if( o instanceof MessageDispatch && ((MessageDispatch)o).getMessage()!=null ) { 356 fail("Received a message."); 357 } 358 } 359 } 360 361 362 } 363 | Popular Tags |