KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > resource > adapter > jms > JmsSession


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.resource.adapter.jms;
23
24 import java.io.Serializable JavaDoc;
25 import java.util.HashSet JavaDoc;
26 import java.util.Iterator JavaDoc;
27
28 import javax.jms.BytesMessage JavaDoc;
29 import javax.jms.Destination JavaDoc;
30 import javax.jms.IllegalStateException JavaDoc;
31 import javax.jms.JMSException JavaDoc;
32 import javax.jms.MessageListener JavaDoc;
33 import javax.jms.MapMessage JavaDoc;
34 import javax.jms.Message JavaDoc;
35 import javax.jms.MessageConsumer JavaDoc;
36 import javax.jms.MessageProducer JavaDoc;
37 import javax.jms.ObjectMessage JavaDoc;
38 import javax.jms.Queue JavaDoc;
39 import javax.jms.QueueBrowser JavaDoc;
40 import javax.jms.QueueReceiver JavaDoc;
41 import javax.jms.QueueSender JavaDoc;
42 import javax.jms.QueueSession JavaDoc;
43 import javax.jms.Session JavaDoc;
44 import javax.jms.StreamMessage JavaDoc;
45 import javax.jms.TemporaryQueue JavaDoc;
46 import javax.jms.TemporaryTopic JavaDoc;
47 import javax.jms.TextMessage JavaDoc;
48 import javax.jms.Topic JavaDoc;
49 import javax.jms.TopicPublisher JavaDoc;
50 import javax.jms.TopicSession JavaDoc;
51 import javax.jms.TopicSubscriber JavaDoc;
52 import javax.resource.spi.ConnectionEvent JavaDoc;
53
54 import org.jboss.logging.Logger;
55
56 /**
57  * Adapts the JMS QueueSession and TopicSession API to a JmsManagedConnection.
58  *
59  * @author <a HREF="mailto:peter.antman@tim.se">Peter Antman </a>.
60  * @author <a HREF="mailto:jason@planet57.com">Jason Dillon </a>.
61  * @author <a HREF="mailto:adrian@jboss.com">Adrian Brock</a>
62  * @version $Revision: 38315 $
63  */

