KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > ReliableReconnectTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19 import java.util.HashMap JavaDoc;
20 import java.net.URI JavaDoc;
21 import javax.jms.Connection JavaDoc;
22 import javax.jms.DeliveryMode JavaDoc;
23 import javax.jms.Destination JavaDoc;
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageConsumer JavaDoc;
27 import javax.jms.MessageProducer JavaDoc;
28 import javax.jms.ObjectMessage JavaDoc;
29 import javax.jms.Queue JavaDoc;
30 import javax.jms.QueueConnection JavaDoc;
31 import javax.jms.QueueReceiver JavaDoc;
32 import javax.jms.QueueSender JavaDoc;
33 import javax.jms.QueueSession JavaDoc;
34 import javax.jms.Session JavaDoc;
35 import javax.jms.TextMessage JavaDoc;
36 import javax.jms.Topic JavaDoc;
37 import org.apache.activemq.ActiveMQConnection;
38 import org.apache.activemq.ActiveMQConnectionFactory;
39 import org.apache.activemq.broker.BrokerService;
40 import org.apache.activemq.broker.BrokerFactory;
41 import org.apache.activemq.broker.Broker;
42 import org.apache.activemq.test.TestSupport;
43 import org.apache.activemq.util.IdGenerator;
44 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
45 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
46
47 /**
48  * @version $Revision: 1.1.1.1 $
49  */

50 public class ReliableReconnectTest extends TestSupport {
51     private static final int RECEIVE_TIMEOUT = 10000;
52     protected static final int MESSAGE_COUNT = 100;
53     protected static final String JavaDoc DEFAULT_BROKER_URL = "vm://localhost";
54     private IdGenerator idGen = new IdGenerator();
55     protected int deliveryMode = DeliveryMode.PERSISTENT;
56     protected String JavaDoc consumerClientId;
57     protected Destination JavaDoc destination;
58     protected AtomicBoolean JavaDoc closeBroker = new AtomicBoolean JavaDoc(false);
59     protected AtomicInteger JavaDoc messagesReceived = new AtomicInteger JavaDoc(0);
60     protected BrokerService broker;
61     protected int firstBatch = MESSAGE_COUNT/10;
62
63     public ReliableReconnectTest() {
64     }
65
66     public ReliableReconnectTest(String JavaDoc n) {
67         super(n);
68     }
69
70     protected void setUp() throws Exception JavaDoc {
71         consumerClientId = idGen.generateId();
72         super.setUp();
73         topic = true;
74         destination = createDestination(getClass().getName());
75     }
76
77     public ActiveMQConnectionFactory getConnectionFactory() throws Exception JavaDoc {
78         String JavaDoc url = "failover://" + DEFAULT_BROKER_URL;
79         return new ActiveMQConnectionFactory(url);
80     }
81
82     protected void startBroker() throws JMSException JavaDoc {
83         try {
84             broker = BrokerFactory.createBroker(new URI JavaDoc("broker://()/localhost"));
85             broker.addConnector(DEFAULT_BROKER_URL);
86             broker.start();
87         } catch (Exception JavaDoc e) {
88             e.printStackTrace();
89         }
90     }
91
92     protected Connection JavaDoc createConsumerConnection() throws Exception JavaDoc {
93         Connection JavaDoc consumerConnection = getConnectionFactory().createConnection();
94         consumerConnection.setClientID(consumerClientId);
95         consumerConnection.start();
96         return consumerConnection;
97     }
98
99     protected MessageConsumer JavaDoc createConsumer(Connection JavaDoc con) throws Exception JavaDoc {
100         Session JavaDoc s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
101         return s.createDurableSubscriber((Topic JavaDoc) destination, "TestFred");
102     }
103
104     protected void spawnConsumer() {
105         Thread JavaDoc thread = new Thread JavaDoc(new Runnable JavaDoc() {
106             public void run() {
107                 try {
108                     Connection JavaDoc consumerConnection = createConsumerConnection();
109                     MessageConsumer JavaDoc consumer = createConsumer(consumerConnection);
110                     //consume some messages
111

112                     for (int i = 0;i < firstBatch;i++) {
113                         Message JavaDoc msg = consumer.receive(RECEIVE_TIMEOUT);
114                         if (msg != null) {
115                             //log.info("GOT: " + msg);
116
messagesReceived.incrementAndGet();
117                         }
118                     }
119                     synchronized (closeBroker) {
120                         closeBroker.set(true);
121                         closeBroker.notify();
122                     }
123                     Thread.sleep(2000);
124                     for (int i = firstBatch;i < MESSAGE_COUNT;i++) {
125                         Message JavaDoc msg = consumer.receive(RECEIVE_TIMEOUT);
126                         //log.info("GOT: " + msg);
127
if (msg != null) {
128                             messagesReceived.incrementAndGet();
129                         }
130                     }
131                     consumerConnection.close();
132                     synchronized (messagesReceived) {
133                         messagesReceived.notify();
134                     }
135                 }
136                 catch (Throwable JavaDoc e) {
137                     e.printStackTrace();
138                 }
139             }
140         });
141         thread.start();
142     }
143
144     public void testReconnect() throws Exception JavaDoc {
145         startBroker();
146         //register an interest as a durable subscriber
147
Connection JavaDoc consumerConnection = createConsumerConnection();
148         createConsumer(consumerConnection);
149         consumerConnection.close();
150         //send some messages ...
151
Connection JavaDoc connection = createConnection();
152         connection.setClientID(idGen.generateId());
153         connection.start();
154         Session JavaDoc producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
155         MessageProducer JavaDoc producer = producerSession.createProducer(destination);
156         TextMessage JavaDoc msg = producerSession.createTextMessage();
157         for (int i = 0;i < MESSAGE_COUNT;i++) {
158             msg.setText("msg: " + i);
159             producer.send(msg);
160         }
161         connection.close();
162         spawnConsumer();
163         synchronized (closeBroker) {
164             if (!closeBroker.get()) {
165                 closeBroker.wait();
166             }
167         }
168 // System.err.println("Stopping broker");
169
broker.stop();
170         startBroker();
171 // System.err.println("Started Broker again");
172
synchronized (messagesReceived) {
173             if (messagesReceived.get() < MESSAGE_COUNT) {
174                 messagesReceived.wait(60000);
175             }
176         }
177         //assertTrue(messagesReceived.get() == MESSAGE_COUNT);
178
int count = messagesReceived.get();
179         assertTrue("Not enough messages received: " + count, count > firstBatch);
180     }
181 }
182
Popular Tags