KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > resource > adapter > jms > JmsSessionFactoryImpl


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.resource.adapter.jms;
23
24 import java.util.HashSet JavaDoc;
25 import java.util.Iterator JavaDoc;
26
27 import javax.jms.ConnectionConsumer JavaDoc;
28 import javax.jms.ConnectionMetaData JavaDoc;
29 import javax.jms.Destination JavaDoc;
30 import javax.jms.ExceptionListener JavaDoc;
31 import javax.jms.IllegalStateException JavaDoc;
32 import javax.jms.JMSException JavaDoc;
33 import javax.jms.Queue JavaDoc;
34 import javax.jms.QueueSession JavaDoc;
35 import javax.jms.ServerSessionPool JavaDoc;
36 import javax.jms.Session JavaDoc;
37 import javax.jms.TemporaryQueue JavaDoc;
38 import javax.jms.TemporaryTopic JavaDoc;
39 import javax.jms.Topic JavaDoc;
40 import javax.jms.TopicSession JavaDoc;
41 import javax.naming.Reference JavaDoc;
42 import javax.resource.Referenceable JavaDoc;
43 import javax.resource.ResourceException JavaDoc;
44 import javax.resource.spi.ConnectionManager JavaDoc;
45 import javax.resource.spi.ManagedConnectionFactory JavaDoc;
46
47 import org.jboss.logging.Logger;
48
49 /**
50  * Implements the JMS Connection API and produces {@link JmsSession} objects.
51  *
52  * @author <a HREF="mailto:peter.antman@tim.se">Peter Antman</a>.
53  * @author <a HREF="mailto:jason@planet57.com">Jason Dillon</a>
54  * @author <a HREF="mailto:adrian@jboss.com">Adrian Brock</a>
55  * @version <tt>$Revision: 38315 $</tt>
56  */