64 public class JmsSession implements Session JavaDoc, QueueSession JavaDoc, TopicSession JavaDoc
65 {
66    private static final Logger log = Logger.getLogger(JmsSession.class);
67
68    /** The managed connection for this session. */
69    private JmsManagedConnection mc; // = null;
70

71    /** The connection request info */
72    private JmsConnectionRequestInfo info;
73
74    /** The session factory for this session */
75    private JmsSessionFactory sf;
76    
77    /** The message consumers */
78    private HashSet JavaDoc consumers = new HashSet JavaDoc();
79    
80    /** The message producers */
81    private HashSet JavaDoc producers = new HashSet JavaDoc();
82    
83    /** Whether trace is enabled */
84    private boolean trace = log.isTraceEnabled();
85    
86    /**
87     * Construct a <tt>JmsSession</tt>.
88     *
89     * @param mc The managed connection for this session.
90     */

91    public JmsSession(final JmsManagedConnection mc, JmsConnectionRequestInfo info)
92    {
93       this.mc = mc;
94       this.info = info;
95       if (trace)
96          log.trace("new JmsSession " + this + " mc=" + mc + " cri=" + info);
97    }
98
99    public void setJmsSessionFactory(JmsSessionFactory sf)
100    {
101       this.sf = sf;
102    }
103    
104    /**
105     * Ensure that the session is opened.
106     *
107     * @return The session
108     *
109     * @throws IllegalStateException The session is closed
110     */

111    Session JavaDoc getSession() throws JMSException JavaDoc
112    {
113       // ensure that the connection is opened
114
if (mc == null)
115          throw new IllegalStateException JavaDoc("The session is closed");
116       
117       Session JavaDoc session = mc.getSession();
118       if (trace)
119          log.trace("getSession " + session + " for " + this);
120       return session;
121    }
122
123    // ---- Session API
124

125    public BytesMessage JavaDoc createBytesMessage() throws JMSException JavaDoc
126    {
127       Session JavaDoc session = getSession();
128       if (trace)
129          log.trace("createBytesMessage" + session);
130       return session.createBytesMessage();
131    }
132
133    public MapMessage JavaDoc createMapMessage() throws JMSException JavaDoc
134    {
135       Session JavaDoc session = getSession();
136       if (trace)
137          log.trace("createMapMessage" + session);
138       return session.createMapMessage();
139    }
140
141    public Message JavaDoc createMessage() throws JMSException JavaDoc
142    {
143       Session JavaDoc session = getSession();
144       if (trace)
145          log.trace("createMessage" + session);
146       return session.createMessage();
147    }
148
149    public ObjectMessage JavaDoc createObjectMessage() throws JMSException JavaDoc
150    {
151       Session JavaDoc session = getSession();
152       if (trace)
153          log.trace("createObjectMessage" + session);
154       return session.createObjectMessage();
155    }
156
157    public ObjectMessage JavaDoc createObjectMessage(Serializable JavaDoc object) throws JMSException JavaDoc
158    {
159       Session JavaDoc session = getSession();
160       if (trace)
161          log.trace("createObjectMessage(Object)" + session);
162       return session.createObjectMessage(object);
163    }
164
165    public StreamMessage JavaDoc createStreamMessage() throws JMSException JavaDoc
166    {
167       Session JavaDoc session = getSession();
168       if (trace)
169          log.trace("createStreamMessage" + session);
170       return session.createStreamMessage();
171    }
172
173    public TextMessage JavaDoc createTextMessage() throws JMSException JavaDoc
174    {
175       Session JavaDoc session = getSession();
176       if (trace)
177          log.trace("createTextMessage" + session);
178       return session.createTextMessage();
179    }
180
181    public TextMessage JavaDoc createTextMessage(String JavaDoc string) throws JMSException JavaDoc
182    {
183       Session JavaDoc session = getSession();
184       if (trace)
185          log.trace("createTextMessage(String)" + session);
186       return session.createTextMessage(string);
187    }
188
189
190    public boolean getTransacted() throws JMSException JavaDoc
191    {
192       getSession(); // check closed
193
return info.isTransacted();
194    }
195
196    /**
197     * Always throws an Exception.
198     *
199     * @throws IllegalStateException Method not allowed.
200     */

201    public MessageListener JavaDoc getMessageListener() throws JMSException JavaDoc
202    {
203       throw new IllegalStateException JavaDoc("Method not allowed");
204    }
205
206    /**
207     * Always throws an Exception.
208     *
209     * @throws IllegalStateException Method not allowed.
210     */

211    public void setMessageListener(MessageListener JavaDoc listener) throws JMSException JavaDoc
212    {
213       throw new IllegalStateException JavaDoc("Method not allowed");
214    }
215
216    /**
217     * Always throws an Error.
218     *
219     * @throws Error Method not allowed.
220     */

221    public void run()
222    {
223       // should this really throw an Error?
224
throw new Error JavaDoc("Method not allowed");
225    }
226
227    /**
228     * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the
229     * managed connection.
230     *
231     * @throws JMSException Failed to close session.
232     */

233    public void close() throws JMSException JavaDoc
234    {
235       sf.closeSession(this);
236       closeSession();
237    }
238
239    // FIXME - is this really OK, probably not
240
public void commit() throws JMSException JavaDoc
241    {
242       Session JavaDoc session = getSession();
243       if (info.isTransacted() == false)
244          throw new IllegalStateException JavaDoc("Session is not transacted");
245       if (trace)
246          log.trace("Commit session " + this);
247       session.commit();
248    }
249
250    public void rollback() throws JMSException JavaDoc
251    {
252       Session JavaDoc session = getSession();
253       if (info.isTransacted() == false)
254          throw new IllegalStateException JavaDoc("Session is not transacted");
255       if (trace)
256          log.trace("Rollback session " + this);
257       session.rollback();
258    }
259
260    public void recover() throws JMSException JavaDoc
261    {
262       Session JavaDoc session = getSession();
263       if (info.isTransacted())
264          throw new IllegalStateException JavaDoc("Session is transacted");
265       if (trace)
266          log.trace("Recover session " + this);
267       session.recover();
268    }
269
270    // --- TopicSession API
271

272    public Topic JavaDoc createTopic(String JavaDoc topicName) throws JMSException JavaDoc
273    {
274       Session JavaDoc session = getSession();
275       if (trace)
276          log.trace("createTopic " + session + " topicName=" + topicName);
277       Topic JavaDoc result = session.createTopic(topicName);
278       if (trace)
279          log.trace("createdTopic " + session + " topic=" + result);
280       return result;
281    }
282
283    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic) throws JMSException JavaDoc
284    {
285       TopicSession JavaDoc session = getTopicSession();
286       if (trace)
287          log.trace("createSubscriber " + session + " topic=" + topic);
288       TopicSubscriber JavaDoc result = session.createSubscriber(topic);
289       result = new JmsTopicSubscriber(result, this);
290       if (trace)
291          log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
292       addConsumer(result);
293       return result;
294    }
295
296    public TopicSubscriber JavaDoc createSubscriber(Topic JavaDoc topic, String JavaDoc messageSelector, boolean noLocal) throws JMSException JavaDoc
297    {
298       TopicSession JavaDoc session = getTopicSession();
299       if (trace)
300          log.trace("createSubscriber " + session + " topic=" + topic + " selector=" + messageSelector + " noLocal=" + noLocal);
301       TopicSubscriber JavaDoc result = session.createSubscriber(topic, messageSelector, noLocal);
302       result = new JmsTopicSubscriber(result, this);
303       if (trace)
304          log.trace("createdSubscriber " + session + " JmsTopicSubscriber=" + result);
305       addConsumer(result);
306       return result;
307    }
308
309    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name) throws JMSException JavaDoc
310    {
311       TopicSession JavaDoc session = getTopicSession();
312       if (trace)
313          log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name);
314       TopicSubscriber JavaDoc result = session.createDurableSubscriber(topic, name);
315       result = new JmsTopicSubscriber(result, this);
316       if (trace)
317          log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
318       addConsumer(result);
319       return result;
320    }
321
322    public TopicSubscriber JavaDoc createDurableSubscriber(Topic JavaDoc topic, String JavaDoc name, String JavaDoc messageSelector, boolean noLocal)
323          throws JMSException JavaDoc
324    {
325       Session JavaDoc session = getSession();
326       if (trace)
327          log.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name + " selector=" + messageSelector + " noLocal=" + noLocal);
328       TopicSubscriber JavaDoc result = session.createDurableSubscriber(topic, name, messageSelector, noLocal);
329       result = new JmsTopicSubscriber(result, this);
330       if (trace)
331          log.trace("createdDurableSubscriber " + session + " JmsTopicSubscriber=" + result);
332       addConsumer(result);
333       return result;
334    }
335
336    public TopicPublisher JavaDoc createPublisher(Topic JavaDoc topic) throws JMSException JavaDoc
337    {
338       TopicSession JavaDoc session = getTopicSession();
339       if (trace)
340          log.trace("createPublisher " + session + " topic=" + topic);
341       TopicPublisher JavaDoc result = session.createPublisher(topic);
342       if (trace)
343          log.trace("createdPublisher " + session + " publisher=" + result);
344       addProducer(result);
345       return result;
346    }
347
348    public TemporaryTopic JavaDoc createTemporaryTopic() throws JMSException JavaDoc
349    {
350       Session JavaDoc session = getSession();
351       if (trace)
352          log.trace("createTemporaryTopic " + session);
353       TemporaryTopic JavaDoc temp = session.createTemporaryTopic();
354       if (trace)
355          log.trace("createdTemporaryTopic " + session + " temp=" + temp);
356       sf.addTemporaryTopic(temp);
357       return temp;
358    }
359
360    public void unsubscribe(String JavaDoc name) throws JMSException JavaDoc
361    {
362       Session JavaDoc session = getSession();
363       if (trace)
364          log.trace("unsubscribe " + session + " name=" + name);
365       session.unsubscribe(name);
366    }
367
368    //--- QueueSession API
369

