1 21 package com.presumo.jms.test; 22 23 import com.presumo.jms.client.JmsTopicConnectionFactory; 24 25 import javax.jms.Topic ; 26 import javax.jms.TopicConnection ; 27 import javax.jms.TopicSession ; 28 import javax.jms.TopicSubscriber ; 29 import javax.jms.TopicPublisher ; 30 import javax.jms.Message ; 31 import javax.jms.JMSException ; 32 import javax.jms.MessageListener ; 33 import javax.jms.Session ; 34 35 import java.util.*; 36 import java.io.*; 37 38 44 public class Demo 45 { 46 public final static String FILTER_KEY = "subfilter"; 47 public final static String EXPECTED_MSGS_KEY = "expectedMsgs"; 48 public final static String NUM_SUB_KEY = "numOfSubscribers"; 49 public final static String NUM_MSGS_KEY = "numOfMsgs"; 50 public final static String INTERATIONS_KEY = "iterations"; 51 public final static String SLEEP_KEY = "sleep"; 52 public final static String SERVER_KEY = "server"; 53 public final static String PORT_KEY = "port"; 54 55 56 57 private Vector subscribers; 58 private Vector finishedSubscribers; 59 60 private String server; 61 private int port; 62 63 private String subfilter; 64 private int expectedMsgs; 65 private int numOfSubscribers; 66 private int numOfMsgs; 67 private int iterations; 68 private int sleepInterval; 69 70 private TopicConnection connection; 71 private TopicPublisher publisher; 72 private TopicSession session; 73 74 private String topicName = "demotopic"; 75 76 Demo(Properties testProp) throws Exception 77 { 78 readProperties(testProp); 79 JmsTopicConnectionFactory factory = new JmsTopicConnectionFactory(); 80 if (server != null) { 81 factory.setHost(server); 82 factory.setPort(port); 83 } 84 connection = factory.createTopicConnection(); 85 86 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 87 Topic topic = session.createTopic(topicName); 88 publisher = session.createPublisher(topic); 89 90 TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 91 subscribers = new Vector(); 92 finishedSubscribers = new Vector(); 93 for (int i=0; i < numOfSubscribers; i++) { 94 SubscriberHelper helper = new SubscriberHelper("Subscriber " + i, expectedMsgs, this); 95 TopicSubscriber sub = subSession.createSubscriber(topic, subfilter, false); 96 sub.setMessageListener(helper); 97 subscribers.add(helper); 98 System.out.println("Subscriber " + i + " created."); 99 } 100 } 101 102 private void readProperties(Properties props) throws Exception 103 { 104 subfilter = props.getProperty(FILTER_KEY, null); 105 expectedMsgs = Integer.parseInt(props.getProperty(EXPECTED_MSGS_KEY)); 106 numOfSubscribers = Integer.parseInt(props.getProperty(NUM_SUB_KEY)); 107 numOfMsgs = Integer.parseInt(props.getProperty(NUM_MSGS_KEY)); 108 iterations = Integer.parseInt(props.getProperty(INTERATIONS_KEY)); 109 sleepInterval = Integer.parseInt(props.getProperty(SLEEP_KEY)); 110 server = props.getProperty(SERVER_KEY); 111 if (server != null) { 112 port = Integer.parseInt(props.getProperty(PORT_KEY)); 113 } 114 } 115 116 117 120 void start() throws Exception 121 { 122 connection.start(); 123 int totalcount=0; 124 for (int i=0; i < iterations; i++) { 125 long startTime = System.currentTimeMillis(); 126 127 for (int j=0; j < numOfMsgs; j++) { 128 totalcount++; 129 Message msg = session.createMessage(); 130 131 if (totalcount % 2 == 0) 132 msg.setBooleanProperty("EVEN", true); 133 else 134 msg.setBooleanProperty("EVEN", false); 135 msg.setIntProperty("MsgNumber", totalcount); 136 137 publisher.setDeliveryMode(javax.jms.DeliveryMode.PERSISTENT); 138 publisher.publish(msg); 139 } 140 long finishTime = System.currentTimeMillis(); 141 long time = finishTime - startTime; 142 System.out.println("Published " + numOfMsgs +" messages in " + time +" millseconds."); 143 try { 144 Thread.sleep(sleepInterval); 145 } catch (InterruptedException ie) {} 146 } 147 148 try { 149 Thread.sleep(5000); 150 } catch (InterruptedException ie) {} 151 } 152 153 156 synchronized boolean isComplete() 157 { 158 return subscribers.size() == 0; 159 } 160 161 164 void close() throws Exception 165 { 166 connection.close(); 167 } 168 169 172 synchronized void subscriberFinished(SubscriberHelper helper) 173 { 174 boolean removed = subscribers.removeElement(helper); 175 176 if (removed) { 177 finishedSubscribers.add(helper); 178 } 179 180 } 181 182 185 synchronized void printStats() 186 { 187 for(int i=0; i < finishedSubscribers.size(); i++) 188 { 189 SubscriberHelper helper = (SubscriberHelper)finishedSubscribers.get(i); 190 System.out.println(helper.report); 191 } 192 } 193 194 private class SubscriberHelper implements MessageListener 198 { 199 private String id; 200 private int expectedMsgs; 201 private long startTime; 202 private int messagesReceived = 0; 203 private Demo callback; 204 String report; 205 206 SubscriberHelper(String id, int expectedMsgs, Demo callback) 207 { 208 this.id = id; 209 this.expectedMsgs = expectedMsgs; 210 this.callback = callback; 211 } 212 213 214 public void onMessage(Message message) 215 { 216 ++messagesReceived; 217 if (messagesReceived == 1) { 218 startTime = System.currentTimeMillis(); 219 } 220 if (messagesReceived == expectedMsgs) { 221 long time = System.currentTimeMillis() - startTime; 222 223 report = "Received "+ expectedMsgs+" messages in " + time +" milliseconds."; 224 report = id + ": " + report; 225 226 callback.subscriberFinished(this); 227 } 229 230 if (messagesReceived % 500 == 0) 231 System.out.println(">>>>>>>>>>>>Received Message "+ messagesReceived +":" + message); 232 } 233 } 234 235 public static void main(String [] args) throws Exception 236 { 237 if (args.length != 1) { 238 System.err.println("Usage: java Demo <demo.properties file>"); 239 System.exit(-1); 240 } 241 242 243 try { 244 Properties testProp = new Properties(); 248 FileInputStream fis = null; 249 try { 250 fis = new FileInputStream(args[0]); 251 testProp.load(fis); 252 } finally { 253 if (fis != null) fis.close(); 254 } 255 256 Demo demo = new Demo(testProp); 257 258 System.out.println("Hit enter to start the demo."); 259 BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); 260 input.readLine(); 261 262 System.out.println("Starting publishes (if any) in 3 seconds..."); 263 try { Thread.sleep(3000); } catch (InterruptedException ie) {} 264 265 demo.start(); 266 267 while (!demo.isComplete() ) { 268 try { Thread.sleep(1000); } catch (InterruptedException ie) {} 269 } 270 271 demo.printStats(); 272 demo.close(); 273 274 } catch (Throwable t) { 275 System.err.println("The following error occured while executing the demo:"); 276 t.printStackTrace(); 277 } 278 } 279 280 } 281 | Popular Tags |