1 22 package org.jboss.test.jbossmessaging.perf; 23 24 import java.util.ArrayList ; 25 import java.util.Iterator ; 26 import javax.jms.Message ; 27 import javax.jms.MessageListener ; 28 import javax.jms.QueueConnection ; 29 import javax.jms.QueueConnectionFactory ; 30 import javax.jms.QueueReceiver ; 31 import javax.jms.QueueSender ; 32 import javax.jms.QueueSession ; 33 import javax.jms.Session ; 34 import javax.jms.TemporaryQueue ; 35 import javax.jms.Topic ; 36 import javax.jms.TopicConnection ; 37 import javax.jms.TopicConnectionFactory ; 38 import javax.jms.TopicPublisher ; 39 import javax.jms.TopicSession ; 40 import javax.jms.TopicSubscriber ; 41 import javax.jms.Queue ; 42 import javax.naming.Context ; 43 44 import org.jboss.test.jbossmessaging.JMSTestCase; 45 46 54 public class SendReplyPerfStressTestCase extends JMSTestCase 55 { 56 static String TOPIC_FACTORY = "ConnectionFactory"; 58 static String QUEUE_FACTORY = "ConnectionFactory"; 59 60 static String TEST_QUEUE = "queue/testQueue"; 61 static String TEST_TOPIC = "topic/testTopic"; 62 63 static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10]; 64 65 static Context context; 67 static QueueConnection queueConnection; 68 static TopicConnection topicConnection; 69 70 public SendReplyPerfStressTestCase(String name) throws Exception 71 { 72 super(name); 73 } 74 75 80 public static void main(String [] args) 81 { 82 83 String newArgs[] = {"org.jboss.test.jbossmessaging.perf.SendReplyPerfStressTestCase"}; 84 junit.swingui.TestRunner.main(newArgs); 85 } 86 87 public static class State 88 { 89 public int expected; 90 public int finished = 0; 91 public ArrayList errors = new ArrayList (); 92 public State(int expected) 93 { 94 this.expected = expected; 95 } 96 public synchronized void addError(Throwable t) 97 { 98 errors.add(t); 99 } 100 public synchronized void finished() 101 { 102 ++finished; 103 if (finished == expected) 104 notifyAll(); 105 } 106 public synchronized void waitForFinish() throws Exception 107 { 108 if (finished == expected) 109 return; 110 wait(); 111 } 112 } 113 114 public static class MessageQueueSender 115 implements Runnable 116 { 117 State state; 118 public MessageQueueSender(State state) 119 { 120 this.state = state; 121 } 122 123 public void run() 124 { 125 try 126 { 127 Queue queue = (Queue )context.lookup(TEST_QUEUE); 128 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 129 TemporaryQueue temp = session.createTemporaryQueue(); 130 Message message = session.createTextMessage(); 131 message.setJMSReplyTo(temp); 132 133 QueueSender sender = session.createSender(queue); 134 sender.send(message); 135 136 QueueReceiver receiver = session.createReceiver(temp); 137 receiver.receive(); 138 receiver.close(); 139 temp.delete(); 140 141 session.close(); 142 } 143 catch (Throwable t) 144 { 145 state.addError(t); 146 } 147 finally 148 { 149 state.finished(); 150 } 151 } 152 } 153 154 public static class MessageTopicSender 155 implements Runnable 156 { 157 State state; 158 public MessageTopicSender(State state) 159 { 160 this.state = state; 161 } 162 163 public void run() 164 { 165 try 166 { 167 Topic topic = (Topic )context.lookup(TEST_TOPIC); 168 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 169 Message message = session.createTextMessage(); 170 171 QueueSession qsession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 172 TemporaryQueue temp = qsession.createTemporaryQueue(); 173 message.setJMSReplyTo(temp); 174 175 TopicPublisher publisher = session.createPublisher(topic); 176 publisher.publish(message); 177 178 QueueReceiver receiver = qsession.createReceiver(temp); 179 receiver.receive(); 180 receiver.close(); 181 182 session.close(); 183 } 184 catch (Throwable t) 185 { 186 state.addError(t); 187 } 188 finally 189 { 190 state.finished(); 191 } 192 } 193 } 194 195 public static class MessageReplier 196 implements MessageListener 197 { 198 State state; 199 public MessageReplier(State state) 200 { 201 this.state = state; 202 } 203 public void onMessage(Message message) 204 { 205 try 206 { 207 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 208 Queue replyQueue = session.createQueue(((Queue )message.getJMSReplyTo()).getQueueName()); 209 QueueSender sender = session.createSender(replyQueue); 210 sender.send(message); 211 sender.close(); 212 session.close(); 213 } 214 catch (Throwable t) 215 { 216 state.addError(t); 217 } 218 } 219 } 220 221 public void testSendReplyQueue() throws Exception 222 { 223 drainQueue(); 224 225 State state = new State(getThreadCount()); 227 MessageReplier replier = new MessageReplier(state); 228 Thread [] threads = new Thread [getThreadCount()]; 229 for (int i = 0; i < threads.length; ++i) 230 threads[i] = new Thread (new MessageQueueSender(state)); 231 232 Queue queue = (Queue )context.lookup(TEST_QUEUE); 234 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 235 QueueReceiver receiver = session.createReceiver(queue); 236 receiver.setMessageListener(replier); 237 queueConnection.start(); 238 239 for (int i = 0; i < threads.length; ++i) 241 threads[i].start(); 242 243 state.waitForFinish(); 245 246 for (Iterator i = state.errors.iterator(); i.hasNext();) 248 getLog().error("Error", (Throwable ) i.next()); 249 if (state.errors.size() > 0) 250 throw new RuntimeException ("Test failed with " + state.errors.size() + " errors"); 251 } 252 253 public void testSendReplyTopic() throws Exception 254 { 255 State state = new State(getThreadCount()); 257 MessageReplier replier = new MessageReplier(state); 258 259 Thread [] threads = new Thread [getThreadCount()]; 260 for (int i = 0; i < threads.length; ++i) 261 threads[i] = new Thread (new MessageTopicSender(state)); 262 263 264 Topic topic = (Topic )context.lookup(TEST_TOPIC); 266 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 267 TopicSubscriber subscriber = session.createSubscriber(topic); 268 subscriber.setMessageListener(replier); 269 topicConnection.start(); 270 queueConnection.start(); 271 272 for (int i = 0; i < threads.length; ++i) 274 threads[i].start(); 275 276 state.waitForFinish(); 278 279 for (Iterator i = state.errors.iterator(); i.hasNext();) 281 getLog().error("Error", (Throwable ) i.next()); 282 if (state.errors.size() > 0) 283 throw new RuntimeException ("Test failed with " + state.errors.size() + " errors"); 284 } 285 286 protected void setUp() throws Exception 287 { 288 super.setUp() ; 290 291 getLog().info("Starting test: " + getName()); 292 293 context = getInitialContext(); 294 295 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 296 queueConnection = queueFactory.createQueueConnection(); 297 298 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 299 topicConnection = topicFactory.createTopicConnection(); 300 301 getLog().debug("Connection to JMS provider established."); 302 } 303 304 protected void tearDown() throws Exception 305 { 306 getLog().info("Ended test: " + getName()); 307 queueConnection.close(); 308 topicConnection.close(); 309 310 super.tearDown() ; 312 } 313 314 private void drainQueue() throws Exception 315 { 316 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 317 Queue queue = (Queue )context.lookup(TEST_QUEUE); 318 319 QueueReceiver receiver = session.createReceiver(queue); 320 queueConnection.start(); 321 Message message = receiver.receive(50); 322 int c = 0; 323 while (message != null) 324 { 325 message = receiver.receive(50); 326 c++; 327 } 328 329 if (c != 0) 330 getLog().debug(" Drained " + c + " messages from the queue"); 331 session.close(); 332 queueConnection.stop(); 333 334 } 335 336 public static junit.framework.Test suite() throws Exception 337 { 338 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 339 String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ; 340 341 return getDeploySetup(SendReplyPerfStressTestCase.class, 342 loader.getResource(resourceName).toString()); 343 } 344 345 } 346 | Popular Tags |