1 18 19 package org.apache.activemq.web; 20 21 import java.io.IOException ; 22 import java.io.PrintWriter ; 23 import java.util.LinkedList ; 24 import java.util.List ; 25 26 import javax.jms.Destination ; 27 import javax.jms.JMSException ; 28 import javax.jms.Message ; 29 import javax.jms.MessageConsumer ; 30 import javax.jms.ObjectMessage ; 31 import javax.jms.TextMessage ; 32 import javax.servlet.ServletConfig ; 33 import javax.servlet.ServletException ; 34 import javax.servlet.http.HttpServletRequest ; 35 import javax.servlet.http.HttpServletResponse ; 36 37 import org.apache.activemq.MessageAvailableConsumer; 38 import org.apache.activemq.MessageAvailableListener; 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 import org.mortbay.util.ajax.Continuation; 42 import org.mortbay.util.ajax.ContinuationSupport; 43 44 54 public class MessageServlet extends MessageServletSupport { 55 private static final Log log = LogFactory.getLog(MessageServlet.class); 56 57 private String readTimeoutParameter = "readTimeout"; 58 private long defaultReadTimeout = -1; 59 private long maximumReadTimeout = 20000; 60 61 public void init() throws ServletException { 62 ServletConfig servletConfig = getServletConfig(); 63 String name = servletConfig.getInitParameter("defaultReadTimeout"); 64 if (name != null) { 65 defaultReadTimeout = asLong(name); 66 } 67 name = servletConfig.getInitParameter("maximumReadTimeout"); 68 if (name != null) { 69 maximumReadTimeout = asLong(name); 70 } 71 } 72 73 81 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 82 try { 84 WebClient client = WebClient.getWebClient(request); 85 86 String text = getPostedMessageBody(request); 87 88 Destination destination = getDestination(client, request); 90 if (destination==null) 91 throw new NoDestinationSuppliedException(); 92 93 if (log.isDebugEnabled()) { 94 log.debug("Sending message to: " + destination + " with text: " + text); 95 } 96 97 TextMessage message = client.getSession().createTextMessage(text); 98 appendParametersToMessage(request, message); 99 boolean persistent = isSendPersistent(request); 100 int priority = getSendPriority(request); 101 long timeToLive = getSendTimeToLive(request); 102 client.send(destination, message); 103 104 response.setHeader("messageID", message.getJMSMessageID()); 106 response.setStatus(HttpServletResponse.SC_OK); 107 } 108 catch (JMSException e) { 109 throw new ServletException ("Could not post JMS message: " + e, e); 110 } 111 } 112 113 117 protected void doDelete(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 118 doMessages(request, response, 1); 119 } 120 121 125 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 126 doMessages(request, response, -1); 127 } 128 129 137 protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException , IOException { 138 139 int messages = 0; 140 try { 141 WebClient client = WebClient.getWebClient(request); 142 Destination destination = getDestination(client, request); 143 if (destination==null) 144 throw new NoDestinationSuppliedException(); 145 long timeout = getReadTimeout(request); 146 boolean ajax = isRicoAjax(request); 147 if (!ajax) 148 maxMessages = 1; 149 150 if (log.isDebugEnabled()) { 151 log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); 152 } 153 154 MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination); 155 Continuation continuation = null; 156 Listener listener = null; 157 Message message = null; 158 159 synchronized (consumer) { 160 listener = (Listener) consumer.getAvailableListener(); 162 if (listener == null) { 163 listener = new Listener(consumer); 164 consumer.setAvailableListener(listener); 165 } 166 message = consumer.receiveNoWait(); 168 169 if (message == null) { 172 continuation = ContinuationSupport.getContinuation(request, consumer); 173 174 listener.setContinuation(continuation); 176 177 continuation.suspend(timeout); 180 } 181 182 if (message == null) 184 message = consumer.receiveNoWait(); 185 186 response.setContentType("text/xml"); 188 PrintWriter writer = response.getWriter(); 189 190 if (ajax) 191 writer.println("<ajax-response>"); 192 193 if (message == null) { 195 response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); 197 } 198 else { 199 response.setStatus(HttpServletResponse.SC_OK); 201 String type = getContentType(request); 202 if (type != null) 203 response.setContentType(type); 204 205 while ((maxMessages < 0 || messages < maxMessages) && message != null) { 208 if (ajax) { 210 writer.print("<response type='object' id='"); 211 writer.print(request.getParameter("id")); 212 writer.println("'>"); 213 } 214 else 215 setResponseHeaders(response, message); 217 218 writeMessageResponse(writer, message); 219 220 if (ajax) 221 writer.println("</response>"); 222 223 message = consumer.receiveNoWait(); 225 messages++; 226 } 227 } 228 229 if (ajax) { 230 writer.println("<response type='object' id='poll'><ok/></response>"); 231 writer.println("</ajax-response>"); 232 } 233 } 234 } 235 catch (JMSException e) { 236 throw new ServletException ("Could not post JMS message: " + e, e); 237 } 238 finally { 239 if (log.isDebugEnabled()) { 240 log.debug("Received " + messages + " message(s)"); 241 } 242 } 243 } 244 245 253 protected void doMessagesWithoutContinuation(HttpServletRequest request, HttpServletResponse response, 254 int maxMessages) throws ServletException , IOException { 255 256 int messages = 0; 257 try { 258 WebClient client = WebClient.getWebClient(request); 259 Destination destination = getDestination(client, request); 260 long timeout = getReadTimeout(request); 261 boolean ajax = isRicoAjax(request); 262 if (!ajax) 263 maxMessages = 1; 264 265 if (log.isDebugEnabled()) { 266 log.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout); 267 } 268 269 MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination); 270 Continuation continuation = null; 271 Listener listener = null; 272 Message message = null; 273 274 response.setContentType("text/xml"); 276 PrintWriter writer = response.getWriter(); 277 278 if (ajax) 279 writer.println("<ajax-response>"); 280 281 if (client.getSemaphore().tryAcquire()) { 283 try { 284 message = consumer.receive(timeout); 286 287 if (message == null) { 289 response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); 292 } else { 293 response.setStatus(HttpServletResponse.SC_OK); 296 String type = getContentType(request); 297 if (type != null) 298 response.setContentType(type); 299 300 while ((maxMessages < 0 || messages < maxMessages) && message != null) { 304 if (ajax) { 306 writer.print("<response type='object' id='"); 307 writer.print(request.getParameter("id")); 308 writer.println("'>"); 309 } else 310 setResponseHeaders(response, message); 312 313 writeMessageResponse(writer, message); 314 315 if (ajax) 316 writer.println("</response>"); 317 318 message = consumer.receiveNoWait(); 320 messages++; 321 } 322 } 323 } finally { 324 client.getSemaphore().release(); 325 } 326 } else { 327 response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT); 329 } 330 331 if (ajax) { 332 writer.println("<response type='object' id='poll'><ok/></response>"); 333 writer.println("</ajax-response>"); 334 } 335 336 } catch (JMSException e) { 337 throw new ServletException ("Could not post JMS message: " + e, e); 338 } finally { 339 if (log.isDebugEnabled()) { 340 log.debug("Received " + messages + " message(s)"); 341 } 342 } 343 } 344 345 protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException , IOException { 346 if (message instanceof TextMessage ) { 347 TextMessage textMsg = (TextMessage ) message; 348 String txt = textMsg.getText(); 349 if (txt.startsWith("<?")) { 350 txt = txt.substring(txt.indexOf("?>") + 2); 351 } 352 writer.print(txt); 353 } 354 else if (message instanceof ObjectMessage ) { 355 ObjectMessage objectMsg = (ObjectMessage ) message; 356 Object object = objectMsg.getObject(); 357 writer.print(object.toString()); 358 } 359 } 360 361 protected boolean isRicoAjax(HttpServletRequest request) { 362 String rico = request.getParameter("rico"); 363 return rico != null && rico.equals("true"); 364 } 365 366 protected String getContentType(HttpServletRequest request) { 367 373 String value = request.getParameter("xml"); 374 if (value != null && "true".equalsIgnoreCase(value)) { 375 return "text/xml"; 376 } 377 return null; 378 } 379 380 protected void setResponseHeaders(HttpServletResponse response, Message message) throws JMSException { 381 response.setHeader("destination", message.getJMSDestination().toString()); 382 response.setHeader("id", message.getJMSMessageID()); 383 } 384 385 389 protected long getReadTimeout(HttpServletRequest request) { 390 long answer = defaultReadTimeout; 391 392 String name = request.getParameter(readTimeoutParameter); 393 if (name != null) { 394 answer = asLong(name); 395 } 396 if (answer < 0 || answer > maximumReadTimeout) { 397 answer = maximumReadTimeout; 398 } 399 return answer; 400 } 401 402 405 private class Listener implements MessageAvailableListener { 406 MessageConsumer consumer; 407 Continuation continuation; 408 List queue = new LinkedList (); 409 410 Listener(MessageConsumer consumer) { 411 this.consumer = consumer; 412 } 413 414 public void setContinuation(Continuation continuation) { 415 synchronized (consumer) { 416 this.continuation = continuation; 417 } 418 } 419 420 public void onMessageAvailable(MessageConsumer consumer) { 421 assert this.consumer == consumer; 422 423 synchronized (this.consumer) { 424 if (continuation != null) 425 continuation.resume(); 426 continuation = null; 427 } 428 } 429 } 430 431 } 432 | Popular Tags |