KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > tool > ConsumerTool


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.tool;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22 import javax.jms.Message JavaDoc;
23 import javax.jms.MessageConsumer JavaDoc;
24 import javax.jms.MessageListener JavaDoc;
25 import javax.jms.Session JavaDoc;
26 import javax.jms.TextMessage JavaDoc;
27 import javax.jms.Topic JavaDoc;
28 import java.io.IOException JavaDoc;
29
30 /**
31  * A simple tool for consuming messages
32  *
33  * @version $Revision$
34  */

35 public class ConsumerTool extends ToolSupport implements MessageListener JavaDoc {
36
37     protected int count = 0;
38     protected int dumpCount = 10;
39     protected boolean verbose = true;
40     protected int maxiumMessages = 0;
41     private boolean pauseBeforeShutdown;
42
43
44     public static void main(String JavaDoc[] args) {
45         ConsumerTool tool = new ConsumerTool();
46         if (args.length > 0) {
47             tool.url = args[0];
48         }
49         if (args.length > 1) {
50             tool.topic = args[1].equalsIgnoreCase("true");
51         }
52         if (args.length > 2) {
53             tool.subject = args[2];
54         }
55         if (args.length > 3) {
56             tool.durable = args[3].equalsIgnoreCase("true");
57         }
58         if (args.length > 4) {
59             tool.maxiumMessages = Integer.parseInt(args[4]);
60         }
61         tool.run();
62     }
63
64     public void run() {
65         try {
66             System.out.println("Connecting to URL: " + url);
67             System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
68             System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
69
70             Connection JavaDoc connection = createConnection();
71             Session JavaDoc session = createSession(connection);
72             MessageConsumer JavaDoc consumer = null;
73             if (durable && topic) {
74                 consumer = session.createDurableSubscriber((Topic JavaDoc) destination, consumerName);
75             }
76             else {
77                 consumer = session.createConsumer(destination);
78             }
79             if (maxiumMessages <= 0) {
80                 consumer.setMessageListener(this);
81             }
82             connection.start();
83
84             if (maxiumMessages > 0) {
85                 consumeMessagesAndClose(connection, session, consumer);
86             }
87         }
88         catch (Exception JavaDoc e) {
89             System.out.println("Caught: " + e);
90             e.printStackTrace();
91         }
92     }
93
94     public void onMessage(Message JavaDoc message) {
95         try {
96             if (message instanceof TextMessage JavaDoc) {
97                 TextMessage JavaDoc txtMsg = (TextMessage JavaDoc) message;
98                 if (verbose) {
99                     
100                     String JavaDoc msg = txtMsg.getText();
101                     if( msg.length() > 50 )
102                         msg = msg.substring(0, 50)+"...";
103                     
104                     System.out.println("Received: " + msg);
105                 }
106             }
107             else {
108                 if (verbose) {
109                     System.out.println("Received: " + message);
110                 }
111             }
112             /*
113             if (++count % dumpCount == 0) {
114                 dumpStats(connection);
115             }
116             */

117         }
118         catch (JMSException JavaDoc e) {
119             System.out.println("Caught: " + e);
120             e.printStackTrace();
121         }
122     }
123
124
125     protected void consumeMessagesAndClose(Connection JavaDoc connection, Session JavaDoc session, MessageConsumer JavaDoc consumer) throws JMSException JavaDoc, IOException JavaDoc {
126         System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
127
128         for (int i = 0; i < maxiumMessages; i++) {
129             Message JavaDoc message = consumer.receive();
130             onMessage(message);
131         }
132         System.out.println("Closing connection");
133         consumer.close();
134         session.close();
135         connection.close();
136         if (pauseBeforeShutdown) {
137             System.out.println("Press return to shut down");
138             System.in.read();
139         }
140     }
141 }
142
Popular Tags