1 22 package org.jboss.test.jbossmq.test; 23 24 import javax.jms.Message ; 25 import javax.jms.MessageListener ; 26 import javax.jms.QueueConnection ; 27 import javax.jms.QueueConnectionFactory ; 28 import javax.jms.QueueReceiver ; 29 import javax.jms.QueueSender ; 30 import javax.jms.QueueSession ; 31 import javax.jms.Session ; 32 import javax.jms.TemporaryQueue ; 33 import javax.naming.Context ; 34 35 import org.jboss.test.JBossTestCase; 36 37 43 public class ConcurrentDeliveryUnitTestCase extends JBossTestCase 44 { 45 static String QUEUE_FACTORY = "ConnectionFactory"; 46 47 QueueConnection queueConnection; 48 49 int completed = 0; 50 boolean inDelivery = false; 51 boolean concurrent = false; 52 53 public ConcurrentDeliveryUnitTestCase(String name) throws Exception 54 { 55 super(name); 56 } 57 58 public void testConcurrentDelivery() throws Exception 59 { 60 connect(); 61 try 62 { 63 MyMessageListener messageListener = new MyMessageListener(); 64 65 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 66 TemporaryQueue queue1 = session.createTemporaryQueue(); 67 QueueSender sender1 = session.createSender(queue1); 68 QueueReceiver receiver1 = session.createReceiver(queue1); 69 receiver1.setMessageListener(messageListener); 70 TemporaryQueue queue2 = session.createTemporaryQueue(); 71 QueueSender sender2 = session.createSender(queue2); 72 QueueReceiver receiver2 = session.createReceiver(queue2); 73 receiver2.setMessageListener(messageListener); 74 Message message = session.createMessage(); 75 queueConnection.start(); 76 77 sender1.send(message); 78 sender2.send(message); 79 80 synchronized (messageListener) 81 { 82 while (completed < 2) 83 { 84 getLog().debug("Waiting for completion " + completed); 85 messageListener.wait(); 86 } 87 } 88 getLog().debug("Completed"); 89 90 if (concurrent) 91 fail("Concurrent delivery"); 92 } 93 finally 94 { 95 disconnect(); 96 } 97 } 98 99 protected void connect() throws Exception 100 { 101 Context context = getInitialContext(); 102 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 103 queueConnection = queueFactory.createQueueConnection(); 104 105 getLog().debug("Connection established."); 106 } 107 108 protected void disconnect() 109 { 110 try 111 { 112 if (queueConnection != null) 113 queueConnection.close(); 114 } 115 catch (Throwable ignored) 116 { 117 getLog().warn("Ignored", ignored); 118 } 119 120 getLog().debug("Connection closed."); 121 } 122 123 public class MyMessageListener implements MessageListener 124 { 125 public void onMessage(Message message) 126 { 127 synchronized (this) 128 { 129 if (inDelivery) 130 concurrent = true; 131 inDelivery = true; 132 getLog().debug("In delivery " + message); 133 } 134 135 try 136 { 137 Thread.sleep(10000); 138 } 139 catch (Throwable ignored) 140 { 141 getLog().warn("Ignored ", ignored); 142 } 143 144 synchronized (this) 145 { 146 inDelivery = false; 147 ++completed; 148 notifyAll(); 149 getLog().debug("Completed " + message); 150 } 151 } 152 } 153 } 154 155 | Popular Tags |