1 22 package org.jboss.test.jbossmq.test; 23 24 import java.util.Random ; 25 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageListener ; 30 import javax.jms.MessageProducer ; 31 import javax.jms.Queue ; 32 import javax.jms.QueueConnection ; 33 import javax.jms.QueueConnectionFactory ; 34 import javax.jms.QueueSession ; 35 import javax.jms.Session ; 36 import javax.naming.Context ; 37 38 import junit.framework.Test; 39 40 import org.jboss.test.JBossTestCase; 41 42 48 public class SessionCloseStressTestCase extends JBossTestCase 49 { 50 static String QUEUE_FACTORY = "ConnectionFactory"; 51 static String QUEUE = "queue/testQueue"; 52 53 QueueConnection queueConnection; 54 Queue queue; 55 56 public SessionCloseStressTestCase(String name) throws Exception 57 { 58 super(name); 59 } 60 61 public abstract class TestRunnable implements Runnable 62 { 63 public Throwable error = null; 64 65 public abstract void doRun() throws Exception ; 66 67 public void run() 68 { 69 try 70 { 71 doRun(); 72 } 73 catch (Throwable t) 74 { 75 log.error("Error in " + Thread.currentThread(), t); 76 error = t; 77 } 78 } 79 } 80 81 public class SessionRunnable extends TestRunnable 82 { 83 MessageConsumer consumer; 84 85 int received = 0; 86 87 public void doRun() throws Exception 88 { 89 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 90 MessageProducer producer = session.createProducer(queue); 91 for (int i = 0; i < getIterationCount(); ++i) 92 { 93 Message message = session.createTextMessage("" + i); 94 producer.send(message); 95 } 96 producer.close(); 97 consumer = session.createConsumer(queue); 98 waitForMessages(); 99 session.close(); 100 } 101 102 public synchronized MessageConsumer getConsumer() throws Exception 103 { 104 while (true) 105 { 106 if (consumer != null) 107 return consumer; 108 wait(); 109 } 110 } 111 112 public synchronized void incReceived() 113 { 114 ++received; 115 notifyAll(); 116 } 117 118 public synchronized void waitForMessages() throws Exception 119 { 120 notifyAll(); 121 int target = new Random ().nextInt(getIterationCount()); 122 while (received < target) 123 wait(); 124 } 125 } 126 127 public class ReceiverRunnable extends TestRunnable 128 { 129 SessionRunnable sessionRunnable; 130 131 public ReceiverRunnable(SessionRunnable sessionRunnable) 132 { 133 this.sessionRunnable = sessionRunnable; 134 } 135 136 public void doRun() throws Exception 137 { 138 MessageConsumer consumer = sessionRunnable.getConsumer(); 139 try 140 { 141 while (true) 142 { 143 consumer.receive(); 144 sessionRunnable.incReceived(); 145 } 146 } 147 catch (JMSException expected) 148 { 149 if (expected.getMessage().indexOf("closed") == -1) 150 throw expected; 151 } 152 } 153 } 154 155 public class ReceiverNoWaitRunnable extends TestRunnable 156 { 157 SessionRunnable sessionRunnable; 158 159 public ReceiverNoWaitRunnable(SessionRunnable sessionRunnable) 160 { 161 this.sessionRunnable = sessionRunnable; 162 } 163 164 public void doRun() throws Exception 165 { 166 MessageConsumer consumer = sessionRunnable.getConsumer(); 167 try 168 { 169 while (true) 170 { 171 if (consumer.receiveNoWait() != null) 172 sessionRunnable.incReceived(); 173 } 174 } 175 catch (JMSException expected) 176 { 177 if (expected.getMessage().indexOf("closed") == -1) 178 throw expected; 179 } 180 } 181 } 182 183 public class ReceiverMessageListenerRunnable extends TestRunnable implements MessageListener 184 { 185 SessionRunnable sessionRunnable; 186 187 public ReceiverMessageListenerRunnable(SessionRunnable sessionRunnable) 188 { 189 this.sessionRunnable = sessionRunnable; 190 } 191 192 public void onMessage(Message message) 193 { 194 sessionRunnable.incReceived(); 195 } 196 197 public void doRun() throws Exception 198 { 199 MessageConsumer consumer = sessionRunnable.getConsumer(); 200 try 201 { 202 consumer.setMessageListener(this); 203 } 204 catch (JMSException expected) 205 { 206 if (expected.getMessage().indexOf("closed") == -1) 207 throw expected; 208 } 209 } 210 } 211 212 public void testSessionCloseCompetesWithReceive() throws Exception 213 { 214 connect(); 215 try 216 { 217 for (int i = 0; i < getThreadCount(); ++i) 218 { 219 SessionRunnable sessionRunnable = new SessionRunnable(); 220 Thread sessionThread = new Thread (sessionRunnable, "Session"); 221 Thread consumerThread = new Thread (new ReceiverRunnable(sessionRunnable), "Consumer"); 222 consumerThread.start(); 223 sessionThread.start(); 224 sessionThread.join(); 225 consumerThread.join(); 226 assertNull(sessionRunnable.error); 227 228 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 230 MessageConsumer consumer = session.createConsumer(queue); 231 while (consumer.receiveNoWait() != null); 232 session.close(); 233 } 234 } 235 finally 236 { 237 disconnect(); 238 } 239 } 240 241 public void testSessionCloseCompetesWithReceiveNoWait() throws Exception 242 { 243 connect(); 244 try 245 { 246 for (int i = 0; i < getThreadCount(); ++i) 247 { 248 SessionRunnable sessionRunnable = new SessionRunnable(); 249 Thread sessionThread = new Thread (sessionRunnable, "Session"); 250 Thread consumerThread = new Thread (new ReceiverNoWaitRunnable(sessionRunnable), "Consumer"); 251 consumerThread.start(); 252 sessionThread.start(); 253 sessionThread.join(); 254 consumerThread.join(); 255 assertNull(sessionRunnable.error); 256 257 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 259 MessageConsumer consumer = session.createConsumer(queue); 260 while (consumer.receiveNoWait() != null); 261 session.close(); 262 } 263 } 264 finally 265 { 266 disconnect(); 267 } 268 } 269 270 public void testSessionCloseCompetesWithMessageListener() throws Exception 271 { 272 connect(); 273 try 274 { 275 for (int i = 0; i < getThreadCount(); ++i) 276 { 277 SessionRunnable sessionRunnable = new SessionRunnable(); 278 Thread sessionThread = new Thread (sessionRunnable, "Session"); 279 Thread consumerThread = new Thread (new ReceiverMessageListenerRunnable(sessionRunnable), "Consumer"); 280 consumerThread.start(); 281 sessionThread.start(); 282 sessionThread.join(); 283 consumerThread.join(); 284 assertNull(sessionRunnable.error); 285 286 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 288 MessageConsumer consumer = session.createConsumer(queue); 289 while (consumer.receiveNoWait() != null); 290 session.close(); 291 } 292 } 293 finally 294 { 295 disconnect(); 296 } 297 } 298 299 protected void connect() throws Exception 300 { 301 Context context = getInitialContext(); 302 queue = (Queue ) context.lookup(QUEUE); 303 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 304 queueConnection = queueFactory.createQueueConnection(); 305 queueConnection.start(); 306 307 getLog().debug("Connection established."); 308 } 309 310 protected void disconnect() 311 { 312 try 313 { 314 if (queueConnection != null) 315 queueConnection.close(); 316 } 317 catch (Exception ignored) 318 { 319 } 320 321 getLog().debug("Connection closed."); 322 } 323 324 public static Test suite() throws Exception 325 { 326 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 327 return getDeploySetup(SessionCloseStressTestCase.class, 328 loader.getResource("messaging/test-destinations-service.xml").toString()); 329 } 330 } 331 | Popular Tags |