KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > ConsumerTool


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 import javax.jms.Connection JavaDoc;
20 import javax.jms.DeliveryMode JavaDoc;
21 import javax.jms.Destination JavaDoc;
22 import javax.jms.ExceptionListener JavaDoc;
23 import javax.jms.JMSException JavaDoc;
24 import javax.jms.Message JavaDoc;
25 import javax.jms.MessageConsumer JavaDoc;
26 import javax.jms.MessageListener JavaDoc;
27 import javax.jms.MessageProducer JavaDoc;
28 import javax.jms.Session JavaDoc;
29 import javax.jms.TextMessage JavaDoc;
30 import javax.jms.Topic JavaDoc;
31
32 import org.apache.activemq.ActiveMQConnection;
33 import org.apache.activemq.ActiveMQConnectionFactory;
34
35 import java.io.IOException JavaDoc;
36 import java.util.Arrays JavaDoc;
37
38 /**
39  * A simple tool for consuming messages
40  *
41  * @version $Revision: 1.1.1.1 $
42  */

43 public class ConsumerTool implements MessageListener JavaDoc, ExceptionListener JavaDoc {
44
45     private boolean running;
46     
47     private Session JavaDoc session;
48     private Destination JavaDoc destination;
49     private MessageProducer JavaDoc replyProducer;
50     
51     private boolean pauseBeforeShutdown;
52     private boolean verbose = true;
53     private int maxiumMessages = 0;
54     private String JavaDoc subject = "TOOL.DEFAULT";
55     private boolean topic = false;
56     private String JavaDoc user = ActiveMQConnection.DEFAULT_USER;
57     private String JavaDoc password = ActiveMQConnection.DEFAULT_PASSWORD;
58     private String JavaDoc url = ActiveMQConnection.DEFAULT_BROKER_URL;
59     private boolean transacted = false;
60     private boolean durable = false;
61     private String JavaDoc clientId;
62     private int ackMode = Session.AUTO_ACKNOWLEDGE;
63     private String JavaDoc consumerName = "James";
64     private long sleepTime = 0;
65     private long receiveTimeOut = 0;
66
67     public static void main(String JavaDoc[] args) {
68         ConsumerTool consumerTool = new ConsumerTool();
69         String JavaDoc[] unknonwn = CommnadLineSupport.setOptions(consumerTool, args);
70         if (unknonwn.length > 0) {
71             System.out.println("Unknown options: " + Arrays.toString(unknonwn));
72             System.exit(-1);
73         }
74         consumerTool.run();
75     }
76
77     public void run() {
78         try {
79             running = true;
80
81             System.out.println("Connecting to URL: " + url);
82             System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
83             System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
84
85             ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
86             Connection JavaDoc connection = connectionFactory.createConnection();
87             if (durable && clientId != null && clientId.length()>0 && !"null".equals(clientId) ) {
88                 connection.setClientID(clientId);
89             }
90             connection.setExceptionListener(this);
91             connection.start();
92
93             session = connection.createSession(transacted, ackMode);
94             if (topic) {
95                 destination = session.createTopic(subject);
96             } else {
97                 destination = session.createQueue(subject);
98             }
99
100             replyProducer = session.createProducer(null);
101             replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
102
103             MessageConsumer JavaDoc consumer = null;
104             if (durable && topic) {
105                 consumer = session.createDurableSubscriber((Topic JavaDoc) destination, consumerName);
106             } else {
107                 consumer = session.createConsumer(destination);
108             }
109             
110             if (maxiumMessages > 0) {
111                 consumeMessagesAndClose(connection, session, consumer);
112             } else {
113                 if (receiveTimeOut == 0) {
114                     consumer.setMessageListener(this);
115                 } else {
116                     consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
117                 }
118             }
119             
120         } catch (Exception JavaDoc e) {
121             System.out.println("Caught: " + e);
122             e.printStackTrace();
123         }
124     }
125
126     public void onMessage(Message JavaDoc message) {
127         try {
128             
129             if (message instanceof TextMessage JavaDoc) {
130                 TextMessage JavaDoc txtMsg = (TextMessage JavaDoc) message;
131                 if (verbose) {
132
133                     String JavaDoc msg = txtMsg.getText();
134                     if (msg.length() > 50) {
135                         msg = msg.substring(0, 50) + "...";
136                     }
137
138                     System.out.println("Received: " + msg);
139                 }
140             } else {
141                 if (verbose) {
142                     System.out.println("Received: " + message);
143                 }
144             }
145
146             if (message.getJMSReplyTo() != null) {
147                 replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
148             }
149             
150             if (transacted) {
151                 session.commit();
152             } else if ( ackMode == Session.CLIENT_ACKNOWLEDGE ) {
153                 message.acknowledge();
154             }
155
156         } catch (JMSException JavaDoc e) {
157             System.out.println("Caught: " + e);
158             e.printStackTrace();
159         } finally {
160             if (sleepTime > 0) {
161                 try {
162                     Thread.sleep(sleepTime);
163                 } catch (InterruptedException JavaDoc e) {
164                 }
165             }
166         }
167     }
168
169     synchronized public void onException(JMSException JavaDoc ex) {
170         System.out.println("JMS Exception occured. Shutting down client.");
171         running = false;
172     }
173
174     synchronized boolean isRunning() {
175         return running;
176     }
177
178     protected void consumeMessagesAndClose(Connection JavaDoc connection, Session JavaDoc session, MessageConsumer JavaDoc consumer) throws JMSException JavaDoc,
179             IOException JavaDoc {
180         System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
181
182         for (int i = 0; i < maxiumMessages && isRunning();) {
183             Message JavaDoc message = consumer.receive(1000);
184             if (message != null) {
185                 i++;
186                 onMessage(message);
187             }
188         }
189         System.out.println("Closing connection");
190         consumer.close();
191         session.close();
192         connection.close();
193         if (pauseBeforeShutdown) {
194             System.out.println("Press return to shut down");
195             System.in.read();
196         }
197     }
198
199     protected void consumeMessagesAndClose(Connection JavaDoc connection, Session JavaDoc session, MessageConsumer JavaDoc consumer, long timeout)
200             throws JMSException JavaDoc, IOException JavaDoc {
201         System.out.println("We will consume messages while they continue to be delivered within: " + timeout
202                 + " ms, and then we will shutdown");
203
204         Message JavaDoc message;
205         while ((message = consumer.receive(timeout)) != null) {
206             onMessage(message);
207         }
208
209         System.out.println("Closing connection");
210         consumer.close();
211         session.close();
212         connection.close();
213         if (pauseBeforeShutdown) {
214             System.out.println("Press return to shut down");
215             System.in.read();
216         }
217     }
218
219     public void setAckMode(String JavaDoc ackMode) {
220         if( "CLIENT_ACKNOWLEDGE".equals(ackMode) ) {
221             this.ackMode = Session.CLIENT_ACKNOWLEDGE;
222         }
223         if( "AUTO_ACKNOWLEDGE".equals(ackMode) ) {
224             this.ackMode = Session.AUTO_ACKNOWLEDGE;
225         }
226         if( "DUPS_OK_ACKNOWLEDGE".equals(ackMode) ) {
227             this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
228         }
229         if( "SESSION_TRANSACTED".equals(ackMode) ) {
230             this.ackMode = Session.SESSION_TRANSACTED;
231         }
232     }
233     
234     public void setClientId(String JavaDoc clientID) {
235         this.clientId = clientID;
236     }
237     public void setConsumerName(String JavaDoc consumerName) {
238         this.consumerName = consumerName;
239     }
240     public void setDurable(boolean durable) {
241         this.durable = durable;
242     }
243     public void setMaxiumMessages(int maxiumMessages) {
244         this.maxiumMessages = maxiumMessages;
245     }
246     public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
247         this.pauseBeforeShutdown = pauseBeforeShutdown;
248     }
249     public void setPassword(String JavaDoc pwd) {
250         this.password = pwd;
251     }
252     public void setReceiveTimeOut(long receiveTimeOut) {
253         this.receiveTimeOut = receiveTimeOut;
254     }
255     public void setSleepTime(long sleepTime) {
256         this.sleepTime = sleepTime;
257     }
258     public void setSubject(String JavaDoc subject) {
259         this.subject = subject;
260     }
261     public void setTopic(boolean topic) {
262         this.topic = topic;
263     }
264     public void setQueue(boolean queue) {
265         this.topic = !queue;
266     }
267     public void setTransacted(boolean transacted) {
268         this.transacted = transacted;
269     }
270     public void setUrl(String JavaDoc url) {
271         this.url = url;
272     }
273     public void setUser(String JavaDoc user) {
274         this.user = user;
275     }
276     public void setVerbose(boolean verbose) {
277         this.verbose = verbose;
278     }
279
280 }
281
Popular Tags