1 21 package com.presumo.jms.test; 22 23 import com.presumo.mobileagent.Agent; 24 import com.presumo.util.log.Logger; 25 import com.presumo.util.log.LoggerFactory; 26 27 import java.io.Serializable ; 28 import java.util.ArrayList ; 29 import java.util.Iterator ; 30 31 import javax.jms.DeliveryMode ; 32 import javax.jms.JMSException ; 33 import javax.jms.Message ; 34 import javax.jms.MessageListener ; 35 import javax.jms.ObjectMessage ; 36 import javax.jms.Session ; 37 import javax.jms.Topic ; 38 import javax.jms.TopicPublisher ; 39 import javax.jms.TopicSession ; 40 import javax.jms.TopicSubscriber ; 41 42 43 48 public class PubSubAgent extends Agent 49 { 50 static final String TEST_TOPIC = "PubSubTest"; 51 static final String TEST_RESULTS_TOPIC = "PubSubTestResults"; 52 53 54 protected int subscriberCount = 5; 55 56 57 protected int publisherCount = 1; 58 59 60 protected int messageCount = 10; 61 62 63 protected boolean persistent = false; 64 65 66 protected int messageSize = 100; 67 68 69 protected int intervals = 2; 70 71 72 protected long intervalPeriod = 100; 73 74 75 protected String subscriptionFilter = null; 76 77 78 protected long expectedMessages = 10; 79 80 81 protected ArrayList subscribers; 82 83 84 protected ArrayList publishers; 85 86 87 private boolean started = false; 88 89 90 private boolean stopped = false; 91 92 93 private int index; 94 95 99 102 public PubSubAgent() 103 { 104 logger.entry("PubSubAgent"); 105 logger.exit("PubSubAgent", this); 106 } 107 108 public PubSubAgent(int index) 109 { 110 logger.entry("PubSubAgent", new Integer (index)); 111 this.index = index; 112 logger.exit("PubSubAgent", this); 113 } 114 115 119 public void setSubscriberCount(int value) { subscriberCount = value; } 120 public void setPublisherCount(int value) { publisherCount = value; } 121 public void setMessageCount(int value) { messageCount = value; } 122 public void setPersistent(boolean value) { persistent = value; } 123 public void setMessageSize(int value) { messageSize = value; } 124 public void setIntervals(int value) { intervals = value; } 125 public void setIntervalPeriod(long value) { intervalPeriod = value; } 126 public void setExpectedMessages(int value) { expectedMessages = value; } 127 public void setSubscriptionFilter(String v){ subscriptionFilter = v; } 128 129 public int getIndex() 130 { 131 return index; 132 } 133 134 public synchronized void startAgent() 135 { 136 logger.entry("startAgent"); 137 started = true; 138 notifyAll(); 139 logger.exit("startAgent"); 140 } 141 142 public synchronized void stopAgent() 143 { 144 logger.entry("stopAgent"); 145 stopped = true; 146 notifyAll(); 147 148 logger.exit("stopAgent"); 149 } 150 151 public synchronized void runAgent() 152 { 153 logger.entry("run"); 154 try { 155 createSubscribers(); 156 createPublishers(); 157 158 while (!started && !stopped) { 160 try { 161 wait(5000); 162 } catch (InterruptedException ie) {} 163 } 164 if (stopped) return; 165 166 startPublishers(); 167 168 while (!stopped) { 170 boolean done = (publishersDone() && subscribersDone()); 171 if (done) { 172 stopped = true; 173 } else { 174 try { 175 wait(1000); 176 } catch (InterruptedException ie) {} 177 } 178 } 179 180 stopPublishers(); 181 stopSubscribers(); 182 183 sendResults(); 184 } catch (JMSException jmsex) { logger.exception(jmsex); } 185 logger.exit("run"); 186 } 187 188 public String getResultsSummary() 189 { 190 logger.entry("getResultsSummary"); 191 192 StringBuffer result = new StringBuffer (); 193 194 Iterator itr = publishers.iterator(); 195 int i=1; 196 while(itr.hasNext()) { 197 PubHelper pub = (PubHelper) itr.next(); 198 result.append("Publisher "); 199 result.append(i++); 200 result.append(": "); 201 result.append(pub.getResultsSummary()); 202 result.append('\n'); 203 } 204 result.append('\n'); 205 i=1; 206 itr = subscribers.iterator(); 207 while(itr.hasNext()) { 208 SubHelper sub = (SubHelper) itr.next(); 209 result.append("Subscriber "); 210 result.append(i++); 211 result.append(": "); 212 213 result.append(sub.getResultsSummary()); 214 result.append('\n'); 215 } 216 217 logger.exit("getResultsSummary"); 218 return result.toString(); 219 } 220 221 222 226 private void sendResults() throws JMSException 227 { 228 logger.entry("sendResults"); 229 230 TopicSession session = connx.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 231 Topic topic = session.createTopic(TEST_RESULTS_TOPIC); 232 TopicPublisher pub = session.createPublisher(topic); 233 ObjectMessage msg = session.createObjectMessage(); 234 msg.setObject(this); 235 pub.publish(msg); 236 237 session.close(); 238 logger.exit("sendResults"); 239 } 240 241 242 private void createSubscribers() throws JMSException 243 { 244 logger.entry("createSubscribers"); 245 246 subscribers = new ArrayList (); 247 248 for (int i=0; i < subscriberCount; ++i) { 249 TopicSession session = connx.createTopicSession(false, 250 Session.DUPS_OK_ACKNOWLEDGE); 251 Topic topic = session.createTopic(TEST_TOPIC); 252 TopicSubscriber sub = session.createSubscriber(topic, subscriptionFilter, false); 253 SubHelper subHelper = new SubHelper(sub, session); 254 subscribers.add(subHelper); 255 } 256 257 logger.exit("createSubscribers"); 258 } 259 260 261 private void createPublishers() throws JMSException 262 { 263 logger.entry("createPublishers"); 264 publishers = new ArrayList (); 265 266 for (int i=0; i < publisherCount; ++i) { 267 TopicSession session = connx.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 268 Topic topic = session.createTopic(TEST_TOPIC); 269 TopicPublisher pub = session.createPublisher(topic); 270 if (persistent) { 271 pub.setDeliveryMode(javax.jms.DeliveryMode.PERSISTENT); 272 } 273 PubHelper pubHelper = new PubHelper(pub, session); 274 publishers.add(pubHelper); 275 } 276 logger.exit("createPublishers"); 277 } 278 279 private void startPublishers() throws JMSException 280 { 281 logger.entry("startPublishers"); 282 Iterator itr = publishers.iterator(); 283 while(itr.hasNext()) { 284 PubHelper pub = (PubHelper) itr.next(); 285 pub.start(); 286 } 287 logger.exit("startPublishers"); 288 } 289 290 private void stopPublishers() throws JMSException 291 { 292 logger.entry("stopPublishers"); 293 294 Iterator itr = publishers.iterator(); 295 while(itr.hasNext()) { 296 PubHelper pub = (PubHelper) itr.next(); 297 if (!pub.isDone()) { 298 pub.stop(); 299 } 300 } 301 302 logger.exit("stopPublishers"); 303 } 304 305 private void stopSubscribers() throws JMSException 306 { 307 logger.entry("stopSubscribers"); 308 309 Iterator itr = subscribers.iterator(); 310 while(itr.hasNext()) { 311 SubHelper sub = (SubHelper) itr.next(); 312 if (!sub.isDone()) { 313 sub.stop(); 314 } 315 } 316 317 logger.exit("stopSubscribers"); 318 } 319 320 321 private boolean publishersDone() 322 { 323 logger.entry("publishersDone"); 324 325 boolean done = true; 326 Iterator itr = publishers.iterator(); 327 while(itr.hasNext() && done) { 328 PubHelper pub = (PubHelper) itr.next(); 329 done = pub.isDone(); 330 } 331 332 logger.exit("publishersDone", new Boolean (done)); 333 return done; 334 } 335 336 337 private boolean subscribersDone() 338 { 339 logger.entry("subscribersDone"); 340 341 boolean done = true; 342 Iterator itr = subscribers.iterator(); 343 while(itr.hasNext() && done) { 344 SubHelper sub = (SubHelper) itr.next(); 345 done = sub.isDone(); 346 } 347 348 logger.exit("subscribersDone", new Boolean (done)); 349 return done; 350 } 351 352 353 354 protected class SubHelper 358 implements Serializable , SubResults, MessageListener 359 { 360 private transient int timecount = 20; 361 private transient long startTime; 362 private transient TopicSession sessionToClose; 363 private transient TopicSubscriber subscriber; 364 365 private int timemod; 366 private volatile long[] timeResults; 367 private volatile int messagesReceived; 368 private volatile boolean done = false; 369 370 373 SubHelper(TopicSubscriber sub, TopicSession session) throws JMSException 374 { 375 logger.entry("SubHelper", sub, session); 376 377 sessionToClose = session; 378 subscriber = sub; 379 sub.setMessageListener(this); 380 if (expectedMessages < timecount) { 381 timecount = (int) expectedMessages; 382 timemod = 1; 383 } 384 else { 385 timemod = (int) expectedMessages/timecount; 386 } 387 timeResults = new long[timecount]; 388 389 logger.exit("SubHelper", this); 390 } 391 392 public String getResultsSummary() 393 { 394 logger.entry("SubHelper-getResultsSummary()"); 395 396 StringBuffer results = new StringBuffer (); 397 results.append(messagesReceived + 398 "/" + expectedMessages); 399 results.append(" msgs, " ); 400 401 if (messagesReceived > expectedMessages) { 402 results.append(" ERROR too many messages received."); 403 } else { 404 logger.debug(" SubHelper-getResultsSummary() " + results); 405 long totalTime = timeResults[((int) (messagesReceived/timemod))-1]; 406 results.append(totalTime); 407 results.append(" milliseconds, "); 408 409 int throughput = messagesReceived; 410 if (totalTime != 0) { 411 throughput = messagesReceived*1000/(int)totalTime; 412 } 413 414 results.append(throughput); 415 results.append(" msgs/second"); 416 } 417 logger.exit("SubHelper-getResultsSummary()", results); 418 return results.toString(); 419 } 420 421 424 public void stop() 425 { 426 logger.entry("SubHelper-stop"); 427 428 try { 429 subscriber.close(); 430 sessionToClose.close(); 431 } catch (JMSException jmsex) { logger.exception(jmsex); } 432 433 logger.exit("SubHelper-stop"); 434 } 435 436 440 public boolean isDone() 441 { 442 logger.entry("SubHelper-isDone"); 443 if (done) { 444 try { 445 subscriber.close(); 446 sessionToClose.close(); 447 } catch (JMSException jmsex) { 448 logger.exception(jmsex); 449 } 450 } 451 logger.exit("SubHelper-isDone", new Boolean (done)); 452 return done; 453 } 454 455 458 public long messagesReceived() 459 { 460 logger.entry("messagesReceived"); 461 logger.exit("messagesReceived", new Long (messagesReceived)); 462 return messagesReceived; 463 } 464 465 468 public void onMessage(Message msg) 469 { 470 logger.entry("onMessage", msg, new Long (messagesReceived)); 471 472 if (messagesReceived == 0) { 473 startTime = System.currentTimeMillis(); 474 } 475 ++messagesReceived; 476 477 if (messagesReceived % timemod == 0) { 478 timeResults[((int) (messagesReceived/timemod))-1] = 479 System.currentTimeMillis() - startTime; 480 } 481 482 if (messagesReceived == expectedMessages) { 483 done = true; 484 } 485 486 logger.exit("onMessage"); 487 } 488 } 489 493 494 495 499 protected class PubHelper 500 implements Runnable , Serializable , PubResults 501 { 502 private volatile long startTime; 503 private volatile long endTime; 504 private volatile int messagesPublished; 505 private volatile boolean done = false; 506 507 private volatile transient boolean stopped = false; 508 private transient TopicPublisher publisher; 509 private transient TopicSession sessionToClose; 510 511 514 protected PubHelper(TopicPublisher pub, TopicSession session) 515 { 516 logger.entry("PubHelper"); 517 sessionToClose = session; 518 publisher = pub; 519 logger.exit("PubHelper", this); 520 } 521 522 523 public String getResultsSummary() 524 { 525 logger.entry("PubHelper-getResultsSummary()"); 526 527 StringBuffer results = new StringBuffer (); 528 results.append(messagesPublished + 529 "/" + messageCount); 530 results.append(" msgs, " ); 531 long totalTime = endTime - startTime; 532 results.append(totalTime); 533 results.append(" milliseconds, "); 534 535 536 int throughput = 0; 537 if (totalTime == 0) { 538 throughput = messagesPublished*1000/(int)totalTime; 539 } 540 results.append(throughput); 541 results.append(" msgs/second"); 542 543 logger.exit("PubHelper-getResultsSummary()", results); 544 return results.toString(); 545 } 546 547 550 public void start() 551 { 552 logger.entry("PubHelper-start"); 553 Thread t = new Thread (this); 554 t.start(); 555 logger.exit("PubHelper-start"); 556 } 557 558 561 public void stop() 562 { 563 logger.entry("PubHelper-stop"); 564 try { 565 publisher.close(); 566 sessionToClose.close(); 567 } catch (JMSException jmsex) { 568 logger.exception(jmsex); 569 } 570 stopped = true; 571 logger.exit("PubHelper-stop"); 572 } 573 574 577 public boolean isDone() 578 { 579 logger.entry("Pubhelper-isDone"); 580 logger.exit("PubHelper-isDone", new Boolean (done)); 581 return done; 582 } 583 584 587 public void run() 588 { 589 logger.entry("PubHelper-run"); 590 591 startTime = System.currentTimeMillis(); 592 593 int intervalMod = messageCount / intervals; 594 595 for (messagesPublished=1; 596 messagesPublished <= messageCount; 597 ++messagesPublished) 598 { 599 if (stopped) break; 600 601 if ((messagesPublished % intervalMod) == 0) { 602 try { 603 Thread.sleep(intervalPeriod); 604 } catch (InterruptedException ie) {} 605 } 606 try { 607 Message msg = createMessage(); 608 logger.debug("PubHelper-run---> publishing message" +messagesPublished); 609 publisher.publish(msg); 610 } catch (JMSException jmsex) { logger.exception(jmsex); } 611 } 612 613 --messagesPublished; 614 615 try { 616 publisher.close(); 617 sessionToClose.close(); 618 } catch (JMSException jmsex) { 619 logger.exception(jmsex); 620 } 621 endTime = System.currentTimeMillis(); 622 done = true; 623 624 logger.exit("PubHelper-run"); 625 } 626 627 private Message createMessage() throws JMSException 628 { 629 logger.entry("PubHelper-createMessage"); 630 Message msg = agentSession.createMessage(); 631 632 logger.exit("PubHelper-createMessage", msg); 633 return msg; 634 } 635 636 } 637 641 642 644 private static Logger logger = 645 LoggerFactory.getLogger(PubSubAgent.class, null); 646 647 649 } 650 | Popular Tags |