KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > BrokerTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import java.io.ByteArrayOutputStream JavaDoc;
21 import java.io.DataOutputStream JavaDoc;
22 import java.io.File JavaDoc;
23 import java.io.IOException JavaDoc;
24 import java.net.URI JavaDoc;
25 import java.util.ArrayList JavaDoc;
26 import java.util.Iterator JavaDoc;
27
28 import javax.jms.DeliveryMode JavaDoc;
29 import javax.jms.MessageNotWriteableException JavaDoc;
30
31 import org.apache.activemq.CombinationTestSupport;
32 import org.apache.activemq.broker.region.RegionBroker;
33 import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
34 import org.apache.activemq.broker.region.policy.PolicyEntry;
35 import org.apache.activemq.broker.region.policy.PolicyMap;
36 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
37 import org.apache.activemq.command.ActiveMQDestination;
38 import org.apache.activemq.command.ActiveMQTextMessage;
39 import org.apache.activemq.command.ConnectionId;
40 import org.apache.activemq.command.ConnectionInfo;
41 import org.apache.activemq.command.ConsumerInfo;
42 import org.apache.activemq.command.DestinationInfo;
43 import org.apache.activemq.command.LocalTransactionId;
44 import org.apache.activemq.command.Message;
45 import org.apache.activemq.command.MessageAck;
46 import org.apache.activemq.command.MessageDispatch;
47 import org.apache.activemq.command.MessageId;
48 import org.apache.activemq.command.ProducerInfo;
49 import org.apache.activemq.command.RemoveInfo;
50 import org.apache.activemq.command.SessionInfo;
51 import org.apache.activemq.command.TransactionId;
52 import org.apache.activemq.command.TransactionInfo;
53 import org.apache.activemq.command.XATransactionId;
54 import org.apache.activemq.memory.UsageManager;
55 import org.apache.activemq.store.PersistenceAdapter;
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58
59 import java.util.concurrent.TimeUnit JavaDoc;
60
61 public class BrokerTestSupport extends CombinationTestSupport {
62     
63     protected static final Log log = LogFactory.getLog(BrokerTestSupport.class);
64     
65     /**
66      * Setting this to false makes the test run faster but they may be less accurate.
67      */

68     public static boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
69
70     protected RegionBroker regionBroker;
71     protected BrokerService broker;
72     protected long idGenerator=0;
73     protected int msgIdGenerator=0;
74     protected int txGenerator=0;
75     protected int tempDestGenerator=0;
76     protected PersistenceAdapter persistenceAdapter;
77
78     protected int MAX_WAIT = 4000;
79
80     protected UsageManager memoryManager;
81
82
83     protected void setUp() throws Exception JavaDoc {
84         super.setUp();
85         broker = createBroker();
86         PolicyMap policyMap = new PolicyMap();
87         policyMap.setDefaultEntry(getDefaultPolicy());
88         broker.setDestinationPolicy(policyMap);
89         broker.start();
90     }
91     
92     protected PolicyEntry getDefaultPolicy() {
93         PolicyEntry policy = new PolicyEntry();
94         policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
95         policy.setSubscriptionRecoveryPolicy(new FixedCountSubscriptionRecoveryPolicy());
96         return policy;
97     }
98     
99   
100     protected BrokerService createBroker() throws Exception JavaDoc {
101         BrokerService broker = BrokerFactory.createBroker(new URI JavaDoc("broker:()/localhost?persistent=false"));
102         return broker;
103     }
104     
105     protected void tearDown() throws Exception JavaDoc {
106         broker.stop();
107         broker=null;
108         regionBroker=null;
109         persistenceAdapter=null;
110         memoryManager=null;
111         super.tearDown();
112     }
113     
114     protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception JavaDoc {
115         ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
116         info.setBrowser(false);
117         info.setDestination(destination);
118         info.setPrefetchSize(1000);
119         info.setDispatchAsync(false);
120         return info;
121     }
122     
123     protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
124         return consumerInfo.createRemoveCommand();
125     }
126
127     protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception JavaDoc {
128         ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
129         return info;
130     }
131
132     protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception JavaDoc {
133         SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
134         return info;
135     }
136
137     protected ConnectionInfo createConnectionInfo() throws Exception JavaDoc {
138         ConnectionInfo info = new ConnectionInfo();
139         info.setConnectionId(new ConnectionId("connection:"+(++idGenerator)));
140         info.setClientId( info.getConnectionId().getValue() );
141         return info;
142     }
143     
144     protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
145         ActiveMQTextMessage message = new ActiveMQTextMessage();
146         message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
147         message.setDestination(destination);
148         message.setPersistent(false);
149         try {
150             message.setText("Test Message Payload.");
151         } catch (MessageNotWriteableException JavaDoc e) {
152         }
153         return message;
154     }
155
156     protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) {
157         MessageAck ack = new MessageAck();
158         ack.setAckType(ackType);
159         ack.setConsumerId(consumerInfo.getConsumerId());
160         ack.setDestination( msg.getDestination() );
161         ack.setLastMessageId( msg.getMessageId() );
162         ack.setMessageCount(count);
163         return ack;
164     }
165
166     protected void gc() {
167         regionBroker.gc();
168     }
169
170     protected void profilerPause(String JavaDoc prompt) throws IOException JavaDoc {
171         if( System.getProperty("profiler")!=null ) {
172             System.out.println();
173             System.out.println(prompt+"> Press enter to continue: ");
174             while( System.in.read()!='\n' ) {
175             }
176             System.out.println(prompt+"> Done.");
177         }
178     }
179
180     protected RemoveInfo closeConnectionInfo(ConnectionInfo info) {
181         return info.createRemoveCommand();
182     }
183
184     protected RemoveInfo closeSessionInfo(SessionInfo info) {
185         return info.createRemoveCommand();
186     }
187
188     protected RemoveInfo closeProducerInfo(ProducerInfo info) {
189         return info.createRemoveCommand();
190     }
191
192     protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
193         Message message = createMessage(producerInfo, destination);
194         message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
195         return message;
196     }
197
198     protected LocalTransactionId createLocalTransaction(SessionInfo info) {
199         LocalTransactionId id = new LocalTransactionId(info.getSessionId().getParentId(), ++txGenerator);
200         return id;
201     }
202     
203     protected XATransactionId createXATransaction(SessionInfo info) throws IOException JavaDoc {
204         long id = txGenerator;
205         ByteArrayOutputStream JavaDoc baos = new ByteArrayOutputStream JavaDoc();
206         DataOutputStream JavaDoc os = new DataOutputStream JavaDoc(baos);
207         os.writeLong(++txGenerator);
208         os.close();
209         byte[] bs = baos.toByteArray();
210         
211         XATransactionId xid = new XATransactionId();
212         xid.setBranchQualifier(bs);
213         xid.setGlobalTransactionId(bs);
214         xid.setFormatId(55);
215         return xid;
216     }
217
218     protected TransactionInfo createBeginTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
219         TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.BEGIN);
220         return info;
221     }
222     
223     protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
224         TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.PREPARE);
225         return info;
226     }
227
228     protected TransactionInfo createCommitTransaction1Phase(ConnectionInfo connectionInfo, TransactionId txid) {
229         TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.COMMIT_ONE_PHASE);
230         return info;
231     }
232     
233     protected TransactionInfo createCommitTransaction2Phase(ConnectionInfo connectionInfo, TransactionId txid) {
234         TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.COMMIT_TWO_PHASE);
235         return info;
236     }
237     
238     protected TransactionInfo createRollbackTransaction(ConnectionInfo connectionInfo, TransactionId txid) {
239         TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.ROLLBACK);
240         return info;
241     }
242
243     protected int countMessagesInQueue(StubConnection connection, ConnectionInfo connectionInfo, ActiveMQDestination destination) throws Exception JavaDoc {
244         
245         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
246         connection.send(sessionInfo);
247         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
248         consumerInfo.setPrefetchSize(1);
249         consumerInfo.setBrowser(true);
250         connection.send(consumerInfo);
251     
252         ArrayList JavaDoc skipped = new ArrayList JavaDoc();
253         
254         // Now get the messages.
255
Object JavaDoc m = connection.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS);
256         int i=0;
257         while( m!=null ) {
258             if( m instanceof MessageDispatch && ((MessageDispatch)m).getConsumerId().equals(consumerInfo.getConsumerId()) ) {
259                 MessageDispatch md = (MessageDispatch) m;
260                 if( md.getMessage()!=null ) {
261                     i++;
262                     connection.send(createAck(consumerInfo, md.getMessage(), 1, MessageAck.STANDARD_ACK_TYPE));
263                 } else {
264                     break;
265                 }
266             } else {
267                 skipped.add(m);
268             }
269             m = connection.getDispatchQueue().poll(MAX_WAIT, TimeUnit.MILLISECONDS);
270         }
271         
272         for (Iterator JavaDoc iter = skipped.iterator(); iter.hasNext();) {
273             connection.getDispatchQueue().put(iter.next());
274         }
275         
276         connection.send(closeSessionInfo(sessionInfo));
277         return i;
278         
279     }
280
281     protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) {
282         DestinationInfo info = new DestinationInfo();
283         info.setConnectionId(connectionInfo.getConnectionId());
284         info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
285         info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId()+":"+(++tempDestGenerator), destinationType));
286         return info;
287     }
288     
289     protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception JavaDoc {
290         if( (destinationType & ActiveMQDestination.TEMP_MASK)!=0 ) {
291             DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType);
292             connection.send(info);
293             return info.getDestination();
294         } else {
295             return ActiveMQDestination.createDestination("TEST", destinationType);
296         }
297     }
298     
299     
300     protected DestinationInfo closeDestinationInfo(DestinationInfo info) {
301         info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
302         info.setTimeout(0);
303         return info;
304     }
305     
306     public static void recursiveDelete(File JavaDoc f) {
307         if( f.isDirectory() ) {
308             File JavaDoc[] files = f.listFiles();
309             for (int i = 0; i < files.length; i++) {
310                 recursiveDelete(files[i]);
311             }
312         }
313         f.delete();
314     }
315
316     protected StubConnection createConnection() throws Exception JavaDoc {
317         return new StubConnection(broker);
318     }
319
320     /**
321      * @param connection
322      * @return
323      * @throws InterruptedException
324      */

325     public Message receiveMessage(StubConnection connection) throws InterruptedException JavaDoc {
326         return receiveMessage(connection, MAX_WAIT);
327     }
328     
329     public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException JavaDoc {
330         while( true ) {
331             Object JavaDoc o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
332             
333             if( o == null )
334                 return null;
335             
336             if( o instanceof MessageDispatch ) {
337                 
338                 MessageDispatch dispatch = (MessageDispatch)o;
339                 if( dispatch.getMessage()==null )
340                     return null;
341                 
342                 dispatch.setMessage(dispatch.getMessage().copy());
343                 dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
344                 return dispatch.getMessage();
345             }
346         }
347     };
348
349     protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException JavaDoc {
350         long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : MAX_WAIT;
351         while( true ) {
352             Object JavaDoc o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
353             if( o == null )
354                 return;
355             if( o instanceof MessageDispatch && ((MessageDispatch)o).getMessage()!=null ) {
356                 fail("Received a message.");
357             }
358         }
359     }
360
361
362 }
363
Popular Tags