KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > pool > PooledSession


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.pool;
19
20 import java.io.Serializable JavaDoc;
21 import java.util.Iterator JavaDoc;
22
23 import javax.jms.BytesMessage JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.MapMessage JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageConsumer JavaDoc;
29 import javax.jms.MessageListener JavaDoc;
30 import javax.jms.MessageProducer JavaDoc;
31 import javax.jms.ObjectMessage JavaDoc;
32 import javax.jms.Queue JavaDoc;
33 import javax.jms.QueueBrowser JavaDoc;
34 import javax.jms.QueueReceiver JavaDoc;
35 import javax.jms.QueueSender JavaDoc;
36 import javax.jms.QueueSession JavaDoc;
37 import javax.jms.StreamMessage JavaDoc;
38 import javax.jms.TemporaryQueue JavaDoc;
39 import javax.jms.TemporaryTopic JavaDoc;
40 import javax.jms.TextMessage JavaDoc;
41 import javax.jms.Topic JavaDoc;
42 import javax.jms.TopicPublisher JavaDoc;
43 import javax.jms.TopicSession JavaDoc;
44 import javax.jms.TopicSubscriber JavaDoc;
45
46 import org.apache.activemq.ActiveMQMessageProducer;
47 import org.apache.activemq.ActiveMQQueueSender;
48 import org.apache.activemq.ActiveMQSession;
49 import org.apache.activemq.ActiveMQTopicPublisher;
50 import org.apache.activemq.AlreadyClosedException;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53
54 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
55
56 /**
57  * @version $Revision: 1.1 $
58  */

