KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > web > MessageServlet


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq.web;
20
21 import java.io.IOException JavaDoc;
22 import java.io.PrintWriter JavaDoc;
23 import java.util.LinkedList JavaDoc;
24 import java.util.List JavaDoc;
25
26 import javax.jms.Destination JavaDoc;
27 import javax.jms.JMSException JavaDoc;
28 import javax.jms.Message JavaDoc;
29 import javax.jms.MessageConsumer JavaDoc;
30 import javax.jms.ObjectMessage JavaDoc;
31 import javax.jms.TextMessage JavaDoc;
32 import javax.servlet.ServletConfig JavaDoc;
33 import javax.servlet.ServletException JavaDoc;
34 import javax.servlet.http.HttpServletRequest JavaDoc;
35 import javax.servlet.http.HttpServletResponse JavaDoc;
36
37 import org.apache.activemq.MessageAvailableConsumer;
38 import org.apache.activemq.MessageAvailableListener;
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.mortbay.util.ajax.Continuation;
42 import org.mortbay.util.ajax.ContinuationSupport;
43
44 /**
45  * A servlet for sending and receiving messages to/from JMS destinations using
46  * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
47  * destination and whether it is a topic or queue via configuration details on
48  * the servlet or as request parameters. <p/> For reading messages you can
49  * specify a readTimeout parameter to determine how long the servlet should
50  * block for.
51  *
52  * @version $Revision: 1.1.1.1 $
53  */

