1 22 package org.jboss.test.jbossmq.stress; 23 24 import javax.jms.Message ; 25 import javax.jms.BytesMessage ; 26 import javax.jms.JMSException ; 27 28 import junit.framework.TestSuite; 29 import junit.framework.Assert; 30 31 import org.jboss.test.jbossmq.MQBase; 32 33 37 38 public class MassiveTest extends MQBase 39 { 40 static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024]; 41 42 public MassiveTest(String name) 43 { 44 super(name); 45 } 46 47 50 public void runMassiveTest() throws Exception 51 { 52 drainTopic(); 54 55 int ic = getIterationCount(); 56 57 IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.BytesMessage .class, 59 "MASSIVE_NR", 60 0, 61 ic); 62 63 TopicWorker sub1 = new TopicWorker(SUBSCRIBER, 64 TRANS_NONE, 65 f1); 66 Thread t1 = new Thread (sub1); 67 t1.start(); 68 69 ByteIntRangeMessageCreator c1 = new ByteIntRangeMessageCreator("MASSIVE_NR", 71 0); 72 TopicWorker pub1 = new TopicWorker(PUBLISHER, 73 TRANS_NONE, 74 c1, 75 ic); 76 pub1.connect(); 77 pub1.publish(); 78 79 Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), 80 ic, 81 pub1.getMessageHandled()); 82 83 log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes"); 84 try 86 { 87 Thread.sleep(ic * 10); 88 } 89 catch (InterruptedException e) 90 { 91 } 92 log.debug("Woke up"); 93 Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), 94 ic, 95 sub1.getMessageHandled()); 96 97 sub1.close(); 99 t1.interrupt(); 100 pub1.close(); 101 } 102 103 public void runMassivTestFailingSub() throws Exception 104 { 105 drainTopic(); 107 108 int ic = getIterationCount(); 109 110 IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.BytesMessage .class, 112 "MASSIVE_NR", 113 0, 114 ic); 115 116 TopicWorker sub1 = new TopicWorker(SUBSCRIBER, 117 TRANS_NONE, 118 f1); 119 Thread t1 = new Thread (sub1); 120 t1.start(); 121 122 FailingSubWorker sub2 = new FailingSubWorker(); 124 sub2.setSubscriberAttrs(SUBSCRIBER, 125 TRANS_NONE, 126 f1); 127 Thread tf = new Thread (sub2); 128 tf.start(); 129 130 ByteIntRangeMessageCreator c1 = new ByteIntRangeMessageCreator("MASSIVE_NR", 132 0); 133 TopicWorker pub1 = new TopicWorker(PUBLISHER, 134 TRANS_NONE, 135 c1, 136 ic); 137 pub1.connect(); 138 pub1.publish(); 139 140 Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), 141 ic, 142 pub1.getMessageHandled()); 143 144 log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes"); 145 try 147 { 148 Thread.sleep(ic * 10); 149 } 150 catch (InterruptedException e) 151 { 152 } 153 log.debug("Woke up"); 154 Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), 155 ic, 156 sub1.getMessageHandled()); 157 158 sub1.close(); 160 t1.interrupt(); 161 pub1.close(); 162 sub2.setStoped(); 163 tf.interrupt(); 164 tf.interrupt(); 165 sub2.close(); 166 } 167 168 public static junit.framework.Test suite() throws Exception 169 { 170 171 TestSuite suite = new TestSuite(); 172 suite.addTest(new MassiveTest("runMassiveTest")); 173 174 return suite; 176 } 177 178 public static void main(String [] args) 179 { 180 try 181 { 182 MassiveTest t = new MassiveTest("runMassiveTest"); 183 t.setUp(); 184 t.runMassiveTest(); 185 } 186 catch (Exception ex) 187 { 188 System.err.println("Ex: " + ex); 189 ex.printStackTrace(); 190 } 191 192 } 193 194 public class ByteIntRangeMessageCreator extends IntRangeMessageCreator 195 { 196 int start = 0; 197 198 public ByteIntRangeMessageCreator(String property) 199 { 200 super(property); 201 } 202 203 public ByteIntRangeMessageCreator(String property, int start) 204 { 205 super(property); 206 this.start = start; 207 } 208 209 public Message createMessage(int nr) throws JMSException 210 { 211 if (session == null) 212 throw new JMSException ("Session not allowed to be null"); 213 214 BytesMessage msg = session.createBytesMessage(); 215 msg.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD); 216 msg.setStringProperty(property, String.valueOf(start + nr)); 217 return msg; 218 } 219 } 220 221 public class FailingSubWorker extends TopicWorker 222 { 223 int check = 0; 224 225 public void onMessage(Message msg) 227 { 228 check++; 229 if (check > 1) 230 log.warn("Got called while asleep!! " + check); 231 while (!stopRequested) 232 { 233 sleep(2000); 234 } 235 } 236 } 237 } 239 240 241 242 243 244 245 246 247 248 | Popular Tags |