59 public class PooledSession implements TopicSession JavaDoc, QueueSession JavaDoc {
60     private static final transient Log log = LogFactory.getLog(PooledSession.class);
61
62     private ActiveMQSession session;
63     private SessionPool sessionPool;
64     private ActiveMQMessageProducer messageProducer;
65     private ActiveMQQueueSender queueSender;
66     private ActiveMQTopicPublisher topicPublisher;
67     private boolean transactional = true;
68     private boolean ignoreClose = false;
69     
70     private final CopyOnWriteArrayList JavaDoc consumers = new CopyOnWriteArrayList JavaDoc();
71     private final CopyOnWriteArrayList JavaDoc browsers = new CopyOnWriteArrayList JavaDoc();
72
73
74     public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
75         this.session = aSession;
76         this.sessionPool = sessionPool;
77         this.transactional = session.isTransacted();
78     }
79
80     protected boolean isIgnoreClose() {
81         return ignoreClose;
82     }
83
84     protected void setIgnoreClose(boolean ignoreClose) {
85         this.ignoreClose = ignoreClose;
86     }
87
88     public void close() throws JMSException JavaDoc {
89         if (!ignoreClose) {
90             // TODO a cleaner way to reset??
91

92             // lets reset the session
93
getSession().setMessageListener(null);
94             
95             // Close any consumers and browsers that may have been created.
96
for (Iterator JavaDoc iter = consumers.iterator(); iter.hasNext();) {
97                 MessageConsumer JavaDoc consumer = (MessageConsumer JavaDoc) iter.next();
98                 consumer.close();
99             }
100             consumers.clear();
101             
102             for (Iterator JavaDoc iter = browsers.iterator(); iter.hasNext();) {
103                 QueueBrowser JavaDoc browser = (QueueBrowser JavaDoc) iter.next();
104                 browser.close();
105             }
106             browsers.clear();
107     
108             // maybe do a rollback?
109
if (transactional) {
110                 try {
111                     getSession().rollback();
112                 }
113                 catch (JMSException JavaDoc e) {
114                     log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
115     
116                     // lets close the session and not put the session back into the pool
117
try {
118                         session.close();
119                     }
120                     catch (JMSException JavaDoc e1) {
121                         log.trace("Ignoring exception as discarding session: " + e1, e1);
122                     }
123                     session = null;
124                     return;
125                 }
126             }
127     
128             sessionPool.returnSession(this);
129         }
130     }
131
132     public void commit() throws JMSException JavaDoc {
133         getSession().commit();
134     }
135
136     public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc {
137         return getSession().createBytesMessage();
138     }
139
140     public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc {
141         return getSession().createMapMessage();
142     }
143
144     public Message JavaDoc createMessage() throws JMSException JavaDoc {
145         return getSession().createMessage();
146     }
147
148     public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc {
149         return getSession().createObjectMessage();
150     }
151
152     public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc serializable) throws JMSException JavaDoc {
153         return getSession().createObjectMessage(serializable);
154     }
155
156     public Queue JavaDoc createQueue(String JavaDoc s) throws JMSException JavaDoc {
157         return getSession().createQueue(s);
158     }
159
160     public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc {
161         return getSession().createStreamMessage();
162     }
163
164     public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc {
165         return getSession().createTemporaryQueue();
166     }
167
168     public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc {
169         return getSession().createTemporaryTopic();
170     }
171
172     public void unsubscribe(String JavaDoc s) throws JMSException JavaDoc {
173         getSession().unsubscribe(s);
174     }
175
176     public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc {
177         return getSession().createTextMessage();
178     }
179
180     public TextMessage JavaDoc createTextMessage(String JavaDoc s) throws JMSException JavaDoc {
181         return getSession().createTextMessage(s);
182     }
183
184     public Topic JavaDoc createTopic(String JavaDoc s) throws JMSException JavaDoc {
185         return getSession().createTopic(s);
186     }
187
188     public int getAcknowledgeMode() throws JMSException JavaDoc {
189         return getSession().getAcknowledgeMode();
190     }
191
192     public boolean getTransacted() throws JMSException JavaDoc {
193         return getSession().getTransacted();
194     }
195
196     public void recover() throws JMSException JavaDoc {
197         getSession().recover();
198     }
199
200     public void rollback() throws JMSException JavaDoc {
201         getSession().rollback();
202     }
203
204     public void run() {
205         if (session != null) {
206             session.run();
207         }
208     }
209
210
211     // Consumer related methods
212
//-------------------------------------------------------------------------
213
public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc {
214         return addQueueBrowser(getSession().createBrowser(queue));
215     }
216
217     public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue, String JavaDoc selector) throws JMSException JavaDoc {
218         return addQueueBrowser(getSession().createBrowser(queue, selector));
219     }
220
221     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination) throws JMSException JavaDoc {
222         return addConsumer(getSession().createConsumer(destination));
223     }
224
225     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc selector) throws JMSException JavaDoc {
226         return addConsumer(getSession().createConsumer(destination, selector));
227     }
228
229     public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc selector, boolean noLocal) throws JMSException JavaDoc {
230         return addConsumer(getSession().createConsumer(destination, selector, noLocal));
231     }
232
233     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc selector) throws JMSException JavaDoc {
234         return addTopicSubscriber(getSession().createDurableSubscriber(topic, selector));
235     }
236
237
238     public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name, String JavaDoc selector, boolean noLocal) throws JMSException JavaDoc {
239         return addTopicSubscriber(getSession().createDurableSubscriber(topic, name, selector, noLocal));
240     }
241
242     public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc {
243         return getSession().getMessageListener();
244     }
245
246     public void setMessageListener(MessageListener JavaDoc messageListener) throws JMSException JavaDoc {
247         getSession().setMessageListener(messageListener);
248     }
249
250     public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic) throws JMSException JavaDoc {
251         return addTopicSubscriber(getSession().createSubscriber(topic));
252     }
253
254     public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic, String JavaDoc selector, boolean local) throws JMSException JavaDoc {
255         return addTopicSubscriber(getSession().createSubscriber(topic, selector, local));
256     }
257
258     public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue) throws JMSException JavaDoc {
259         return addQueueReceiver(getSession().createReceiver(queue));
260     }
261
262     public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue, String JavaDoc selector) throws JMSException JavaDoc {
263         return addQueueReceiver(getSession().createReceiver(queue, selector));
264     }
265
266
267     // Producer related methods
268
//-------------------------------------------------------------------------
269
public MessageProducer JavaDoc createProducer(Destination JavaDoc destination) throws JMSException JavaDoc {
270         return new PooledProducer(getMessageProducer(), destination);
271     }
272
273     public QueueSender JavaDoc createSender(Queue JavaDoc queue) throws JMSException JavaDoc {
274         return new PooledQueueSender(getQueueSender(), queue);
275     }
276
277     public TopicPublisher JavaDoc createPublisher(Topic JavaDoc topic) throws JMSException JavaDoc {
278         return new PooledTopicPublisher(getTopicPublisher(), topic);
279     }
280
281     // Implementation methods
282
//-------------------------------------------------------------------------
283
protected ActiveMQSession getSession() throws AlreadyClosedException {
284         if (session == null) {
285             throw new AlreadyClosedException("The session has already been closed");
286         }
287         return session;
288     }
289
290     public ActiveMQMessageProducer getMessageProducer() throws JMSException JavaDoc {
291         if (messageProducer == null) {
292             messageProducer = (ActiveMQMessageProducer) getSession().createProducer(null);
293         }
294         return messageProducer;
295     }
296
297     public ActiveMQQueueSender getQueueSender() throws JMSException JavaDoc {
298         if (queueSender == null) {
299             queueSender = (ActiveMQQueueSender) getSession().createSender(null);
300         }
301         return queueSender;
302     }
303
304     public ActiveMQTopicPublisher getTopicPublisher() throws JMSException JavaDoc {
305         if (topicPublisher == null) {
306             topicPublisher = (ActiveMQTopicPublisher) getSession().createPublisher(null);
307         }
308         return topicPublisher;
309     }
310
311     private QueueBrowser JavaDoc addQueueBrowser(QueueBrowser JavaDoc browser) {
312         browsers.add(browser);
313         return browser;
314     }
315     private MessageConsumer JavaDoc addConsumer(MessageConsumer JavaDoc consumer) {
316         consumers.add(consumer);
317         return consumer;
318     }
319     private TopicSubscriber JavaDoc addTopicSubscriber(TopicSubscriber JavaDoc subscriber) {
320         consumers.add(subscriber);
321         return subscriber;
322     }
323     private QueueReceiver JavaDoc addQueueReceiver(QueueReceiver JavaDoc receiver) {
324         consumers.add(receiver);
325         return receiver;
326     }
327
328     public String JavaDoc toString() {
329         return "PooledSession { "+session+" }";
330     }
331 }
332
Popular Tags