KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > bugs > JmsDurableTopicSlowReceiveTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.bugs;
16
17 import java.io.File JavaDoc;
18 import java.util.Properties JavaDoc;
19 import javax.jms.BytesMessage JavaDoc;
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.Destination JavaDoc;
22 import javax.jms.Message JavaDoc;
23 import javax.jms.MessageConsumer JavaDoc;
24 import javax.jms.MessageProducer JavaDoc;
25 import javax.jms.Session JavaDoc;
26 import javax.jms.Topic JavaDoc;
27 import javax.jms.TopicSubscriber JavaDoc;
28 import org.apache.activemq.ActiveMQConnectionFactory;
29 import org.apache.activemq.broker.BrokerService;
30 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
31 import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
32 import org.apache.activemq.test.JmsTopicSendReceiveTest;
33
34 /**
35  * @version $Revision: 1.5 $
36  */

37 public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest{
38
39     private static final org.apache.commons.logging.Log log=org.apache.commons.logging.LogFactory
40             .getLog(JmsDurableTopicSlowReceiveTest.class);
41     protected Connection JavaDoc connection2;
42     protected Session JavaDoc session2;
43     protected Session JavaDoc consumeSession2;
44     protected MessageConsumer JavaDoc consumer2;
45     protected MessageProducer JavaDoc producer2;
46     protected Destination JavaDoc consumerDestination2;
47     BrokerService broker;
48     final int NMSG=100;
49     final int MSIZE=256000;
50     private Connection JavaDoc connection3;
51     private Session JavaDoc consumeSession3;
52     private TopicSubscriber JavaDoc consumer3;
53     private final String JavaDoc countProperyName = "count";
54
55     /**
56      * Set up a durable suscriber test.
57      *
58      * @see junit.framework.TestCase#setUp()
59      */

60     protected void setUp() throws Exception JavaDoc{
61         this.durable=true;
62         broker=createBroker();
63         super.setUp();
64     }
65
66     protected void tearDown() throws Exception JavaDoc{
67         super.tearDown();
68         broker.stop();
69     }
70
71     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception JavaDoc{
72         ActiveMQConnectionFactory result=new ActiveMQConnectionFactory("vm://localhost?async=false");
73         Properties JavaDoc props=new Properties JavaDoc();
74         props.put("prefetchPolicy.durableTopicPrefetch","5");
75         props.put("prefetchPolicy.optimizeDurableTopicPrefetch","5");
76         result.setProperties(props);
77         return result;
78     }
79
80     protected BrokerService createBroker() throws Exception JavaDoc{
81         BrokerService answer=new BrokerService();
82         configureBroker(answer);
83         answer.start();
84         return answer;
85     }
86
87     protected void configureBroker(BrokerService answer) throws Exception JavaDoc{
88         answer.setDeleteAllMessagesOnStartup(true);
89     }
90
91     /**
92      * Test if all the messages sent are being received.
93      *
94      * @throws Exception
95      */

96     public void testSlowReceiver() throws Exception JavaDoc{
97         connection2=createConnection();
98         connection2.setClientID("test");
99         connection2.start();
100         consumeSession2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
101         session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
102         consumerDestination2=session2.createTopic(getConsumerSubject()+"2");
103         consumer2=consumeSession2.createDurableSubscriber((Topic JavaDoc)consumerDestination2,getName());
104       
105         consumer2.close();
106         connection2.close();
107         new Thread JavaDoc(new Runnable JavaDoc(){
108
109             public void run(){
110                 try{
111                     int count = 0;
112                     for(int loop=0;loop<4;loop++){
113                         connection2=createConnection();
114                         connection2.start();
115                         session2=connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
116                         producer2=session2.createProducer(null);
117                         producer2.setDeliveryMode(deliveryMode);
118                         Thread.sleep(1000);
119                         for(int i=0;i<NMSG/4;i++){
120                             BytesMessage JavaDoc message=session2.createBytesMessage();
121                             message.writeBytes(new byte[MSIZE]);
122                             message.setStringProperty("test","test");
123                             message.setIntProperty(countProperyName,count);
124                             message.setJMSType("test");
125                             producer2.send(consumerDestination2,message);
126                             Thread.sleep(50);
127                             if(verbose){
128                                 System.out.println("Sent("+loop+"): "+i);
129                             }
130                             count++;
131                         }
132                         producer2.close();
133                         connection2.stop();
134                         connection2.close();
135                     }
136                 }catch(Throwable JavaDoc e){
137                     e.printStackTrace();
138                 }
139             }
140         },"SENDER Thread").start();
141         connection3=createConnection();
142         connection3.setClientID("test");
143         connection3.start();
144         consumeSession3=connection3.createSession(false,Session.CLIENT_ACKNOWLEDGE);
145         consumer3=consumeSession3.createDurableSubscriber((Topic JavaDoc)consumerDestination2,getName());
146         connection3.close();
147         int count =0;
148         for(int loop=0;loop<4;++loop){
149             connection3=createConnection();
150             connection3.setClientID("test");
151             connection3.start();
152             consumeSession3=connection3.createSession(false,Session.CLIENT_ACKNOWLEDGE);
153             consumer3=consumeSession3.createDurableSubscriber((Topic JavaDoc)consumerDestination2,getName());
154             Message JavaDoc msg=null;
155             int i;
156             for(i=0;i<NMSG/4;i++){
157                 msg=consumer3.receive(10000);
158                 if(msg==null)
159                     break;
160                 if(verbose) {
161                     System.out.println("Received("+loop+"): "+i + " count = " + msg.getIntProperty(countProperyName));
162                 }
163                 assertNotNull(msg);
164                 assertEquals(msg.getJMSType(),"test");
165                 assertEquals(msg.getStringProperty("test"),"test");
166                 assertEquals("Messages received out of order",count,msg.getIntProperty(countProperyName));
167                 Thread.sleep(500);
168                 msg.acknowledge();
169                 count++;
170             }
171             consumer3.close();
172             assertEquals("Receiver "+loop,NMSG/4,i);
173             connection3.close();
174         }
175     }
176 }
177
Popular Tags