KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > DurableSubscriptionTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.DeliveryMode JavaDoc;
22 import javax.jms.Destination JavaDoc;
23 import javax.jms.JMSException JavaDoc;
24 import javax.jms.Message JavaDoc;
25 import javax.jms.MessageConsumer JavaDoc;
26 import javax.jms.MessageProducer JavaDoc;
27 import javax.jms.Session JavaDoc;
28 import javax.jms.TextMessage JavaDoc;
29 import javax.jms.Topic JavaDoc;
30 import javax.jms.TopicSubscriber JavaDoc;
31
32 import org.apache.activemq.ActiveMQConnectionFactory;
33 import org.apache.activemq.TestSupport;
34 import org.apache.activemq.broker.BrokerService;
35 import org.apache.activemq.store.PersistenceAdapter;
36
37 /**
38  * @version $Revision: 1.1.1.1 $
39  */

40 abstract public class DurableSubscriptionTestSupport extends TestSupport {
41
42     private Connection JavaDoc connection;
43     private Session JavaDoc session;
44     private TopicSubscriber JavaDoc consumer;
45     private MessageProducer JavaDoc producer;
46     private BrokerService broker;
47     
48     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception JavaDoc {
49         return new ActiveMQConnectionFactory("vm://durable-broker");
50     }
51     
52     protected Connection JavaDoc createConnection() throws Exception JavaDoc {
53         Connection JavaDoc rc = super.createConnection();
54         rc.setClientID(getName());
55         return rc;
56     }
57     
58     protected void setUp() throws Exception JavaDoc {
59         createBroker();
60         super.setUp();
61     }
62     protected void tearDown() throws Exception JavaDoc {
63         super.tearDown();
64         destroyBroker();
65     }
66     protected void restartBroker() throws Exception JavaDoc {
67         destroyBroker();
68         createRestartedBroker(); // retain stored messages
69
}
70     private void createBroker() throws Exception JavaDoc {
71         try {
72             broker = new BrokerService();
73             broker.setBrokerName("durable-broker");
74             broker.setDeleteAllMessagesOnStartup(true);
75             broker.setPersistenceAdapter(createPersistenceAdapter());
76             broker.setPersistent(true);
77             broker.start();
78         } catch (Exception JavaDoc e) {
79             e.printStackTrace();
80         }
81
82         connection = createConnection();
83     }
84     
85     private void createRestartedBroker() throws Exception JavaDoc {
86         try {
87             broker = new BrokerService();
88             broker.setBrokerName("durable-broker");
89             broker.setDeleteAllMessagesOnStartup(false);
90             broker.setPersistenceAdapter(createPersistenceAdapter());
91             broker.setPersistent(true);
92             broker.start();
93             
94         } catch (Exception JavaDoc e) {
95             e.printStackTrace();
96         }
97
98         connection = createConnection();
99     }
100     private void destroyBroker() throws Exception JavaDoc {
101         if( connection != null )
102             connection.close();
103         if( broker!=null )
104             broker.stop();
105     }
106     
107     abstract protected PersistenceAdapter createPersistenceAdapter() throws Exception JavaDoc;
108     
109     
110     public void XtestUnsubscribeSubscription() throws Exception JavaDoc {
111         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
112         Topic JavaDoc topic = session.createTopic("TestTopic");
113         consumer = session.createDurableSubscriber(topic, "sub1");
114         producer = session.createProducer(topic);
115         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
116         connection.start();
117         
118         // Make sure it works when the durable sub is active.
119
producer.send(session.createTextMessage("Msg:1"));
120         assertTextMessageEquals("Msg:1", consumer.receive(5000));
121         
122         // Deactivate the sub.
123
consumer.close();
124         // Send a new message.
125
producer.send(session.createTextMessage("Msg:2"));
126         session.unsubscribe("sub1");
127         
128         // Reopen the connection.
129
connection.close();
130         connection = createConnection();
131         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
132         producer = session.createProducer(topic);
133         connection.start();
134
135         // Activate the sub.
136
consumer = session.createDurableSubscriber(topic, "sub1");
137         producer.send(session.createTextMessage("Msg:3"));
138         
139         // Try to get the message.
140
assertTextMessageEquals("Msg:3", consumer.receive(5000));
141     }
142     
143     public void XtestInactiveDurableSubscriptionTwoConnections() throws Exception JavaDoc {
144         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
145         Topic JavaDoc topic = session.createTopic("TestTopic");
146         consumer = session.createDurableSubscriber(topic, "sub1");
147         producer = session.createProducer(topic);
148         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
149         connection.start();
150         
151         // Make sure it works when the durable sub is active.
152
producer.send(session.createTextMessage("Msg:1"));
153         assertTextMessageEquals("Msg:1", consumer.receive(5000));
154         
155         // Deactivate the sub.
156
consumer.close();
157         
158         // Send a new message.
159
producer.send(session.createTextMessage("Msg:2"));
160         
161         // Reopen the connection.
162
connection.close();
163         connection = createConnection();
164         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
165         connection.start();
166
167         // Activate the sub.
168
consumer = session.createDurableSubscriber(topic, "sub1");
169         
170         // Try to get the message.
171
assertTextMessageEquals("Msg:2", consumer.receive(5000));
172     }
173     
174     public void XtestInactiveDurableSubscriptionBrokerRestart() throws Exception JavaDoc {
175         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
176         Topic JavaDoc topic = session.createTopic("TestTopic");
177         consumer = session.createDurableSubscriber(topic, "sub1");
178         producer = session.createProducer(topic);
179         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
180         connection.start();
181         
182         // Make sure it works when the durable sub is active.
183
producer.send(session.createTextMessage("Msg:1"));
184         assertTextMessageEquals("Msg:1", consumer.receive(5000));
185         
186         // Deactivate the sub.
187
consumer.close();
188         
189         // Send a new message.
190
producer.send(session.createTextMessage("Msg:2"));
191         
192         // Reopen the connection.
193
restartBroker();
194         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
195         connection.start();
196
197         // Activate the sub.
198
consumer = session.createDurableSubscriber(topic, "sub1");
199         
200         // Try to get the message.
201
assertTextMessageEquals("Msg:2", consumer.receive(5000));
202         assertNull(consumer.receive(5000));
203     }
204     
205     public void testDurableSubscriptionPersistsPastBrokerRestart() throws Exception JavaDoc {
206
207         // Create the durable sub.
208
connection.start();
209         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
210
211         // Ensure that consumer will receive messages sent before it was created
212
Topic JavaDoc topic = session.createTopic("TestTopic?consumer.retroactive=true");
213         consumer = session.createDurableSubscriber(topic, "sub1");
214
215         // Restart the broker.
216
restartBroker();
217
218         // Reconnection
219
connection.start();
220         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
221         producer = session.createProducer(topic);
222         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
223         
224         // Make sure it works when the durable sub is active.
225
producer.send(session.createTextMessage("Msg:1"));
226         
227         // Activate the sub.
228
consumer = session.createDurableSubscriber(topic, "sub1");
229         
230         // Send a new message.
231
producer.send(session.createTextMessage("Msg:2"));
232         
233         
234         // Try to get the message.
235
assertTextMessageEquals("Msg:1", consumer.receive(5000));
236         assertTextMessageEquals("Msg:2", consumer.receive(5000));
237         
238         assertNull(consumer.receive(5000));
239     }
240     
241     public void XtestInactiveDurableSubscriptionOneConnection() throws Exception JavaDoc {
242         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
243         Topic JavaDoc topic = session.createTopic("TestTopic");
244         consumer = session.createDurableSubscriber(topic, "sub1");
245         producer = session.createProducer(topic);
246         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
247         connection.start();
248         
249         // Make sure it works when the durable sub is active.
250
producer.send(session.createTextMessage("Msg:1"));
251         assertTextMessageEquals("Msg:1", consumer.receive(5000));
252         
253         // Deactivate the sub.
254
consumer.close();
255         
256         // Send a new message.
257
producer.send(session.createTextMessage("Msg:2"));
258
259         // Activate the sub.
260
consumer = session.createDurableSubscriber(topic, "sub1");
261         
262         // Try to get the message.
263
assertTextMessageEquals("Msg:2", consumer.receive(5000));
264     }
265     
266     public void XtestSelectorChange() throws Exception JavaDoc {
267         session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
268         Topic JavaDoc topic = session.createTopic("TestTopic");
269         consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
270         producer = session.createProducer(topic);
271         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
272         connection.start();
273         
274         // Make sure it works when the durable sub is active.
275
TextMessage JavaDoc msg = session.createTextMessage();
276         msg.setText("Msg:1");
277         msg.setStringProperty("color", "blue");
278         producer.send(msg);
279         msg.setText("Msg:2");
280         msg.setStringProperty("color", "red");
281         producer.send(msg);
282         
283         assertTextMessageEquals("Msg:2", consumer.receive(5000));
284         
285         // Change the subscription
286
consumer.close();
287         consumer = session.createDurableSubscriber(topic, "sub1", "color='blue'", false);
288         
289         // Send a new message.
290
msg.setText("Msg:3");
291         msg.setStringProperty("color", "red");
292         producer.send(msg);
293         msg.setText("Msg:4");
294         msg.setStringProperty("color", "blue");
295         producer.send(msg);
296         
297         // Try to get the message.
298
assertTextMessageEquals("Msg:4", consumer.receive(5000));
299     }
300     
301     
302     public void XtestDurableSubWorksInNewSession() throws JMSException JavaDoc {
303
304         // Create the consumer.
305
connection.start();
306         Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
307         Topic JavaDoc topic = session.createTopic("topic-"+getName());
308         MessageConsumer JavaDoc consumer = session.createDurableSubscriber(topic, "sub1");
309         // Drain any messages that may allready be in the sub
310
while( consumer.receive(1000)!=null )
311             ;
312
313         // See if the durable sub works in a new session.
314
session.close();
315         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
316
317         // Send a Message that should be added to the durable sub.
318
MessageProducer JavaDoc producer = createProducer(session, topic);
319         producer.send(session.createTextMessage("Message 1"));
320
321         // Activate the durable sub now. And receive the message.
322
consumer = session.createDurableSubscriber(topic, "sub1");
323         Message JavaDoc msg = consumer.receive(1000);
324         assertNotNull(msg);
325         assertEquals( "Message 1", ((TextMessage JavaDoc)msg).getText() );
326
327     }
328     
329     
330     public void XtestDurableSubWorksInNewConnection() throws Exception JavaDoc {
331
332         // Create the consumer.
333
connection.start();
334         Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
335         Topic JavaDoc topic = session.createTopic("topic-"+getName());
336         MessageConsumer JavaDoc consumer = session.createDurableSubscriber(topic, "sub1");
337         // Drain any messages that may allready be in the sub
338
while( consumer.receive(1000)!=null )
339             ;
340
341         // See if the durable sub works in a new connection.
342
// The embeded broker shutsdown when his connections are closed.
343
// So we open the new connection before the old one is closed.
344
connection.close();
345         connection = createConnection();
346         connection.start();
347         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
348
349         // Send a Message that should be added to the durable sub.
350
MessageProducer JavaDoc producer = createProducer(session, topic);
351         producer.send(session.createTextMessage("Message 1"));
352
353         // Activate the durable sub now. And receive the message.
354
consumer = session.createDurableSubscriber(topic, "sub1");
355         Message JavaDoc msg = consumer.receive(1000);
356         assertNotNull(msg);
357         assertEquals( "Message 1", ((TextMessage JavaDoc)msg).getText() );
358
359     }
360     
361     private MessageProducer JavaDoc createProducer(Session JavaDoc session, Destination JavaDoc queue) throws JMSException JavaDoc {
362         MessageProducer JavaDoc producer = session.createProducer(queue);
363         producer.setDeliveryMode(getDeliveryMode());
364         return producer;
365    }
366    
367    protected int getDeliveryMode() {
368        return DeliveryMode.PERSISTENT;
369    }
370     private void assertTextMessageEquals(String JavaDoc string, Message JavaDoc message) throws JMSException JavaDoc {
371         assertNotNull("Message was null", message);
372         assertTrue("Message is not a TextMessage", message instanceof TextMessage JavaDoc);
373         assertEquals(string, ((TextMessage JavaDoc)message).getText());
374     }
375     
376 }
377
Popular Tags