1 18 package org.apache.activemq.pool; 19 20 import java.io.Serializable ; 21 import java.util.Iterator ; 22 23 import javax.jms.BytesMessage ; 24 import javax.jms.Destination ; 25 import javax.jms.JMSException ; 26 import javax.jms.MapMessage ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageListener ; 30 import javax.jms.MessageProducer ; 31 import javax.jms.ObjectMessage ; 32 import javax.jms.Queue ; 33 import javax.jms.QueueBrowser ; 34 import javax.jms.QueueReceiver ; 35 import javax.jms.QueueSender ; 36 import javax.jms.QueueSession ; 37 import javax.jms.StreamMessage ; 38 import javax.jms.TemporaryQueue ; 39 import javax.jms.TemporaryTopic ; 40 import javax.jms.TextMessage ; 41 import javax.jms.Topic ; 42 import javax.jms.TopicPublisher ; 43 import javax.jms.TopicSession ; 44 import javax.jms.TopicSubscriber ; 45 46 import org.apache.activemq.ActiveMQMessageProducer; 47 import org.apache.activemq.ActiveMQQueueSender; 48 import org.apache.activemq.ActiveMQSession; 49 import org.apache.activemq.ActiveMQTopicPublisher; 50 import org.apache.activemq.AlreadyClosedException; 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 54 import java.util.concurrent.CopyOnWriteArrayList ; 55 56 59 public class PooledSession implements TopicSession , QueueSession { 60 private static final transient Log log = LogFactory.getLog(PooledSession.class); 61 62 private ActiveMQSession session; 63 private SessionPool sessionPool; 64 private ActiveMQMessageProducer messageProducer; 65 private ActiveMQQueueSender queueSender; 66 private ActiveMQTopicPublisher topicPublisher; 67 private boolean transactional = true; 68 private boolean ignoreClose = false; 69 70 private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList (); 71 private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList (); 72 73 74 public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) { 75 this.session = aSession; 76 this.sessionPool = sessionPool; 77 this.transactional = session.isTransacted(); 78 } 79 80 protected boolean isIgnoreClose() { 81 return ignoreClose; 82 } 83 84 protected void setIgnoreClose(boolean ignoreClose) { 85 this.ignoreClose = ignoreClose; 86 } 87 88 public void close() throws JMSException { 89 if (!ignoreClose) { 90 92 getSession().setMessageListener(null); 94 95 for (Iterator iter = consumers.iterator(); iter.hasNext();) { 97 MessageConsumer consumer = (MessageConsumer ) iter.next(); 98 consumer.close(); 99 } 100 consumers.clear(); 101 102 for (Iterator iter = browsers.iterator(); iter.hasNext();) { 103 QueueBrowser browser = (QueueBrowser ) iter.next(); 104 browser.close(); 105 } 106 browsers.clear(); 107 108 if (transactional) { 110 try { 111 getSession().rollback(); 112 } 113 catch (JMSException e) { 114 log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e); 115 116 try { 118 session.close(); 119 } 120 catch (JMSException e1) { 121 log.trace("Ignoring exception as discarding session: " + e1, e1); 122 } 123 session = null; 124 return; 125 } 126 } 127 128 sessionPool.returnSession(this); 129 } 130 } 131 132 public void commit() throws JMSException { 133 getSession().commit(); 134 } 135 136 public BytesMessage createBytesMessage() throws JMSException { 137 return getSession().createBytesMessage(); 138 } 139 140 public MapMessage createMapMessage() throws JMSException { 141 return getSession().createMapMessage(); 142 } 143 144 public Message createMessage() throws JMSException { 145 return getSession().createMessage(); 146 } 147 148 public ObjectMessage createObjectMessage() throws JMSException { 149 return getSession().createObjectMessage(); 150 } 151 152 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { 153 return getSession().createObjectMessage(serializable); 154 } 155 156 public Queue createQueue(String s) throws JMSException { 157 return getSession().createQueue(s); 158 } 159 160 public StreamMessage createStreamMessage() throws JMSException { 161 return getSession().createStreamMessage(); 162 } 163 164 public TemporaryQueue createTemporaryQueue() throws JMSException { 165 return getSession().createTemporaryQueue(); 166 } 167 168 public TemporaryTopic createTemporaryTopic() throws JMSException { 169 return getSession().createTemporaryTopic(); 170 } 171 172 public void unsubscribe(String s) throws JMSException { 173 getSession().unsubscribe(s); 174 } 175 176 public TextMessage createTextMessage() throws JMSException { 177 return getSession().createTextMessage(); 178 } 179 180 public TextMessage createTextMessage(String s) throws JMSException { 181 return getSession().createTextMessage(s); 182 } 183 184 public Topic createTopic(String s) throws JMSException { 185 return getSession().createTopic(s); 186 } 187 188 public int getAcknowledgeMode() throws JMSException { 189 return getSession().getAcknowledgeMode(); 190 } 191 192 public boolean getTransacted() throws JMSException { 193 return getSession().getTransacted(); 194 } 195 196 public void recover() throws JMSException { 197 getSession().recover(); 198 } 199 200 public void rollback() throws JMSException { 201 getSession().rollback(); 202 } 203 204 public void run() { 205 if (session != null) { 206 session.run(); 207 } 208 } 209 210 211 public QueueBrowser createBrowser(Queue queue) throws JMSException { 214 return addQueueBrowser(getSession().createBrowser(queue)); 215 } 216 217 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { 218 return addQueueBrowser(getSession().createBrowser(queue, selector)); 219 } 220 221 public MessageConsumer createConsumer(Destination destination) throws JMSException { 222 return addConsumer(getSession().createConsumer(destination)); 223 } 224 225 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { 226 return addConsumer(getSession().createConsumer(destination, selector)); 227 } 228 229 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { 230 return addConsumer(getSession().createConsumer(destination, selector, noLocal)); 231 } 232 233 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { 234 return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector)); 235 } 236 237 238 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { 239 return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal)); 240 } 241 242 public MessageListener getMessageListener() throws JMSException { 243 return getSession().getMessageListener(); 244 } 245 246 public void setMessageListener(MessageListener messageListener) throws JMSException { 247 getSession().setMessageListener(messageListener); 248 } 249 250 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 251 return addTopicSubscriber(getSession().createSubscriber(topic)); 252 } 253 254 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { 255 return addTopicSubscriber(getSession().createSubscriber(topic, selector, local)); 256 } 257 258 public QueueReceiver createReceiver(Queue queue) throws JMSException { 259 return addQueueReceiver(getSession().createReceiver(queue)); 260 } 261 262 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { 263 return addQueueReceiver(getSession().createReceiver(queue, selector)); 264 } 265 266 267 public MessageProducer createProducer(Destination destination) throws JMSException { 270 return new PooledProducer(getMessageProducer(), destination); 271 } 272 273 public QueueSender createSender(Queue queue) throws JMSException { 274 return new PooledQueueSender(getQueueSender(), queue); 275 } 276 277 public TopicPublisher createPublisher(Topic topic) throws JMSException { 278 return new PooledTopicPublisher(getTopicPublisher(), topic); 279 } 280 281 protected ActiveMQSession getSession() throws AlreadyClosedException { 284 if (session == null) { 285 throw new AlreadyClosedException("The session has already been closed"); 286 } 287 return session; 288 } 289 290 public ActiveMQMessageProducer getMessageProducer() throws JMSException { 291 if (messageProducer == null) { 292 messageProducer = (ActiveMQMessageProducer) getSession().createProducer(null); 293 } 294 return messageProducer; 295 } 296 297 public ActiveMQQueueSender getQueueSender() throws JMSException { 298 if (queueSender == null) { 299 queueSender = (ActiveMQQueueSender) getSession().createSender(null); 300 } 301 return queueSender; 302 } 303 304 public ActiveMQTopicPublisher getTopicPublisher() throws JMSException { 305 if (topicPublisher == null) { 306 topicPublisher = (ActiveMQTopicPublisher) getSession().createPublisher(null); 307 } 308 return topicPublisher; 309 } 310 311 private QueueBrowser addQueueBrowser(QueueBrowser browser) { 312 browsers.add(browser); 313 return browser; 314 } 315 private MessageConsumer addConsumer(MessageConsumer consumer) { 316 consumers.add(consumer); 317 return consumer; 318 } 319 private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) { 320 consumers.add(subscriber); 321 return subscriber; 322 } 323 private QueueReceiver addQueueReceiver(QueueReceiver receiver) { 324 consumers.add(receiver); 325 return receiver; 326 } 327 328 public String toString() { 329 return "PooledSession { "+session+" }"; 330 } 331 } 332 | Popular Tags |