KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > test > PubSubAgent


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  *
19  * Copyright 2002 Dan Greff
20  */

21 package com.presumo.jms.test;
22
23 import com.presumo.mobileagent.Agent;
24 import com.presumo.util.log.Logger;
25 import com.presumo.util.log.LoggerFactory;
26
27 import java.io.Serializable JavaDoc;
28 import java.util.ArrayList JavaDoc;
29 import java.util.Iterator JavaDoc;
30
31 import javax.jms.DeliveryMode JavaDoc;
32 import javax.jms.JMSException JavaDoc;
33 import javax.jms.Message JavaDoc;
34 import javax.jms.MessageListener JavaDoc;
35 import javax.jms.ObjectMessage JavaDoc;
36 import javax.jms.Session JavaDoc;
37 import javax.jms.Topic JavaDoc;
38 import javax.jms.TopicPublisher JavaDoc;
39 import javax.jms.TopicSession JavaDoc;
40 import javax.jms.TopicSubscriber JavaDoc;
41
42
43 /**
44  * This test is under construction.
45  *
46  * @author Dan Greff
47  */

48 public class PubSubAgent extends Agent
49 {
50   static final String JavaDoc TEST_TOPIC = "PubSubTest";
51   static final String JavaDoc TEST_RESULTS_TOPIC = "PubSubTestResults";
52
53   /** Number of subscribers to be created by this agent **/
54   protected int subscriberCount = 5;
55
56   /** Number of publishers to be created by this agent **/
57   protected int publisherCount = 1;
58
59   /** Number of messages to be sent by publishers on this agent **/
60   protected int messageCount = 10;
61
62   /** Indicate whether published messages should be persistent **/
63   protected boolean persistent = false;
64
65   /** Approximate size of messages in bytes to be published **/
66   protected int messageSize = 100;
67
68   /** Number of intervals to publish the messages **/
69   protected int intervals = 2;
70
71   /** Sleep period between each interval **/
72   protected long intervalPeriod = 100;
73
74   /** Subscription filter to be used by the subscribers **/
75   protected String JavaDoc subscriptionFilter = null;
76
77   /** Number of messages each subscriber should expect **/
78   protected long expectedMessages = 10;
79
80   /** SubHelpers created by this agent **/
81   protected ArrayList JavaDoc subscribers;
82
83   /** PubHelpers created by this agent **/
84   protected ArrayList JavaDoc publishers;
85
86   /** Used for start/stop operations **/
87   private boolean started = false;
88
89   /** Used for start/stop operations **/
90   private boolean stopped = false;
91
92   /** Used to identify the agent **/
93   private int index;
94
95     /////////////////////////////////////////////////////////////////////////
96
// Constructors //
97
/////////////////////////////////////////////////////////////////////////
98

99   /**
100    * Default constructor for deserialization
101    */

102   public PubSubAgent()
103   {
104     logger.entry("PubSubAgent");
105     logger.exit("PubSubAgent", this);
106   }
107
108   public PubSubAgent(int index)
109   {
110     logger.entry("PubSubAgent", new Integer JavaDoc(index));
111     this.index = index;
112     logger.exit("PubSubAgent", this);
113   }
114
115     /////////////////////////////////////////////////////////////////////////
116
// Public methods //
117
/////////////////////////////////////////////////////////////////////////
118

119   public void setSubscriberCount(int value) { subscriberCount = value; }
120   public void setPublisherCount(int value) { publisherCount = value; }
121   public void setMessageCount(int value) { messageCount = value; }
122   public void setPersistent(boolean value) { persistent = value; }
123   public void setMessageSize(int value) { messageSize = value; }
124   public void setIntervals(int value) { intervals = value; }
125   public void setIntervalPeriod(long value) { intervalPeriod = value; }
126   public void setExpectedMessages(int value) { expectedMessages = value; }
127   public void setSubscriptionFilter(String JavaDoc v){ subscriptionFilter = v; }
128
129   public int getIndex()
130   {
131     return index;
132   }
133
134   public synchronized void startAgent()
135   {
136     logger.entry("startAgent");
137     started = true;
138     notifyAll();
139     logger.exit("startAgent");
140   }
141
142   public synchronized void stopAgent()
143   {
144     logger.entry("stopAgent");
145     stopped = true;
146     notifyAll();
147
148     logger.exit("stopAgent");
149   }
150
151   public synchronized void runAgent()
152   {
153     logger.entry("run");
154     try {
155       createSubscribers();
156       createPublishers();
157       
158       // Wait for a start or stopped notification
159
while (!started && !stopped) {
160         try {
161           wait(5000);
162         } catch (InterruptedException JavaDoc ie) {}
163       }
164       if (stopped) return;
165
166       startPublishers();
167       
168       // Loop until stopped or all publishers and subscribers are done
169
while (!stopped) {
170         boolean done = (publishersDone() && subscribersDone());
171         if (done) {
172           stopped = true;
173         } else {
174           try {
175             wait(1000);
176           } catch (InterruptedException JavaDoc ie) {}
177         }
178       }
179       
180       stopPublishers();
181       stopSubscribers();
182
183       sendResults();
184     } catch (JMSException JavaDoc jmsex) { logger.exception(jmsex); }
185     logger.exit("run");
186   }
187
188   public String JavaDoc getResultsSummary()
189   {
190     logger.entry("getResultsSummary");
191
192     StringBuffer JavaDoc result = new StringBuffer JavaDoc();
193   
194     Iterator JavaDoc itr = publishers.iterator();
195     int i=1;
196     while(itr.hasNext()) {
197       PubHelper pub = (PubHelper) itr.next();
198       result.append("Publisher ");
199       result.append(i++);
200       result.append(": ");
201       result.append(pub.getResultsSummary());
202       result.append('\n');
203     }
204     result.append('\n');
205     i=1;
206     itr = subscribers.iterator();
207     while(itr.hasNext()) {
208       SubHelper sub = (SubHelper) itr.next();
209       result.append("Subscriber ");
210       result.append(i++);
211       result.append(": ");
212
213       result.append(sub.getResultsSummary());
214       result.append('\n');
215     }
216
217     logger.exit("getResultsSummary");
218     return result.toString();
219   }
220
221
222     /////////////////////////////////////////////////////////////////////////
223
// Private methods //
224
/////////////////////////////////////////////////////////////////////////
225

226   private void sendResults() throws JMSException JavaDoc
227   {
228     logger.entry("sendResults");
229
230     TopicSession JavaDoc session = connx.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
231     Topic JavaDoc topic = session.createTopic(TEST_RESULTS_TOPIC);
232     TopicPublisher JavaDoc pub = session.createPublisher(topic);
233     ObjectMessage JavaDoc msg = session.createObjectMessage();
234     msg.setObject(this);
235     pub.publish(msg);
236     
237     session.close();
238     logger.exit("sendResults");
239   }
240
241
242   private void createSubscribers() throws JMSException JavaDoc
243   {
244     logger.entry("createSubscribers");
245
246     subscribers = new ArrayList JavaDoc();
247
248     for (int i=0; i < subscriberCount; ++i) {
249       TopicSession JavaDoc session = connx.createTopicSession(false,
250                                                       Session.DUPS_OK_ACKNOWLEDGE);
251       Topic JavaDoc topic = session.createTopic(TEST_TOPIC);
252       TopicSubscriber JavaDoc sub = session.createSubscriber(topic, subscriptionFilter, false);
253       SubHelper subHelper = new SubHelper(sub, session);
254       subscribers.add(subHelper);
255     }
256
257     logger.exit("createSubscribers");
258   }
259
260
261   private void createPublishers() throws JMSException JavaDoc
262   {
263     logger.entry("createPublishers");
264     publishers = new ArrayList JavaDoc();
265
266     for (int i=0; i < publisherCount; ++i) {
267       TopicSession JavaDoc session = connx.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
268       Topic JavaDoc topic = session.createTopic(TEST_TOPIC);
269       TopicPublisher JavaDoc pub = session.createPublisher(topic);
270       if (persistent) {
271         pub.setDeliveryMode(javax.jms.DeliveryMode.PERSISTENT);
272       }
273       PubHelper pubHelper = new PubHelper(pub, session);
274       publishers.add(pubHelper);
275     }
276     logger.exit("createPublishers");
277   }
278
279   private void startPublishers() throws JMSException JavaDoc
280   {
281     logger.entry("startPublishers");
282     Iterator JavaDoc itr = publishers.iterator();
283     while(itr.hasNext()) {
284       PubHelper pub = (PubHelper) itr.next();
285       pub.start();
286     }
287     logger.exit("startPublishers");
288   }
289
290   private void stopPublishers() throws JMSException JavaDoc
291   {
292     logger.entry("stopPublishers");
293
294     Iterator JavaDoc itr = publishers.iterator();
295     while(itr.hasNext()) {
296       PubHelper pub = (PubHelper) itr.next();
297       if (!pub.isDone()) {
298         pub.stop();
299       }
300     }
301
302     logger.exit("stopPublishers");
303   }
304
305   private void stopSubscribers() throws JMSException JavaDoc
306   {
307     logger.entry("stopSubscribers");
308
309     Iterator JavaDoc itr = subscribers.iterator();
310     while(itr.hasNext()) {
311       SubHelper sub = (SubHelper) itr.next();
312       if (!sub.isDone()) {
313         sub.stop();
314       }
315     }
316     
317     logger.exit("stopSubscribers");
318   }
319
320
321   private boolean publishersDone()
322   {
323     logger.entry("publishersDone");
324
325     boolean done = true;
326     Iterator JavaDoc itr = publishers.iterator();
327     while(itr.hasNext() && done) {
328       PubHelper pub = (PubHelper) itr.next();
329       done = pub.isDone();
330     }
331
332     logger.exit("publishersDone", new Boolean JavaDoc(done));
333     return done;
334   }
335
336
337   private boolean subscribersDone()
338   {
339     logger.entry("subscribersDone");
340
341     boolean done = true;
342     Iterator JavaDoc itr = subscribers.iterator();
343     while(itr.hasNext() && done) {
344       SubHelper sub = (SubHelper) itr.next();
345       done = sub.isDone();
346     }
347
348     logger.exit("subscribersDone", new Boolean JavaDoc(done));
349     return done;
350   }
351
352
353
354     /////////////////////////////////////////////////////////////////////////
355
// Inner class SubHelper //
356
/////////////////////////////////////////////////////////////////////////
357
protected class SubHelper
358     implements Serializable JavaDoc, SubResults, MessageListener JavaDoc
359   {
360     private transient int timecount = 20;
361     private transient long startTime;
362     private transient TopicSession JavaDoc sessionToClose;
363     private transient TopicSubscriber JavaDoc subscriber;
364
365     private int timemod;
366     private volatile long[] timeResults;
367     private volatile int messagesReceived;
368     private volatile boolean done = false;
369
370     /**
371      * Constructor.
372      */

373     SubHelper(TopicSubscriber JavaDoc sub, TopicSession JavaDoc session) throws JMSException JavaDoc
374     {
375       logger.entry("SubHelper", sub, session);
376
377       sessionToClose = session;
378       subscriber = sub;
379       sub.setMessageListener(this);
380       if (expectedMessages < timecount) {
381         timecount = (int) expectedMessages;
382         timemod = 1;
383       }
384       else {
385         timemod = (int) expectedMessages/timecount;
386       }
387       timeResults = new long[timecount];
388       
389       logger.exit("SubHelper", this);
390     }
391
392     public String JavaDoc getResultsSummary()
393     {
394       logger.entry("SubHelper-getResultsSummary()");
395
396       StringBuffer JavaDoc results = new StringBuffer JavaDoc();
397       results.append(messagesReceived +
398                      "/" + expectedMessages);
399       results.append(" msgs, " );
400
401       if (messagesReceived > expectedMessages) {
402         results.append(" ERROR too many messages received.");
403       } else {
404         logger.debug(" SubHelper-getResultsSummary() " + results);
405         long totalTime = timeResults[((int) (messagesReceived/timemod))-1];
406         results.append(totalTime);
407         results.append(" milliseconds, ");
408       
409         int throughput = messagesReceived;
410         if (totalTime != 0) {
411           throughput = messagesReceived*1000/(int)totalTime;
412         }
413         
414         results.append(throughput);
415         results.append(" msgs/second");
416       }
417       logger.exit("SubHelper-getResultsSummary()", results);
418       return results.toString();
419     }
420
421     /**
422      * Stop this subscriber from receiving messages
423      */

424     public void stop()
425     {
426       logger.entry("SubHelper-stop");
427
428       try {
429         subscriber.close();
430         sessionToClose.close();
431       } catch (JMSException JavaDoc jmsex) { logger.exception(jmsex); }
432
433       logger.exit("SubHelper-stop");
434     }
435
436     /**
437      * Returns true if this Subscriber has received all expected
438      * messages.
439      */

440     public boolean isDone()
441     {
442       logger.entry("SubHelper-isDone");
443       if (done) {
444         try {
445           subscriber.close();
446           sessionToClose.close();
447         } catch (JMSException JavaDoc jmsex) {
448           logger.exception(jmsex);
449         }
450       }
451       logger.exit("SubHelper-isDone", new Boolean JavaDoc(done));
452       return done;
453     }
454
455     /**
456      * Returns the number of messages received by this subscriber.
457      */

458     public long messagesReceived()
459     {
460       logger.entry("messagesReceived");
461       logger.exit("messagesReceived", new Long JavaDoc(messagesReceived));
462       return messagesReceived;
463     }
464
465     /**
466      * Callback from JMS when there is a message for the subscriber.
467      */

468     public void onMessage(Message JavaDoc msg)
469     {
470       logger.entry("onMessage", msg, new Long JavaDoc(messagesReceived));
471
472       if (messagesReceived == 0) {
473         startTime = System.currentTimeMillis();
474       }
475       ++messagesReceived;
476       
477       if (messagesReceived % timemod == 0) {
478         timeResults[((int) (messagesReceived/timemod))-1] =
479           System.currentTimeMillis() - startTime;
480       }
481       
482       if (messagesReceived == expectedMessages) {
483         done = true;
484       }
485       
486       logger.exit("onMessage");
487     }
488   }
489     /////////////////////////////////////////////////////////////////////////
490
// End Inner class SubHelper //
491
/////////////////////////////////////////////////////////////////////////
492

493
494
495     /////////////////////////////////////////////////////////////////////////
496
// Inner class PubHelper //
497
/////////////////////////////////////////////////////////////////////////
498

499   protected class PubHelper
500     implements Runnable JavaDoc, Serializable JavaDoc, PubResults
501   {
502     private volatile long startTime;
503     private volatile long endTime;
504     private volatile int messagesPublished;
505     private volatile boolean done = false;
506
507     private volatile transient boolean stopped = false;
508     private transient TopicPublisher JavaDoc publisher;
509     private transient TopicSession JavaDoc sessionToClose;
510
511     /**
512      * Constructor
513      */

514     protected PubHelper(TopicPublisher JavaDoc pub, TopicSession JavaDoc session)
515     {
516       logger.entry("PubHelper");
517       sessionToClose = session;
518       publisher = pub;
519       logger.exit("PubHelper", this);
520     }
521
522
523     public String JavaDoc getResultsSummary()
524     {
525       logger.entry("PubHelper-getResultsSummary()");
526
527       StringBuffer JavaDoc results = new StringBuffer JavaDoc();
528       results.append(messagesPublished +
529                      "/" + messageCount);
530       results.append(" msgs, " );
531       long totalTime = endTime - startTime;
532       results.append(totalTime);
533       results.append(" milliseconds, ");
534       
535       
536       int throughput = 0;
537       if (totalTime == 0) {
538         throughput = messagesPublished*1000/(int)totalTime;
539       }
540       results.append(throughput);
541       results.append(" msgs/second");
542
543       logger.exit("PubHelper-getResultsSummary()", results);
544       return results.toString();
545     }
546
547     /**
548      * Start the publishes
549      */

550     public void start()
551     {
552       logger.entry("PubHelper-start");
553       Thread JavaDoc t = new Thread JavaDoc(this);
554       t.start();
555       logger.exit("PubHelper-start");
556     }
557
558     /**
559      * Stop the publishes
560      */

561     public void stop()
562     {
563       logger.entry("PubHelper-stop");
564       try {
565         publisher.close();
566         sessionToClose.close();
567       } catch (JMSException JavaDoc jmsex) {
568         logger.exception(jmsex);
569       }
570       stopped = true;
571       logger.exit("PubHelper-stop");
572     }
573
574     /**
575      * @return true if the publisher finished.
576      */

577     public boolean isDone()
578     {
579       logger.entry("Pubhelper-isDone");
580       logger.exit("PubHelper-isDone", new Boolean JavaDoc(done));
581       return done;
582     }
583
584     /**
585      * Runnable implementation to do the publishing
586      */

587     public void run()
588     {
589       logger.entry("PubHelper-run");
590
591       startTime = System.currentTimeMillis();
592       
593       int intervalMod = messageCount / intervals;
594       
595       for (messagesPublished=1;
596            messagesPublished <= messageCount;
597            ++messagesPublished)
598       {
599         if (stopped) break;
600
601         if ((messagesPublished % intervalMod) == 0) {
602           try {
603             Thread.sleep(intervalPeriod);
604           } catch (InterruptedException JavaDoc ie) {}
605         }
606         try {
607           Message JavaDoc msg = createMessage();
608           logger.debug("PubHelper-run---> publishing message" +messagesPublished);
609           publisher.publish(msg);
610         } catch (JMSException JavaDoc jmsex) { logger.exception(jmsex); }
611       }
612
613       --messagesPublished;
614
615       try {
616         publisher.close();
617         sessionToClose.close();
618       } catch (JMSException JavaDoc jmsex) {
619         logger.exception(jmsex);
620       }
621       endTime = System.currentTimeMillis();
622       done = true;
623
624       logger.exit("PubHelper-run");
625     }
626
627     private Message JavaDoc createMessage() throws JMSException JavaDoc
628     {
629       logger.entry("PubHelper-createMessage");
630       Message JavaDoc msg = agentSession.createMessage();
631       
632       logger.exit("PubHelper-createMessage", msg);
633       return msg;
634     }
635
636   }
637     /////////////////////////////////////////////////////////////////////////
638
// End Inner class PubHelper //
639
/////////////////////////////////////////////////////////////////////////
640

641
642   ////////////////////////////// Misc stuff ////////////////////////////////
643

644   private static Logger logger =
645     LoggerFactory.getLogger(PubSubAgent.class, null);
646
647   ///////////////////////////////////////////////////////////////////////////
648

649 }
650
Popular Tags