1 package com.ubermq.jms.client.test; 2 3 import EDU.oswego.cs.dl.util.concurrent.*; 4 import com.ubermq.jms.client.*; 5 import com.ubermq.jms.common.datagram.impl.*; 6 import com.ubermq.kernel.*; 7 import com.ubermq.kernel.overflow.*; 8 import java.io.*; 9 import java.util.*; 10 import javax.jms.*; 11 import junit.framework.*; 12 13 16 public class DurableTestCase 17 extends TestCase 18 { 19 public static TestSuite suite() { 20 return new TestSuite(DurableTestCase.class); 21 } 22 23 public DurableTestCase(String sz) { 24 super(sz); 25 } 26 27 private static final int RECV_TIMEOUT = 1000; 28 private static final int SEND_TIMEOUT = 200; 29 30 private TopicConnectionFactory f; 31 private TopicConnection tc1, tc2; 32 private TopicSession ts_client, ts_auto; 33 private Topic theTopic, theTopic2, durableTopic; 34 35 public void setUp() 36 throws Exception 37 { 38 Thread.sleep(5000); 39 40 f = new UnicastConnectionFactory("localhost", 3999); 42 tc1 = f.createTopicConnection(); 43 tc2 = f.createTopicConnection(); 44 ts_client = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 45 ts_auto = tc2.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 46 47 tc1.start(); 48 tc2.start(); 49 50 theTopic = ts_client.createTopic(THE_TOPIC); 51 theTopic2 = ts_client.createTopic(THE_TOPIC2); 52 durableTopic = ts_client.createTopic(DURABLE_TOPIC); 53 } 54 55 public static final String THE_TOPIC = "TheTopic"; 57 public static final String THE_TOPIC2 = "TheTopic2"; 58 public static final String DURABLE_TOPIC = "DuraTopic"; 59 public static final String AB_TOPIC = "Active-Backup"; 60 61 public void testDurable() 62 throws Exception 63 { 64 ts_client.unsubscribe("durable-1"); 65 ts_client.unsubscribe("durable-2"); 66 67 TopicPublisher p = ts_auto.createPublisher(durableTopic); 68 TopicSubscriber s = ts_client.createDurableSubscriber(durableTopic, "durable-1"); 69 s.close(); 70 71 RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 25); 73 74 for (int i = 0; i < 5; i++) 75 { 76 TopicConnection newC = f.createTopicConnection(); 77 TopicSession mySession = newC.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 78 TopicSubscriber sub = 79 mySession.createDurableSubscriber(durableTopic, "durable-1"); 80 newC.start(); 81 82 if (i == 4) RegressionTestCase.receiveExactly(sub, 5); 83 else { 84 RegressionTestCase.receiveAtLeast(sub, 5); 86 87 while(sub.receiveNoWait() != null); 89 } 90 91 sub.close(); 92 mySession.close(); 93 newC.close(); 94 } 95 96 RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 25); 98 99 for (int i = 0; i < 5; i++) 100 { 101 TopicSubscriber sub = 102 ts_client.createDurableSubscriber(durableTopic, "durable-1"); 103 104 if (i == 4) RegressionTestCase.receiveExactly(sub, 5); 105 else RegressionTestCase.receiveAtLeast(sub, 5); 106 107 sub.close(); 108 } 109 110 ts_client.unsubscribe("durable-1"); 112 113 s = ts_client.createDurableSubscriber(durableTopic, "durable-2"); 115 RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 15); 116 RegressionTestCase.receiveExactly(s, 15); 117 s.close(); 118 ts_client.unsubscribe("durable-2"); 119 120 RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 15); 123 s = ts_client.createDurableSubscriber(durableTopic, "durable-2"); 124 RegressionTestCase.receiveExactly(s, 0); 125 s.close(); 126 ts_client.unsubscribe("durable-2"); 127 } 128 129 public void testDurableOutOfOrderAcks() 130 throws Exception 131 { 132 TopicPublisher p = ts_auto.createPublisher(durableTopic); 133 134 TopicSubscriber s = ts_client.createDurableSubscriber(durableTopic, "durable-2"); 136 RegressionTestCase.sendExactly(ts_auto, p, durableTopic, 15); 137 Stack stack = new Stack(); 138 Message m = null; 139 while((m = s.receiveNoWait()) != null) { 140 stack.push(m); 141 } 142 while(!stack.empty()) { 143 ((Message)stack.pop()).acknowledge(); 144 } 145 s.close(); 146 147 s = ts_client.createDurableSubscriber(durableTopic, "durable-2"); 150 Assert.assertTrue(s.receiveNoWait() == null); 151 ts_client.unsubscribe("durable-2"); 152 } 153 154 public static final class IncrementOutputNode 155 implements DatagramSink 156 { 157 private final SynchronizedInt i; 158 private boolean open; 159 160 public IncrementOutputNode(SynchronizedInt i) 161 { 162 this.i = i; 163 this.open = true; 164 } 165 166 public void output(com.ubermq.kernel.IDatagram d, 167 com.ubermq.kernel.IOverflowHandler h) 168 throws IOException 169 { 170 if (!isOpen()) 171 throw new IOException(); 172 173 i.increment(); 174 } 175 176 180 public boolean isOpen() 181 { 182 return open; 183 } 184 185 public void setOpen(boolean f) 186 { 187 this.open = f; 188 } 189 } 190 191 public static final String AB_SUBSCRIPTION = "ActiveBackupDurable"; 192 193 public void testActiveBackup() 194 throws Exception 195 { 196 ts_client.unsubscribe(AB_SUBSCRIPTION); 197 198 TopicConnection tc1 = f.createTopicConnection(), 199 tc2 = f.createTopicConnection(); 200 201 TopicSession ts1 = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE), 202 ts2 = tc2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 203 204 TopicSubscriber active = ts1.createDurableSubscriber(ts_client.createTopic(AB_TOPIC), 205 AB_SUBSCRIPTION); 206 TopicSubscriber backup = ts2.createDurableSubscriber(ts_client.createTopic(AB_TOPIC), 207 AB_SUBSCRIPTION); 208 209 tc1.start(); 210 tc2.start(); 211 212 RegressionTestCase.sendExactly(ts_auto, 214 ts_auto.createPublisher(ts_auto.createTopic(AB_TOPIC)), 215 ts_auto.createTopic(AB_TOPIC), 216 25); 217 218 for (int i = 0; i < 15; i++) 220 { 221 Message m = active.receive(RECV_TIMEOUT); 222 Assert.assertNotNull(m); 223 m.acknowledge(); 224 } 225 226 active.receive(RECV_TIMEOUT); 228 active.close(); 229 ts1.close(); 230 tc1.close(); 231 232 for (int i = 0; i < 10; i++) 234 { 235 Message m = backup.receive(RECV_TIMEOUT); 236 Assert.assertNotNull(m); 237 m.acknowledge(); 238 } 239 Assert.assertNull(backup.receiveNoWait()); 240 241 tc1 = f.createTopicConnection(); 243 ts1 = tc1.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 244 active = ts1.createDurableSubscriber(ts1.createTopic(AB_TOPIC), 245 AB_SUBSCRIPTION); 246 tc1.start(); 247 248 RegressionTestCase.sendExactly(ts_auto, 250 ts_auto.createPublisher(ts_auto.createTopic(AB_TOPIC)), 251 ts_auto.createTopic(AB_TOPIC), 252 25); 253 254 for (int i = 0; i < 25; i++) 256 { 257 Message m = backup.receive(RECV_TIMEOUT); 258 Assert.assertNotNull(m); 259 m.acknowledge(); 260 } 261 Assert.assertNull(active.receiveNoWait()); 262 263 backup.close(); 265 ts2.close(); 266 tc2.close(); 267 RegressionTestCase.sendExactly(ts_auto, 268 ts_auto.createPublisher(ts_auto.createTopic(AB_TOPIC)), 269 ts_auto.createTopic(AB_TOPIC), 270 25); 271 272 RegressionTestCase.receiveExactly(active, 25); 274 275 active.close(); 277 278 try { 280 ts1.createPublisher(ts_client.createTopic(AB_TOPIC)).publish( 281 ts1.createTextMessage("howdy"), 282 com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING, 283 Message.DEFAULT_PRIORITY, 284 Message.DEFAULT_TIME_TO_LIVE); 285 Assert.assertTrue(false); 286 } catch(JMSUndeliverableException ue) { 287 Assert.assertTrue(true); 288 } 289 290 ts1.unsubscribe(AB_SUBSCRIPTION); 292 ts1.close(); 293 tc1.close(); 294 295 } 296 297 public void testGuaranteedProcessing() 298 throws Exception 299 { 300 TopicConnection tc1 = f.createTopicConnection(), 301 tc2 = f.createTopicConnection(); 302 303 TopicSession ts1 = tc1.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE), 304 ts2 = tc2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE); 305 306 Topic theTopic = ts1.createTopic("org.my-service.guaranteed"); 307 308 TopicSubscriber service = ts1.createDurableSubscriber(theTopic, 310 "My-Service"); 311 312 tc1.start(); 314 tc2.start(); 315 316 TextMessage m = ts1.createTextMessage("howdy"); 318 319 TopicPublisher tp = ts2.createPublisher(theTopic); 321 tp.publish(m, 322 com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING, 323 Message.DEFAULT_PRIORITY, 324 Message.DEFAULT_TIME_TO_LIVE); 325 326 Assert.assertEquals(m.getText(), 328 ((TextMessage)service.receive(RECV_TIMEOUT)).getText()); 329 330 service.close(); 332 333 try { 335 tp.publish(m, 336 com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING, 337 Message.DEFAULT_PRIORITY, 338 Message.DEFAULT_TIME_TO_LIVE); 339 Assert.assertTrue(false); 340 } catch(JMSUndeliverableException ue) { 341 Assert.assertTrue(true); 342 } 343 344 ts1.unsubscribe("My-Service"); 346 347 TopicSubscriber sub = ts1.createSubscriber(theTopic); 350 try { 351 tp.publish(m, 352 com.ubermq.jms.client.DeliveryMode.GUARANTEED_PROCESSING, 353 Message.DEFAULT_PRIORITY, 354 Message.DEFAULT_TIME_TO_LIVE); 355 Assert.assertTrue(true); 356 } catch(JMSUndeliverableException ue) { 357 Assert.assertTrue(false); 358 } 359 Assert.assertEquals(m.getText(), 360 ((TextMessage)sub.receive(RECV_TIMEOUT)).getText()); 361 362 tp.close(); 364 ts1.close(); 365 ts2.close(); 366 tc1.close(); 367 tc2.close(); 368 } 369 } 370 | Popular Tags |