1 20 package com.presumo.jms.test; 21 22 import com.presumo.jms.client.JmsTopicConnectionFactory; 23 import com.presumo.mobileagent.AgentRunner; 24 import com.presumo.mobileagent.AgentUtil; 25 26 import com.presumo.util.log.Logger; 27 import com.presumo.util.log.LoggerFactory; 28 29 import java.io.BufferedReader ; 30 import java.io.FileInputStream ; 31 import java.io.InputStreamReader ; 32 33 import java.util.Properties ; 34 35 import javax.jms.JMSException ; 36 import javax.jms.Message ; 37 import javax.jms.MessageListener ; 38 import javax.jms.ObjectMessage ; 39 import javax.jms.Session ; 40 import javax.jms.Topic ; 41 import javax.jms.TopicConnection ; 42 import javax.jms.TopicPublisher ; 43 import javax.jms.TopicSession ; 44 import javax.jms.TopicSubscriber ; 45 46 50 public class PubSubTest implements MessageListener 51 { 52 static final String TEST_RESULTS_TOPIC = "PubSubTestResults"; 53 54 public final static String SERVER_KEY = "server"; 55 public final static String PORT_KEY = "port"; 56 public final static String LOCAL_AGENT_KEY= "runLocalAgent"; 57 public final static String PUB_COUNT_KEY = "publisherCount"; 58 public final static String MSG_COUNT_KEY = "messageCount"; 59 public final static String MSG_SIZE_KEY = "messageSize"; 60 public final static String INTERVAL_KEY = "intervalCount"; 61 public final static String PUB_SLEEP_KEY = "intervalPeriod"; 62 public final static String PERSISTENT_KEY = "persistent"; 63 public final static String SUB_COUNT_KEY = "subscriberCount"; 64 public final static String SUB_MODE_KEY = "subscriptionMode"; 65 66 private String server; 67 private int port; 68 private boolean runLocalAgent; 69 private int publisherCount; 70 private int messageCount; 71 private int messageSize; 72 private int intervalCount; 73 private long intervalPeriod; 74 private boolean persistent; 75 private int subscriberCount; 76 private String subscriberMode; 77 78 private int agentCount; 79 private String [] agentRunners; 80 private PubSubAgent [] runningTests; 81 private PubSubAgent [] completedTests; 82 83 private AgentRunner localAgent; 84 85 TopicConnection connx; 86 87 91 public PubSubTest(Properties props) throws JMSException 92 { 93 logger.entry("PubSubTest", props); 94 95 readProperties(props); 96 97 JmsTopicConnectionFactory factory = new JmsTopicConnectionFactory(); 98 if (server != null) { 99 factory.setHost(server); 100 factory.setPort(port); 101 } 102 connx = factory.createTopicConnection(); 103 connx.start(); 104 if (runLocalAgent) { 105 localAgent = new AgentRunner(connx); 106 } 107 108 PubSubAgent temp = new PubSubAgent(); 109 temp.setConnection(connx); 110 agentRunners = null; 111 112 113 for (int tries=0; tries < 3 && agentRunners == null; ++tries) { 114 System.out.print("Searching for test clients..."); 115 agentRunners = temp.getAvailableRunners(3000); 116 } 117 118 if (agentRunners == null || agentRunners.length == 0) { 119 throw new RuntimeException ("No test clients available to run test."); 120 } 121 System.out.println(" found " + agentRunners.length + "."); 122 123 agentCount = agentRunners.length; 124 runningTests = new PubSubAgent[agentRunners.length]; 125 completedTests = new PubSubAgent[agentRunners.length]; 126 127 for (int i=0; i < runningTests.length; ++i) { 128 runningTests[i] = createTestAgent(i); 129 } 130 131 logger.exit("PubSubTest", this); 132 } 133 134 138 public void start() throws JMSException 139 { 140 logger.entry("start"); 141 142 TopicSession session = connx.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 143 Topic topic = session.createTopic(TEST_RESULTS_TOPIC); 144 TopicSubscriber sub = session.createSubscriber(topic, "", false); 145 sub.setMessageListener(this); 146 147 AgentUtil.sendStartMessage(connx, agentRunners); 148 149 logger.exit("start"); 150 } 151 152 public void stop() throws JMSException 153 { 154 logger.entry("stop"); 155 AgentUtil.sendStopMessage(connx, agentRunners); 156 logger.exit("stop"); 157 } 158 159 public void close() 160 { 161 logger.entry("close"); 162 163 try { 164 connx.close(); 165 if (localAgent != null) { 166 localAgent.stop(); 167 } 168 } catch (JMSException jmsex) { 169 jmsex.printStackTrace(); 170 } 171 172 logger.exit("close"); 173 } 174 175 public boolean isComplete() 176 { 177 logger.entry("isComplete"); 178 179 boolean complete = true; 180 for(int i=0; (i < completedTests.length) && (complete); ++i) { 181 complete = (completedTests[i] != null); 182 } 183 184 logger.exit("isComplete", new Boolean (complete)); 185 return complete; 186 } 187 188 public String getSetupSummary() 189 { 190 logger.entry("getSetupSummary"); 191 192 StringBuffer result = new StringBuffer (); 193 194 if (server == null) { 195 result.append(" No server (intraJVM test) "); 196 } else { 197 result.append(" JMSServer: " + server + ":" + port); 198 } 199 result.append("\n Number of test clients: " + agentCount); 200 result.append("\n Number of subscribers: " + subscriberCount); 201 result.append("\n Number of publishers: " + publisherCount); 202 result.append("\n Messages to be published: " 203 + messageCount * publisherCount); 204 result.append("\n Persistent: " + persistent); 205 result.append("\n Number of messages expected at each subscriber: " 206 + messageCount * publisherCount); 207 result.append("\n"); 208 result.append("\n Total number of messages to be delivered: " + 209 messageCount * publisherCount * subscriberCount); 210 211 logger.exit("getSetupSummary", result); 212 return result.toString(); 213 } 214 215 public String getResultsSummary() 216 { logger.entry("getResultsSummary"); 217 StringBuffer result = new StringBuffer (); 218 219 for (int i=0; i < agentCount; ++i) { 220 result.append("--- Results for test client " + agentRunners[i] + " ----\n"); 221 if (completedTests[i] != null) { 222 result.append(completedTests[i].getResultsSummary()); 223 } else { 224 result.append("\tNo response from test client yet.\n"); 225 } 226 result.append("-------------------------------------------------------------\n"); 227 } 228 229 logger.exit("getResultSummary", result); 230 return result.toString(); 231 } 232 233 234 public void onMessage(Message msg) 235 { 236 logger.entry("onMessage", msg); 237 238 try { 239 if (msg instanceof ObjectMessage ) { 240 ObjectMessage omsg = (ObjectMessage ) msg; 241 Object o = omsg.getObject(); 242 if (o instanceof PubSubAgent) { 243 PubSubAgent agent = (PubSubAgent) o; 244 logger.debug("-----> Agent response collected: " + agent); 245 completedTests[agent.getIndex()] = agent; 246 runningTests[agent.getIndex()] = null; 247 } 248 } 249 } catch (JMSException jmsex) { 250 jmsex.printStackTrace(); 251 } 252 253 logger.exit("onMessage"); 254 } 255 256 260 private void readProperties(Properties props) 261 { 262 server = props.getProperty(SERVER_KEY); 263 port = Integer.parseInt(props.getProperty(PORT_KEY, "2323")); 264 runLocalAgent = Boolean.valueOf(props.getProperty(LOCAL_AGENT_KEY, "true")).booleanValue(); 265 publisherCount = Integer.parseInt(props.getProperty(PUB_COUNT_KEY, "1")); 266 messageCount = Integer.parseInt(props.getProperty(MSG_COUNT_KEY, "10")); 267 messageSize = Integer.parseInt(props.getProperty(MSG_SIZE_KEY, "100")); 268 intervalCount = Integer.parseInt(props.getProperty(INTERVAL_KEY, "2")); 269 intervalPeriod = Integer.parseInt(props.getProperty(PUB_SLEEP_KEY, "1")); 270 persistent = Boolean.valueOf(props.getProperty(PERSISTENT_KEY, "false")).booleanValue(); 271 subscriberCount = Integer.parseInt(props.getProperty(SUB_COUNT_KEY, "5")); 272 subscriberMode = props.getProperty(SUB_MODE_KEY, "all").toLowerCase(); 273 } 274 275 private PubSubAgent createTestAgent(int index) throws JMSException 276 { 277 logger.entry("createTestAgent", new Integer (index)); 278 PubSubAgent agent = new PubSubAgent(index); 279 agent.setConnection(connx); 280 281 int subs = 0; 282 if (subscriberCount <= agentCount) { 283 subs = (index > (agentCount - subscriberCount - 1)) ? 1 : 0; 284 } else { 285 subs = subscriberCount / agentCount; 286 subs = ((subs * index+1) > subscriberCount) ? subscriberCount % subs : subs; 287 } 288 289 int pubs = 0; 290 if (publisherCount <= agentCount) { 291 pubs = (index < publisherCount) ? 1 : 0; 292 } else { 293 pubs = publisherCount / agentCount; 294 pubs = ((pubs * index+1) > publisherCount) ? publisherCount % pubs : pubs; 295 } 296 297 logger.debug("--> pubCount=" + pubs); 298 logger.debug("--> subCount=" + subs); 299 300 agent.setSubscriberCount(subs); 301 agent.setPublisherCount(pubs); 302 agent.setMessageCount(messageCount); 303 agent.setPersistent(persistent); 304 agent.setMessageSize(messageSize); 305 agent.setIntervals(intervalCount); 306 agent.setIntervalPeriod(intervalPeriod); 307 agent.setExpectedMessages(messageCount * publisherCount); 308 agent.setSubscriptionFilter(""); 309 310 agent.moveTo( agentRunners[index] ); 311 312 logger.exit("createTestAgent", agent); 313 return agent; 314 } 315 319 public static void main(String [] args) throws Exception 320 { 321 logger.entry("main", args); 322 323 String propfile = null; 324 if (args.length != 1) { 325 propfile = "default.prop"; 326 } else { 327 propfile = args[0]; 328 } 329 System.out.println("Using "+propfile+" for test case."); 330 331 try { 332 Properties testProp = new Properties (); 336 FileInputStream fis = null; 337 try { 338 fis = new FileInputStream (propfile); 339 testProp.load(fis); 340 } finally { 341 if (fis != null) fis.close(); 342 } 343 344 boolean interactive = Boolean.valueOf 345 (testProp.getProperty("interactive", "true")).booleanValue(); 346 int iterations = 347 Integer.parseInt(testProp.getProperty("testIterations", "1")); 348 349 for (int i=1; i <= iterations; ++i) { 350 PubSubTest test = new PubSubTest(testProp); 351 352 if (iterations > 1) { 353 System.out.println("-Iteration " + i + " ----------------------------------------------------"); 354 } else { 355 System.out.println("----------------------------------------------------------------------"); 356 } 357 System.out.println(test.getSetupSummary()); 358 System.out.println("----------------------------------------------------------------------"); 359 System.out.println("\n"); 360 361 if (interactive) { 362 System.out.println(" Press <ENTER> start the test."); 363 BufferedReader input = new BufferedReader (new InputStreamReader (System.in)); 364 input.readLine(); 365 } else { 366 try { Thread.sleep(1000); } catch(InterruptedException ie) {} 367 } 368 369 test.start(); 370 371 if (interactive) { 372 System.out.println(" Test started."); 373 } 374 375 while (!test.isComplete() ) { 376 try { Thread.sleep(1000); } catch (InterruptedException ie) {} 377 } 378 379 System.out.println("----------------------------------------------------------------------"); 380 System.out.println(test.getResultsSummary()); 381 System.out.println("----------------------------------------------------------------------"); 382 System.out.println("\n\n"); 383 384 test.close(); 385 386 if (interactive) { 387 System.out.println("Press <ENTER> to exit."); 388 BufferedReader input = new 389 BufferedReader (new InputStreamReader (System.in)); 390 input.readLine(); 391 } 392 } 393 } catch (Throwable t) { 394 System.err.println("The following error occured while executing the test:"); 395 t.printStackTrace(); 396 } 397 398 logger.exit("main"); 399 } 400 401 402 404 private static Logger logger = 405 LoggerFactory.getLogger(PubSubTest.class, null); 406 407 409 } 410 | Popular Tags |