370    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue) throws JMSException JavaDoc
371    {
372       Session JavaDoc session = getSession();
373       if (trace)
374          log.trace("createBrowser " + session + " queue=" + queue);
375       QueueBrowser JavaDoc result = session.createBrowser(queue);
376       if (trace)
377          log.trace("createdBrowser " + session + " browser=" + result);
378       return result;
379    }
380
381    public QueueBrowser JavaDoc createBrowser(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc
382    {
383       Session JavaDoc session = getSession();
384       if (trace)
385          log.trace("createBrowser " + session + " queue=" + queue + " selector=" + messageSelector);
386       QueueBrowser JavaDoc result = session.createBrowser(queue, messageSelector);
387       if (trace)
388          log.trace("createdBrowser " + session + " browser=" + result);
389       return result;
390    }
391
392    public Queue JavaDoc createQueue(String JavaDoc queueName) throws JMSException JavaDoc
393    {
394       Session JavaDoc session = getSession();
395       if (trace)
396          log.trace("createQueue " + session + " queueName=" + queueName);
397       Queue JavaDoc result = session.createQueue(queueName);
398       if (trace)
399          log.trace("createdQueue " + session + " queue=" + result);
400       return result;
401    }
402
403    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue) throws JMSException JavaDoc
404    {
405       QueueSession JavaDoc session = getQueueSession();
406       if (trace)
407          log.trace("createReceiver " + session + " queue=" + queue);
408       QueueReceiver JavaDoc result = session.createReceiver(queue);
409       result = new JmsQueueReceiver(result, this);
410       if (trace)
411          log.trace("createdReceiver " + session + " receiver=" + result);
412       addConsumer(result);
413       return result;
414    }
415
416    public QueueReceiver JavaDoc createReceiver(Queue JavaDoc queue, String JavaDoc messageSelector) throws JMSException JavaDoc
417    {
418       QueueSession JavaDoc session = getQueueSession();
419       if (trace)
420          log.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
421       QueueReceiver JavaDoc result = session.createReceiver(queue, messageSelector);
422       result = new JmsQueueReceiver(result, this);
423       if (trace)
424          log.trace("createdReceiver " + session + " receiver=" + result);
425       addConsumer(result);
426       return result;
427    }
428
429    public QueueSender JavaDoc createSender(Queue JavaDoc queue) throws JMSException JavaDoc
430    {
431       QueueSession JavaDoc session = getQueueSession();
432       if (trace)
433          log.trace("createSender " + session + " queue=" + queue);
434       QueueSender JavaDoc result = session.createSender(queue);
435       if (trace)
436          log.trace("createdSender " + session + " sender=" + result);
437       addProducer(result);
438       return result;
439    }
440
441    public TemporaryQueue JavaDoc createTemporaryQueue() throws JMSException JavaDoc
442    {
443       Session JavaDoc session = getSession();
444       if (trace)
445          log.trace("createTemporaryQueue " + session);
446       TemporaryQueue JavaDoc temp = session.createTemporaryQueue();
447       if (trace)
448          log.trace("createdTemporaryQueue " + session + " temp=" + temp);
449       sf.addTemporaryQueue(temp);
450       return temp;
451    }
452
453    // -- JMS 1.1
454

