KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > presumo > jms > test > PubSubTest


1 /**
2  * This file is part of Presumo.
3  *
4  * Presumo is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * Presumo is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with Presumo; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17  *
18  * Copyright (c) 2002 Dan Greff
19  */

20 package com.presumo.jms.test;
21
22 import com.presumo.jms.client.JmsTopicConnectionFactory;
23 import com.presumo.mobileagent.AgentRunner;
24 import com.presumo.mobileagent.AgentUtil;
25
26 import com.presumo.util.log.Logger;
27 import com.presumo.util.log.LoggerFactory;
28
29 import java.io.BufferedReader JavaDoc;
30 import java.io.FileInputStream JavaDoc;
31 import java.io.InputStreamReader JavaDoc;
32
33 import java.util.Properties JavaDoc;
34
35 import javax.jms.JMSException JavaDoc;
36 import javax.jms.Message JavaDoc;
37 import javax.jms.MessageListener JavaDoc;
38 import javax.jms.ObjectMessage JavaDoc;
39 import javax.jms.Session JavaDoc;
40 import javax.jms.Topic JavaDoc;
41 import javax.jms.TopicConnection JavaDoc;
42 import javax.jms.TopicPublisher JavaDoc;
43 import javax.jms.TopicSession JavaDoc;
44 import javax.jms.TopicSubscriber JavaDoc;
45
46 /**
47  *
48  * @author Dan Greff
49  */

