KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > axis > transport > jms > TopicConnector


1 /*
2  * Copyright 2001, 2002,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.axis.transport.jms;
18
19 import org.apache.axis.components.jms.JMSVendorAdapter;
20
21 import javax.jms.Connection JavaDoc;
22 import javax.jms.ConnectionFactory JavaDoc;
23 import javax.jms.Destination JavaDoc;
24 import javax.jms.JMSException JavaDoc;
25 import javax.jms.Message JavaDoc;
26 import javax.jms.MessageConsumer JavaDoc;
27 import javax.jms.MessageListener JavaDoc;
28 import javax.jms.Session JavaDoc;
29 import javax.jms.TemporaryTopic JavaDoc;
30 import javax.jms.Topic JavaDoc;
31 import javax.jms.TopicConnection JavaDoc;
32 import javax.jms.TopicConnectionFactory JavaDoc;
33 import javax.jms.TopicPublisher JavaDoc;
34 import javax.jms.TopicSession JavaDoc;
35 import javax.jms.TopicSubscriber JavaDoc;
36 import java.util.HashMap JavaDoc;
37
38 /**
39  * TopicConnector is a concrete JMSConnector subclass that specifically handles
40  * connections to topics (pub-sub domain).
41  *
42  * @author Jaime Meritt (jmeritt@sonicsoftware.com)
43  * @author Richard Chung (rchung@sonicsoftware.com)
44  * @author Dave Chappell (chappell@sonicsoftware.com)
45  */

