KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > tool > JmsConsumerClient


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.tool;
19
20 import org.apache.commons.logging.LogFactory;
21 import org.apache.commons.logging.Log;
22 import org.apache.activemq.tool.properties.JmsConsumerProperties;
23 import org.apache.activemq.tool.properties.JmsClientProperties;
24
25 import javax.jms.MessageConsumer JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.ConnectionFactory JavaDoc;
28 import javax.jms.Connection JavaDoc;
29 import javax.jms.Destination JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageListener JavaDoc;
32 import javax.jms.Topic JavaDoc;
33
34 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
35
36 public class JmsConsumerClient extends AbstractJmsMeasurableClient {
37     private static final Log log = LogFactory.getLog(JmsConsumerClient.class);
38
39     protected MessageConsumer JavaDoc jmsConsumer;
40     protected JmsConsumerProperties client;
41
42     public JmsConsumerClient(ConnectionFactory JavaDoc factory) {
43         this(new JmsConsumerProperties(), factory);
44     }
45
46     public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory JavaDoc factory) {
47         super(factory);
48         client = clientProps;
49     }
50
51     public void receiveMessages() throws JMSException JavaDoc {
52         if (client.isAsyncRecv()) {
53             if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
54                 receiveAsyncTimeBasedMessages(client.getRecvDuration());
55             } else {
56                 receiveAsyncCountBasedMessages(client.getRecvCount());
57             }
58         } else {
59             if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
60                 receiveSyncTimeBasedMessages(client.getRecvDuration());
61             } else {
62                 receiveSyncCountBasedMessages(client.getRecvCount());
63             }
64         }
65     }
66
67     public void receiveMessages(int destCount) throws JMSException JavaDoc {
68         this.destCount = destCount;
69         receiveMessages();
70     }
71
72     public void receiveMessages(int destIndex, int destCount) throws JMSException JavaDoc {
73         this.destIndex = destIndex;
74         receiveMessages(destCount);
75     }
76
77     public void receiveSyncTimeBasedMessages(long duration) throws JMSException JavaDoc {
78         if (getJmsConsumer() == null) {
79             createJmsConsumer();
80         }
81
82         try {
83             getConnection().start();
84
85             log.info("Starting to synchronously receive messages for " + duration + " ms...");
86             long endTime = System.currentTimeMillis() + duration;
87             while (System.currentTimeMillis() < endTime) {
88                 getJmsConsumer().receive();
89                 incThroughput();
90             }
91         } finally {
92             if (client.isDurable() && client.isUnsubscribe()) {
93                 log.info("Unsubscribing durable subscriber: " + getClientName());
94                 getJmsConsumer().close();
95                 getSession().unsubscribe(getClientName());
96             }
97             getConnection().close();
98         }
99     }
100
101     public void receiveSyncCountBasedMessages(long count) throws JMSException JavaDoc {
102         if (getJmsConsumer() == null) {
103             createJmsConsumer();
104         }
105
106         try {
107             getConnection().start();
108             log.info("Starting to synchronously receive " + count + " messages...");
109
110             int recvCount = 0;
111             while (recvCount < count) {
112                 getJmsConsumer().receive();
113                 incThroughput();
114                 recvCount++;
115             }
116         } finally {
117             if (client.isDurable() && client.isUnsubscribe()) {
118                 log.info("Unsubscribing durable subscriber: " + getClientName());
119                 getJmsConsumer().close();
120                 getSession().unsubscribe(getClientName());
121             }
122             getConnection().close();
123         }
124     }
125
126     public void receiveAsyncTimeBasedMessages(long duration) throws JMSException JavaDoc {
127         if (getJmsConsumer() == null) {
128             createJmsConsumer();
129         }
130
131         getJmsConsumer().setMessageListener(new MessageListener JavaDoc() {
132             public void onMessage(Message JavaDoc msg) {
133                 incThroughput();
134             }
135         });
136
137         try {
138             getConnection().start();
139             log.info("Starting to asynchronously receive messages for " + duration + " ms...");
140             try {
141                 Thread.sleep(duration);
142             } catch (InterruptedException JavaDoc e) {
143                 throw new JMSException JavaDoc("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
144             }
145         } finally {
146             if (client.isDurable() && client.isUnsubscribe()) {
147                 log.info("Unsubscribing durable subscriber: " + getClientName());
148                 getJmsConsumer().close();
149                 getSession().unsubscribe(getClientName());
150             }
151             getConnection().close();
152         }
153     }
154
155     public void receiveAsyncCountBasedMessages(long count) throws JMSException JavaDoc {
156         if (getJmsConsumer() == null) {
157             createJmsConsumer();
158         }
159
160         final AtomicInteger JavaDoc recvCount = new AtomicInteger JavaDoc(0);
161         getJmsConsumer().setMessageListener(new MessageListener JavaDoc() {
162             public void onMessage(Message JavaDoc msg) {
163                 incThroughput();
164                 recvCount.incrementAndGet();
165                 recvCount.notify();
166             }
167         });
168
169         try {
170             getConnection().start();
171             log.info("Starting to asynchronously receive " + client.getRecvCount() + " messages...");
172             try {
173                 while (recvCount.get() < count) {
174                     recvCount.wait();
175                 }
176             } catch (InterruptedException JavaDoc e) {
177                 throw new JMSException JavaDoc("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
178             }
179         } finally {
180             if (client.isDurable() && client.isUnsubscribe()) {
181                 log.info("Unsubscribing durable subscriber: " + getClientName());
182                 getJmsConsumer().close();
183                 getSession().unsubscribe(getClientName());
184             }
185             getConnection().close();
186         }
187     }
188
189     public MessageConsumer JavaDoc createJmsConsumer() throws JMSException JavaDoc {
190         Destination JavaDoc[] dest = createDestination(destIndex, destCount);
191         return createJmsConsumer(dest[0]);
192     }
193
194     public MessageConsumer JavaDoc createJmsConsumer(Destination JavaDoc dest) throws JMSException JavaDoc {
195         if (client.isDurable()) {
196             String JavaDoc clientName = getClientName();
197             if (clientName == null) {
198                 clientName = "JmsConsumer";
199                 setClientName(clientName);
200             }
201             log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
202             jmsConsumer = getSession().createDurableSubscriber((Topic JavaDoc) dest, clientName);
203         } else {
204             log.info("Creating non-durable consumer to: " + dest.toString());
205             jmsConsumer = getSession().createConsumer(dest);
206         }
207         return jmsConsumer;
208     }
209
210     public MessageConsumer JavaDoc createJmsConsumer(Destination JavaDoc dest, String JavaDoc selector, boolean noLocal) throws JMSException JavaDoc {
211         if (client.isDurable()) {
212             String JavaDoc clientName = getClientName();
213             if (clientName == null) {
214                 clientName = "JmsConsumer";
215                 setClientName(clientName);
216             }
217             log.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
218             jmsConsumer = getSession().createDurableSubscriber((Topic JavaDoc) dest, clientName, selector, noLocal);
219         } else {
220             log.info("Creating non-durable consumer to: " + dest.toString());
221             jmsConsumer = getSession().createConsumer(dest, selector, noLocal);
222         }
223         return jmsConsumer;
224     }
225
226     public MessageConsumer JavaDoc getJmsConsumer() {
227         return jmsConsumer;
228     }
229
230     public JmsClientProperties getClient() {
231         return client;
232     }
233
234     public void setClient(JmsClientProperties clientProps) {
235         client = (JmsConsumerProperties)clientProps;
236     }
237 }
238
Popular Tags