KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > axis > transport > jms > TopicConnector


1 /*
2  * The Apache Software License, Version 1.1
3  *
4  *
5  * Copyright (c) 2001, 2002 The Apache Software Foundation. All rights
6  * reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Apache Software Foundation (http://www.apache.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Axis" and "Apache Software Foundation" must
28  * not be used to endorse or promote products derived from this
29  * software without prior written permission. For written
30  * permission, please contact apache@apache.org.
31  *
32  * 5. Products derived from this software may not be called "Apache",
33  * nor may "Apache" appear in their name, without prior written
34  * permission of the Apache Software Foundation.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
40  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
41  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
42  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
43  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
44  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
45  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
46  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
47  * SUCH DAMAGE.
48  * ====================================================================
49  *
50  * This software consists of voluntary contributions made by many
51  * individuals on behalf of the Apache Software Foundation. For more
52  * information on the Apache Software Foundation, please see
53  * <http://www.apache.org/>.
54  */

55
56 package org.jboss.axis.transport.jms;
57
58 import org.jboss.axis.components.jms.JMSVendorAdapter;
59
60 import javax.jms.Connection JavaDoc;
61 import javax.jms.ConnectionFactory JavaDoc;
62 import javax.jms.Destination JavaDoc;
63 import javax.jms.JMSException JavaDoc;
64 import javax.jms.Message JavaDoc;
65 import javax.jms.MessageConsumer JavaDoc;
66 import javax.jms.MessageListener JavaDoc;
67 import javax.jms.Session JavaDoc;
68 import javax.jms.TemporaryTopic JavaDoc;
69 import javax.jms.Topic JavaDoc;
70 import javax.jms.TopicConnection JavaDoc;
71 import javax.jms.TopicConnectionFactory JavaDoc;
72 import javax.jms.TopicPublisher JavaDoc;
73 import javax.jms.TopicSession JavaDoc;
74 import javax.jms.TopicSubscriber JavaDoc;
75 import java.util.HashMap JavaDoc;
76
77 /**
78  * TopicConnector is a concrete JMSConnector subclass that specifically handles
79  * connections to topics (pub-sub domain).
80  *
81  * @author Jaime Meritt (jmeritt@sonicsoftware.com)
82  * @author Richard Chung (rchung@sonicsoftware.com)
83  * @author Dave Chappell (chappell@sonicsoftware.com)
84  */

