KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ubermq > jms > client > test > DurableTestCase


1 package com.ubermq.jms.client.test;
2
3 import EDU.oswego.cs.dl.util.concurrent.*;
4 import com.ubermq.jms.client.*;
5 import com.ubermq.jms.common.datagram.impl.*;
6 import com.ubermq.kernel.*;
7 import com.ubermq.kernel.overflow.*;
8 import java.io.*;
9 import java.util.*;
10 import javax.jms.*;
11 import junit.framework.*;
12
13 /**
14  * A JUnit test case that exercises durable subscriptions exclusively.
15  */

16 public class DurableTestCase
17     extends TestCase
18 {
19     public static TestSuite suite() {
20         return new TestSuite(DurableTestCase.class);
21     }
22
23     public DurableTestCase(String JavaDoc sz) {
24         super(sz);
25     }
26
27     private static final int RECV_TIMEOUT = 1000;
28     private static final int SEND_TIMEOUT = 200;
29
30     private TopicConnectionFactory f;
31     private TopicConnection tc1, tc2;
32     private TopicSession ts_client, ts_auto;
33     private Topic theTopic, theTopic2, durableTopic;
34
35     public void setUp()
36         throws Exception JavaDoc
37     {
38         Thread.sleep(5000);
39
40         // connect
41
f = new UnicastConnectionFactory("localhost", 3999);
42         tc1 = f.createTopicConnection();
43         tc2 = f.createTopicConnection();
44         ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
45         ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
46
47         tc1.start();
48         tc2.start();
49
50         theTopic = ts_client.createTopic(THE_TOPIC);
51         theTopic2 = ts_client.createTopic(THE_TOPIC2);
52         durableTopic = ts_client.createTopic(DURABLE_TOPIC);
53     }
54
55     // a topic used for general testing.
56
public static final String JavaDoc THE_TOPIC = "TheTopic";
57     public static final String JavaDoc THE_TOPIC2 = "TheTopic2";
58     public static final String JavaDoc DURABLE_TOPIC = "DuraTopic";
59     public static final String JavaDoc AB_TOPIC = "Active-Backup";
60
61     public void testDurable()
62         throws Exception JavaDoc
63     {
64         ts_client.unsubscribe("durable-1");
65         ts_client.unsubscribe("durable-2");
66
67         TopicPublisher p = ts_auto.createPublisher(durableTopic);
68         TopicSubscriber s = ts_client.createDurableSubscriber(durableTopic, "durable-1");
69         s.close();
70
71         // send 25 while we're away, and collect 5 each time we reconnect
72
RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 25);
73
74         for (int i = 0; i < 5; i++)
75         {
76             TopicConnection newC = f.createTopicConnection();
77             TopicSession mySession = newC.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
78             TopicSubscriber sub =
79                 mySession.createDurableSubscriber(durableTopic, "durable-1");
80             newC.start();
81
82             if (i == 4) RegressionTestCase.receiveExactly(sub, 5);
83             else {
84                 // receive and ACK 5 of them.
85
RegressionTestCase.receiveAtLeast(sub, 5);
86
87                 // receive as many more as we have, but leave them unACK'd.
88
while(sub.receiveNoWait() != null);
89             }
90
91             sub.close();
92             mySession.close();
93             newC.close();
94         }
95
96         // do it again, but use only one connection this time
97
RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 25);
98
99         for (int i = 0; i < 5; i++)
100         {
101             TopicSubscriber sub =
102                 ts_client.createDurableSubscriber(durableTopic, "durable-1");
103
104             if (i == 4) RegressionTestCase.receiveExactly(sub, 5);
105             else RegressionTestCase.receiveAtLeast(sub, 5);
106
107             sub.close();
108         }
109
110         // unsubscribe please
111
ts_client.unsubscribe("durable-1");
112
113         // connected case of the durable
114
s = ts_client.createDurableSubscriber(durableTopic, "durable-2");
115         RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 15);
116         RegressionTestCase.receiveExactly(s, 15);
117         s.close();
118         ts_client.unsubscribe("durable-2");
119
120         // disconnected and durable is unsubscribed, the messages
121
// should go nowhere
122
RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 15);
123         s = ts_client.createDurableSubscriber(durableTopic, "durable-2");
124         RegressionTestCase.receiveExactly(s, 0);
125         s.close();
126         ts_client.unsubscribe("durable-2");
127     }
128
129     public void testDurableOutOfOrderAcks()
130         throws Exception JavaDoc
131     {
132         TopicPublisher p = ts_auto.createPublisher(durableTopic);
133
134         // test acking in reverse order
135
TopicSubscriber s = ts_client.createDurableSubscriber(durableTopic, "durable-2");
136         RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 15);
137         Stack stack = new Stack();
138         Message m = null;
139         while((m = s.receiveNoWait()) != null) {
140             stack.push(m);
141         }
142         while(!stack.empty()) {
143             ((Message)stack.pop()).acknowledge();
144         }
145         s.close();
146
147         // when i come back, i should not get any messages
148
// even though i ack'ed backwards.
149
s = ts_client.createDurableSubscriber(durableTopic, "durable-2");
150         Assert.assertTrue(s.receiveNoWait() == null);
151         ts_client.unsubscribe("durable-2");
152     }
153
154     public static final class IncrementOutputNode
155         implements DatagramSink
156     {
157         private final SynchronizedInt i;
158         private boolean open;
159
160         public IncrementOutputNode(SynchronizedInt i)
161         {
162             this.i = i;
163             this.open = true;
164         }
165
166         public void output(com.ubermq.kernel.IDatagram d,
167                            com.ubermq.kernel.IOverflowHandler h)
168             throws IOException
169         {
170             if (!isOpen())
171                 throw new IOException();
172
173             i.increment();
174         }
175
176         /**
177          * Determines whether the destination node is available for output.
178          * @return true if the output dest node is available
179          */

180         public boolean isOpen()
181         {
182             return open;
183         }
184
185         public void setOpen(boolean f)
186         {
187             this.open = f;
188         }
189     }
190
191     public static final String JavaDoc AB_SUBSCRIPTION = "ActiveBackupDurable";
192
193     public void testActiveBackup()
194         throws Exception JavaDoc
195     {
196         ts_client.unsubscribe(AB_SUBSCRIPTION);
197
198         TopicConnection tc1 = f.createTopicConnection(),
199             tc2 = f.createTopicConnection();
200
201         TopicSession ts1 = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE),
202             ts2 = tc2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
203
204         TopicSubscriber active = ts1.createDurableSubscriber(ts_client.createTopic(AB_TOPIC),
205                                                              AB_SUBSCRIPTION);
206         TopicSubscriber backup = ts2.createDurableSubscriber(ts_client.createTopic(AB_TOPIC),
207                                                              AB_SUBSCRIPTION);
208
209         tc1.start();
210         tc2.start();
211
212         // send em
213
RegressionTestCase.sendExactly(ts_auto,
214                                        ts_auto.createPublisher(ts_auto.createTopic(AB_TOPIC)),
215                                        ts_auto.createTopic(AB_TOPIC),
216                                        25);
217
218         // now receive 15 of them w/ active, then fail
219
for (int i = 0; i < 15; i++)
220         {
221             Message m = active.receive(RECV_TIMEOUT);
222             Assert.assertNotNull(m);
223             m.acknowledge();
224         }
225
226         // get another one and fail during processing
227
active.receive(RECV_TIMEOUT);
228         active.close();
229         ts1.close();
230         tc1.close();
231
232         // now get the remaining 10
233
for (int i = 0; i < 10; i++)
234         {
235             Message m = backup.receive(RECV_TIMEOUT);
236             Assert.assertNotNull(m);
237             m.acknowledge();
238         }
239         Assert.assertNull(backup.receiveNoWait());
240
241         // bring back the active, and it should now serve as the backup
242
tc1 = f.createTopicConnection();
243         ts1 = tc1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
244         active = ts1.createDurableSubscriber(ts1.createTopic(AB_TOPIC),
245                                              AB_SUBSCRIPTION);
246         tc1.start();
247
248         // send some
249
RegressionTestCase.sendExactly(ts_auto,
250                                        ts_auto.createPublisher(ts_auto.createTopic(AB_TOPIC)),
251                                        ts_auto.createTopic(AB_TOPIC),
252                                        25);
253
254         // backup should get all 25, and active 0
255
for (int i = 0; i < 25; i++)
256         {
257             Message m = backup.receive(RECV_TIMEOUT);
258             Assert.assertNotNull(m);
259             m.acknowledge();
260         }
261         Assert.assertNull(active.receiveNoWait());
262
263         // now, close backup. active is back again.
264
backup.close();
265         ts2.close();
266         tc2.close();
267         RegressionTestCase.sendExactly(ts_auto,
268                                        ts_auto.createPublisher(ts_auto.createTopic(AB_TOPIC)),
269                                        ts_auto.createTopic(AB_TOPIC),
270                                        25);
271
272         // now active should get all of the messages we just sent
273
RegressionTestCase.receiveExactly(active, 25);
274
275         // clsoe active. both subscribers are away
276
active.close();
277
278         // a guaranteed send to the durable topic should fail.
279
try {
280             ts1.createPublisher(ts_client.createTopic(AB_TOPIC)).publish(
281                 ts1.createTextMessage("howdy"),
282                 com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING,
283                 Message.DEFAULT_PRIORITY,
284                 Message.DEFAULT_TIME_TO_LIVE);
285             Assert.assertTrue(false);
286         } catch(JMSUndeliverableException ue) {
287             Assert.assertTrue(true);
288         }
289
290         // bye
291
ts1.unsubscribe(AB_SUBSCRIPTION);
292         ts1.close();
293         tc1.close();
294
295     }
296
297     public void testGuaranteedProcessing()
298         throws Exception JavaDoc
299     {
300         TopicConnection tc1 = f.createTopicConnection(),
301             tc2 = f.createTopicConnection();
302
303         TopicSession ts1 = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE),
304             ts2 = tc2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
305
306         Topic theTopic = ts1.createTopic("org.my-service.guaranteed");
307
308         // make the durable
309
TopicSubscriber service = ts1.createDurableSubscriber(theTopic,
310                                                               "My-Service");
311
312         // start em
313
tc1.start();
314         tc2.start();
315
316         // a message
317
TextMessage m = ts1.createTextMessage("howdy");
318
319         // publish a message with guaranteed delivery mode.
320
TopicPublisher tp = ts2.createPublisher(theTopic);
321         tp.publish(m,
322                    com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING,
323                    Message.DEFAULT_PRIORITY,
324                    Message.DEFAULT_TIME_TO_LIVE);
325
326         // ok, the durable should be around to get it.
327
Assert.assertEquals(m.getText(),
328                                 ((TextMessage)service.receive(RECV_TIMEOUT)).getText());
329
330         // now have the durable go away
331
service.close();
332
333         // now resend the message, it should fail.
334
try {
335             tp.publish(m,
336                        com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING,
337                        Message.DEFAULT_PRIORITY,
338                        Message.DEFAULT_TIME_TO_LIVE);
339             Assert.assertTrue(false);
340         } catch(JMSUndeliverableException ue) {
341             Assert.assertTrue(true);
342         }
343
344         // now unsub the service
345
ts1.unsubscribe("My-Service");
346
347         // try to publish again. should work now because i unsub'd
348
// the service.
349
TopicSubscriber sub = ts1.createSubscriber(theTopic);
350         try {
351             tp.publish(m,
352                        com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING,
353                        Message.DEFAULT_PRIORITY,
354                        Message.DEFAULT_TIME_TO_LIVE);
355             Assert.assertTrue(true);
356         } catch(JMSUndeliverableException ue) {
357             Assert.assertTrue(false);
358         }
359         Assert.assertEquals(m.getText(),
360                                 ((TextMessage)sub.receive(RECV_TIMEOUT)).getText());
361
362         // bye
363
tp.close();
364         ts1.close();
365         ts2.close();
366         tc1.close();
367         tc2.close();
368     }
369 }
370
Popular Tags