1 22 package org.jboss.test.jbossmq.test; 23 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageProducer ; 26 import javax.jms.ObjectMessage ; 27 import javax.jms.Queue ; 28 import javax.jms.QueueConnection ; 29 import javax.jms.QueueConnectionFactory ; 30 import javax.jms.QueueSession ; 31 import javax.jms.Session ; 32 import javax.naming.Context ; 33 34 import org.jboss.test.JBossTestCase; 35 36 42 public abstract class ReceiversImplStressTest extends JBossTestCase 43 { 44 static String QUEUE_FACTORY = "ConnectionFactory"; 45 static String QUEUE = "queue/ReceiversImpl"; 46 static int messages = 0; 47 48 QueueConnection queueConnection; 49 Queue queue; 50 51 public ReceiversImplStressTest(String name) throws Exception 52 { 53 super(name); 54 messages = getThreadCount() * getBeanCount(); 55 } 56 57 public abstract class TestRunnable implements Runnable 58 { 59 public Throwable error = null; 60 61 public abstract void doRun() throws Exception ; 62 63 public void run() 64 { 65 try 66 { 67 doRun(); 68 } 69 catch (Throwable t) 70 { 71 log.error("Error in " + Thread.currentThread(), t); 72 error = t; 73 } 74 } 75 } 76 77 public class ReceiverRunnable extends TestRunnable 78 { 79 int integer; 80 81 MessageConsumer consumer; 82 83 public ReceiverRunnable(int integer) throws Exception 84 { 85 this.integer = integer; 86 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 87 consumer = session.createConsumer(queue); 88 } 89 90 public void doRun() throws Exception 91 { 92 int count = getBeanCount(); 93 while (count > 0) 94 { 95 consumer.receive(); 96 --count; 97 } 98 } 99 } 100 101 public class ReceiverNoWaitRunnable extends TestRunnable 102 { 103 int integer; 104 105 MessageConsumer consumer; 106 107 public ReceiverNoWaitRunnable(int integer) throws Exception 108 { 109 this.integer = integer; 110 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 111 consumer = session.createConsumer(queue); 112 } 113 114 public void doRun() throws Exception 115 { 116 int count = getBeanCount(); 117 while (count > 0) 118 { 119 if (consumer.receiveNoWait() != null) 120 --count; 121 } 122 } 123 } 124 125 public void testReceiversImplReceive() throws Exception 126 { 127 connect(); 128 try 129 { 130 ReceiverRunnable[] receivers = new ReceiverRunnable[getThreadCount()]; 131 Thread [] consumerThreads = new Thread [getThreadCount()]; 132 for (int i = 0; i < consumerThreads.length; ++i) 133 { 134 receivers[i] = new ReceiverRunnable(i); 135 consumerThreads[i] = new Thread (receivers[i], "Consumer" + i); 136 } 137 for (int i = 0; i < consumerThreads.length; ++i) 138 consumerThreads[i].start(); 139 140 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 141 MessageProducer producer = session.createProducer(queue); 142 for (int i = 0; i < messages; ++i) 143 { 144 ObjectMessage message = session.createObjectMessage(new Integer (i)); 145 producer.send(message); 146 } 147 148 149 for (int i = 0; i < consumerThreads.length; ++i) 150 consumerThreads[i].join(); 151 for (int i = 0; i < consumerThreads.length; ++i) 152 assertNull(receivers[i].error); 153 154 MessageConsumer consumer = session.createConsumer(queue); 156 while (consumer.receiveNoWait() != null); 157 } 158 finally 159 { 160 disconnect(); 161 } 162 } 163 164 public void testReceiversImplReceiveNoWait() throws Exception 165 { 166 connect(); 167 try 168 { 169 ReceiverNoWaitRunnable[] receivers = new ReceiverNoWaitRunnable[getThreadCount()]; 170 Thread [] consumerThreads = new Thread [getThreadCount()]; 171 for (int i = 0; i < consumerThreads.length; ++i) 172 { 173 receivers[i] = new ReceiverNoWaitRunnable(i); 174 consumerThreads[i] = new Thread (receivers[i], "Consumer" + i); 175 } 176 for (int i = 0; i < consumerThreads.length; ++i) 177 consumerThreads[i].start(); 178 179 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 180 MessageProducer producer = session.createProducer(queue); 181 for (int i = 0; i < messages; ++i) 182 { 183 ObjectMessage message = session.createObjectMessage(new Integer (i)); 184 producer.send(message); 185 } 186 187 188 for (int i = 0; i < consumerThreads.length; ++i) 189 consumerThreads[i].join(); 190 for (int i = 0; i < consumerThreads.length; ++i) 191 assertNull(receivers[i].error); 192 193 MessageConsumer consumer = session.createConsumer(queue); 195 while (consumer.receiveNoWait() != null); 196 } 197 finally 198 { 199 disconnect(); 200 } 201 } 202 203 protected void connect() throws Exception 204 { 205 Context context = getInitialContext(); 206 queue = (Queue ) context.lookup(QUEUE); 207 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(QUEUE_FACTORY); 208 queueConnection = queueFactory.createQueueConnection(); 209 queueConnection.start(); 210 211 getLog().debug("Connection established."); 212 } 213 214 protected void disconnect() 215 { 216 try 217 { 218 if (queueConnection != null) 219 queueConnection.close(); 220 } 221 catch (Exception ignored) 222 { 223 } 224 225 getLog().debug("Connection closed."); 226 } 227 } 228 | Popular Tags |