54 public class MessageServlet extends MessageServletSupport {
55     private static final Log log = LogFactory.getLog(MessageServlet.class);
56
57     private String JavaDoc readTimeoutParameter = "readTimeout";
58     private long defaultReadTimeout = -1;
59     private long maximumReadTimeout = 20000;
60
61     public void init() throws ServletException JavaDoc {
62         ServletConfig JavaDoc servletConfig = getServletConfig();
63         String JavaDoc name = servletConfig.getInitParameter("defaultReadTimeout");
64         if (name != null) {
65             defaultReadTimeout = asLong(name);
66         }
67         name = servletConfig.getInitParameter("maximumReadTimeout");
68         if (name != null) {
69             maximumReadTimeout = asLong(name);
70         }
71     }
72
73     /**
74      * Sends a message to a destination
75      *
76      * @param request
77      * @param response
78      * @throws ServletException
79      * @throws IOException
80      */

81     protected void doPost(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
82         // lets turn the HTTP post into a JMS Message
83
try {
84             WebClient client = WebClient.getWebClient(request);
85
86             String JavaDoc text = getPostedMessageBody(request);
87
88             // lets create the destination from the URI?
89
Destination JavaDoc destination = getDestination(client, request);
90             if (destination==null)
91                 throw new NoDestinationSuppliedException();
92
93             if (log.isDebugEnabled()) {
94                 log.debug("Sending message to: " + destination + " with text: " + text);
95             }
96
97             TextMessage JavaDoc message = client.getSession().createTextMessage(text);
98             appendParametersToMessage(request, message);
99             boolean persistent = isSendPersistent(request);
100             int priority = getSendPriority(request);
101             long timeToLive = getSendTimeToLive(request);
102             client.send(destination, message);
103
104             // lets return a unique URI for reliable messaging
105
response.setHeader("messageID", message.getJMSMessageID());
106             response.setStatus(HttpServletResponse.SC_OK);
107         }
108         catch (JMSException JavaDoc e) {
109             throw new ServletException JavaDoc("Could not post JMS message: " + e, e);
110         }
111     }
112
113     /**
114      * Supports a HTTP DELETE to be equivlanent of consuming a singe message
115      * from a queue
116      */

117     protected void doDelete(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
118         doMessages(request, response, 1);
119     }
120
121     /**
122      * Supports a HTTP DELETE to be equivlanent of consuming a singe message
123      * from a queue
124      */

125     protected void doGet(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response) throws ServletException JavaDoc, IOException JavaDoc {
126         doMessages(request, response, -1);
127     }
128
129     /**
130      * Reads a message from a destination up to some specific timeout period
131      *
132      * @param request
133      * @param response
134      * @throws ServletException
135      * @throws IOException
136      */

137     protected void doMessages(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response, int maxMessages) throws ServletException JavaDoc, IOException JavaDoc {
138
139         int messages = 0;
140         try {
141             WebClient client = WebClient.getWebClient(request);
142             Destination JavaDoc destination = getDestination(client, request);
143             if (destination==null)
144                 throw new NoDestinationSuppliedException();
145             long timeout = getReadTimeout(request);
146             boolean ajax = isRicoAjax(request);
147             if (!ajax)
148                 maxMessages = 1;
149
150             if (log.isDebugEnabled()) {
151                 log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
152             }
153
154             MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
155             Continuation continuation = null;
156             Listener listener = null;
157             Message JavaDoc message = null;
158
159             synchronized (consumer) {
160                 // Fetch the listeners
161
listener = (Listener) consumer.getAvailableListener();
162                 if (listener == null) {
163                     listener = new Listener(consumer);
164                     consumer.setAvailableListener(listener);
165                 }
166                 // Look for any available messages
167
message = consumer.receiveNoWait();
168
169                 // Get an existing Continuation or create a new one if there are
170
// no events.
171
if (message == null) {
172                     continuation = ContinuationSupport.getContinuation(request, consumer);
173
174                     // register this continuation with our listener.
175
listener.setContinuation(continuation);
176
177                     // Get the continuation object (may wait and/or retry
178
// request here).
179
continuation.suspend(timeout);
180                 }
181
182                 // Try again now
183
if (message == null)
184                     message = consumer.receiveNoWait();
185
186                 // write a responds
187
response.setContentType("text/xml");
188                 PrintWriter JavaDoc writer = response.getWriter();
189
190                 if (ajax)
191                     writer.println("<ajax-response>");
192
193                 // handle any message(s)
194
if (message == null) {
195                     // No messages so OK response of for ajax else no content.
196
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
197                 }
198                 else {
199                     // We have at least one message so set up the response
200
response.setStatus(HttpServletResponse.SC_OK);
201                     String JavaDoc type = getContentType(request);
202                     if (type != null)
203                         response.setContentType(type);
204
205                     // send a response for each available message (up to max
206
// messages)
207
while ((maxMessages < 0 || messages < maxMessages) && message != null) {
208                         // System.err.println("message["+messages+"]="+message);
209
if (ajax) {
210                             writer.print("<response type='object' id='");
211                             writer.print(request.getParameter("id"));
212                             writer.println("'>");
213                         }
214                         else
215                             // only ever 1 message for non ajax!
216
setResponseHeaders(response, message);
217
218                         writeMessageResponse(writer, message);
219
220                         if (ajax)
221                             writer.println("</response>");
222
223                         // look for next message
224
message = consumer.receiveNoWait();
225                         messages++;
226                     }
227                 }
228
229                 if (ajax) {
230                     writer.println("<response type='object' id='poll'><ok/></response>");
231                     writer.println("</ajax-response>");
232                 }
233             }
234         }
235         catch (JMSException JavaDoc e) {
236             throw new ServletException JavaDoc("Could not post JMS message: " + e, e);
237         }
238         finally {
239             if (log.isDebugEnabled()) {
240                 log.debug("Received " + messages + " message(s)");
241             }
242         }
243     }
244
245     /**
246      * Reads a message from a destination up to some specific timeout period
247      *
248      * @param request
249      * @param response
250      * @throws ServletException
251      * @throws IOException
252      */

253     protected void doMessagesWithoutContinuation(HttpServletRequest JavaDoc request, HttpServletResponse JavaDoc response,
254             int maxMessages) throws ServletException JavaDoc, IOException JavaDoc {
255
256         int messages = 0;
257         try {
258             WebClient client = WebClient.getWebClient(request);
259             Destination JavaDoc destination = getDestination(client, request);
260             long timeout = getReadTimeout(request);
261             boolean ajax = isRicoAjax(request);
262             if (!ajax)
263                 maxMessages = 1;
264
265             if (log.isDebugEnabled()) {
266                 log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
267             }
268
269             MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
270             Continuation continuation = null;
271             Listener listener = null;
272             Message JavaDoc message = null;
273
274             // write a responds
275
response.setContentType("text/xml");
276             PrintWriter JavaDoc writer = response.getWriter();
277
278             if (ajax)
279                 writer.println("<ajax-response>");
280
281             // Only one client thread at a time should poll for messages.
282
if (client.getSemaphore().tryAcquire()) {
283                 try {
284                     // Look for any available messages
285
message = consumer.receive(timeout);
286
287                     // handle any message(s)
288
if (message == null) {
289                         // No messages so OK response of for ajax else no
290
// content.
291
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
292                     } else {
293                         // We have at least one message so set up the
294
// response
295
response.setStatus(HttpServletResponse.SC_OK);
296                         String JavaDoc type = getContentType(request);
297                         if (type != null)
298                             response.setContentType(type);
299
300                         // send a response for each available message (up to
301
// max
302
// messages)
303
while ((maxMessages < 0 || messages < maxMessages) && message != null) {
304                             // System.err.println("message["+messages+"]="+message);
305
if (ajax) {
306                                 writer.print("<response type='object' id='");
307                                 writer.print(request.getParameter("id"));
308                                 writer.println("'>");
309                             } else
310                                 // only ever 1 message for non ajax!
311
setResponseHeaders(response, message);
312
313                             writeMessageResponse(writer, message);
314
315                             if (ajax)
316                                 writer.println("</response>");
317
318                             // look for next message
319
message = consumer.receiveNoWait();
320                             messages++;
321                         }
322                     }
323                 } finally {
324                     client.getSemaphore().release();
325                 }
326             } else {
327                 // Client is using us in another thread.
328
response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
329             }
330
331             if (ajax) {
332                 writer.println("<response type='object' id='poll'><ok/></response>");
333                 writer.println("</ajax-response>");
334             }
335
336         } catch (JMSException JavaDoc e) {
337             throw new ServletException JavaDoc("Could not post JMS message: " + e, e);
338         } finally {
339             if (log.isDebugEnabled()) {
340                 log.debug("Received " + messages + " message(s)");
341             }
342         }
343     }
344
345     protected void writeMessageResponse(PrintWriter JavaDoc writer, Message JavaDoc message) throws JMSException JavaDoc, IOException JavaDoc {
346         if (message instanceof TextMessage JavaDoc) {
347             TextMessage JavaDoc textMsg = (TextMessage JavaDoc) message;
348             String JavaDoc txt = textMsg.getText();
349             if (txt.startsWith("<?")) {
350                 txt = txt.substring(txt.indexOf("?>") + 2);
351             }
352             writer.print(txt);
353         }
354         else if (message instanceof ObjectMessage JavaDoc) {
355             ObjectMessage JavaDoc objectMsg = (ObjectMessage JavaDoc) message;
356             Object JavaDoc object = objectMsg.getObject();
357             writer.print(object.toString());
358         }
359     }
360
361     protected boolean isRicoAjax(HttpServletRequest JavaDoc request) {
362         String JavaDoc rico = request.getParameter("rico");
363         return rico != null && rico.equals("true");
364     }
365
366     protected String JavaDoc getContentType(HttpServletRequest JavaDoc request) {
367         /*
368          * log("Params: " + request.getParameterMap()); Enumeration iter =
369          * request.getHeaderNames(); while (iter.hasMoreElements()) { String
370          * name = (String) iter.nextElement(); log("Header: " + name + " = " +
371          * request.getHeader(name)); }
372          */

373         String JavaDoc value = request.getParameter("xml");
374         if (value != null && "true".equalsIgnoreCase(value)) {
375             return "text/xml";
376         }
377         return null;
378     }
379
380     protected void setResponseHeaders(HttpServletResponse JavaDoc response, Message JavaDoc message) throws JMSException JavaDoc {
381         response.setHeader("destination", message.getJMSDestination().toString());
382         response.setHeader("id", message.getJMSMessageID());
383     }
384
385     /**
386      * @return the timeout value for read requests which is always >= 0 and <=
387      * maximumReadTimeout to avoid DoS attacks
388      */

389     protected long getReadTimeout(HttpServletRequest JavaDoc request) {
390         long answer = defaultReadTimeout;
391
392         String JavaDoc name = request.getParameter(readTimeoutParameter);
393         if (name != null) {
394             answer = asLong(name);
395         }
396         if (answer < 0 || answer > maximumReadTimeout) {
397             answer = maximumReadTimeout;
398         }
399         return answer;
400     }
401
402     /*
403      * Listen for available messages and wakeup any continuations.
404      */

405     private class Listener implements MessageAvailableListener {
406         MessageConsumer JavaDoc consumer;
407         Continuation continuation;
408         List JavaDoc queue = new LinkedList JavaDoc();
409
410         Listener(MessageConsumer JavaDoc consumer) {
411             this.consumer = consumer;
412         }
413
414         public void setContinuation(Continuation continuation) {
415             synchronized (consumer) {
416                 this.continuation = continuation;
417             }
418         }
419
420         public void onMessageAvailable(MessageConsumer JavaDoc consumer) {
421             assert this.consumer == consumer;
422
423             synchronized (this.consumer) {
424                 if (continuation != null)
425                     continuation.resume();
426                 continuation = null;
427             }
428         }
429     }
430
431 }
432
Popular Tags