1 22 package org.jboss.test.jbossmessaging.test; 23 24 import java.util.Random ; 25 26 import javax.jms.JMSException ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageListener ; 30 import javax.jms.MessageProducer ; 31 import javax.jms.Queue ; 32 import javax.jms.QueueConnection ; 33 import javax.jms.QueueConnectionFactory ; 34 import javax.jms.QueueSession ; 35 import javax.jms.Session ; 36 import javax.naming.Context ; 37 38 import junit.framework.Test; 39 40 import org.jboss.test.jbossmessaging.JMSTestCase; 41 42 49 public class SessionCloseStressTestCase extends JMSTestCase 50 { 51 static String QUEUE_FACTORY = "ConnectionFactory"; 52 static String QUEUE = "queue/testQueue"; 53 54 QueueConnection queueConnection; 55 Queue queue; 56 57 public SessionCloseStressTestCase(String name) throws Exception 58 { 59 super(name); 60 } 61 62 public abstract class TestRunnable implements Runnable 63 { 64 public Throwable error = null; 65 66 public abstract void doRun() throws Exception ; 67 68 public void run() 69 { 70 try 71 { 72 doRun(); 73 } 74 catch (Throwable t) 75 { 76 log.error("Error in " + Thread.currentThread(), t); 77 error = t; 78 } 79 } 80 } 81 82 public class SessionRunnable extends TestRunnable 83 { 84 MessageConsumer consumer; 85 86 int received = 0; 87 88 public void doRun() throws Exception 89 { 90 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 91 MessageProducer producer = session.createProducer(queue); 92 for (int i = 0; i < getIterationCount(); ++i) 93 { 94 Message message = session.createTextMessage("" + i); 95 producer.send(message); 96 } 97 producer.close(); 98 consumer = session.createConsumer(queue); 99 waitForMessages(); 100 session.close(); 101 } 102 103 public synchronized MessageConsumer getConsumer() throws Exception 104 { 105 while (true) 106 { 107 if (consumer != null) 108 return consumer; 109 wait(); 110 } 111 } 112 113 public synchronized void incReceived() 114 { 115 ++received; 116 notifyAll(); 117 } 118 119 public synchronized void waitForMessages() throws Exception 120 { 121 notifyAll(); 122 int target = new Random ().nextInt(getIterationCount()); 123 while (received < target) 124 wait(); 125 } 126 } 127 128 public class ReceiverRunnable extends TestRunnable 129 { 130 SessionRunnable sessionRunnable; 131 132 public ReceiverRunnable(SessionRunnable sessionRunnable) 133 { 134 this.sessionRunnable = sessionRunnable; 135 } 136 137 public void doRun() throws Exception 138 { 139 MessageConsumer consumer = sessionRunnable.getConsumer(); 140 try 141 { 142 while (true) 143 { 144 consumer.receive(); 145 sessionRunnable.incReceived(); 146 } 147 } 148 catch (JMSException expected) 149 { 150 if (expected.getMessage().indexOf("closed") == -1) 151 throw expected; 152 } 153 } 154 } 155 156 public class ReceiverNoWaitRunnable extends TestRunnable 157 { 158 SessionRunnable sessionRunnable; 159 160 public ReceiverNoWaitRunnable(SessionRunnable sessionRunnable) 161 { 162 this.sessionRunnable = sessionRunnable; 163 } 164 165 public void doRun() throws Exception 166 { 167 MessageConsumer consumer = sessionRunnable.getConsumer(); 168 try 169 { 170 while (true) 171 { 172 if (consumer.receiveNoWait() != null) 173 sessionRunnable.incReceived(); 174 } 175 } 176 catch (JMSException expected) 177 { 178 if (expected.getMessage().indexOf("closed") == -1) 179 throw expected; 180 } 181 } 182 } 183 184 public class ReceiverMessageListenerRunnable extends TestRunnable implements MessageListener 185 { 186 SessionRunnable sessionRunnable; 187 188 public ReceiverMessageListenerRunnable(SessionRunnable sessionRunnable) 189 { 190 this.sessionRunnable = sessionRunnable; 191 } 192 193 public void onMessage(Message message) 194 { 195 sessionRunnable.incReceived(); 196 } 197 198 public void doRun() throws Exception 199 { 200 MessageConsumer consumer = sessionRunnable.getConsumer(); 201 try 202 { 203 consumer.setMessageListener(this); 204 } 205 catch (JMSException expected) 206 { 207 if (expected.getMessage().indexOf("closed") == -1) 208 throw expected; 209 } 210 } 211 } 212 213 public void testSessionCloseCompetesWithReceive() throws Exception 214 { 215 connect(); 216 try 217 { 218 for (int i = 0; i < getThreadCount(); ++i) 219 { 220 SessionRunnable sessionRunnable = new SessionRunnable(); 221 Thread sessionThread = new Thread (sessionRunnable, "Session"); 222 Thread consumerThread = new Thread (new ReceiverRunnable(sessionRunnable), "Consumer"); 223 consumerThread.start(); 224 sessionThread.start(); 225 sessionThread.join(); 226 consumerThread.join(); 227 assertNull(sessionRunnable.error); 228 229 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 231 MessageConsumer consumer = session.createConsumer(queue); 232 while (consumer.receiveNoWait() != null); 233 session.close(); 234 } 235 } 236 finally 237 { 238 disconnect(); 239 } 240 } 241 242 public void testSessionCloseCompetesWithReceiveNoWait() throws Exception 243 { 244 connect(); 245 try 246 { 247 for (int i = 0; i < getThreadCount(); ++i) 248 { 249 SessionRunnable sessionRunnable = new SessionRunnable(); 250 Thread sessionThread = new Thread (sessionRunnable, "Session"); 251 Thread consumerThread = new Thread (new ReceiverNoWaitRunnable(sessionRunnable), "Consumer"); 252 consumerThread.start(); 253 sessionThread.start(); 254 sessionThread.join(); 255 consumerThread.join(); 256 assertNull(sessionRunnable.error); 257 258 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 260 MessageConsumer consumer = session.createConsumer(queue); 261 while (consumer.receiveNoWait() != null); 262 session.close(); 263 } 264 } 265 finally 266 { 267 disconnect(); 268 } 269 } 270 271 public void testSessionCloseCompetesWithMessageListener() throws Exception 272 { 273 connect(); 274 try 275 { 276 for (int i = 0; i < getThreadCount(); ++i) 277 { 278 SessionRunnable sessionRunnable = new SessionRunnable(); 279 Thread sessionThread = new Thread (sessionRunnable, "Session"); 280 Thread consumerThread = new Thread (new ReceiverMessageListenerRunnable(sessionRunnable), "Consumer"); 281 consumerThread.start(); 282 sessionThread.start(); 283 sessionThread.join(); 284 consumerThread.join(); 285 assertNull(sessionRunnable.error); 286 287 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 289 MessageConsumer consumer = session.createConsumer(queue); 290 while (consumer.receiveNoWait() != null); 291 session.close(); 292 } 293 } 294 finally 295 { 296 disconnect(); 297 } 298 } 299 300 protected void connect() throws Exception 301 { 302 Context context = getInitialContext(); 303 queue = (Queue ) context.lookup(QUEUE); 304 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 305 queueConnection = queueFactory.createQueueConnection(); 306 queueConnection.start(); 307 308 getLog().debug("Connection established."); 309 } 310 311 protected void disconnect() 312 { 313 try 314 { 315 if (queueConnection != null) 316 queueConnection.close(); 317 } 318 catch (Exception ignored) 319 { 320 } 321 322 getLog().debug("Connection closed."); 323 } 324 325 public static Test suite() throws Exception 326 { 327 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 328 String resourceName = getJMSResourceRelativePathname("tests-destinations-service.xml") ; 329 330 return getDeploySetup(SessionCloseStressTestCase.class, 331 loader.getResource(resourceName).toString()); 332 } 333 } 334 | Popular Tags |