50 public class PubSubTest implements MessageListener JavaDoc
51 {
52   static final String JavaDoc TEST_RESULTS_TOPIC = "PubSubTestResults";
53
54   public final static String JavaDoc SERVER_KEY = "server";
55   public final static String JavaDoc PORT_KEY = "port";
56   public final static String JavaDoc LOCAL_AGENT_KEY= "runLocalAgent";
57   public final static String JavaDoc PUB_COUNT_KEY = "publisherCount";
58   public final static String JavaDoc MSG_COUNT_KEY = "messageCount";
59   public final static String JavaDoc MSG_SIZE_KEY = "messageSize";
60   public final static String JavaDoc INTERVAL_KEY = "intervalCount";
61   public final static String JavaDoc PUB_SLEEP_KEY = "intervalPeriod";
62   public final static String JavaDoc PERSISTENT_KEY = "persistent";
63   public final static String JavaDoc SUB_COUNT_KEY = "subscriberCount";
64   public final static String JavaDoc SUB_MODE_KEY = "subscriptionMode";
65   
66   private String JavaDoc server;
67   private int port;
68   private boolean runLocalAgent;
69   private int publisherCount;
70   private int messageCount;
71   private int messageSize;
72   private int intervalCount;
73   private long intervalPeriod;
74   private boolean persistent;
75   private int subscriberCount;
76   private String JavaDoc subscriberMode;
77
78   private int agentCount;
79   private String JavaDoc [] agentRunners;
80   private PubSubAgent [] runningTests;
81   private PubSubAgent [] completedTests;
82
83   private AgentRunner localAgent;
84
85   TopicConnection JavaDoc connx;
86
87     /////////////////////////////////////////////////////////////////////////
88
// Constructors //
89
/////////////////////////////////////////////////////////////////////////
90

91   public PubSubTest(Properties JavaDoc props) throws JMSException JavaDoc
92   {
93     logger.entry("PubSubTest", props);
94
95     readProperties(props);
96
97     JmsTopicConnectionFactory factory = new JmsTopicConnectionFactory();
98     if (server != null) {
99       factory.setHost(server);
100       factory.setPort(port);
101     }
102     connx = factory.createTopicConnection();
103     connx.start();
104     if (runLocalAgent) {
105       localAgent = new AgentRunner(connx);
106     }
107
108     PubSubAgent temp = new PubSubAgent();
109     temp.setConnection(connx);
110     agentRunners = null;
111
112     
113     for (int tries=0; tries < 3 && agentRunners == null; ++tries) {
114       System.out.print("Searching for test clients...");
115       agentRunners = temp.getAvailableRunners(3000);
116     }
117
118     if (agentRunners == null || agentRunners.length == 0) {
119       throw new RuntimeException JavaDoc("No test clients available to run test.");
120     }
121     System.out.println(" found " + agentRunners.length + ".");
122     
123     agentCount = agentRunners.length;
124     runningTests = new PubSubAgent[agentRunners.length];
125     completedTests = new PubSubAgent[agentRunners.length];
126     
127     for (int i=0; i < runningTests.length; ++i) {
128       runningTests[i] = createTestAgent(i);
129     }
130
131     logger.exit("PubSubTest", this);
132   }
133
134     /////////////////////////////////////////////////////////////////////////
135
// Public Methods //
136
/////////////////////////////////////////////////////////////////////////
137

138   public void start() throws JMSException JavaDoc
139   {
140     logger.entry("start");
141
142     TopicSession JavaDoc session = connx.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
143     Topic JavaDoc topic = session.createTopic(TEST_RESULTS_TOPIC);
144     TopicSubscriber JavaDoc sub = session.createSubscriber(topic, "", false);
145     sub.setMessageListener(this);
146     
147     AgentUtil.sendStartMessage(connx, agentRunners);
148
149     logger.exit("start");
150   }
151
152   public void stop() throws JMSException JavaDoc
153   {
154     logger.entry("stop");
155     AgentUtil.sendStopMessage(connx, agentRunners);
156     logger.exit("stop");
157   }
158
159   public void close()
160   {
161     logger.entry("close");
162
163     try {
164       connx.close();
165       if (localAgent != null) {
166         localAgent.stop();
167       }
168     } catch (JMSException JavaDoc jmsex) {
169       jmsex.printStackTrace();
170     }
171
172     logger.exit("close");
173   }
174
175   public boolean isComplete()
176   {
177     logger.entry("isComplete");
178
179     boolean complete = true;
180     for(int i=0; (i < completedTests.length) && (complete); ++i) {
181       complete = (completedTests[i] != null);
182     }
183
184     logger.exit("isComplete", new Boolean JavaDoc(complete));
185     return complete;
186   }
187
188   public String JavaDoc getSetupSummary()
189   {
190     logger.entry("getSetupSummary");
191
192     StringBuffer JavaDoc result = new StringBuffer JavaDoc();
193
194     if (server == null) {
195       result.append(" No server (intraJVM test) ");
196     } else {
197       result.append(" JMSServer: " + server + ":" + port);
198     }
199     result.append("\n Number of test clients: " + agentCount);
200     result.append("\n Number of subscribers: " + subscriberCount);
201     result.append("\n Number of publishers: " + publisherCount);
202     result.append("\n Messages to be published: "
203                   + messageCount * publisherCount);
204     result.append("\n Persistent: " + persistent);
205     result.append("\n Number of messages expected at each subscriber: "
206                   + messageCount * publisherCount);
207     result.append("\n");
208     result.append("\n Total number of messages to be delivered: " +
209                   messageCount * publisherCount * subscriberCount);
210
211     logger.exit("getSetupSummary", result);
212     return result.toString();
213   }
214
215   public String JavaDoc getResultsSummary()
216   { logger.entry("getResultsSummary");
217     StringBuffer JavaDoc result = new StringBuffer JavaDoc();
218     
219     for (int i=0; i < agentCount; ++i) {
220       result.append("--- Results for test client " + agentRunners[i] + " ----\n");
221       if (completedTests[i] != null) {
222         result.append(completedTests[i].getResultsSummary());
223       } else {
224         result.append("\tNo response from test client yet.\n");
225       }
226       result.append("-------------------------------------------------------------\n");
227     }
228
229     logger.exit("getResultSummary", result);
230     return result.toString();
231   }
232
233
234   public void onMessage(Message JavaDoc msg)
235   {
236     logger.entry("onMessage", msg);
237
238     try {
239       if (msg instanceof ObjectMessage JavaDoc) {
240         ObjectMessage JavaDoc omsg = (ObjectMessage JavaDoc) msg;
241         Object JavaDoc o = omsg.getObject();
242         if (o instanceof PubSubAgent) {
243           PubSubAgent agent = (PubSubAgent) o;
244           logger.debug("-----> Agent response collected: " + agent);
245           completedTests[agent.getIndex()] = agent;
246           runningTests[agent.getIndex()] = null;
247         }
248       }
249     } catch (JMSException JavaDoc jmsex) {
250       jmsex.printStackTrace();
251     }
252
253     logger.exit("onMessage");
254   }
255
256     /////////////////////////////////////////////////////////////////////////
257
// Private methods //
258
/////////////////////////////////////////////////////////////////////////
259

260   private void readProperties(Properties JavaDoc props)
261   {
262     server = props.getProperty(SERVER_KEY);
263     port = Integer.parseInt(props.getProperty(PORT_KEY, "2323"));
264     runLocalAgent = Boolean.valueOf(props.getProperty(LOCAL_AGENT_KEY, "true")).booleanValue();
265     publisherCount = Integer.parseInt(props.getProperty(PUB_COUNT_KEY, "1"));
266     messageCount = Integer.parseInt(props.getProperty(MSG_COUNT_KEY, "10"));
267     messageSize = Integer.parseInt(props.getProperty(MSG_SIZE_KEY, "100"));
268     intervalCount = Integer.parseInt(props.getProperty(INTERVAL_KEY, "2"));
269     intervalPeriod = Integer.parseInt(props.getProperty(PUB_SLEEP_KEY, "1"));
270     persistent = Boolean.valueOf(props.getProperty(PERSISTENT_KEY, "false")).booleanValue();
271     subscriberCount = Integer.parseInt(props.getProperty(SUB_COUNT_KEY, "5"));
272     subscriberMode = props.getProperty(SUB_MODE_KEY, "all").toLowerCase();
273   }
274
275   private PubSubAgent createTestAgent(int index) throws JMSException JavaDoc
276   {
277     logger.entry("createTestAgent", new Integer JavaDoc(index));
278     PubSubAgent agent = new PubSubAgent(index);
279     agent.setConnection(connx);
280
281     int subs = 0;
282     if (subscriberCount <= agentCount) {
283       subs = (index > (agentCount - subscriberCount - 1)) ? 1 : 0;
284     } else {
285       subs = subscriberCount / agentCount;
286       subs = ((subs * index+1) > subscriberCount) ? subscriberCount % subs : subs;
287     }
288
289     int pubs = 0;
290     if (publisherCount <= agentCount) {
291       pubs = (index < publisherCount) ? 1 : 0;
292     } else {
293       pubs = publisherCount / agentCount;
294       pubs = ((pubs * index+1) > publisherCount) ? publisherCount % pubs : pubs;
295     }
296
297     logger.debug("--> pubCount=" + pubs);
298     logger.debug("--> subCount=" + subs);
299
300     agent.setSubscriberCount(subs);
301     agent.setPublisherCount(pubs);
302     agent.setMessageCount(messageCount);
303     agent.setPersistent(persistent);
304     agent.setMessageSize(messageSize);
305     agent.setIntervals(intervalCount);
306     agent.setIntervalPeriod(intervalPeriod);
307     agent.setExpectedMessages(messageCount * publisherCount);
308     agent.setSubscriptionFilter("");
309
310     agent.moveTo( agentRunners[index] );
311
312     logger.exit("createTestAgent", agent);
313     return agent;
314   }
315     /////////////////////////////////////////////////////////////////////////
316
// Main //
317
/////////////////////////////////////////////////////////////////////////
318

319   public static void main(String JavaDoc [] args) throws Exception JavaDoc
320   {
321     logger.entry("main", args);
322
323     String JavaDoc propfile = null;
324     if (args.length != 1) {
325       propfile = "default.prop";
326     } else {
327       propfile = args[0];
328     }
329     System.out.println("Using "+propfile+" for test case.");
330         
331     try {
332       //
333
// Read in the test properties
334
//
335
Properties JavaDoc testProp = new Properties JavaDoc();
336       FileInputStream JavaDoc fis = null;
337       try {
338         fis = new FileInputStream JavaDoc(propfile);
339         testProp.load(fis);
340       } finally {
341         if (fis != null) fis.close();
342       }
343
344       boolean interactive = Boolean.valueOf
345         (testProp.getProperty("interactive", "true")).booleanValue();
346       int iterations =
347         Integer.parseInt(testProp.getProperty("testIterations", "1"));
348
349       for (int i=1; i <= iterations; ++i) {
350         PubSubTest test = new PubSubTest(testProp);
351
352         if (iterations > 1) {
353           System.out.println("-Iteration " + i + " ----------------------------------------------------");
354         } else {
355           System.out.println("----------------------------------------------------------------------");
356         }
357           System.out.println(test.getSetupSummary());
358           System.out.println("----------------------------------------------------------------------");
359           System.out.println("\n");
360           
361           if (interactive) {
362             System.out.println(" Press <ENTER> start the test.");
363             BufferedReader JavaDoc input = new BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
364             input.readLine();
365           } else {
366             try { Thread.sleep(1000); } catch(InterruptedException JavaDoc ie) {}
367           }
368           
369           test.start();
370           
371           if (interactive) {
372             System.out.println(" Test started.");
373           }
374           
375           while (!test.isComplete() ) {
376             try { Thread.sleep(1000); } catch (InterruptedException JavaDoc ie) {}
377           }
378           
379           System.out.println("----------------------------------------------------------------------");
380           System.out.println(test.getResultsSummary());
381           System.out.println("----------------------------------------------------------------------");
382           System.out.println("\n\n");
383           
384           test.close();
385           
386           if (interactive) {
387             System.out.println("Press <ENTER> to exit.");
388             BufferedReader JavaDoc input = new
389               BufferedReader JavaDoc(new InputStreamReader JavaDoc(System.in));
390             input.readLine();
391           }
392       }
393     } catch (Throwable JavaDoc t) {
394       System.err.println("The following error occured while executing the test:");
395       t.printStackTrace();
396     }
397
398     logger.exit("main");
399   }
400
401
402   ////////////////////////////// Misc stuff ////////////////////////////////
403

404   private static Logger logger =
405     LoggerFactory.getLogger(PubSubTest.class, null);
406
407   ///////////////////////////////////////////////////////////////////////////
408

409 }
410
Popular Tags