46 public class TopicConnector extends JMSConnector
47 {
48     public TopicConnector(TopicConnectionFactory JavaDoc factory,
49                           int numRetries,
50                           int numSessions,
51                           long connectRetryInterval,
52                           long interactRetryInterval,
53                           long timeoutTime,
54                           boolean allowReceive,
55                           String JavaDoc clientID,
56                           String JavaDoc username,
57                           String JavaDoc password,
58                           JMSVendorAdapter adapter,
59                           JMSURLHelper jmsurl)
60         throws JMSException JavaDoc
61     {
62         super(factory, numRetries, numSessions, connectRetryInterval,
63               interactRetryInterval, timeoutTime, allowReceive,
64               clientID, username, password, adapter, jmsurl);
65     }
66
67     protected Connection JavaDoc internalConnect(ConnectionFactory JavaDoc connectionFactory,
68                                          String JavaDoc username, String JavaDoc password)
69         throws JMSException JavaDoc
70     {
71         TopicConnectionFactory JavaDoc tcf = (TopicConnectionFactory JavaDoc)connectionFactory;
72         if(username == null)
73             return tcf.createTopicConnection();
74
75         return tcf.createTopicConnection(username, password);
76     }
77
78
79     protected SyncConnection createSyncConnection(ConnectionFactory JavaDoc factory,
80                                                   Connection JavaDoc connection,
81                                                   int numSessions,
82                                                   String JavaDoc threadName,
83                                                   String JavaDoc clientID,
84                                                   String JavaDoc username,
85                                                   String JavaDoc password)
86         throws JMSException JavaDoc
87     {
88         return new TopicSyncConnection((TopicConnectionFactory JavaDoc)factory,
89                                        (TopicConnection JavaDoc)connection, numSessions,
90                                        threadName, clientID, username, password);
91     }
92
93     protected AsyncConnection createAsyncConnection(ConnectionFactory JavaDoc factory,
94                                                     Connection JavaDoc connection,
95                                                     String JavaDoc threadName,
96                                                     String JavaDoc clientID,
97                                                     String JavaDoc username,
98                                                     String JavaDoc password)
99         throws JMSException JavaDoc
100     {
101         return new TopicAsyncConnection((TopicConnectionFactory JavaDoc)factory,
102                                         (TopicConnection JavaDoc)connection, threadName,
103                                         clientID, username, password);
104     }
105
106     public JMSEndpoint createEndpoint(String JavaDoc destination)
107     {
108         return new TopicEndpoint(destination);
109     }
110
111     /**
112      * Create an endpoint for a queue destination.
113      *
114      * @param destination
115      * @return
116      * @throws JMSException
117      */

118     public JMSEndpoint createEndpoint(Destination JavaDoc destination)
119         throws JMSException JavaDoc
120     {
121         if(!(destination instanceof Topic JavaDoc))
122             throw new IllegalArgumentException JavaDoc("The input be a topic for this connector");
123         return new TopicDestinationEndpoint((Topic JavaDoc)destination);
124     }
125
126     private TopicSession JavaDoc createTopicSession(TopicConnection JavaDoc connection, int ackMode)
127         throws JMSException JavaDoc
128     {
129         return connection.createTopicSession(false,
130                                              ackMode);
131     }
132
133     private Topic JavaDoc createTopic(TopicSession JavaDoc session, String JavaDoc subject)
134         throws Exception JavaDoc
135     {
136         return m_adapter.getTopic(session, subject);
137     }
138
139     private TopicSubscriber JavaDoc createSubscriber(TopicSession JavaDoc session,
140                                              TopicSubscription subscription)
141         throws Exception JavaDoc
142     {
143         if(subscription.isDurable())
144             return createDurableSubscriber(session,
145                         (Topic JavaDoc)subscription.m_endpoint.getDestination(session),
146                         subscription.m_subscriptionName,
147                         subscription.m_messageSelector,
148                         subscription.m_noLocal);
149         else
150             return createSubscriber(session,
151                         (Topic JavaDoc)subscription.m_endpoint.getDestination(session),
152                         subscription.m_messageSelector,
153                         subscription.m_noLocal);
154     }
155
156     private TopicSubscriber JavaDoc createDurableSubscriber(TopicSession JavaDoc session,
157                                                     Topic JavaDoc topic,
158                                                     String JavaDoc subscriptionName,
159                                                     String JavaDoc messageSelector,
160                                                     boolean noLocal)
161         throws JMSException JavaDoc
162     {
163         return session.createDurableSubscriber(topic, subscriptionName,
164                                                messageSelector, noLocal);
165     }
166
167     private TopicSubscriber JavaDoc createSubscriber(TopicSession JavaDoc session,
168                                              Topic JavaDoc topic,
169                                              String JavaDoc messageSelector,
170                                              boolean noLocal)
171         throws JMSException JavaDoc
172     {
173         return session.createSubscriber(topic, messageSelector, noLocal);
174     }
175
176
177
178
179     private final class TopicAsyncConnection extends AsyncConnection
180     {
181
182         TopicAsyncConnection(TopicConnectionFactory JavaDoc connectionFactory,
183                              TopicConnection JavaDoc connection,
184                              String JavaDoc threadName,
185                              String JavaDoc clientID,
186                              String JavaDoc username,
187                              String JavaDoc password)
188
189             throws JMSException JavaDoc
190         {
191             super(connectionFactory, connection, threadName,
192                   clientID, username, password);
193         }
194
195         protected ListenerSession createListenerSession(javax.jms.Connection JavaDoc connection,
196                                                         Subscription subscription)
197             throws Exception JavaDoc
198         {
199             TopicSession JavaDoc session = createTopicSession((TopicConnection JavaDoc)connection,
200                                                       subscription.m_ackMode);
201             TopicSubscriber JavaDoc subscriber = createSubscriber(session,
202                                                 (TopicSubscription)subscription);
203             return new TopicListenerSession(session, subscriber,
204                                                 (TopicSubscription)subscription);
205         }
206
207         private final class TopicListenerSession extends ListenerSession
208         {
209
210             TopicListenerSession(TopicSession JavaDoc session,
211                                  TopicSubscriber JavaDoc subscriber,
212                                  TopicSubscription subscription)
213                 throws Exception JavaDoc
214             {
215                 super(session, subscriber, subscription);
216             }
217
218             void cleanup()
219             {
220                 try{m_consumer.close();}catch(Exception JavaDoc ignore){}
221                 try
222                 {
223                     TopicSubscription sub = (TopicSubscription)m_subscription;
224                     if(sub.isDurable() && sub.m_unsubscribe)
225                     {
226                         ((TopicSession JavaDoc)m_session).unsubscribe(sub.m_subscriptionName);
227                     }
228                 }
229                 catch(Exception JavaDoc ignore){}
230                 try{m_session.close();}catch(Exception JavaDoc ignore){}
231
232             }
233         }
234     }
235
236     private final class TopicSyncConnection extends SyncConnection
237     {
238         TopicSyncConnection(TopicConnectionFactory JavaDoc connectionFactory,
239                             TopicConnection JavaDoc connection,
240                             int numSessions,
241                             String JavaDoc threadName,
242                             String JavaDoc clientID,
243                             String JavaDoc username,
244                             String JavaDoc password)
245
246             throws JMSException JavaDoc
247         {
248             super(connectionFactory, connection, numSessions, threadName,
249                   clientID, username, password);
250         }
251
252         protected SendSession createSendSession(javax.jms.Connection JavaDoc connection)
253             throws JMSException JavaDoc
254         {
255             TopicSession JavaDoc session = createTopicSession((TopicConnection JavaDoc)connection,
256                                             JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
257             TopicPublisher JavaDoc publisher = session.createPublisher(null);
258             return new TopicSendSession(session, publisher);
259         }
260
261         private final class TopicSendSession extends SendSession
262         {
263             TopicSendSession(TopicSession JavaDoc session,
264                              TopicPublisher JavaDoc publisher)
265                 throws JMSException JavaDoc
266             {
267                 super(session, publisher);
268             }
269
270
271             protected MessageConsumer JavaDoc createConsumer(Destination JavaDoc destination)
272                 throws JMSException JavaDoc
273             {
274                 return createSubscriber((TopicSession JavaDoc)m_session, (Topic JavaDoc)destination,
275                                         null, JMSConstants.DEFAULT_NO_LOCAL);
276             }
277
278             protected void deleteTemporaryDestination(Destination JavaDoc destination)
279                 throws JMSException JavaDoc
280             {
281                 ((TemporaryTopic JavaDoc)destination).delete();
282             }
283
284
285             protected Destination JavaDoc createTemporaryDestination()
286                 throws JMSException JavaDoc
287             {
288                 return ((TopicSession JavaDoc)m_session).createTemporaryTopic();
289             }
290
291             protected void send(Destination JavaDoc destination, Message JavaDoc message,
292                                 int deliveryMode, int priority, long timeToLive)
293                 throws JMSException JavaDoc
294             {
295                 ((TopicPublisher JavaDoc)m_producer).publish((Topic JavaDoc)destination, message,
296                                                 deliveryMode, priority, timeToLive);
297             }
298
299         }
300     }
301
302
303
304     private class TopicEndpoint
305         extends JMSEndpoint
306     {
307         String JavaDoc m_topicName;
308
309         TopicEndpoint(String JavaDoc topicName)
310         {
311             super(TopicConnector.this);
312             m_topicName = topicName;
313         }
314
315         Destination JavaDoc getDestination(Session JavaDoc session)
316             throws Exception JavaDoc
317         {
318             return createTopic((TopicSession JavaDoc)session, m_topicName);
319         }
320
321         protected Subscription createSubscription(MessageListener JavaDoc listener,
322                                                   HashMap JavaDoc properties)
323         {
324             return new TopicSubscription(listener, this, properties);
325         }
326
327         public String JavaDoc toString()
328         {
329             StringBuffer JavaDoc buffer = new StringBuffer JavaDoc("TopicEndpoint:");
330             buffer.append(m_topicName);
331             return buffer.toString();
332         }
333
334         public boolean equals(Object JavaDoc object)
335         {
336             if(!super.equals(object))
337                 return false;
338
339             if(!(object instanceof TopicEndpoint))
340                 return false;
341
342             return m_topicName.equals(((TopicEndpoint)object).m_topicName);
343         }
344     }
345
346     private final class TopicSubscription extends Subscription
347     {
348         String JavaDoc m_subscriptionName;
349         boolean m_unsubscribe;
350         boolean m_noLocal;
351
352         TopicSubscription(MessageListener JavaDoc listener,
353                           JMSEndpoint endpoint,
354                           HashMap JavaDoc properties)
355         {
356             super(listener, endpoint, properties);
357             m_subscriptionName = MapUtils.removeStringProperty(properties,
358                                                 JMSConstants.SUBSCRIPTION_NAME,
359                                                 null);
360             m_unsubscribe = MapUtils.removeBooleanProperty(properties,
361                                                 JMSConstants.UNSUBSCRIBE,
362                                                 JMSConstants.DEFAULT_UNSUBSCRIBE);
363             m_noLocal = MapUtils.removeBooleanProperty(properties,
364                                                 JMSConstants.NO_LOCAL,
365                                                 JMSConstants.DEFAULT_NO_LOCAL);
366         }
367
368         boolean isDurable()
369         {
370             return m_subscriptionName != null;
371         }
372
373         public boolean equals(Object JavaDoc obj)
374         {
375             if(!super.equals(obj))
376                 return false;
377             if(!(obj instanceof TopicSubscription))
378                 return false;
379
380             TopicSubscription other = (TopicSubscription)obj;
381             if(other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal)
382                 return false;
383
384             if(isDurable())
385             {
386                 return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName);
387             }
388             else if(other.isDurable())
389                 return false;
390             else
391                 return true;
392         }
393
394         public String JavaDoc toString()
395         {
396             StringBuffer JavaDoc buffer = new StringBuffer JavaDoc(super.toString());
397             buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe);
398             if(isDurable())
399             {
400                 buffer.append(":");
401                 buffer.append(m_subscriptionName);
402             }
403             return buffer.toString();
404         }
405
406     }
407
408     private final class TopicDestinationEndpoint
409         extends TopicEndpoint
410     {
411         Topic JavaDoc m_topic;
412
413         TopicDestinationEndpoint(Topic JavaDoc topic)
414             throws JMSException JavaDoc
415         {
416             super(topic.getTopicName());
417             m_topic = topic;
418         }
419
420         Destination JavaDoc getDestination(Session JavaDoc session)
421         {
422             return m_topic;
423         }
424
425     }
426
427
428 }
Popular Tags