KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > web > WebClient


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.web;
20
21 import java.io.Externalizable JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.io.ObjectInput JavaDoc;
24 import java.io.ObjectOutput JavaDoc;
25 import java.util.ArrayList JavaDoc;
26 import java.util.HashMap JavaDoc;
27 import java.util.Iterator JavaDoc;
28 import java.util.List JavaDoc;
29 import java.util.Map JavaDoc;
30
31 import javax.jms.ConnectionFactory JavaDoc;
32 import javax.jms.DeliveryMode JavaDoc;
33 import javax.jms.Destination JavaDoc;
34 import javax.jms.JMSException JavaDoc;
35 import javax.jms.Message JavaDoc;
36 import javax.jms.MessageConsumer JavaDoc;
37 import javax.jms.MessageProducer JavaDoc;
38 import javax.jms.Session JavaDoc;
39 import javax.servlet.ServletContext JavaDoc;
40 import javax.servlet.http.HttpServletRequest JavaDoc;
41 import javax.servlet.http.HttpSession JavaDoc;
42 import javax.servlet.http.HttpSessionActivationListener JavaDoc;
43 import javax.servlet.http.HttpSessionBindingEvent JavaDoc;
44 import javax.servlet.http.HttpSessionBindingListener JavaDoc;
45 import javax.servlet.http.HttpSessionEvent JavaDoc;
46
47 import org.apache.activemq.ActiveMQConnection;
48 import org.apache.activemq.ActiveMQConnectionFactory;
49 import org.apache.activemq.ActiveMQSession;
50 import org.apache.activemq.MessageAvailableConsumer;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53
54 import java.util.concurrent.Semaphore JavaDoc;
55
56 /**
57  * Represents a messaging client used from inside a web container typically
58  * stored inside a HttpSession
59  *
60  * TODO controls to prevent DOS attacks with users requesting many consumers
61  * TODO configure consumers with small prefetch.
62  *
63  * @version $Revision: 1.1.1.1 $
64  */

