KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > web > MessageListenerServlet


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.web;
20
21 import java.io.IOException JavaDoc;
22 import java.io.PrintWriter JavaDoc;
23 import java.io.StringWriter JavaDoc;
24 import java.util.HashMap JavaDoc;
25 import java.util.List JavaDoc;
26 import java.util.Map JavaDoc;
27
28 import javax.jms.Destination JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageConsumer JavaDoc;
32 import javax.jms.ObjectMessage JavaDoc;
33 import javax.jms.TextMessage JavaDoc;
34 import javax.servlet.ServletConfig JavaDoc;
35 import javax.servlet.ServletException JavaDoc;
36 import javax.servlet.http.HttpServletRequest JavaDoc;
37 import javax.servlet.http.HttpServletResponse JavaDoc;
38 import javax.servlet.http.HttpSession JavaDoc;
39
40 import org.apache.activemq.MessageAvailableConsumer;
41 import org.apache.activemq.MessageAvailableListener;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.mortbay.util.ajax.Continuation;
45 import org.mortbay.util.ajax.ContinuationSupport;
46
47 /**
48  * A servlet for sending and receiving messages to/from JMS destinations using
49  * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
50  * destination and whether it is a topic or queue via configuration details on
51  * the servlet or as request parameters. <p/> For reading messages you can
52  * specify a readTimeout parameter to determine how long the servlet should
53  * block for.
54  *
55  * The servlet can be configured with the following init parameters:<dl>
56  * <dt>defaultReadTimeout</dt><dd>The default time in ms to wait for messages.
57  * May be overridden by a request using the 'timeout' parameter</dd>
58  * <dt>maximumReadTimeout</dt><dd>The maximum value a request may specify for the 'timeout' parameter</dd>
59  * <dt>maximumMessages</dt><dd>maximum messages to send per response</dd>
60  * <dt></dt><dd></dd>
61  * </dl>
62  *
63  *
64  * @version $Revision: 1.1.1.1 $
65  */

