KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > TopicPublisher


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 import java.util.Arrays JavaDoc;
19
20 import javax.jms.*;
21
22 import org.apache.activemq.ActiveMQConnectionFactory;
23
24 /**
25  * Use in conjunction with TopicListener to test the performance of ActiveMQ Topics.
26  */

27 public class TopicPublisher implements MessageListener
28 {
29     private final Object JavaDoc mutex = new Object JavaDoc();
30     private Connection connection;
31     private Session session;
32     private MessageProducer publisher;
33     private Topic topic;
34     private Topic control;
35     
36 // private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false";
37
private String JavaDoc url="tcp://localhost:61616";
38     private int size=256;
39     private int subscribers=1;
40     private int remaining;
41     private int messages=10000;
42     private long delay;
43     private int batch=40;
44     
45     private byte[] payload;
46     private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
47
48     public static void main(String JavaDoc[] argv) throws Exception JavaDoc
49     {
50         TopicPublisher p = new TopicPublisher();
51         String JavaDoc[] unknonwn = CommnadLineSupport.setOptions(p, argv);
52         if (unknonwn.length > 0) {
53             System.out.println("Unknown options: " + Arrays.toString(unknonwn));
54             System.exit(-1);
55         }
56         p.run();
57     }
58
59     private void run() throws Exception JavaDoc
60     {
61         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
62         connection = factory.createConnection();
63         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
64         topic = session.createTopic("topictest.messages");
65         control = session.createTopic("topictest.control");
66         
67         publisher = session.createProducer(topic);
68         publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
69         
70         payload = new byte[size];
71         for(int i = 0; i < size; i++)
72         {
73             payload[i] = (byte) DATA[i % DATA.length];
74         }
75
76         session.createConsumer(control).setMessageListener(this);
77         connection.start();
78
79         long[] times = new long[batch];
80         for(int i = 0; i < batch; i++)
81         {
82             if(i > 0) Thread.sleep(delay*1000);
83             times[i] = batch(messages);
84             System.out.println("Batch " + (i+1) + " of " + batch + " completed in " + times[i] + " ms.");
85         }
86
87         long min = min(times);
88         long max = max(times);
89         System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
90
91         //request shutdown
92
publisher.send(session.createTextMessage("SHUTDOWN"));
93
94         connection.stop();
95         connection.close();
96     }
97
98     private long batch(int msgCount) throws Exception JavaDoc
99     {
100         long start = System.currentTimeMillis();
101         remaining=subscribers;
102         publish();
103         waitForCompletion();
104         return System.currentTimeMillis() - start;
105     }
106
107     private void publish() throws Exception JavaDoc
108     {
109
110         //send events
111
BytesMessage msg = session.createBytesMessage();
112         msg.writeBytes(payload);
113         for (int i = 0; i < messages; i++)
114         {
115             publisher.send(msg);
116             if ((i + 1) % 1000 == 0)
117             {
118                 System.out.println("Sent " + (i + 1) + " messages");
119             }
120         }
121
122         //request report
123
publisher.send(session.createTextMessage("REPORT"));
124     }
125
126     private void waitForCompletion() throws Exception JavaDoc
127     {
128         System.out.println("Waiting for completion...");
129         synchronized (mutex)
130         {
131             while (remaining > 0)
132             {
133                 mutex.wait();
134             }
135         }
136     }
137
138
139     public void onMessage(Message message)
140     {
141         synchronized (mutex)
142         {
143             System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
144             if (remaining == 0)
145             {
146                 mutex.notify();
147             }
148         }
149     }
150     
151     Object JavaDoc getReport(Message m)
152     {
153         try
154         {
155             return ((TextMessage) m).getText();
156         }
157         catch (JMSException e)
158         {
159             e.printStackTrace(System.out);
160             return e.toString();
161         }
162     }
163
164     static long min(long[] times)
165     {
166         long min = times.length > 0 ? times[0] : 0;
167         for(int i = 0; i < times.length; i++)
168         {
169             min = Math.min(min, times[i]);
170         }
171         return min;
172     }
173
174     static long max(long[] times)
175     {
176         long max = times.length > 0 ? times[0] : 0;
177         for(int i = 0; i < times.length; i++)
178         {
179             max = Math.max(max, times[i]);
180         }
181         return max;
182     }
183
184     static long avg(long[] times, long min, long max)
185     {
186         long sum = 0;
187         for(int i = 0; i < times.length; i++)
188         {
189             sum += times[i];
190         }
191         sum -= min;
192         sum -= max;
193         return (sum / times.length - 2);
194     }
195
196     public void setBatch(int batch) {
197         this.batch = batch;
198     }
199     public void setDelay(long delay) {
200         this.delay = delay;
201     }
202     public void setMessages(int messages) {
203         this.messages = messages;
204     }
205     public void setSize(int size) {
206         this.size = size;
207     }
208     public void setSubscribers(int subscribers) {
209         this.subscribers = subscribers;
210     }
211     public void setUrl(String JavaDoc url) {
212         this.url = url;
213     }
214 }
215
Popular Tags