KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > cursors > CursorSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.broker.region.cursors;
16
17 import java.util.ArrayList JavaDoc;
18 import java.util.List JavaDoc;
19 import java.util.Properties JavaDoc;
20 import java.util.concurrent.CountDownLatch JavaDoc;
21 import java.util.concurrent.TimeUnit JavaDoc;
22 import javax.jms.Connection JavaDoc;
23 import javax.jms.ConnectionFactory JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.jms.TextMessage JavaDoc;
32 import junit.framework.TestCase;
33 import org.apache.activemq.ActiveMQConnectionFactory;
34 import org.apache.activemq.broker.BrokerService;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38 /**
39  * @version $Revision: 1.3 $
40  */

41 public abstract class CursorSupport extends TestCase{
42
43     protected static final Log log=LogFactory.getLog(CursorSupport.class);
44     protected static final int MESSAGE_COUNT=500;
45     protected static final int PREFETCH_SIZE=50;
46     protected BrokerService broker;
47     protected String JavaDoc bindAddress="tcp://localhost:60706";
48
49     protected abstract Destination JavaDoc getDestination(Session JavaDoc session) throws JMSException JavaDoc;
50
51     protected abstract MessageConsumer JavaDoc getConsumer(Connection JavaDoc connection) throws Exception JavaDoc;
52
53     protected abstract void configureBroker(BrokerService answer) throws Exception JavaDoc;
54
55     public void testSendFirstThenConsume() throws Exception JavaDoc{
56         ConnectionFactory JavaDoc factory=createConnectionFactory();
57         Connection JavaDoc consumerConnection=getConsumerConnection(factory);
58         MessageConsumer JavaDoc consumer=getConsumer(consumerConnection);
59         consumerConnection.close();
60         Connection JavaDoc producerConnection=factory.createConnection();
61         producerConnection.start();
62         Session JavaDoc session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
63         MessageProducer JavaDoc producer=session.createProducer(getDestination(session));
64         List JavaDoc senderList=new ArrayList JavaDoc();
65         for(int i=0;i<MESSAGE_COUNT;i++){
66             Message JavaDoc msg=session.createTextMessage("test"+i);
67             senderList.add(msg);
68             producer.send(msg);
69         }
70         producerConnection.close();
71         // now consume the messages
72
consumerConnection=getConsumerConnection(factory);
73         // create durable subs
74
consumer=getConsumer(consumerConnection);
75         List JavaDoc consumerList=new ArrayList JavaDoc();
76         for(int i=0;i<MESSAGE_COUNT;i++){
77             Message JavaDoc msg=consumer.receive();
78             consumerList.add(msg);
79         }
80         assertEquals(senderList,consumerList);
81         consumerConnection.close();
82     }
83
84     public void testSendWhilstConsume() throws Exception JavaDoc{
85         ConnectionFactory JavaDoc factory=createConnectionFactory();
86         Connection JavaDoc consumerConnection=getConsumerConnection(factory);
87         // create durable subs
88
MessageConsumer JavaDoc consumer=getConsumer(consumerConnection);
89         consumerConnection.close();
90         Connection JavaDoc producerConnection=factory.createConnection();
91         producerConnection.start();
92         Session JavaDoc session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
93         MessageProducer JavaDoc producer=session.createProducer(getDestination(session));
94         List JavaDoc senderList=new ArrayList JavaDoc();
95         for(int i=0;i<MESSAGE_COUNT/10;i++){
96             TextMessage JavaDoc msg=session.createTextMessage("test"+i);
97             senderList.add(msg);
98             producer.send(msg);
99         }
100         // now consume the messages
101
consumerConnection=getConsumerConnection(factory);
102         // create durable subs
103
consumer=getConsumer(consumerConnection);
104         final List JavaDoc consumerList=new ArrayList JavaDoc();
105         final CountDownLatch JavaDoc latch=new CountDownLatch JavaDoc(1);
106         consumer.setMessageListener(new MessageListener JavaDoc(){
107
108             public void onMessage(Message JavaDoc msg){
109                 try{
110                     // sleep to act as a slow consumer
111
// which will force a mix of direct and polled dispatching
112
// using the cursor on the broker
113
Thread.sleep(50);
114                 }catch(Exception JavaDoc e){
115                     // TODO Auto-generated catch block
116
e.printStackTrace();
117                 }
118                 consumerList.add(msg);
119                 if(consumerList.size()==MESSAGE_COUNT){
120                     latch.countDown();
121                 }
122             }
123         });
124         for(int i=MESSAGE_COUNT/10;i<MESSAGE_COUNT;i++){
125             TextMessage JavaDoc msg=session.createTextMessage("test"+i);
126             senderList.add(msg);
127             producer.send(msg);
128         }
129         latch.await(300000,TimeUnit.MILLISECONDS);
130         producerConnection.close();
131         consumerConnection.close();
132         assertEquals("Still dipatching - count down latch not sprung",latch.getCount(),0);
133         assertEquals("cosumerList - expected: "+MESSAGE_COUNT+" but was: "+consumerList.size(),consumerList.size(),
134                 senderList.size());
135         for (int i =0; i < senderList.size(); i++) {
136             Message JavaDoc sent = (Message JavaDoc)senderList.get(i);
137             Message JavaDoc consumed = (Message JavaDoc)consumerList.get(i);
138             if (!sent.equals(consumed)) {
139                System.err.println("BAD MATCH AT POS " + i);
140                System.err.println(sent);
141                System.err.println(consumed);
142                /*
143                System.err.println("\n\n\n\n\n");
144                for (int j = 0; j < consumerList.size(); j++) {
145                    System.err.println(consumerList.get(j));
146                }
147                */

148             }
149             assertEquals("This should be the same at pos " + i + " in the list",sent.getJMSMessageID(),consumed.getJMSMessageID());
150         }
151     }
152
153     protected Connection JavaDoc getConsumerConnection(ConnectionFactory JavaDoc fac) throws JMSException JavaDoc{
154         Connection JavaDoc connection=fac.createConnection();
155         connection.setClientID("testConsumer");
156         connection.start();
157         return connection;
158     }
159
160     protected void setUp() throws Exception JavaDoc{
161         if(broker==null){
162             broker=createBroker();
163         }
164         super.setUp();
165     }
166
167     protected void tearDown() throws Exception JavaDoc{
168         super.tearDown();
169         if(broker!=null){
170             broker.stop();
171         }
172     }
173
174     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception JavaDoc{
175         ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
176         Properties JavaDoc props=new Properties JavaDoc();
177         props.setProperty("prefetchPolicy.durableTopicPrefetch",""+PREFETCH_SIZE);
178         props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch",""+PREFETCH_SIZE);
179         props.setProperty("prefetchPolicy.queuePrefetch",""+PREFETCH_SIZE);
180         cf.setProperties(props);
181         return cf;
182     }
183
184     protected BrokerService createBroker() throws Exception JavaDoc{
185         BrokerService answer=new BrokerService();
186         configureBroker(answer);
187         answer.start();
188         return answer;
189     }
190 }
191
Popular Tags