1 22 package org.jboss.test.jbossmq.stress; 23 24 import javax.jms.JMSException ; 25 26 import junit.framework.Assert; 27 28 import org.jboss.test.jbossmq.MQBase; 29 30 37 38 public class MultipleDurableSubscribers extends MQBase 39 { 40 class PiggyBackWorker extends TopicWorker 41 { 42 public PiggyBackWorker(TopicWorker worker) 43 { 44 super(); 45 connection = worker.connection; 46 destination = worker.destination; 47 session = worker.session; 48 } 49 50 public void connect() 51 { 52 log.debug("In null connect"); 53 } 55 56 public void subscribe() throws JMSException 57 { 58 super.subscribe(); 59 log.debug("Message consumer set up " + consumer); 60 } 61 } 62 63 public MultipleDurableSubscribers(String name) 64 { 65 super(name); 66 } 67 68 76 public void runDurableSubscriberPartOne() throws Exception 77 { 78 try 79 { 80 drainTopic(); 82 83 int ic = getIterationCount(); 84 85 IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message .class, 87 "DURABLE_NR", 88 0, 89 ic / 2); 90 91 TopicWorker sub1 = new TopicWorker(SUBSCRIBER, 92 TRANS_NONE, 93 f1); 94 sub1.setDurable("john", "needle", "sub1"); 95 Thread t1 = new Thread (sub1); 96 t1.start(); 97 98 log.debug("Sub1 set up"); 99 sleep(5000L); 100 TopicWorker sub2 = new PiggyBackWorker(sub1); 101 sub2.setSubscriberAttrs(SUBSCRIBER, 102 TRANS_NONE, 103 f1); 104 sub2.setDurable("john", "needle", "sub2"); 105 Thread t2 = new Thread (sub2); 106 t2.start(); 107 log.debug("Sub2 setup"); 108 109 IntRangeMessageCreator c1 = new IntRangeMessageCreator("DURABLE_NR", 111 0); 112 TopicWorker pub1 = new TopicWorker(PUBLISHER, 113 TRANS_NONE, 114 c1, 115 ic / 2); 116 pub1.connect(); 117 pub1.publish(); 118 119 Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), 120 ic / 2, 121 pub1.getMessageHandled()); 122 123 log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes"); 125 sleep(ic * 10); 126 127 128 Assert.assertEquals("Subscriber1 did not get correct number of messages " + sub1.getMessageHandled(), 129 ic / 2, 130 sub1.getMessageHandled()); 131 Assert.assertEquals("Subscriber2 did not get correct number of messages " + sub1.getMessageHandled(), 132 ic / 2, 133 sub2.getMessageHandled()); 134 135 sub1.close(); 137 t1.interrupt(); 138 sub2.close(); 139 t2.interrupt(); 140 141 pub1.publish(ic / 2); 143 Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), ic, 144 pub1.getMessageHandled()); 145 146 pub1.close(); 147 } 148 catch (Throwable t) 149 { 150 log.error("Error in test: " + t, t); 151 throw new Exception (t.getMessage()); 152 } 153 } 154 155 159 public void runDurableSubscriberPartTwo() throws Exception 160 { 161 try 162 { 163 int ic = getIterationCount(); 164 IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message .class, 166 "DURABLE_NR", 167 0, 168 ic / 2); 169 170 TopicWorker sub1 = new TopicWorker(SUBSCRIBER, 171 TRANS_NONE, 172 f1); 173 sub1.setDurable("john", "needle", "sub1"); 174 175 Thread t1 = new Thread (sub1); 177 t1.start(); 178 sleep(5000L); 179 TopicWorker sub2 = new PiggyBackWorker(sub1); 180 sub2.setSubscriberAttrs(SUBSCRIBER, 181 TRANS_NONE, 182 f1); 183 sub2.setDurable("john", "needle", "sub2"); 184 Thread t2 = new Thread (sub2); 185 t2.start(); 186 187 log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes"); 188 sleep(ic * 10); 189 Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), ic / 2, 190 sub1.getMessageHandled()); 191 Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), ic / 2, 192 sub2.getMessageHandled()); 193 194 sub1.unsubscribe(); 196 sub2.unsubscribe(); 197 sub1.close(); 198 t1.interrupt(); 199 sub2.close(); 200 t2.interrupt(); 201 } 202 catch (Throwable t) 203 { 204 log.error("Error in test: " + t, t); 205 throw new Exception (t.getMessage()); 206 } 207 } 208 209 public void testDurableSubscriber() throws Exception 210 { 211 runDurableSubscriberPartOne(); 212 runDurableSubscriberPartTwo(); 213 } 214 215 public static void main(String [] args) 216 { 217 218 } 219 220 } | Popular Tags |