57 public class JmsSessionFactoryImpl
58    implements JmsSessionFactory, Referenceable JavaDoc
59 {
60    private static final Logger log = Logger.getLogger(JmsSessionFactoryImpl.class);
61
62    /** Are we closed? */
63    private boolean closed = false;
64
65    /** Whether trace is enabled */
66    private boolean trace = log.isTraceEnabled();
67    
68    private Reference JavaDoc reference;
69
70    // Used from JmsConnectionFactory
71
private String JavaDoc userName;
72    private String JavaDoc password;
73    private String JavaDoc clientID;
74    private int type;
75
76    /* Whether we are started */
77    private boolean started = false;
78    
79    /** JmsRa own factory */
80    private JmsManagedConnectionFactory mcf;
81
82    /** Hook to the appserver */
83    private ConnectionManager JavaDoc cm;
84
85    /** The sessions */
86    private HashSet JavaDoc sessions = new HashSet JavaDoc();
87
88    /** The temporary queues */
89    private HashSet JavaDoc tempQueues = new HashSet JavaDoc();
90
91    /** The temporary topics */
92    private HashSet JavaDoc tempTopics = new HashSet JavaDoc();
93    
94    public JmsSessionFactoryImpl(final ManagedConnectionFactory JavaDoc mcf,
95                                 final ConnectionManager JavaDoc cm,
96                                 final int type)
97    {
98       this.mcf = (JmsManagedConnectionFactory) mcf;
99       this.cm = cm;
100       
101       if (cm == null)
102          // This is standalone usage, no appserver
103
this.cm = new JmsConnectionManager();
104       else
105          this.cm = cm;
106
107       this.type = type;
108
109       if (trace)
110          log.trace("mcf=" + mcf + ", cm=" + cm + ", type=" + type);
111    }
112
113    public void setReference(final Reference JavaDoc reference)
114    {
115       this.reference = reference;
116    }
117     
118    public Reference JavaDoc getReference()
119    {
120       return reference;
121    }
122     
123    // --- API for JmsConnectionFactoryImpl
124

125    public void setUserName(final String JavaDoc name)
126    {
127       userName = name;
128    }
129     
130    public void setPassword(final String JavaDoc password)
131    {
132       this.password = password;
133    }
134
135    //---- QueueConnection ---
136

137    public QueueSession JavaDoc createQueueSession(final boolean transacted,
138                                           final int acknowledgeMode)
139       throws JMSException JavaDoc
140    {
141       checkClosed();
142       if (type == JmsConnectionFactory.TOPIC)
143          throw new IllegalStateException JavaDoc("Can not get a queue session from a topic connection");
144       return allocateConnection(transacted, acknowledgeMode, type);
145    }
146     
147    public ConnectionConsumer JavaDoc createConnectionConsumer
148       (Queue JavaDoc queue,
149        String JavaDoc messageSelector,
150        ServerSessionPool JavaDoc sessionPool,
151        int maxMessages)
152       throws JMSException JavaDoc
153    {
154       throw new IllegalStateException JavaDoc(ISE);
155    }
156     
157    //--- TopicConnection ---
158

159    public TopicSession JavaDoc createTopicSession(final boolean transacted,
160                                           final int acknowledgeMode)
161       throws JMSException JavaDoc
162    {
163       checkClosed();
164       if (type == JmsConnectionFactory.QUEUE)
165          throw new IllegalStateException JavaDoc("Can not get a topic session from a queue connection");
166       return allocateConnection(transacted, acknowledgeMode, type);
167    }
168
169    public ConnectionConsumer JavaDoc createConnectionConsumer
170       (Topic JavaDoc topic,
171        String JavaDoc messageSelector,
172        ServerSessionPool JavaDoc sessionPool,
173        int maxMessages)
174       throws JMSException JavaDoc
175    {
176       throw new IllegalStateException JavaDoc(ISE);
177    }
178
179    public ConnectionConsumer JavaDoc createDurableConnectionConsumer(
180       Topic JavaDoc topic,
181       String JavaDoc subscriptionName,
182       String JavaDoc messageSelector,
183       ServerSessionPool JavaDoc sessionPool,
184       int maxMessages)
185       throws JMSException JavaDoc
186    {
187       throw new IllegalStateException JavaDoc(ISE);
188    }
189    
190    //--- All the Connection methods
191

192    public String JavaDoc getClientID() throws JMSException JavaDoc
193    {
194       checkClosed();
195       return clientID;
196    }
197     
198    public void setClientID(String JavaDoc cID) throws JMSException JavaDoc
199    {
200       if (mcf.isStrict())
201          throw new IllegalStateException JavaDoc(ISE);
202       
203       checkClosed();
204       if (clientID != null)
205          throw new IllegalStateException JavaDoc("Cannot change client id");
206       clientID = cID;
207    }
208     
209    public ConnectionMetaData JavaDoc getMetaData() throws JMSException JavaDoc
210    {
211       checkClosed();
212       return mcf.getMetaData();
213    }
214     
215    public ExceptionListener JavaDoc getExceptionListener() throws JMSException JavaDoc
216    {
217       throw new IllegalStateException JavaDoc(ISE);
218    }
219     
220    public void setExceptionListener(ExceptionListener JavaDoc listener)
221       throws JMSException JavaDoc
222    {
223       throw new IllegalStateException JavaDoc(ISE);
224    }
225     
226    public void start() throws JMSException JavaDoc
227    {
228       checkClosed();
229       if (trace)
230          log.trace("start() " + this);
231       synchronized (sessions)
232       {
233          if (started)
234             return;
235          started = true;
236          for (Iterator JavaDoc i = sessions.iterator(); i.hasNext();)
237          {
238             JmsSession session = (JmsSession) i.next();
239             session.start();
240          }
241       }
242    }
243     
244    public void stop() throws JMSException JavaDoc
245    {
246       if (mcf.isStrict())
247          throw new IllegalStateException JavaDoc(ISE);
248       checkClosed();
249       if (trace)
250          log.trace("stop() " + this);
251       synchronized (sessions)
252       {
253          if (started == false)
254             return;
255          started = true;
256          for (Iterator JavaDoc i = sessions.iterator(); i.hasNext();)
257          {
258             JmsSession session = (JmsSession) i.next();
259             session.stop();
260          }
261       }
262    }
263
264    public void close() throws JMSException JavaDoc
265    {
266       if (closed)
267          return;
268       closed = true;
269
270       if (trace)
271          log.trace("close() " + this);
272       
273       synchronized (sessions)
274       {
275          for (Iterator JavaDoc i = sessions.iterator(); i.hasNext();)
276          {
277             JmsSession session = (JmsSession) i.next();
278             try
279             {
280                session.closeSession();
281             }
282             catch (Throwable JavaDoc t)
283             {
284                log.trace("Error closing session", t);
285             }
286             i.remove();
287          }
288       }
289       
290       synchronized (tempQueues)
291       {
292          for (Iterator JavaDoc i = tempQueues.iterator(); i.hasNext();)
293          {
294             TemporaryQueue JavaDoc temp = (TemporaryQueue JavaDoc) i.next();
295             try
296             {
297                if (trace)
298                   log.trace("Closing temporary queue " + temp + " for " + this);
299                temp.delete();
300             }
301             catch (Throwable JavaDoc t)
302             {
303                log.trace("Error deleting temporary queue", t);
304             }
305             i.remove();
306          }
307       }
308       
309       synchronized (tempTopics)
310       {
311          for (Iterator JavaDoc i = tempTopics.iterator(); i.hasNext();)
312          {
313             TemporaryTopic JavaDoc temp = (TemporaryTopic JavaDoc) i.next();
314             try
315             {
316                if (trace)
317                   log.trace("Closing temporary topic " + temp + " for " + this);
318                temp.delete();
319             }
320             catch (Throwable JavaDoc t)
321             {
322                log.trace("Error deleting temporary queue", t);
323             }
324             i.remove();
325          }
326       }
327    }
328
329    public void closeSession(JmsSession session) throws JMSException JavaDoc
330    {
331       synchronized (sessions)
332       {
333          sessions.remove(session);
334       }
335    }
336    
337    public void addTemporaryQueue(TemporaryQueue JavaDoc temp)
338    {
339       synchronized(tempQueues)
340       {
341          tempQueues.add(temp);
342       }
343    }
344    
345    public void addTemporaryTopic(TemporaryTopic JavaDoc temp)
346    {
347       synchronized(tempTopics)
348       {
349          tempTopics.add(temp);
350       }
351    }
352    
353    // -- JMS 1.1
354

355    public ConnectionConsumer JavaDoc createConnectionConsumer(Destination JavaDoc destination, ServerSessionPool JavaDoc pool, int maxMessages) throws JMSException JavaDoc
356    {
357       throw new IllegalStateException JavaDoc(ISE);
358    }
359
360    public ConnectionConsumer JavaDoc createConnectionConsumer(Destination JavaDoc destination, String JavaDoc name, ServerSessionPool JavaDoc pool, int maxMessages) throws JMSException JavaDoc
361    {
362       throw new IllegalStateException JavaDoc(ISE);
363    }
364
365    public Session JavaDoc createSession(boolean transacted, int acknowledgeMode)
366       throws JMSException JavaDoc
367    {
368       checkClosed();
369       return allocateConnection(transacted, acknowledgeMode, type);
370    }
371
372    protected JmsSession allocateConnection(boolean transacted, int acknowledgeMode, int sessionType) throws JMSException JavaDoc
373    {
374       try
375       {
376          synchronized (sessions)
377          {
378             if (mcf.isStrict() && sessions.isEmpty() == false)
379                throw new IllegalStateException JavaDoc("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
380             if (transacted)
381                acknowledgeMode = Session.SESSION_TRANSACTED;
382             JmsConnectionRequestInfo info = new JmsConnectionRequestInfo(transacted, acknowledgeMode, sessionType);
383             info.setUserName(userName);
384             info.setPassword(password);
385             info.setClientID(clientID);
386
387             if (trace)
388                log.trace("Allocating session for " + this + " with request info=" + info);
389             JmsSession session = (JmsSession) cm.allocateConnection(mcf, info);
390             if (trace)
391                log.trace("Allocated " + this + " session=" + session);
392             session.setJmsSessionFactory(this);
393             if (started)
394                session.start();
395             sessions.add(session);
396             return session;
397          }
398       }
399       catch (ResourceException JavaDoc e)
400       {
401          log.error("could not create session", e);
402          
403          JMSException JavaDoc je = new JMSException JavaDoc
404             ("Could not create a session: " + e);
405          je.setLinkedException(e);
406          throw je;
407       }
408    }
409
410    protected void checkClosed() throws IllegalStateException JavaDoc
411    {
412       if (closed)
413          throw new IllegalStateException JavaDoc("The connection is closed");
414    }
415 }
416
Popular Tags