455    public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination) throws JMSException JavaDoc
456    {
457       Session JavaDoc session = getSession();
458       if (trace)
459          log.trace("createConsumer " + session + " dest=" + destination);
460       MessageConsumer JavaDoc result = session.createConsumer(destination);
461       result = new JmsMessageConsumer(result, this);
462       if (trace)
463          log.trace("createdConsumer " + session + " consumer=" + result);
464       addConsumer(result);
465       return result;
466    }
467
468    public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector) throws JMSException JavaDoc
469    {
470       Session JavaDoc session = getSession();
471       if (trace)
472          log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector);
473       MessageConsumer JavaDoc result = session.createConsumer(destination, messageSelector);
474       result = new JmsMessageConsumer(result, this);
475       if (trace)
476          log.trace("createdConsumer " + session + " consumer=" + result);
477       addConsumer(result);
478       return result;
479    }
480
481    public MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination, String JavaDoc messageSelector, boolean noLocal)
482          throws JMSException JavaDoc
483    {
484       Session JavaDoc session = getSession();
485       if (trace)
486          log.trace("createConsumer " + session + " dest=" + destination + " messageSelector=" + messageSelector + " noLocal=" + noLocal);
487       MessageConsumer JavaDoc result = session.createConsumer(destination, messageSelector, noLocal);
488       result = new JmsMessageConsumer(result, this);
489       if (trace)
490          log.trace("createdConsumer " + session + " consumer=" + result);
491       addConsumer(result);
492       return result;
493    }
494
495    public MessageProducer JavaDoc createProducer(Destination JavaDoc destination) throws JMSException JavaDoc
496    {
497       Session JavaDoc session = getSession();
498       if (trace)
499          log.trace("createProducer " + session + " dest=" + destination);
500       MessageProducer JavaDoc result = getSession().createProducer(destination);
501       if (trace)
502          log.trace("createdProducer " + session + " producer=" + result);
503       addProducer(result);
504       return result;
505    }
506
507    public int getAcknowledgeMode() throws JMSException JavaDoc
508    {
509       getSession(); // check closed
510
return info.getAcknowledgeMode();
511    }
512
513    // --- JmsManagedConnection api
514

