KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > TopicListener


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 import java.util.Arrays JavaDoc;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22 import javax.jms.Message JavaDoc;
23 import javax.jms.MessageConsumer JavaDoc;
24 import javax.jms.MessageListener JavaDoc;
25 import javax.jms.MessageProducer JavaDoc;
26 import javax.jms.Session JavaDoc;
27 import javax.jms.TextMessage JavaDoc;
28 import javax.jms.Topic JavaDoc;
29
30 import org.apache.activemq.ActiveMQConnectionFactory;
31
32 /**
33  * Use in conjunction with TopicPublisher to test the performance of ActiveMQ Topics.
34  */

35 public class TopicListener implements MessageListener JavaDoc {
36     
37     private Connection JavaDoc connection;
38     private MessageProducer JavaDoc producer;
39     private Session JavaDoc session;
40     private int count;
41     private long start;
42     private Topic JavaDoc topic;
43     private Topic JavaDoc control;
44     
45 // private String url="tcp://localhost:61616?jms.dispatchAsync=true&jms.useAsyncSend=true&jms.optimizeAcknowledge=true&jms.disableTimeStampsByDefault=true&jms.optimizedMessageDispatch=true&wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false";
46
private String JavaDoc url="tcp://localhost:61616";
47
48     public static void main(String JavaDoc[] argv) throws Exception JavaDoc {
49         TopicListener l = new TopicListener();
50         String JavaDoc[] unknonwn = CommnadLineSupport.setOptions(l, argv);
51         if (unknonwn.length > 0) {
52             System.out.println("Unknown options: " + Arrays.toString(unknonwn));
53             System.exit(-1);
54         }
55         l.run();
56     }
57     
58     public void run() throws JMSException JavaDoc {
59         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
60         connection = factory.createConnection();
61         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
62         topic = session.createTopic("topictest.messages");
63         control = session.createTopic("topictest.control");
64
65         MessageConsumer JavaDoc consumer = session.createConsumer(topic);
66         consumer.setMessageListener(this);
67
68         connection.start();
69
70         producer = session.createProducer(control);
71         System.out.println("Waiting for messages...");
72     }
73     
74     private static boolean checkText(Message JavaDoc m, String JavaDoc s)
75     {
76         try
77         {
78             return m instanceof TextMessage JavaDoc && ((TextMessage JavaDoc) m).getText().equals(s);
79         }
80         catch (JMSException JavaDoc e)
81         {
82             e.printStackTrace(System.out);
83             return false;
84         }
85     }
86
87
88     public void onMessage(Message JavaDoc message) {
89         if ( checkText(message, "SHUTDOWN") ) {
90             
91             try {
92                 connection.close();
93             } catch (Exception JavaDoc e) {
94                 e.printStackTrace(System.out);
95             }
96             
97         } else if (checkText(message, "REPORT")) {
98             // send a report:
99
try {
100                 long time = (System.currentTimeMillis() - start);
101                 String JavaDoc msg = "Received " + count + " in " + time + "ms";
102                 producer.send(session.createTextMessage(msg));
103             } catch (Exception JavaDoc e) {
104                 e.printStackTrace(System.out);
105             }
106             count = 0;
107             
108         } else {
109             
110             if (count==0) {
111                 start = System.currentTimeMillis();
112             }
113             
114             if (++count % 1000 == 0)
115                 System.out.println("Received " + count + " messages.");
116         }
117     }
118
119     public void setUrl(String JavaDoc url) {
120         this.url = url;
121     }
122
123 }
124
Popular Tags