1 18 19 package org.apache.activemq.web; 20 21 import java.io.IOException ; 22 import java.io.PrintWriter ; 23 import java.io.StringWriter ; 24 import java.util.HashMap ; 25 import java.util.List ; 26 import java.util.Map ; 27 28 import javax.jms.Destination ; 29 import javax.jms.JMSException ; 30 import javax.jms.Message ; 31 import javax.jms.MessageConsumer ; 32 import javax.jms.ObjectMessage ; 33 import javax.jms.TextMessage ; 34 import javax.servlet.ServletConfig ; 35 import javax.servlet.ServletException ; 36 import javax.servlet.http.HttpServletRequest ; 37 import javax.servlet.http.HttpServletResponse ; 38 import javax.servlet.http.HttpSession ; 39 40 import org.apache.activemq.MessageAvailableConsumer; 41 import org.apache.activemq.MessageAvailableListener; 42 import org.apache.commons.logging.Log; 43 import org.apache.commons.logging.LogFactory; 44 import org.mortbay.util.ajax.Continuation; 45 import org.mortbay.util.ajax.ContinuationSupport; 46 47 66 public class MessageListenerServlet extends MessageServletSupport { 67 private static final Log log = LogFactory.getLog(MessageListenerServlet.class); 68 69 private String readTimeoutParameter = "timeout"; 70 71 private long defaultReadTimeout = -1; 72 73 private long maximumReadTimeout = 25000; 74 75 private int maximumMessages = 100; 76 77 public void init() throws ServletException { 78 ServletConfig servletConfig = getServletConfig(); 79 String name = servletConfig.getInitParameter("defaultReadTimeout"); 80 if (name != null) { 81 defaultReadTimeout = asLong(name); 82 } 83 name = servletConfig.getInitParameter("maximumReadTimeout"); 84 if (name != null) { 85 maximumReadTimeout = asLong(name); 86 } 87 name = servletConfig.getInitParameter("maximumMessages"); 88 if (name != null) { 89 maximumMessages = (int) asLong(name); 90 } 91 } 92 93 110 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 111 112 114 WebClient client = WebClient.getWebClient(request); 115 String message_ids=""; 116 117 synchronized (client) { 118 119 if (log.isDebugEnabled()) { 120 log.debug("POST client="+client+" session="+request.getSession().getId()+" info="+request.getPathInfo()+" contentType="+request.getContentType()); 121 } 123 124 int messages=0; 125 126 while (true) 128 { 129 String destination_name = request.getParameter(messages==0?"destination":("d"+messages)); 131 String message = request.getParameter(messages==0?"message":("m"+messages)); 132 String type = request.getParameter(messages==0?"type":("t"+messages)); 133 134 if (destination_name==null || message==null || type==null) 135 break; 136 137 try { 138 Destination destination=getDestination(client,request,destination_name); 139 140 if (log.isDebugEnabled()) { 141 log.debug(messages+" destination="+destination_name+" message="+message+" type="+type); 142 log.debug(destination+" is a "+destination.getClass().getName()); 143 } 144 145 messages++; 146 147 if ("listen".equals(type)) 148 { 149 Listener listener = getListener(request); 150 Map consumerIdMap = getConsumerIdMap(request); 151 client.closeConsumer(destination); MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination); 153 154 consumer.setAvailableListener(listener); 155 consumerIdMap.put(consumer, message); 156 if (log.isDebugEnabled()) { 157 log.debug("Subscribed: "+consumer+" to "+destination+" id="+message); 158 } 159 } 160 else if ("unlisten".equals(type)) 161 { 162 Map consumerIdMap = getConsumerIdMap(request); 163 MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination); 164 165 consumer.setAvailableListener(null); 166 consumerIdMap.remove(consumer); 167 client.closeConsumer(destination); 168 if (log.isDebugEnabled()) { 169 log.debug("Unsubscribed: "+consumer); 170 } 171 } 172 else if ("send".equals(type)) 173 { 174 TextMessage text = client.getSession().createTextMessage(message); 175 appendParametersToMessage(request, text); 176 177 client.send(destination, text); 178 message_ids+=text.getJMSMessageID()+"\n"; 179 if (log.isDebugEnabled()) { 180 log.debug("Sent "+message+" to "+destination); 181 } 182 } 183 else 184 log.warn("unknown type "+type); 185 186 } 187 catch (JMSException e) { 188 log.warn("jms", e); 189 } 190 } 191 } 192 193 if ("true".equals(request.getParameter("poll"))) 194 { 195 try 196 { 197 doMessages(client,request,response); 199 } 200 catch (JMSException e) 201 { 202 throw new ServletException ("JMS problem: " + e, e); 203 } 204 } 205 else 206 { 207 if (request.getContentLength()!=0 && 209 (request.getContentType()==null || !request.getContentType().toLowerCase().startsWith("application/x-www-form-urlencoded"))) 210 { 211 try { 212 Destination destination=getDestination(client, request); 213 String body = getPostedMessageBody(request); 214 TextMessage message = client.getSession().createTextMessage(body ); 215 appendParametersToMessage(request, message); 216 217 client.send(destination, message); 218 if (log.isDebugEnabled()) { 219 log.debug("Sent to destination: " + destination + " body: " + body); 220 } 221 message_ids+=message.getJMSMessageID()+"\n"; 222 } 223 catch (JMSException e) { 224 throw new ServletException (e); 225 } 226 } 227 228 response.setContentType("text/plain"); 229 response.setHeader("Cache-Control", "no-cache"); 230 response.getWriter().print(message_ids); 231 } 232 } 233 234 238 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException , IOException { 239 try { 240 WebClient client = WebClient.getWebClient(request); 241 if (log.isDebugEnabled()) { 242 log.debug("GET client="+client+" session="+request.getSession().getId()+" uri="+request.getRequestURI()+" query="+request.getQueryString()); 243 } 244 245 doMessages(client, request, response); 246 } 247 catch (JMSException e) { 248 throw new ServletException ("JMS problem: " + e, e); 249 } 250 } 251 252 253 263 protected void doMessages(WebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException , IOException { 264 265 int messages = 0; 266 268 long timeout = getReadTimeout(request); 269 if (log.isDebugEnabled()) { 270 log.debug("doMessage timeout="+timeout); 271 } 272 273 Continuation continuation = ContinuationSupport.getContinuation(request, client); 274 Listener listener = getListener(request); 275 if (listener!=null && continuation!=null && !continuation.isPending()) 276 listener.access(); 277 278 Message message = null; 279 synchronized (client) { 280 281 List consumers = client.getConsumers(); 282 MessageAvailableConsumer consumer = null; 283 284 for (int i = 0; message == null && i < consumers.size(); i++) { 286 consumer = (MessageAvailableConsumer) consumers.get(i); 287 if (consumer.getAvailableListener() == null) 288 continue; 289 290 message = consumer.receiveNoWait(); 292 if (log.isDebugEnabled()) { 293 log.debug("received "+message+" from "+consumer); 294 } 295 } 296 297 300 if (message == null) { 301 listener.setContinuation(continuation); 303 304 continuation.suspend(timeout); 307 } 308 listener.setContinuation(null); 309 310 response.setContentType("text/xml"); 312 response.setHeader("Cache-Control", "no-cache"); 313 314 StringWriter swriter = new StringWriter (); 315 PrintWriter writer = new PrintWriter (swriter); 316 317 Map consumerIdMap = getConsumerIdMap(request); 318 response.setStatus(HttpServletResponse.SC_OK); 319 writer.println("<ajax-response>"); 320 321 if (message != null) { 323 String id = (String ) consumerIdMap.get(consumer); 324 writer.print("<response id='"); 325 writer.print(id); 326 writer.print("'>"); 327 writeMessageResponse(writer, message); 328 writer.println("</response>"); 329 messages++; 330 } 331 332 for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) { 334 consumer = (MessageAvailableConsumer) consumers.get(i); 335 if (consumer.getAvailableListener() == null) 336 continue; 337 338 message = consumer.receiveNoWait(); 340 while (message != null && messages < maximumMessages) { 341 String id = (String ) consumerIdMap.get(consumer); 342 writer.print("<response id='"); 343 writer.print(id); 344 writer.print("'>"); 345 writeMessageResponse(writer, message); 346 writer.println("</response>"); 347 messages++; 348 message = consumer.receiveNoWait(); 349 } 350 } 351 352 355 writer.print("</ajax-response>"); 356 357 writer.flush(); 358 String m = swriter.toString(); 359 response.getWriter().println(m); 361 } 362 363 } 364 365 protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException , IOException { 366 if (message instanceof TextMessage ) { 367 TextMessage textMsg = (TextMessage ) message; 368 String txt = textMsg.getText(); 369 if (txt.startsWith("<?")) { 370 txt = txt.substring(txt.indexOf("?>") + 2); 371 } 372 writer.print(txt); 373 } else if (message instanceof ObjectMessage ) { 374 ObjectMessage objectMsg = (ObjectMessage ) message; 375 Object object = objectMsg.getObject(); 376 writer.print(object.toString()); 377 } 378 } 379 380 protected Listener getListener(HttpServletRequest request) { 381 HttpSession session = request.getSession(); 382 Listener listener = (Listener) session.getAttribute("mls.listener"); 383 if (listener == null) { 384 listener = new Listener(WebClient.getWebClient(request)); 385 session.setAttribute("mls.listener", listener); 386 } 387 return listener; 388 } 389 390 protected Map getConsumerIdMap(HttpServletRequest request) { 391 HttpSession session = request.getSession(true); 392 Map map = (Map ) session.getAttribute("mls.consumerIdMap"); 393 if (map == null) { 394 map = new HashMap (); 395 session.setAttribute("mls.consumerIdMap", map); 396 } 397 return map; 398 } 399 400 protected boolean isRicoAjax(HttpServletRequest request) { 401 String rico = request.getParameter("rico"); 402 return rico != null && rico.equals("true"); 403 } 404 405 409 protected long getReadTimeout(HttpServletRequest request) { 410 long answer = defaultReadTimeout; 411 412 String name = request.getParameter(readTimeoutParameter); 413 if (name != null) { 414 answer = asLong(name); 415 } 416 if (answer < 0 || answer > maximumReadTimeout) { 417 answer = maximumReadTimeout; 418 } 419 return answer; 420 } 421 422 425 private class Listener implements MessageAvailableListener { 426 WebClient client; 427 long lastAccess; 428 Continuation continuation; 429 430 Listener(WebClient client) { 431 this.client = client; 432 } 433 434 public void access() 435 { 436 lastAccess=System.currentTimeMillis(); 437 } 438 439 synchronized public void setContinuation(Continuation continuation) { 440 this.continuation = continuation; 441 } 442 443 synchronized public void onMessageAvailable(MessageConsumer consumer) { 444 if (log.isDebugEnabled()) { 445 log.debug("message for "+consumer+"continuation="+continuation); 446 } 447 if (continuation != null) 448 continuation.resume(); 449 else if (System.currentTimeMillis()-lastAccess>2*maximumReadTimeout) 450 { 451 new Thread () { 452 public void run() { 453 client.closeConsumers(); 454 }; 455 }.start(); 456 } 457 continuation = null; 458 } 459 460 } 461 } 462 | Popular Tags |