1 22 package org.jboss.test.messagedriven.support; 23 24 import java.util.ArrayList ; 25 import java.util.HashMap ; 26 import java.util.Properties ; 27 28 import javax.jms.Connection ; 29 import javax.jms.ConnectionFactory ; 30 import javax.jms.Destination ; 31 import javax.jms.Message ; 32 import javax.jms.MessageProducer ; 33 import javax.jms.Session ; 34 import javax.management.ObjectName ; 35 36 import org.jboss.mx.util.ObjectNameFactory; 37 import org.jboss.naming.Util; 38 import org.jboss.test.JBossTestCase; 39 import org.jboss.test.messagedriven.mbeans.TestMessageDrivenManagementMBean; 40 41 47 public abstract class BasicMessageDrivenUnitTest extends JBossTestCase 48 { 49 protected static final long WAIT_TIME = 5000L; 50 protected static final long REPEATED_WAIT = 4; 51 52 protected static final ObjectName testQueue = ObjectNameFactory.create("jboss.mq.destination:service=Queue,name=testQueue"); 53 protected static final Properties testQueueProps = new Properties (); 54 55 protected static final ObjectName testTopic = ObjectNameFactory.create("jboss.mq.destination:service=Topic,name=testTopic"); 56 protected static final Properties testTopicProps = new Properties (); 57 58 protected static final ObjectName testDurableTopic = ObjectNameFactory.create("jboss.mq.destination:service=Topic,name=testDurableTopic"); 59 protected static final Properties testDurableTopicProps = new Properties (); 60 61 static 62 { 63 testQueueProps.put("destination", "queue/testQueue"); 64 testQueueProps.put("destinationType", "javax.jms.Queue"); 65 66 testTopicProps.put("destination", "topic/testTopic"); 67 testTopicProps.put("destinationType", "javax.jms.Topic"); 68 69 testDurableTopicProps.put("destination", "topic/testDurableTopic"); 70 testDurableTopicProps.put("destinationType", "javax.jms.Topic"); 71 testDurableTopicProps.put("durability", "Durable"); 73 testDurableTopicProps.put("subscriptionName", "messagedriven"); 74 testDurableTopicProps.put("user", "john"); 75 testDurableTopicProps.put("password", "needle"); 76 } 77 78 protected Thread thread; 79 protected boolean running = false; 80 81 protected String mdbjar = "testmessagedriven.jar"; 82 protected String mbeansar = "testmessagedriven.sar"; 83 84 protected ObjectName jmxDestination = ObjectNameFactory.create("does:not=exist"); 85 protected ObjectName dlqJMXDestination = ObjectNameFactory.create("jboss.mq.destination:service=Queue,name=DLQ"); 86 protected String connectionFactoryJNDI = "ConnectionFactory"; 87 protected Destination destination; 88 protected Destination dlqDestination; 89 protected Properties defaultProps; 90 protected Properties props; 91 92 protected Connection connection; 93 protected Session session; 94 protected HashMap producers = new HashMap (); 95 protected ArrayList messages = new ArrayList (); 96 97 public BasicMessageDrivenUnitTest(String name, ObjectName jmxDestination, Properties defaultProps) 98 { 99 super(name); 100 this.jmxDestination = jmxDestination; 101 this.defaultProps = defaultProps; 102 } 103 104 public void runTest(Operation[] ops, Properties props) throws Exception 105 { 106 startTest(props); 107 try 108 { 109 for (int i = 0; i < ops.length; ++i) 110 ops[i].run(); 111 } 112 finally 113 { 114 stopTest(); 115 } 116 } 117 118 public String getMDBDeployment() 119 { 120 return mdbjar; 121 } 122 123 public ObjectName getJMXDestination() 124 { 125 return jmxDestination; 126 } 127 128 public ObjectName getDLQJMXDestination() 129 { 130 return dlqJMXDestination; 131 } 132 133 public Destination getDestination() throws Exception 134 { 135 if (destination != null) 136 return destination; 137 String jndiName = (String ) getAttribute(getJMXDestination(), "JNDIName"); 138 destination = (Destination ) lookup(jndiName, Destination .class); 139 return destination; 140 } 141 142 public Destination getDLQDestination() throws Exception 143 { 144 if (dlqDestination != null) 145 return dlqDestination; 146 String jndiName = (String ) getAttribute(getDLQJMXDestination(), "JNDIName"); 147 dlqDestination = (Destination ) lookup(jndiName, Destination .class); 148 return dlqDestination; 149 } 150 151 public MessageProducer getMessageProducer() throws Exception 152 { 153 return getMessageProducer(getDestination()); 154 } 155 156 public MessageProducer getMessageProducer(Destination destination) throws Exception 157 { 158 MessageProducer producer = (MessageProducer ) producers.get(destination); 159 if (producer == null) 160 producer = getSession().createProducer(destination); 161 return producer; 162 } 163 164 public Session getSession() throws Exception 165 { 166 if (session != null) 167 return session; 168 169 return getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 170 } 171 172 public Connection getConnection() throws Exception 173 { 174 if (connection != null) 175 return connection; 176 177 ConnectionFactory factory = (ConnectionFactory ) lookup(connectionFactoryJNDI, ConnectionFactory .class); 178 connection = factory.createConnection(); 179 return connection; 180 } 181 182 public Connection getConnection(String user, String password) throws Exception 183 { 184 if (connection != null) 185 return connection; 186 187 ConnectionFactory factory = (ConnectionFactory ) lookup(connectionFactoryJNDI, ConnectionFactory .class); 188 connection = factory.createConnection(user, password); 189 return connection; 190 } 191 192 public Message getTestMessage() throws Exception 193 { 194 return getSession().createMessage(); 195 } 196 197 protected void setUp() throws Exception 198 { 199 if ("testServerFound".equals(getName())) 200 return; 201 deploy(mbeansar); 202 } 203 204 protected void tearDown() throws Exception 205 { 206 if ("testServerFound".equals(getName())) 207 return; 208 try 209 { 210 undeploy(mbeansar); 211 } 212 catch (Throwable t) 213 { 214 getLog().error("Error undeploying: " + mbeansar, t); 215 } 216 } 217 218 protected void startTest(Properties props) throws Exception 219 { 220 this.props = props; 221 clearMessages(getJMXDestination()); 222 clearMessages(getDLQJMXDestination()); 223 tidyup(props); 224 initProperties(props); 225 deploy(getMDBDeployment()); 226 try 227 { 228 startReceiverThread(); 229 } 230 catch (Exception e) 231 { 232 undeploy(getMDBDeployment()); 233 throw e; 234 } 235 } 236 237 protected void stopTest() 238 { 239 if (connection != null) 240 { 241 try 242 { 243 connection.close(); 244 } 245 catch (Exception ignored) 246 { 247 } 248 connection = null; 249 } 250 stopReceiverThread(); 251 try 252 { 253 undeploy(getMDBDeployment()); 254 } 255 catch (Throwable t) 256 { 257 getLog().error("Error undeploying: " + getMDBDeployment(), t); 258 } 259 try 260 { 261 clearMessages(getJMXDestination()); 262 tidyup(props); 263 } 264 catch (Throwable t) 265 { 266 getLog().error("Error clearing messages: " + getJMXDestination(), t); 267 } 268 try 269 { 270 clearMessages(getDLQJMXDestination()); 271 } 272 catch (Throwable t) 273 { 274 getLog().error("Error clearing messages: " + getDLQJMXDestination(), t); 275 } 276 } 277 278 protected void clearMessages(ObjectName name) throws Exception 279 { 280 if (name != null) 281 { 282 getLog().info("Clearing messages " + name); 283 getServer().invoke(name, "removeAllMessages", new Object [0], new String [0]); 284 } 285 } 286 287 protected void tidyup(Properties props) throws Exception 288 { 289 String name = props.getProperty("subscriptionName"); 290 if (name != null) 291 { 292 String user = props.getProperty("user"); 293 if (user != null) 294 { 295 String password = props.getProperty("password"); 296 getConnection(user, password); 297 } 298 else 299 getConnection(); 300 try 301 { 302 Session session = getSession(); 303 try 304 { 305 session.unsubscribe(name); 306 } 307 catch (Throwable t) 308 { 309 log.debug("Unsubscribe failed: ", t); 310 } 311 } 312 finally 313 { 314 try 315 { 316 connection.close(); 317 } 318 catch (Exception ignored) 319 { 320 } 321 connection = null; 322 } 323 } 324 } 325 326 protected void activate(ObjectName name) throws Exception 327 { 328 getServer().invoke(name, "startDelivery", new Object [0], new String [0]); 329 } 330 331 protected void deactivate(ObjectName name) throws Exception 332 { 333 getServer().invoke(name, "stopDelivery", new Object [0], new String [0]); 334 } 335 336 protected void initProperties(Properties props) throws Exception 337 { 338 getLog().info("Init properties " + props); 339 getServer().invoke(TestMessageDrivenManagementMBean.OBJECT_NAME, "initProperties", new Object [] { props }, new String [] { Properties .class.getName() }); 340 } 341 342 protected void waitMessages(int expected, long wait) throws Exception 343 { 344 synchronized (this) 345 { 346 if (wait != 0) 347 wait(wait); 348 349 for (int i = 0; i < REPEATED_WAIT && messages.size() < expected; ++i) 350 wait(WAIT_TIME); 351 } 352 } 353 354 protected ArrayList getMessages() throws Exception 355 { 356 synchronized (this) 357 { 358 return new ArrayList (messages); 359 } 360 } 361 362 protected void startReceiverThread() 363 { 364 synchronized (this) 365 { 366 thread = new Thread (new ReceiverRunnable(), getClass().getName()); 367 thread.start(); 368 running = true; 369 } 370 } 371 372 protected void stopReceiverThread() 373 { 374 synchronized (this) 375 { 376 running = false; 377 while (thread != null) 378 { 379 try 380 { 381 this.notifyAll(); 382 this.wait(); 383 } 384 catch (Throwable t) 385 { 386 getLog().error("Error waiting for receiver thread to stop " + thread, t); 387 } 388 } 389 } 390 } 391 392 protected Object getAttribute(ObjectName name, String attribute) throws Exception 393 { 394 return getServer().getAttribute(name, attribute); 395 } 396 397 protected Object lookup(String jndiName, Class clazz) throws Exception 398 { 399 return Util.lookup(getInitialContext(), jndiName, clazz); 400 } 401 402 public class ReceiverRunnable implements Runnable 403 { 404 public void run() 405 { 406 try 407 { 408 while (true) 409 { 410 ArrayList result = (ArrayList ) getAttribute(TestMessageDrivenManagementMBean.OBJECT_NAME, "Messages"); 411 synchronized (BasicMessageDrivenUnitTest.this) 412 { 413 if (running == false) 414 break; 415 if (result.size() > 0) 416 { 417 messages.addAll(result); 418 BasicMessageDrivenUnitTest.this.notifyAll(); 419 } 420 BasicMessageDrivenUnitTest.this.wait(WAIT_TIME); 421 } 422 } 423 } 424 catch (Throwable t) 425 { 426 getLog().error("Error in receiver thread " + thread, t); 427 } 428 429 synchronized (BasicMessageDrivenUnitTest.this) 430 { 431 thread = null; 432 BasicMessageDrivenUnitTest.this.notifyAll(); 433 } 434 } 435 } 436 } 437 | Popular Tags |