515    void setManagedConnection(final JmsManagedConnection mc)
516    {
517       if (this.mc != null)
518          this.mc.removeHandle(this);
519       this.mc = mc;
520    }
521
522    void destroy()
523    {
524       mc = null;
525    }
526
527    void start() throws JMSException JavaDoc
528    {
529       if (mc != null)
530          mc.start();
531    }
532
533    void stop() throws JMSException JavaDoc
534    {
535       if (mc != null)
536          mc.stop();
537    }
538
539    void checkStrict() throws JMSException JavaDoc
540    {
541       if (mc != null && mc.getManagedConnectionFactory().isStrict())
542          throw new IllegalStateException JavaDoc(JmsSessionFactory.ISE);
543    }
544    
545    void closeSession() throws JMSException JavaDoc
546    {
547       if (mc != null)
548       {
549          log.trace("Closing session");
550
551          try
552          {
553             mc.stop();
554          }
555          catch (Throwable JavaDoc t)
556          {
557             log.trace("Error stopping managed connection", t);
558          }
559          
560          synchronized (consumers)
561          {
562             for (Iterator JavaDoc i = consumers.iterator(); i.hasNext();)
563             {
564                JmsMessageConsumer consumer = (JmsMessageConsumer) i.next();
565                try
566                {
567                   consumer.closeConsumer();
568                }
569                catch (Throwable JavaDoc t)
570                {
571                   log.trace("Error closing consumer", t);
572                }
573                i.remove();
574             }
575          }
576
577          synchronized (producers)
578          {
579             for (Iterator JavaDoc i = producers.iterator(); i.hasNext();)
580             {
581                MessageProducer JavaDoc producer = (MessageProducer JavaDoc) i.next();
582                try
583                {
584                   producer.close();
585                }
586                catch (Throwable JavaDoc t)
587                {
588                   log.trace("Error closing producer", t);
589                }
590                i.remove();
591             }
592          }
593          
594          mc.removeHandle(this);
595          ConnectionEvent JavaDoc ev = new ConnectionEvent JavaDoc(mc, ConnectionEvent.CONNECTION_CLOSED);
596          ev.setConnectionHandle(this);
597          mc.sendEvent(ev);
598          mc = null;
599       }
600    }
601    
602    void addConsumer(MessageConsumer JavaDoc consumer)
603    {
604       synchronized (consumers)
605       {
606          consumers.add(consumer);
607       }
608    }
609    
610    void removeConsumer(MessageConsumer JavaDoc consumer)
611    {
612       synchronized (consumers)
613       {
614          consumers.remove(consumer);
615       }
616    }
617    
618    void addProducer(MessageProducer JavaDoc producer)
619    {
620       synchronized (producers)
621       {
622          producers.add(producer);
623       }
624    }
625    
626    void removeProducer(MessageProducer JavaDoc producer)
627    {
628       synchronized (producers)
629       {
630          producers.remove(producer);
631       }
632    }
633    
634    QueueSession JavaDoc getQueueSession() throws JMSException JavaDoc
635    {
636       return (QueueSession JavaDoc) getSession();
637    }
638    
639    TopicSession JavaDoc getTopicSession() throws JMSException JavaDoc
640    {
641       return (TopicSession JavaDoc) getSession();
642    }
643 }
644
Popular Tags