1 22 package org.jboss.test.jbossmq.perf; 23 import java.util.ArrayList ; 24 import java.util.Iterator ; 25 import javax.jms.Message ; 26 import javax.jms.MessageListener ; 27 import javax.jms.QueueConnection ; 28 import javax.jms.QueueConnectionFactory ; 29 import javax.jms.QueueReceiver ; 30 import javax.jms.QueueSender ; 31 import javax.jms.QueueSession ; 32 import javax.jms.Session ; 33 import javax.jms.TemporaryQueue ; 34 import javax.jms.Topic ; 35 import javax.jms.TopicConnection ; 36 import javax.jms.TopicConnectionFactory ; 37 import javax.jms.TopicPublisher ; 38 import javax.jms.TopicSession ; 39 import javax.jms.TopicSubscriber ; 40 import javax.jms.Queue ; 41 import javax.naming.Context ; 42 43 import org.jboss.test.JBossTestCase; 44 45 51 public class SendReplyPerfStressTestCase extends JBossTestCase 52 { 53 static String TOPIC_FACTORY = "ConnectionFactory"; 55 static String QUEUE_FACTORY = "ConnectionFactory"; 56 57 static String TEST_QUEUE = "queue/testQueue"; 58 static String TEST_TOPIC = "topic/testTopic"; 59 60 static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10]; 61 62 static Context context; 64 static QueueConnection queueConnection; 65 static TopicConnection topicConnection; 66 67 public SendReplyPerfStressTestCase(String name) throws Exception 68 { 69 super(name); 70 } 71 72 77 public static void main(String [] args) 78 { 79 80 String newArgs[] = {"org.jboss.test.jbossmq.perf.SendReplyPerfStressTestCase"}; 81 junit.swingui.TestRunner.main(newArgs); 82 } 83 84 public static class State 85 { 86 public int expected; 87 public int finished = 0; 88 public ArrayList errors = new ArrayList (); 89 public State(int expected) 90 { 91 this.expected = expected; 92 } 93 public synchronized void addError(Throwable t) 94 { 95 errors.add(t); 96 } 97 public synchronized void finished() 98 { 99 ++finished; 100 if (finished == expected) 101 notifyAll(); 102 } 103 public synchronized void waitForFinish() throws Exception 104 { 105 if (finished == expected) 106 return; 107 wait(); 108 } 109 } 110 111 public static class MessageQueueSender 112 implements Runnable 113 { 114 State state; 115 public MessageQueueSender(State state) 116 { 117 this.state = state; 118 } 119 120 public void run() 121 { 122 try 123 { 124 Queue queue = (Queue )context.lookup(TEST_QUEUE); 125 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 126 TemporaryQueue temp = session.createTemporaryQueue(); 127 Message message = session.createTextMessage(); 128 message.setJMSReplyTo(temp); 129 130 QueueSender sender = session.createSender(queue); 131 sender.send(message); 132 133 QueueReceiver receiver = session.createReceiver(temp); 134 receiver.receive(); 135 receiver.close(); 136 temp.delete(); 137 138 session.close(); 139 } 140 catch (Throwable t) 141 { 142 state.addError(t); 143 } 144 finally 145 { 146 state.finished(); 147 } 148 } 149 } 150 151 public static class MessageTopicSender 152 implements Runnable 153 { 154 State state; 155 public MessageTopicSender(State state) 156 { 157 this.state = state; 158 } 159 160 public void run() 161 { 162 try 163 { 164 Topic topic = (Topic )context.lookup(TEST_TOPIC); 165 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 166 Message message = session.createTextMessage(); 167 168 QueueSession qsession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 169 TemporaryQueue temp = qsession.createTemporaryQueue(); 170 message.setJMSReplyTo(temp); 171 172 TopicPublisher publisher = session.createPublisher(topic); 173 publisher.publish(message); 174 175 QueueReceiver receiver = qsession.createReceiver(temp); 176 receiver.receive(); 177 receiver.close(); 178 179 session.close(); 180 } 181 catch (Throwable t) 182 { 183 state.addError(t); 184 } 185 finally 186 { 187 state.finished(); 188 } 189 } 190 } 191 192 public static class MessageReplier 193 implements MessageListener 194 { 195 State state; 196 public MessageReplier(State state) 197 { 198 this.state = state; 199 } 200 public void onMessage(Message message) 201 { 202 try 203 { 204 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 205 Queue replyQueue = session.createQueue(((Queue )message.getJMSReplyTo()).getQueueName()); 206 QueueSender sender = session.createSender(replyQueue); 207 sender.send(message); 208 sender.close(); 209 session.close(); 210 } 211 catch (Throwable t) 212 { 213 state.addError(t); 214 } 215 } 216 } 217 218 public void testSendReplyQueue() throws Exception 219 { 220 drainQueue(); 221 222 State state = new State(getThreadCount()); 224 MessageReplier replier = new MessageReplier(state); 225 Thread [] threads = new Thread [getThreadCount()]; 226 for (int i = 0; i < threads.length; ++i) 227 threads[i] = new Thread (new MessageQueueSender(state)); 228 229 Queue queue = (Queue )context.lookup(TEST_QUEUE); 231 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 232 QueueReceiver receiver = session.createReceiver(queue); 233 receiver.setMessageListener(replier); 234 queueConnection.start(); 235 236 for (int i = 0; i < threads.length; ++i) 238 threads[i].start(); 239 240 state.waitForFinish(); 242 243 for (Iterator i = state.errors.iterator(); i.hasNext();) 245 getLog().error("Error", (Throwable ) i.next()); 246 if (state.errors.size() > 0) 247 throw new RuntimeException ("Test failed with " + state.errors.size() + " errors"); 248 } 249 250 public void testSendReplyTopic() throws Exception 251 { 252 State state = new State(getThreadCount()); 254 MessageReplier replier = new MessageReplier(state); 255 256 Thread [] threads = new Thread [getThreadCount()]; 257 for (int i = 0; i < threads.length; ++i) 258 threads[i] = new Thread (new MessageTopicSender(state)); 259 260 261 Topic topic = (Topic )context.lookup(TEST_TOPIC); 263 TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 264 TopicSubscriber subscriber = session.createSubscriber(topic); 265 subscriber.setMessageListener(replier); 266 topicConnection.start(); 267 queueConnection.start(); 268 269 for (int i = 0; i < threads.length; ++i) 271 threads[i].start(); 272 273 state.waitForFinish(); 275 276 for (Iterator i = state.errors.iterator(); i.hasNext();) 278 getLog().error("Error", (Throwable ) i.next()); 279 if (state.errors.size() > 0) 280 throw new RuntimeException ("Test failed with " + state.errors.size() + " errors"); 281 } 282 283 protected void setUp() throws Exception 284 { 285 getLog().info("Starting test: " + getName()); 286 287 context = getInitialContext(); 288 289 QueueConnectionFactory queueFactory = (QueueConnectionFactory )context.lookup(QUEUE_FACTORY); 290 queueConnection = queueFactory.createQueueConnection(); 291 292 TopicConnectionFactory topicFactory = (TopicConnectionFactory )context.lookup(TOPIC_FACTORY); 293 topicConnection = topicFactory.createTopicConnection(); 294 295 getLog().debug("Connection to JBossMQ established."); 296 } 297 298 protected void tearDown() throws Exception 299 { 300 getLog().info("Ended test: " + getName()); 301 queueConnection.close(); 302 topicConnection.close(); 303 } 304 305 private void drainQueue() throws Exception 306 { 307 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 308 Queue queue = (Queue )context.lookup(TEST_QUEUE); 309 310 QueueReceiver receiver = session.createReceiver(queue); 311 queueConnection.start(); 312 Message message = receiver.receive(50); 313 int c = 0; 314 while (message != null) 315 { 316 message = receiver.receive(50); 317 c++; 318 } 319 320 if (c != 0) 321 getLog().debug(" Drained " + c + " messages from the queue"); 322 session.close(); 323 queueConnection.stop(); 324 325 } 326 327 public static junit.framework.Test suite() throws Exception 328 { 329 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 330 return getDeploySetup(SendReplyPerfStressTestCase.class, 331 loader.getResource("messaging/test-destinations-service.xml").toString()); 332 } 333 334 } 335 | Popular Tags |