1 18 19 package org.apache.activemq.web; 20 21 import java.io.Externalizable ; 22 import java.io.IOException ; 23 import java.io.ObjectInput ; 24 import java.io.ObjectOutput ; 25 import java.util.ArrayList ; 26 import java.util.HashMap ; 27 import java.util.Iterator ; 28 import java.util.List ; 29 import java.util.Map ; 30 31 import javax.jms.ConnectionFactory ; 32 import javax.jms.DeliveryMode ; 33 import javax.jms.Destination ; 34 import javax.jms.JMSException ; 35 import javax.jms.Message ; 36 import javax.jms.MessageConsumer ; 37 import javax.jms.MessageProducer ; 38 import javax.jms.Session ; 39 import javax.servlet.ServletContext ; 40 import javax.servlet.http.HttpServletRequest ; 41 import javax.servlet.http.HttpSession ; 42 import javax.servlet.http.HttpSessionActivationListener ; 43 import javax.servlet.http.HttpSessionBindingEvent ; 44 import javax.servlet.http.HttpSessionBindingListener ; 45 import javax.servlet.http.HttpSessionEvent ; 46 47 import org.apache.activemq.ActiveMQConnection; 48 import org.apache.activemq.ActiveMQConnectionFactory; 49 import org.apache.activemq.ActiveMQSession; 50 import org.apache.activemq.MessageAvailableConsumer; 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 54 import java.util.concurrent.Semaphore ; 55 56 65 public class WebClient implements HttpSessionActivationListener , HttpSessionBindingListener , Externalizable { 66 public static final String webClientAttribute = "org.apache.activemq.webclient"; 67 public static final String connectionFactoryAttribute = "org.apache.activemq.connectionFactory"; 68 69 public static final String connectionFactoryPrefetchParam = "org.apache.activemq.connectionFactory.prefetch"; 70 public static final String connectionFactoryOptimizeAckParam = "org.apache.activemq.connectionFactory.optimizeAck"; 71 public static final String brokerUrlInitParam = "org.apache.activemq.brokerURL"; 72 73 private static final Log log = LogFactory.getLog(WebClient.class); 74 75 private static transient ConnectionFactory factory; 76 77 private transient Map consumers = new HashMap (); 78 private transient ActiveMQConnection connection; 79 private transient ActiveMQSession session; 80 private transient MessageProducer producer; 81 private int deliveryMode = DeliveryMode.NON_PERSISTENT; 82 83 private final Semaphore semaphore = new Semaphore (1); 84 85 86 93 public static WebClient getWebClient(HttpServletRequest request) { 94 HttpSession session = request.getSession(true); 95 WebClient client = getWebClient(session); 96 if (client == null || client.isClosed()) { 97 client = WebClient.createWebClient(request); 98 session.setAttribute(webClientAttribute, client); 99 } 100 101 return client; 102 } 103 107 public static WebClient getWebClient(HttpSession session) { 108 return (WebClient) session.getAttribute(webClientAttribute); 109 } 110 111 public static void initContext(ServletContext context) { 112 initConnectionFactory(context); 113 } 114 115 public WebClient() { 116 if (factory == null) 117 throw new IllegalStateException ("initContext(ServletContext) not called"); 118 } 119 120 public int getDeliveryMode() { 121 return deliveryMode; 122 } 123 124 public void setDeliveryMode(int deliveryMode) { 125 this.deliveryMode = deliveryMode; 126 } 127 128 public synchronized void closeConsumers() { 129 for (Iterator it = consumers.values().iterator(); it.hasNext();) { 130 MessageConsumer consumer = (MessageConsumer ) it.next(); 131 it.remove(); 132 try { 133 consumer.setMessageListener(null); 134 if (consumer instanceof MessageAvailableConsumer) 135 ((MessageAvailableConsumer) consumer).setAvailableListener(null); 136 consumer.close(); 137 } 138 catch (JMSException e) { 139 log.debug("caught exception closing consumer",e); 140 } 141 } 142 } 143 144 public synchronized void close() { 145 try { 146 closeConsumers(); 147 if (connection != null) 148 connection.close(); 149 } 150 catch (JMSException e) { 151 log.debug("caught exception closing consumer",e); 152 } 153 finally { 154 producer = null; 155 session = null; 156 connection = null; 157 if (consumers != null) 158 consumers.clear(); 159 consumers = null; 160 } 161 } 162 163 public boolean isClosed() { 164 return consumers == null; 165 } 166 167 public void writeExternal(ObjectOutput out) throws IOException { 168 if (consumers != null) { 169 out.write(consumers.size()); 170 Iterator i = consumers.keySet().iterator(); 171 while (i.hasNext()) 172 out.writeObject(i.next().toString()); 173 } 174 else 175 out.write(-1); 176 177 } 178 179 public void readExternal(ObjectInput in) throws IOException , ClassNotFoundException { 180 int size = in.readInt(); 181 if (size >= 0) { 182 consumers = new HashMap (); 183 for (int i = 0; i < size; i++) { 184 String destinationName = in.readObject().toString(); 185 186 try { 187 Destination destination = destinationName.startsWith("topic://") ? (Destination ) getSession().createTopic(destinationName) 188 : (Destination ) getSession().createQueue(destinationName); 189 consumers.put(destination, getConsumer(destination, true)); 190 } 191 catch (JMSException e) { 192 log.debug("Caought Exception ",e); 193 IOException ex = new IOException (e.getMessage()); 194 ex.initCause(e.getCause() != null ? e.getCause() : e); 195 throw ex; 196 197 } 198 } 199 } 200 } 201 202 public void send(Destination destination, Message message) throws JMSException { 203 getProducer().send(destination, message); 204 if (log.isDebugEnabled()) { 205 log.debug("Sent! to destination: " + destination + " message: " + message); 206 } 207 } 208 209 public void send(Destination destination, Message message, boolean persistent, int priority, int timeToLive) throws JMSException { 210 int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; 211 getProducer().send(destination, message, deliveryMode, priority, timeToLive); 212 if (log.isDebugEnabled()) { 213 log.debug("Sent! to destination: " + destination + " message: " + message); 214 } 215 } 216 217 public Session getSession() throws JMSException { 218 if (session == null) { 219 session = createSession(); 220 } 221 return session; 222 } 223 224 public ActiveMQConnection getConnection() throws JMSException { 225 if (connection == null) { 226 connection = (ActiveMQConnection) factory.createConnection(); 227 connection.start(); 228 } 229 return connection; 230 } 231 232 public static synchronized void initConnectionFactory(ServletContext servletContext) { 233 if (factory == null) 234 factory = (ConnectionFactory ) servletContext.getAttribute(connectionFactoryAttribute); 235 if (factory == null) { 236 String brokerURL = servletContext.getInitParameter(brokerUrlInitParam); 237 238 239 log.debug("Value of: " + brokerUrlInitParam + " is: " + brokerURL); 240 241 if (brokerURL == null) { 242 brokerURL = "vm://localhost"; 243 } 244 245 ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL); 246 247 if (servletContext.getInitParameter(connectionFactoryPrefetchParam) != null) { 249 int prefetch = Integer.valueOf(servletContext.getInitParameter(connectionFactoryPrefetchParam)).intValue(); 250 amqfactory.getPrefetchPolicy().setAll(prefetch); 251 } 252 253 if (servletContext.getInitParameter(connectionFactoryOptimizeAckParam) != null) { 255 boolean optimizeAck = Boolean.valueOf(servletContext.getInitParameter(connectionFactoryOptimizeAckParam)).booleanValue(); 256 amqfactory.setOptimizeAcknowledge(optimizeAck); 257 } 258 259 factory = amqfactory; 260 261 servletContext.setAttribute(connectionFactoryAttribute, factory); 262 } 263 } 264 265 public synchronized MessageProducer getProducer() throws JMSException { 266 if (producer == null) { 267 producer = getSession().createProducer(null); 268 producer.setDeliveryMode(deliveryMode); 269 } 270 return producer; 271 } 272 273 public void setProducer(MessageProducer producer) { 274 this.producer = producer; 275 } 276 277 public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException { 278 return getConsumer(destination, true); 279 } 280 281 public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException { 282 MessageConsumer consumer = (MessageConsumer ) consumers.get(destination); 283 if (create && consumer == null) { 284 consumer = getSession().createConsumer(destination); 285 consumers.put(destination, consumer); 286 } 287 return consumer; 288 } 289 290 public synchronized void closeConsumer(Destination destination) throws JMSException { 291 MessageConsumer consumer = (MessageConsumer ) consumers.get(destination); 292 if (consumer != null) { 293 consumers.remove(destination); 294 consumer.setMessageListener(null); 295 if (consumer instanceof MessageAvailableConsumer) 296 ((MessageAvailableConsumer) consumer).setAvailableListener(null); 297 consumer.close(); 298 } 299 } 300 301 public synchronized List getConsumers() { 302 return new ArrayList (consumers.values()); 303 } 304 305 protected ActiveMQSession createSession() throws JMSException { 306 return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); 307 } 308 309 public Semaphore getSemaphore() { 310 return semaphore; 311 } 312 313 public void sessionWillPassivate(HttpSessionEvent event) { 314 close(); 315 } 316 317 public void sessionDidActivate(HttpSessionEvent event) { 318 } 319 320 public void valueBound(HttpSessionBindingEvent event) { 321 } 322 323 public void valueUnbound(HttpSessionBindingEvent event) { 324 close(); 325 } 326 327 protected static WebClient createWebClient(HttpServletRequest request) { 328 return new WebClient(); 329 } 330 331 } 332 | Popular Tags |