85 public class TopicConnector extends JMSConnector
86 {
87    public TopicConnector(TopicConnectionFactory JavaDoc factory,
88                          int numRetries,
89                          int numSessions,
90                          long connectRetryInterval,
91                          long interactRetryInterval,
92                          long timeoutTime,
93                          boolean allowReceive,
94                          String JavaDoc clientID,
95                          String JavaDoc username,
96                          String JavaDoc password,
97                          JMSVendorAdapter adapter)
98            throws JMSException JavaDoc
99    {
100       super(factory, numRetries, numSessions, connectRetryInterval,
101               interactRetryInterval, timeoutTime, allowReceive,
102               clientID, username, password, adapter);
103    }
104
105    protected Connection JavaDoc internalConnect(ConnectionFactory JavaDoc connectionFactory,
106                                         String JavaDoc username, String JavaDoc password)
107            throws JMSException JavaDoc
108    {
109       TopicConnectionFactory JavaDoc tcf = (TopicConnectionFactory JavaDoc)connectionFactory;
110       if (username == null)
111          return tcf.createTopicConnection();
112
113       return tcf.createTopicConnection(username, password);
114    }
115
116
117    protected SyncConnection createSyncConnection(ConnectionFactory JavaDoc factory,
118                                                  Connection JavaDoc connection,
119                                                  int numSessions,
120                                                  String JavaDoc threadName,
121                                                  String JavaDoc clientID,
122                                                  String JavaDoc username,
123                                                  String JavaDoc password)
124            throws JMSException JavaDoc
125    {
126       return new TopicSyncConnection((TopicConnectionFactory JavaDoc)factory,
127               (TopicConnection JavaDoc)connection, numSessions,
128               threadName, clientID, username, password);
129    }
130
131    protected AsyncConnection createAsyncConnection(ConnectionFactory JavaDoc factory,
132                                                    Connection JavaDoc connection,
133                                                    String JavaDoc threadName,
134                                                    String JavaDoc clientID,
135                                                    String JavaDoc username,
136                                                    String JavaDoc password)
137            throws JMSException JavaDoc
138    {
139       return new TopicAsyncConnection((TopicConnectionFactory JavaDoc)factory,
140               (TopicConnection JavaDoc)connection, threadName,
141               clientID, username, password);
142    }
143
144    public JMSEndpoint createEndpoint(String JavaDoc destination)
145    {
146       return new TopicEndpoint(destination);
147    }
148
149    /**
150     * Create an endpoint for a queue destination.
151     *
152     * @param destination
153     * @return
154     * @throws JMSException
155     */

156    public JMSEndpoint createEndpoint(Destination JavaDoc destination)
157            throws JMSException JavaDoc
158    {
159       if (!(destination instanceof Topic JavaDoc))
160          throw new IllegalArgumentException JavaDoc("The input be a topic for this connector");
161       return new TopicDestinationEndpoint((Topic JavaDoc)destination);
162    }
163
164    private TopicSession JavaDoc createTopicSession(TopicConnection JavaDoc connection, int ackMode)
165            throws JMSException JavaDoc
166    {
167       return connection.createTopicSession(false,
168               ackMode);
169    }
170
171    private Topic JavaDoc createTopic(TopicSession JavaDoc session, String JavaDoc subject)
172            throws Exception JavaDoc
173    {
174       return m_adapter.getTopic(session, subject);
175    }
176
177    private TopicSubscriber JavaDoc createSubscriber(TopicSession JavaDoc session,
178                                             TopicSubscription subscription)
179            throws Exception JavaDoc
180    {
181       if (subscription.isDurable())
182          return createDurableSubscriber(session,
183                  (Topic JavaDoc)subscription.m_endpoint.getDestination(session),
184                  subscription.m_subscriptionName,
185                  subscription.m_messageSelector,
186                  subscription.m_noLocal);
187       else
188          return createSubscriber(session,
189                  (Topic JavaDoc)subscription.m_endpoint.getDestination(session),
190                  subscription.m_messageSelector,
191                  subscription.m_noLocal);
192    }
193
194    private TopicSubscriber JavaDoc createDurableSubscriber(TopicSession JavaDoc session,
195                                                    Topic JavaDoc topic,
196                                                    String JavaDoc subscriptionName,
197                                                    String JavaDoc messageSelector,
198                                                    boolean noLocal)
199            throws JMSException JavaDoc
200    {
201       return session.createDurableSubscriber(topic, subscriptionName,
202               messageSelector, noLocal);
203    }
204
205    private TopicSubscriber JavaDoc createSubscriber(TopicSession JavaDoc session,
206                                             Topic JavaDoc topic,
207                                             String JavaDoc messageSelector,
208                                             boolean noLocal)
209            throws JMSException JavaDoc
210    {
211       return session.createSubscriber(topic, messageSelector, noLocal);
212    }
213
214
215    private final class TopicAsyncConnection extends AsyncConnection
216    {
217
218       TopicAsyncConnection(TopicConnectionFactory JavaDoc connectionFactory,
219                            TopicConnection JavaDoc connection,
220                            String JavaDoc threadName,
221                            String JavaDoc clientID,
222                            String JavaDoc username,
223                            String JavaDoc password)
224
225               throws JMSException JavaDoc
226       {
227          super(connectionFactory, connection, threadName,
228                  clientID, username, password);
229       }
230
231       protected ListenerSession createListenerSession(javax.jms.Connection JavaDoc connection,
232                                                       Subscription subscription)
233               throws Exception JavaDoc
234       {
235          TopicSession JavaDoc session = createTopicSession((TopicConnection JavaDoc)connection,
236                  subscription.m_ackMode);
237          TopicSubscriber JavaDoc subscriber = createSubscriber(session,
238                  (TopicSubscription)subscription);
239          return new TopicListenerSession(session, subscriber,
240                  (TopicSubscription)subscription);
241       }
242
243       private final class TopicListenerSession extends ListenerSession
244       {
245
246          TopicListenerSession(TopicSession JavaDoc session,
247                               TopicSubscriber JavaDoc subscriber,
248                               TopicSubscription subscription)
249                  throws Exception JavaDoc
250          {
251             super(session, subscriber, subscription);
252          }
253
254          void cleanup()
255          {
256             try
257             {
258                m_consumer.close();
259             }
260             catch (Exception JavaDoc ignore)
261             {
262             }
263             try
264             {
265                TopicSubscription sub = (TopicSubscription)m_subscription;
266                if (sub.isDurable() && sub.m_unsubscribe)
267                {
268                   ((TopicSession JavaDoc)m_session).unsubscribe(sub.m_subscriptionName);
269                }
270             }
271             catch (Exception JavaDoc ignore)
272             {
273             }
274             try
275             {
276                m_session.close();
277             }
278             catch (Exception JavaDoc ignore)
279             {
280             }
281
282          }
283       }
284    }
285
286    private final class TopicSyncConnection extends SyncConnection
287    {
288       TopicSyncConnection(TopicConnectionFactory JavaDoc connectionFactory,
289                           TopicConnection JavaDoc connection,
290                           int numSessions,
291                           String JavaDoc threadName,
292                           String JavaDoc clientID,
293                           String JavaDoc username,
294                           String JavaDoc password)
295
296               throws JMSException JavaDoc
297       {
298          super(connectionFactory, connection, numSessions, threadName,
299                  clientID, username, password);
300       }
301
302       protected SendSession createSendSession(javax.jms.Connection JavaDoc connection)
303               throws JMSException JavaDoc
304       {
305          TopicSession JavaDoc session = createTopicSession((TopicConnection JavaDoc)connection,
306                  JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
307          TopicPublisher JavaDoc publisher = session.createPublisher(null);
308          return new TopicSendSession(session, publisher);
309       }
310
311       private final class TopicSendSession extends SendSession
312       {
313          TopicSendSession(TopicSession JavaDoc session,
314                           TopicPublisher JavaDoc publisher)
315                  throws JMSException JavaDoc
316          {
317             super(session, publisher);
318          }
319
320
321          protected MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination)
322                  throws JMSException JavaDoc
323          {
324             return createSubscriber((TopicSession JavaDoc)m_session, (Topic JavaDoc)destination,
325                     null, JMSConstants.DEFAULT_NO_LOCAL);
326          }
327
328          protected void deleteTemporaryDestination(Destination JavaDoc destination)
329                  throws JMSException JavaDoc
330          {
331             ((TemporaryTopic JavaDoc)destination).delete();
332          }
333
334
335          protected Destination JavaDoc createTemporaryDestination()
336                  throws JMSException JavaDoc
337          {
338             return ((TopicSession JavaDoc)m_session).createTemporaryTopic();
339          }
340
341          protected void send(Destination JavaDoc destination, Message JavaDoc message,
342                              int deliveryMode, int priority, long timeToLive)
343                  throws JMSException JavaDoc
344          {
345             ((TopicPublisher JavaDoc)m_producer).publish((Topic JavaDoc)destination, message,
346                     deliveryMode, priority, timeToLive);
347          }
348
349       }
350    }
351
352
353    private class TopicEndpoint
354            extends JMSEndpoint
355    {
356       String JavaDoc m_topicName;
357
358       TopicEndpoint(String JavaDoc topicName)
359       {
360          super(TopicConnector.this);
361          m_topicName = topicName;
362       }
363
364       Destination JavaDoc getDestination(Session JavaDoc session)
365               throws Exception JavaDoc
366       {
367          return createTopic((TopicSession JavaDoc)session, m_topicName);
368       }
369
370       protected Subscription createSubscription(MessageListener JavaDoc listener,
371                                                 HashMap JavaDoc properties)
372       {
373          return new TopicSubscription(listener, this, properties);
374       }
375
376       public String JavaDoc toString()
377       {
378          StringBuffer JavaDoc buffer = new StringBuffer JavaDoc("TopicEndpoint:");
379          buffer.append(m_topicName);
380          return buffer.toString();
381       }
382
383       public boolean equals(Object JavaDoc object)
384       {
385          if (!super.equals(object))
386             return false;
387
388          if (!(object instanceof TopicEndpoint))
389             return false;
390
391          return m_topicName.equals(((TopicEndpoint)object).m_topicName);
392       }
393    }
394
395    private final class TopicSubscription extends Subscription
396    {
397       String JavaDoc m_subscriptionName;
398       boolean m_unsubscribe;
399       boolean m_noLocal;
400
401       TopicSubscription(MessageListener JavaDoc listener,
402                         JMSEndpoint endpoint,
403                         HashMap JavaDoc properties)
404       {
405          super(listener, endpoint, properties);
406          m_subscriptionName = MapUtils.removeStringProperty(properties,
407                  JMSConstants.SUBSCRIPTION_NAME,
408                  null);
409          m_unsubscribe = MapUtils.removeBooleanProperty(properties,
410                  JMSConstants.UNSUBSCRIBE,
411                  JMSConstants.DEFAULT_UNSUBSCRIBE);
412          m_noLocal = MapUtils.removeBooleanProperty(properties,
413                  JMSConstants.NO_LOCAL,
414                  JMSConstants.DEFAULT_NO_LOCAL);
415       }
416
417       boolean isDurable()
418       {
419          return m_subscriptionName != null;
420       }
421
422       public boolean equals(Object JavaDoc obj)
423       {
424          if (!super.equals(obj))
425             return false;
426          if (!(obj instanceof TopicSubscription))
427             return false;
428
429          TopicSubscription other = (TopicSubscription)obj;
430          if (other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal)
431             return false;
432
433          if (isDurable())
434          {
435             return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName);
436          }
437          else if (other.isDurable())
438             return false;
439          else
440             return true;
441       }
442
443       public String JavaDoc toString()
444       {
445          StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(super.toString());
446          buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe);
447          if (isDurable())
448          {
449             buffer.append(":");
450             buffer.append(m_subscriptionName);
451          }
452          return buffer.toString();
453       }
454
455    }
456
457    private final class TopicDestinationEndpoint
458            extends TopicEndpoint
459    {
460       Topic JavaDoc m_topic;
461
462       TopicDestinationEndpoint(Topic JavaDoc topic)
463               throws JMSException JavaDoc
464       {
465          super(topic.getTopicName());
466          m_topic = topic;
467       }
468
469       Destination JavaDoc getDestination(Session JavaDoc session)
470       {
471          return m_topic;
472       }
473
474    }
475
476
477 }
Popular Tags