65 public class WebClient implements HttpSessionActivationListener JavaDoc, HttpSessionBindingListener JavaDoc, Externalizable JavaDoc {
66     public static final String JavaDoc webClientAttribute = "org.apache.activemq.webclient";
67     public static final String JavaDoc connectionFactoryAttribute = "org.apache.activemq.connectionFactory";
68
69     public static final String JavaDoc connectionFactoryPrefetchParam = "org.apache.activemq.connectionFactory.prefetch";
70     public static final String JavaDoc connectionFactoryOptimizeAckParam = "org.apache.activemq.connectionFactory.optimizeAck";
71     public static final String JavaDoc brokerUrlInitParam = "org.apache.activemq.brokerURL";
72
73     private static final Log log = LogFactory.getLog(WebClient.class);
74
75     private static transient ConnectionFactory JavaDoc factory;
76
77     private transient Map JavaDoc consumers = new HashMap JavaDoc();
78     private transient ActiveMQConnection connection;
79     private transient ActiveMQSession session;
80     private transient MessageProducer JavaDoc producer;
81     private int deliveryMode = DeliveryMode.NON_PERSISTENT;
82
83     private final Semaphore JavaDoc semaphore = new Semaphore JavaDoc(1);
84
85
86     /**
87      * Helper method to get the client for the current session, lazily creating
88      * a client if there is none currently
89      *
90      * @param request is the current HTTP request
91      * @return the current client or a newly creates
92      */

93     public static WebClient getWebClient(HttpServletRequest JavaDoc request) {
94         HttpSession JavaDoc session = request.getSession(true);
95         WebClient client = getWebClient(session);
96         if (client == null || client.isClosed()) {
97             client = WebClient.createWebClient(request);
98             session.setAttribute(webClientAttribute, client);
99         }
100
101         return client;
102     }
103     /**
104      * @return the web client for the current HTTP session or null if there is
105      * not a web client created yet
106      */

107     public static WebClient getWebClient(HttpSession JavaDoc session) {
108         return (WebClient) session.getAttribute(webClientAttribute);
109     }
110
111     public static void initContext(ServletContext JavaDoc context) {
112         initConnectionFactory(context);
113     }
114
115     public WebClient() {
116         if (factory == null)
117             throw new IllegalStateException JavaDoc("initContext(ServletContext) not called");
118     }
119
120     public int getDeliveryMode() {
121         return deliveryMode;
122     }
123
124     public void setDeliveryMode(int deliveryMode) {
125         this.deliveryMode = deliveryMode;
126     }
127
128     public synchronized void closeConsumers() {
129         for (Iterator JavaDoc it = consumers.values().iterator(); it.hasNext();) {
130             MessageConsumer JavaDoc consumer = (MessageConsumer JavaDoc) it.next();
131             it.remove();
132             try {
133                 consumer.setMessageListener(null);
134                 if (consumer instanceof MessageAvailableConsumer)
135                     ((MessageAvailableConsumer) consumer).setAvailableListener(null);
136                 consumer.close();
137             }
138             catch (JMSException JavaDoc e) {
139                 log.debug("caught exception closing consumer",e);
140             }
141         }
142     }
143
144     public synchronized void close() {
145         try {
146             closeConsumers();
147             if (connection != null)
148                 connection.close();
149         }
150         catch (JMSException JavaDoc e) {
151             log.debug("caught exception closing consumer",e);
152         }
153         finally {
154             producer = null;
155             session = null;
156             connection = null;
157             if (consumers != null)
158                 consumers.clear();
159             consumers = null;
160         }
161     }
162
163     public boolean isClosed() {
164         return consumers == null;
165     }
166
167     public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
168         if (consumers != null) {
169             out.write(consumers.size());
170             Iterator JavaDoc i = consumers.keySet().iterator();
171             while (i.hasNext())
172                 out.writeObject(i.next().toString());
173         }
174         else
175             out.write(-1);
176
177     }
178
179     public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
180         int size = in.readInt();
181         if (size >= 0) {
182             consumers = new HashMap JavaDoc();
183             for (int i = 0; i < size; i++) {
184                 String JavaDoc destinationName = in.readObject().toString();
185
186                 try {
187                     Destination JavaDoc destination = destinationName.startsWith("topic://") ? (Destination JavaDoc) getSession().createTopic(destinationName)
188                             : (Destination JavaDoc) getSession().createQueue(destinationName);
189                     consumers.put(destination, getConsumer(destination, true));
190                 }
191                 catch (JMSException JavaDoc e) {
192                     log.debug("Caought Exception ",e);
193                     IOException JavaDoc ex = new IOException JavaDoc(e.getMessage());
194                     ex.initCause(e.getCause() != null ? e.getCause() : e);
195                     throw ex;
196                     
197                 }
198             }
199         }
200     }
201
202     public void send(Destination JavaDoc destination, Message JavaDoc message) throws JMSException JavaDoc {
203         getProducer().send(destination, message);
204         if (log.isDebugEnabled()) {
205             log.debug("Sent! to destination: " + destination + " message: " + message);
206         }
207     }
208
209     public void send(Destination JavaDoc destination, Message JavaDoc message, boolean persistent, int priority, int timeToLive) throws JMSException JavaDoc {
210         int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
211         getProducer().send(destination, message, deliveryMode, priority, timeToLive);
212         if (log.isDebugEnabled()) {
213             log.debug("Sent! to destination: " + destination + " message: " + message);
214         }
215     }
216
217     public Session JavaDoc getSession() throws JMSException JavaDoc {
218         if (session == null) {
219             session = createSession();
220         }
221         return session;
222     }
223
224     public ActiveMQConnection getConnection() throws JMSException JavaDoc {
225         if (connection == null) {
226             connection = (ActiveMQConnection) factory.createConnection();
227             connection.start();
228         }
229         return connection;
230     }
231
232     public static synchronized void initConnectionFactory(ServletContext JavaDoc servletContext) {
233         if (factory == null)
234             factory = (ConnectionFactory JavaDoc) servletContext.getAttribute(connectionFactoryAttribute);
235         if (factory == null) {
236             String JavaDoc brokerURL = servletContext.getInitParameter(brokerUrlInitParam);
237
238
239             log.debug("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
240
241             if (brokerURL == null) {
242                 brokerURL = "vm://localhost";
243             }
244
245             ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
246
247             // Set prefetch policy for factory
248
if (servletContext.getInitParameter(connectionFactoryPrefetchParam) != null) {
249                 int prefetch = Integer.valueOf(servletContext.getInitParameter(connectionFactoryPrefetchParam)).intValue();
250                 amqfactory.getPrefetchPolicy().setAll(prefetch);
251             }
252
253             // Set optimize acknowledge setting
254
if (servletContext.getInitParameter(connectionFactoryOptimizeAckParam) != null) {
255                 boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(connectionFactoryOptimizeAckParam)).booleanValue();
256                 amqfactory.setOptimizeAcknowledge(optimizeAck);
257             }
258
259             factory = amqfactory;
260
261             servletContext.setAttribute(connectionFactoryAttribute, factory);
262         }
263     }
264
265     public synchronized MessageProducer JavaDoc getProducer() throws JMSException JavaDoc {
266         if (producer == null) {
267             producer = getSession().createProducer(null);
268             producer.setDeliveryMode(deliveryMode);
269         }
270         return producer;
271     }
272
273     public void setProducer(MessageProducer JavaDoc producer) {
274         this.producer = producer;
275     }
276
277     public synchronized MessageConsumer JavaDoc getConsumer(Destination JavaDoc destination) throws JMSException JavaDoc {
278         return getConsumer(destination, true);
279     }
280
281     public synchronized MessageConsumer JavaDoc getConsumer(Destination JavaDoc destination, boolean create) throws JMSException JavaDoc {
282         MessageConsumer JavaDoc consumer = (MessageConsumer JavaDoc) consumers.get(destination);
283         if (create && consumer == null) {
284             consumer = getSession().createConsumer(destination);
285             consumers.put(destination, consumer);
286         }
287         return consumer;
288     }
289
290     public synchronized void closeConsumer(Destination JavaDoc destination) throws JMSException JavaDoc {
291         MessageConsumer JavaDoc consumer = (MessageConsumer JavaDoc) consumers.get(destination);
292         if (consumer != null) {
293             consumers.remove(destination);
294             consumer.setMessageListener(null);
295             if (consumer instanceof MessageAvailableConsumer)
296                 ((MessageAvailableConsumer) consumer).setAvailableListener(null);
297             consumer.close();
298         }
299     }
300
301     public synchronized List JavaDoc getConsumers() {
302         return new ArrayList JavaDoc(consumers.values());
303     }
304
305     protected ActiveMQSession createSession() throws JMSException JavaDoc {
306         return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
307     }
308
309     public Semaphore JavaDoc getSemaphore() {
310         return semaphore;
311     }
312
313     public void sessionWillPassivate(HttpSessionEvent JavaDoc event) {
314         close();
315     }
316
317     public void sessionDidActivate(HttpSessionEvent JavaDoc event) {
318     }
319
320     public void valueBound(HttpSessionBindingEvent JavaDoc event) {
321     }
322
323     public void valueUnbound(HttpSessionBindingEvent JavaDoc event) {
324         close();
325     }
326
327     protected static WebClient createWebClient(HttpServletRequest JavaDoc request) {
328         return new WebClient();
329     }
330
331 }
332
Popular Tags