1 22 package org.jboss.test.jbossmq.perf; 23 24 import javax.jms.DeliveryMode ; 25 import javax.jms.Message ; 26 import javax.jms.Queue ; 27 import javax.jms.QueueConnection ; 28 import javax.jms.QueueConnectionFactory ; 29 import javax.jms.QueueReceiver ; 30 import javax.jms.QueueSender ; 31 import javax.jms.QueueSession ; 32 import javax.jms.Session ; 33 import javax.jms.TopicConnection ; 34 import javax.jms.TopicConnectionFactory ; 35 import javax.management.ObjectName ; 36 import javax.naming.Context ; 37 import javax.naming.InitialContext ; 38 39 import org.jboss.logging.Logger; 40 import org.jboss.test.JBossTestCase; 41 42 import EDU.oswego.cs.dl.util.concurrent.Semaphore; 43 44 50 51 public class InvocationLayerStressTest extends JBossTestCase 52 { 53 Context context; 54 QueueConnection queueConnection; 55 TopicConnection topicConnection; 56 static final int WORKER_COUNT = Integer.parseInt(System.getProperty("jbosstest.threadcount", "10")); 57 static final int MESSAGE_COUNT = Integer.parseInt(System.getProperty("jbosstest.iterationcount", "500")); 58 Semaphore exitSemaphore; 59 60 66 public InvocationLayerStressTest(String name) throws Exception 67 { 68 super(name); 69 } 70 71 72 public void createQueue(String name) 73 { 74 try 75 { 76 ObjectName objn = new ObjectName ("jboss.mq:service=DestinationManager"); 77 getServer().invoke(objn, "createQueue", new Object []{name, name}, new String []{String .class.getName(), String .class.getName()}); 78 } 79 catch (Exception e) 80 { 81 e.printStackTrace(); 82 } 83 } 84 85 public void createTopic(String name) 86 { 87 try 88 { 89 ObjectName objn = new ObjectName ("jboss.mq:service=DestinationManager"); 90 getServer().invoke(objn, "createTopic", new Object []{name, name}, new String []{String .class.getName(), String .class.getName()}); 91 } 92 catch (Exception e) 93 { 94 e.printStackTrace(); 95 } 96 } 97 98 public void deleteQueue(String name) 99 { 100 try 101 { 102 ObjectName objn = new ObjectName ("jboss.mq:service=DestinationManager"); 103 getServer().invoke(objn, "destroyQueue", new Object []{name}, new String []{String .class.getName()}); 104 } 105 catch (Exception e) 106 { 107 e.printStackTrace(); 108 } 109 } 110 111 public void deleteTopic(String name) 112 { 113 try 114 { 115 ObjectName objn = new ObjectName ("jboss.mq:service=DestinationManager"); 116 getServer().invoke(objn, "destroyTopic", new Object []{name}, new String []{String .class.getName()}); 117 } 118 catch (Exception e) 119 { 120 e.printStackTrace(); 121 } 122 } 123 124 protected void connect(String queueLoc, String topicLoc) throws Exception 125 { 126 context = new InitialContext (); 127 QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup(queueLoc); 128 queueConnection = queueFactory.createQueueConnection(); 129 130 TopicConnectionFactory topicFactory = (TopicConnectionFactory ) context.lookup(topicLoc); 131 topicConnection = topicFactory.createTopicConnection(); 132 } 133 134 protected void disconnect() throws Exception 135 { 136 queueConnection.close(); 137 topicConnection.close(); 138 } 139 140 141 class QueueWorker extends Thread 142 { 143 String queueName; 144 Throwable exception; 145 Object signal = new Object (); 146 Logger log = Logger.getLogger(QueueWorker.class); 147 148 QueueWorker(String queueName, String ilType) 149 { 150 super(queueName); 151 this.queueName = queueName; 152 this.log = Logger.getLogger("QueueWorker."+queueName+"."+ilType); 153 } 154 155 public void run() 156 { 157 log.info("QueueWorker Running: " + queueName); 158 159 try 160 { 161 work(); 162 } 163 catch (Throwable e) 164 { 165 exception = e; 166 log.error("Exception:", e); 167 } 168 169 log.debug("Notifying main thread: "); 171 exitSemaphore.release(); 172 173 log.info("QueueWorker Done: " + queueName); 174 } 175 176 void work() throws Exception 177 { 178 createQueue(queueName); 179 QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); 180 Queue queue = (Queue ) context.lookup(queueName); 181 182 QueueSender sender = session.createSender(queue); 184 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 185 Message message = session.createTextMessage("Test Message"); 186 for (int i = 0; i < MESSAGE_COUNT; i++) 187 { 188 sender.send(message); 189 log.debug("Sent message " + i + " to queue :" + queueName); 190 } 191 192 QueueReceiver receiver = session.createReceiver(queue); 194 for (int i = 0; i < MESSAGE_COUNT; i++) 195 { 196 message = receiver.receive(5000); 197 log.debug("Received message " + i + " from queue :" + queueName); 198 if( message == null ) 199 fail("Received of msg timedout"); 200 } 201 session.close(); 202 deleteQueue(queueName); 203 } 204 205 } 206 } 207 | Popular Tags |