1 22 package org.jboss.test.jbossmq.stress; 23 24 import junit.framework.TestSuite; 25 import junit.framework.Assert; 26 27 import org.jboss.test.jbossmq.MQBase; 28 29 35 36 public class DurableSubscriberTest extends MQBase 37 { 38 39 public DurableSubscriberTest(String name) 40 { 41 super(name); 42 } 43 44 50 public void runDurableSubscriberPartOne() throws Exception 51 { 52 try 53 { 54 drainTopic(); 56 57 int ic = getIterationCount(); 58 59 IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message .class, 61 "DURABLE_NR", 62 0, 63 ic / 2); 64 65 TopicWorker sub1 = new TopicWorker(SUBSCRIBER, 66 TRANS_NONE, 67 f1); 68 sub1.setDurable("john", "needle", "sub2"); 69 Thread t1 = new Thread (sub1); 70 t1.start(); 71 72 IntRangeMessageCreator c1 = new IntRangeMessageCreator("DURABLE_NR", 74 0); 75 TopicWorker pub1 = new TopicWorker(PUBLISHER, 76 TRANS_NONE, 77 c1, 78 ic / 2); 79 pub1.connect(); 80 pub1.publish(); 81 82 Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), 83 ic / 2, 84 pub1.getMessageHandled()); 85 86 log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes"); 88 sleep(ic * 10); 89 90 91 Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), 92 ic / 2, 93 sub1.getMessageHandled()); 94 95 sub1.close(); 97 t1.interrupt(); 98 99 pub1.publish(ic / 2); 101 Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), ic, 102 pub1.getMessageHandled()); 103 104 pub1.close(); 105 } 106 catch (Throwable t) 107 { 108 log.error("Error in test: " + t, t); 109 throw new Exception (t.getMessage()); 110 } 111 } 112 113 117 public void runDurableSubscriberPartTwo() throws Exception 118 { 119 try 120 { 121 int ic = getIterationCount(); 122 IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message .class, 124 "DURABLE_NR", 125 0, 126 ic / 2); 127 128 TopicWorker sub1 = new TopicWorker(SUBSCRIBER, 129 TRANS_NONE, 130 f1); 131 sub1.setDurable("john", "needle", "sub2"); 132 133 Thread t2 = new Thread (sub1); 135 t2.start(); 136 137 log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes"); 138 sleep(ic * 10); 139 Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), ic / 2, 140 sub1.getMessageHandled()); 141 142 sub1.unsubscribe(); 144 sub1.close(); 145 t2.interrupt(); 146 147 } 148 catch (Throwable t) 149 { 150 log.error("Error in test: " + t, t); 151 throw new Exception (t.getMessage()); 152 } 153 } 154 155 public void testDurableSubscriber() throws Exception 156 { 157 runDurableSubscriberPartOne(); 158 runDurableSubscriberPartTwo(); 159 } 160 161 public void runGoodClient() throws Exception 162 { 163 TopicWorker sub1 = new TopicWorker(CONNECTOR, 164 TRANS_NONE, 165 null); 166 sub1.setDurable("john", "needle", "sub2"); 167 Thread t1 = new Thread (sub1); 168 t1.start(); 169 try 170 { 171 Thread.sleep(2000); 172 } 173 catch (InterruptedException e) 174 { 175 } 176 t1.interrupt(); 178 sub1.close(); 179 Assert.assertNull("Error in connecting durable sub", sub1.getException()); 180 181 } 182 183 187 public void runBadClient() throws Exception 188 { 189 TopicWorker sub1 = new TopicWorker(CONNECTOR, 190 TRANS_NONE, 191 null); 192 sub1.setDurable("john", "needle", "sub2"); 193 Thread t1 = new Thread (sub1); 194 t1.start(); 195 try 196 { 197 Thread.sleep(2000); 198 } 199 catch (InterruptedException e) 200 { 201 } 202 t1.interrupt(); 204 Assert.assertNull("Error in connecting durable sub", sub1.getException()); 206 } 207 208 public static junit.framework.Test suite() throws Exception 209 { 210 211 TestSuite suite = new TestSuite(); 212 suite.addTest(new DurableSubscriberTest("runGoodClient")); 213 suite.addTest(new DurableSubscriberTest("testDurableSubscriber")); 214 215 return suite; 217 } 218 219 public static void main(String [] args) 220 { 221 222 } 223 224 } | Popular Tags |