66 public class MessageListenerServlet extends MessageServletSupport {
67     private static final Log log = LogFactory.getLog(MessageListenerServlet.class);
68
69     private String JavaDoc readTimeoutParameter = "timeout";
70
71     private long defaultReadTimeout = -1;
72
73     private long maximumReadTimeout = 25000;
74
75     private int maximumMessages = 100;
76     
77     public void init() throws ServletException JavaDoc {
78         ServletConfig JavaDoc servletConfig = getServletConfig();
79         String JavaDoc name = servletConfig.getInitParameter("defaultReadTimeout");
80         if (name != null) {
81             defaultReadTimeout = asLong(name);
82         }
83         name = servletConfig.getInitParameter("maximumReadTimeout");
84         if (name != null) {
85             maximumReadTimeout = asLong(name);
86         }
87         name = servletConfig.getInitParameter("maximumMessages");
88         if (name != null) {
89             maximumMessages = (int) asLong(name);
90         }
91     }
92
93     /**
94      * Sends a message to a destination or manage subscriptions.
95      *
96      * If the the content type of the POST is <code>application/x-www-form-urlencoded</code>, then the form parameters
97      * "destination", "message" and "type" are used to pass a message or a subscription. If multiple messages
98      * or subscriptions are passed in a single post, then additional parameters are shortened to "dN", "mN" and "tN"
99      * where N is an index starting from 1. The type is either "send", "listen" or "unlisten". For send types,
100      * the message is the text of the TextMessage, otherwise it is the ID to be used for the subscription.
101      *
102      * If the content type is not <code>application/x-www-form-urlencoded</code>, then the body of the post is
103      * sent as the message to a destination that is derived from a query parameter, the URL or the default destination.
104      *
105      * @param request
106      * @param response
107      * @throws ServletException
108      * @throws IOException
109      */

110     protected void doPost(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
111         
112         // lets turn the HTTP post into a JMS Message
113

114         WebClient client = WebClient.getWebClient(request);
115         String JavaDoc message_ids="";
116         
117         synchronized (client) {
118             
119             if (log.isDebugEnabled()) {
120                 log.debug("POST client="+client+" session="+request.getSession().getId()+" info="+request.getPathInfo()+" contentType="+request.getContentType());
121             // dump(request.getParameterMap());
122
}
123             
124             int messages=0;
125             
126             // loop until no more messages
127
while (true)
128             {
129                 // Get the message parameters. Multiple messages are encoded with more compact parameter names.
130
String JavaDoc destination_name = request.getParameter(messages==0?"destination":("d"+messages));
131                 String JavaDoc message = request.getParameter(messages==0?"message":("m"+messages));
132                 String JavaDoc type = request.getParameter(messages==0?"type":("t"+messages));
133                 
134                 if (destination_name==null || message==null || type==null)
135                     break;
136                 
137                 try {
138                     Destination JavaDoc destination=getDestination(client,request,destination_name);
139                     
140                     if (log.isDebugEnabled()) {
141                         log.debug(messages+" destination="+destination_name+" message="+message+" type="+type);
142                         log.debug(destination+" is a "+destination.getClass().getName());
143                     }
144                     
145                     messages++;
146                     
147                     if ("listen".equals(type))
148                     {
149                         Listener listener = getListener(request);
150                         Map JavaDoc consumerIdMap = getConsumerIdMap(request);
151                         client.closeConsumer(destination); // drop any existing consumer.
152
MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
153                         
154                         consumer.setAvailableListener(listener);
155                         consumerIdMap.put(consumer, message);
156                         if (log.isDebugEnabled()) {
157                             log.debug("Subscribed: "+consumer+" to "+destination+" id="+message);
158                         }
159                     }
160                     else if ("unlisten".equals(type))
161                     {
162                         Map JavaDoc consumerIdMap = getConsumerIdMap(request);
163                         MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
164                         
165                         consumer.setAvailableListener(null);
166                         consumerIdMap.remove(consumer);
167                         client.closeConsumer(destination);
168                         if (log.isDebugEnabled()) {
169                             log.debug("Unsubscribed: "+consumer);
170                         }
171                     }
172                     else if ("send".equals(type))
173                     {
174                         TextMessage JavaDoc text = client.getSession().createTextMessage(message);
175                         appendParametersToMessage(request, text);
176
177                         client.send(destination, text);
178                         message_ids+=text.getJMSMessageID()+"\n";
179                         if (log.isDebugEnabled()) {
180                             log.debug("Sent "+message+" to "+destination);
181                         }
182                     }
183                     else
184                         log.warn("unknown type "+type);
185                     
186                 }
187                 catch (JMSException JavaDoc e) {
188                     log.warn("jms", e);
189                 }
190             }
191         }
192             
193         if ("true".equals(request.getParameter("poll")))
194         {
195             try
196             {
197                 // TODO return message IDs
198
doMessages(client,request,response);
199             }
200             catch (JMSException JavaDoc e)
201             {
202                 throw new ServletException JavaDoc("JMS problem: " + e, e);
203             }
204         }
205         else
206         {
207             // handle simple POST of a message
208
if (request.getContentLength()!=0 &&
209                (request.getContentType()==null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded")))
210             {
211                 try {
212                     Destination JavaDoc destination=getDestination(client, request);
213                     String JavaDoc body = getPostedMessageBody(request);
214                     TextMessage JavaDoc message = client.getSession().createTextMessage(body );
215                     appendParametersToMessage(request, message);
216
217                     client.send(destination, message);
218                     if (log.isDebugEnabled()) {
219                         log.debug("Sent to destination: " + destination + " body: " + body);
220                     }
221                     message_ids+=message.getJMSMessageID()+"\n";
222                 }
223                 catch (JMSException JavaDoc e) {
224                     throw new ServletException JavaDoc(e);
225                 }
226             }
227             
228             response.setContentType("text/plain");
229             response.setHeader("Cache-Control", "no-cache");
230             response.getWriter().print(message_ids);
231         }
232     }
233
234     /**
235      * Supports a HTTP DELETE to be equivlanent of consuming a singe message
236      * from a queue
237      */

238     protected void doGet(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
239         try {
240             WebClient client = WebClient.getWebClient(request);
241             if (log.isDebugEnabled()) {
242                 log.debug("GET client="+client+" session="+request.getSession().getId()+" uri="+request.getRequestURI()+" query="+request.getQueryString());
243             }
244             
245             doMessages(client, request, response);
246         }
247         catch (JMSException JavaDoc e) {
248             throw new ServletException JavaDoc("JMS problem: " + e, e);
249         }
250     }
251
252
253     /**
254      * Reads a message from a destination up to some specific timeout period
255      *
256      * @param client
257      * The webclient
258      * @param request
259      * @param response
260      * @throws ServletException
261      * @throws IOException
262      */

263     protected void doMessages(WebClient client, HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws JMSException JavaDoc, IOException JavaDoc {
264
265         int messages = 0;
266         // This is a poll for any messages
267

268         long timeout = getReadTimeout(request);
269         if (log.isDebugEnabled()) {
270             log.debug("doMessage timeout="+timeout);
271         }
272         
273         Continuation continuation = ContinuationSupport.getContinuation(request, client);
274         Listener listener = getListener(request);
275         if (listener!=null && continuation!=null && !continuation.isPending())
276             listener.access();
277
278         Message JavaDoc message = null;
279         synchronized (client) {
280
281             List JavaDoc consumers = client.getConsumers();
282             MessageAvailableConsumer consumer = null;
283
284             // Look for a message that is ready to go
285
for (int i = 0; message == null && i < consumers.size(); i++) {
286                 consumer = (MessageAvailableConsumer) consumers.get(i);
287                 if (consumer.getAvailableListener() == null)
288                     continue;
289
290                 // Look for any available messages
291
message = consumer.receiveNoWait();
292                 if (log.isDebugEnabled()) {
293                     log.debug("received "+message+" from "+consumer);
294                 }
295             }
296
297             // Get an existing Continuation or create a new one if there are no
298
// messages
299

300             if (message == null) {
301                 // register this continuation with our listener.
302
listener.setContinuation(continuation);
303
304                 // Get the continuation object (may wait and/or retry
305
// request here).
306
continuation.suspend(timeout);
307             }
308             listener.setContinuation(null);
309
310             // prepare the responds
311
response.setContentType("text/xml");
312             response.setHeader("Cache-Control", "no-cache");
313
314             StringWriter JavaDoc swriter = new StringWriter JavaDoc();
315             PrintWriter JavaDoc writer = new PrintWriter JavaDoc(swriter);
316
317             Map JavaDoc consumerIdMap = getConsumerIdMap(request);
318             response.setStatus(HttpServletResponse.SC_OK);
319             writer.println("<ajax-response>");
320
321             // Send any message we already have
322
if (message != null) {
323                 String JavaDoc id = (String JavaDoc) consumerIdMap.get(consumer);
324                 writer.print("<response id='");
325                 writer.print(id);
326                 writer.print("'>");
327                 writeMessageResponse(writer, message);
328                 writer.println("</response>");
329                 messages++;
330             }
331
332             // Send the rest of the messages
333
for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
334                 consumer = (MessageAvailableConsumer) consumers.get(i);
335                 if (consumer.getAvailableListener() == null)
336                     continue;
337
338                 // Look for any available messages
339
message = consumer.receiveNoWait();
340                 while (message != null && messages < maximumMessages) {
341                     String JavaDoc id = (String JavaDoc) consumerIdMap.get(consumer);
342                     writer.print("<response id='");
343                     writer.print(id);
344                     writer.print("'>");
345                     writeMessageResponse(writer, message);
346                     writer.println("</response>");
347                     messages++;
348                     message = consumer.receiveNoWait();
349                 }
350             }
351
352             // Add poll message
353
// writer.println("<response type='object' id='amqPoll'><ok/></response>");
354

355             writer.print("</ajax-response>");
356
357             writer.flush();
358             String JavaDoc m = swriter.toString();
359             // System.err.println(m);
360
response.getWriter().println(m);
361         }
362
363     }
364
365     protected void writeMessageResponse(PrintWriter JavaDoc writer, Message JavaDoc message) throws JMSException JavaDoc, IOException JavaDoc {
366         if (message instanceof TextMessage JavaDoc) {
367             TextMessage JavaDoc textMsg = (TextMessage JavaDoc) message;
368             String JavaDoc txt = textMsg.getText();
369             if (txt.startsWith("<?")) {
370                 txt = txt.substring(txt.indexOf("?>") + 2);
371             }
372             writer.print(txt);
373         } else if (message instanceof ObjectMessage JavaDoc) {
374             ObjectMessage JavaDoc objectMsg = (ObjectMessage JavaDoc) message;
375             Object JavaDoc object = objectMsg.getObject();
376             writer.print(object.toString());
377         }
378     }
379
380     protected Listener getListener(HttpServletRequest JavaDoc request) {
381         HttpSession JavaDoc session = request.getSession();
382         Listener listener = (Listener) session.getAttribute("mls.listener");
383         if (listener == null) {
384             listener = new Listener(WebClient.getWebClient(request));
385             session.setAttribute("mls.listener", listener);
386         }
387         return listener;
388     }
389
390     protected Map JavaDoc getConsumerIdMap(HttpServletRequest JavaDoc request) {
391         HttpSession JavaDoc session = request.getSession(true);
392         Map JavaDoc map = (Map JavaDoc) session.getAttribute("mls.consumerIdMap");
393         if (map == null) {
394             map = new HashMap JavaDoc();
395             session.setAttribute("mls.consumerIdMap", map);
396         }
397         return map;
398     }
399
400     protected boolean isRicoAjax(HttpServletRequest JavaDoc request) {
401         String JavaDoc rico = request.getParameter("rico");
402         return rico != null && rico.equals("true");
403     }
404
405     /**
406      * @return the timeout value for read requests which is always >= 0 and <=
407      * maximumReadTimeout to avoid DoS attacks
408      */

409     protected long getReadTimeout(HttpServletRequest JavaDoc request) {
410         long answer = defaultReadTimeout;
411
412         String JavaDoc name = request.getParameter(readTimeoutParameter);
413         if (name != null) {
414             answer = asLong(name);
415         }
416         if (answer < 0 || answer > maximumReadTimeout) {
417             answer = maximumReadTimeout;
418         }
419         return answer;
420     }
421
422     /*
423      * Listen for available messages and wakeup any continuations.
424      */

425     private class Listener implements MessageAvailableListener {
426         WebClient client;
427         long lastAccess;
428         Continuation continuation;
429
430         Listener(WebClient client) {
431             this.client = client;
432         }
433
434         public void access()
435         {
436             lastAccess=System.currentTimeMillis();
437         }
438         
439         synchronized public void setContinuation(Continuation continuation) {
440             this.continuation = continuation;
441         }
442
443         synchronized public void onMessageAvailable(MessageConsumer JavaDoc consumer) {
444             if (log.isDebugEnabled()) {
445                 log.debug("message for "+consumer+"continuation="+continuation);
446             }
447             if (continuation != null)
448                 continuation.resume();
449             else if (System.currentTimeMillis()-lastAccess>2*maximumReadTimeout)
450             {
451                 new Thread JavaDoc() {
452                     public void run() {
453                         client.closeConsumers();
454                     };
455                 }.start();
456             }
457             continuation = null;
458         }
459
460     }
461 }
462
Popular Tags