KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > jms > JmsTopicConnector


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network.jms;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.Destination JavaDoc;
22 import javax.jms.JMSException JavaDoc;
23 import javax.jms.Session JavaDoc;
24 import javax.jms.Topic JavaDoc;
25 import javax.jms.TopicConnection JavaDoc;
26 import javax.jms.TopicConnectionFactory JavaDoc;
27 import javax.jms.TopicSession JavaDoc;
28 import javax.naming.NamingException JavaDoc;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 /**
34  * A Bridge to other JMS Topic providers
35  *
36  * @org.apache.xbean.XBean
37  *
38  * @version $Revision: 1.1.1.1 $
39  */

40 public class JmsTopicConnector extends JmsConnector{
41     private static final Log log=LogFactory.getLog(JmsTopicConnector.class);
42     private String JavaDoc outboundTopicConnectionFactoryName;
43     private String JavaDoc localConnectionFactoryName;
44     private TopicConnectionFactory JavaDoc outboundTopicConnectionFactory;
45     private TopicConnectionFactory JavaDoc localTopicConnectionFactory;
46     private TopicConnection JavaDoc outboundTopicConnection;
47     private TopicConnection JavaDoc localTopicConnection;
48     private InboundTopicBridge[] inboundTopicBridges;
49     private OutboundTopicBridge[] outboundTopicBridges;
50     
51     public boolean init(){
52         boolean result=super.init();
53         if(result){
54             try{
55                 initializeForeignTopicConnection();
56                 initializeLocalTopicConnection();
57                 initializeInboundJmsMessageConvertor();
58                 initializeOutboundJmsMessageConvertor();
59                 initializeInboundTopicBridges();
60                 initializeOutboundTopicBridges();
61             }catch(Exception JavaDoc e){
62                 log.error("Failed to initialize the JMSConnector",e);
63             }
64         }
65         return result;
66     }
67     
68
69     
70     /**
71      * @return Returns the inboundTopicBridges.
72      */

73     public InboundTopicBridge[] getInboundTopicBridges(){
74         return inboundTopicBridges;
75     }
76
77     /**
78      * @param inboundTopicBridges
79      * The inboundTopicBridges to set.
80      */

81     public void setInboundTopicBridges(InboundTopicBridge[] inboundTopicBridges){
82         this.inboundTopicBridges=inboundTopicBridges;
83     }
84
85     /**
86      * @return Returns the outboundTopicBridges.
87      */

88     public OutboundTopicBridge[] getOutboundTopicBridges(){
89         return outboundTopicBridges;
90     }
91
92     /**
93      * @param outboundTopicBridges
94      * The outboundTopicBridges to set.
95      */

96     public void setOutboundTopicBridges(OutboundTopicBridge[] outboundTopicBridges){
97         this.outboundTopicBridges=outboundTopicBridges;
98     }
99
100     /**
101      * @return Returns the localTopicConnectionFactory.
102      */

103     public TopicConnectionFactory JavaDoc getLocalTopicConnectionFactory(){
104         return localTopicConnectionFactory;
105     }
106
107     /**
108      * @param localTopicConnectionFactory
109      * The localTopicConnectionFactory to set.
110      */

111     public void setLocalTopicConnectionFactory(TopicConnectionFactory JavaDoc localConnectionFactory){
112         this.localTopicConnectionFactory=localConnectionFactory;
113     }
114
115     /**
116      * @return Returns the outboundTopicConnectionFactory.
117      */

118     public TopicConnectionFactory JavaDoc getOutboundTopicConnectionFactory(){
119         return outboundTopicConnectionFactory;
120     }
121
122     /**
123      * @return Returns the outboundTopicConnectionFactoryName.
124      */

125     public String JavaDoc getOutboundTopicConnectionFactoryName(){
126         return outboundTopicConnectionFactoryName;
127     }
128
129     /**
130      * @param outboundTopicConnectionFactoryName
131      * The outboundTopicConnectionFactoryName to set.
132      */

133     public void setOutboundTopicConnectionFactoryName(String JavaDoc foreignTopicConnectionFactoryName){
134         this.outboundTopicConnectionFactoryName=foreignTopicConnectionFactoryName;
135     }
136
137     /**
138      * @return Returns the localConnectionFactoryName.
139      */

140     public String JavaDoc getLocalConnectionFactoryName(){
141         return localConnectionFactoryName;
142     }
143
144     /**
145      * @param localConnectionFactoryName
146      * The localConnectionFactoryName to set.
147      */

148     public void setLocalConnectionFactoryName(String JavaDoc localConnectionFactoryName){
149         this.localConnectionFactoryName=localConnectionFactoryName;
150     }
151
152     /**
153      * @return Returns the localTopicConnection.
154      */

155     public TopicConnection JavaDoc getLocalTopicConnection(){
156         return localTopicConnection;
157     }
158
159     /**
160      * @param localTopicConnection
161      * The localTopicConnection to set.
162      */

163     public void setLocalTopicConnection(TopicConnection JavaDoc localTopicConnection){
164         this.localTopicConnection=localTopicConnection;
165     }
166
167     /**
168      * @return Returns the outboundTopicConnection.
169      */

170     public TopicConnection JavaDoc getOutboundTopicConnection(){
171         return outboundTopicConnection;
172     }
173
174     /**
175      * @param outboundTopicConnection
176      * The outboundTopicConnection to set.
177      */

178     public void setOutboundTopicConnection(TopicConnection JavaDoc foreignTopicConnection){
179         this.outboundTopicConnection=foreignTopicConnection;
180     }
181
182     /**
183      * @param outboundTopicConnectionFactory
184      * The outboundTopicConnectionFactory to set.
185      */

186     public void setOutboundTopicConnectionFactory(TopicConnectionFactory JavaDoc foreignTopicConnectionFactory){
187         this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
188     }
189
190
191     public void restartProducerConnection() throws NamingException JavaDoc, JMSException JavaDoc {
192         outboundTopicConnection = null;
193         initializeForeignTopicConnection();
194     }
195
196     protected void initializeForeignTopicConnection() throws NamingException JavaDoc,JMSException JavaDoc{
197         if(outboundTopicConnection==null){
198             // get the connection factories
199
if(outboundTopicConnectionFactory==null){
200                 // look it up from JNDI
201
if(outboundTopicConnectionFactoryName!=null){
202                     outboundTopicConnectionFactory=(TopicConnectionFactory JavaDoc) jndiOutboundTemplate.lookup(
203                                     outboundTopicConnectionFactoryName,TopicConnectionFactory JavaDoc.class);
204                     if(outboundUsername!=null){
205                         outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
206                                         outboundPassword);
207                     }else{
208                         outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
209                     }
210                 }else {
211                     throw new JMSException JavaDoc("Cannot create localConnection - no information");
212                 }
213             }else {
214                 if(outboundUsername!=null){
215                     outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
216                                     outboundPassword);
217                 }else{
218                     outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
219                 }
220             }
221         }
222         outboundTopicConnection.start();
223     }
224
225     protected void initializeLocalTopicConnection() throws NamingException JavaDoc,JMSException JavaDoc{
226         if(localTopicConnection==null){
227             // get the connection factories
228
if(localTopicConnectionFactory==null){
229                 if(embeddedConnectionFactory==null){
230                     // look it up from JNDI
231
if(localConnectionFactoryName!=null){
232                         localTopicConnectionFactory=(TopicConnectionFactory JavaDoc) jndiLocalTemplate.lookup(
233                                         localConnectionFactoryName,TopicConnectionFactory JavaDoc.class);
234                         if(localUsername!=null){
235                             localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
236                                             localPassword);
237                         }else{
238                             localTopicConnection=localTopicConnectionFactory.createTopicConnection();
239                         }
240                     }else {
241                         throw new JMSException JavaDoc("Cannot create localConnection - no information");
242                     }
243                 }else{
244                     localTopicConnection = embeddedConnectionFactory.createTopicConnection();
245                 }
246             }else {
247                 if(localUsername!=null){
248                     localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
249                                     localPassword);
250                 }else{
251                     localTopicConnection=localTopicConnectionFactory.createTopicConnection();
252                 }
253             }
254         }
255         localTopicConnection.start();
256     }
257     
258     protected void initializeInboundJmsMessageConvertor(){
259         inboundMessageConvertor.setConnection(localTopicConnection);
260     }
261     
262     protected void initializeOutboundJmsMessageConvertor(){
263         outboundMessageConvertor.setConnection(outboundTopicConnection);
264     }
265
266     protected void initializeInboundTopicBridges() throws JMSException JavaDoc{
267         if(inboundTopicBridges!=null){
268             TopicSession JavaDoc outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
269             TopicSession JavaDoc localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
270             for(int i=0;i<inboundTopicBridges.length;i++){
271                 InboundTopicBridge bridge=inboundTopicBridges[i];
272                 String JavaDoc localTopicName=bridge.getLocalTopicName();
273                 Topic JavaDoc activemqTopic=createActiveMQTopic(localSession,localTopicName);
274                 String JavaDoc topicName=bridge.getInboundTopicName();
275                 Topic JavaDoc foreignTopic=createForeignTopic(outboundSession,topicName);
276                 bridge.setConsumerTopic(foreignTopic);
277                 bridge.setProducerTopic(activemqTopic);
278                 bridge.setProducerConnection(localTopicConnection);
279                 bridge.setConsumerConnection(outboundTopicConnection);
280                 if(bridge.getJmsMessageConvertor()==null){
281                     bridge.setJmsMessageConvertor(getInboundMessageConvertor());
282                 }
283                 bridge.setJmsConnector(this);
284                 addInboundBridge(bridge);
285             }
286             outboundSession.close();
287             localSession.close();
288         }
289     }
290
291     protected void initializeOutboundTopicBridges() throws JMSException JavaDoc{
292         if(outboundTopicBridges!=null){
293             TopicSession JavaDoc outboundSession = outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
294             TopicSession JavaDoc localSession = localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
295             for(int i=0;i<outboundTopicBridges.length;i++){
296                 OutboundTopicBridge bridge=outboundTopicBridges[i];
297                 String JavaDoc localTopicName=bridge.getLocalTopicName();
298                 Topic JavaDoc activemqTopic=createActiveMQTopic(localSession,localTopicName);
299                 String JavaDoc topicName=bridge.getOutboundTopicName();
300                 Topic JavaDoc foreignTopic=createForeignTopic(outboundSession,topicName);
301                 bridge.setConsumerTopic(activemqTopic);
302                 bridge.setProducerTopic(foreignTopic);
303                 bridge.setProducerConnection(outboundTopicConnection);
304                 bridge.setConsumerConnection(localTopicConnection);
305                 if(bridge.getJmsMessageConvertor()==null){
306                     bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
307                 }
308                 bridge.setJmsConnector(this);
309                 addOutboundBridge(bridge);
310             }
311             outboundSession.close();
312             localSession.close();
313         }
314     }
315     
316     protected Destination JavaDoc createReplyToBridge(Destination JavaDoc destination, Connection JavaDoc replyToProducerConnection, Connection JavaDoc replyToConsumerConnection){
317         Topic JavaDoc replyToProducerTopic =(Topic JavaDoc)destination;
318         boolean isInbound = replyToProducerConnection.equals(localTopicConnection);
319         
320         if(isInbound){
321             InboundTopicBridge bridge = (InboundTopicBridge) replyToBridges.get(replyToProducerTopic);
322             if (bridge == null){
323                 bridge = new InboundTopicBridge(){
324                     protected Destination JavaDoc processReplyToDestination (Destination JavaDoc destination){
325                         return null;
326                     }
327                 };
328                 try{
329                     TopicSession JavaDoc replyToConsumerSession = ((TopicConnection JavaDoc)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
330                     Topic JavaDoc replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
331                     replyToConsumerSession.close();
332                     bridge.setConsumerTopic(replyToConsumerTopic);
333                     bridge.setProducerTopic(replyToProducerTopic);
334                     bridge.setProducerConnection((TopicConnection JavaDoc)replyToProducerConnection);
335                     bridge.setConsumerConnection((TopicConnection JavaDoc)replyToConsumerConnection);
336                     bridge.setDoHandleReplyTo(false);
337                     if(bridge.getJmsMessageConvertor()==null){
338                         bridge.setJmsMessageConvertor(getInboundMessageConvertor());
339                     }
340                     bridge.setJmsConnector(this);
341                     bridge.start();
342                     log.info("Created replyTo bridge for " + replyToProducerTopic);
343                 }catch(Exception JavaDoc e){
344                     log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
345                     return null;
346                 }
347                 replyToBridges.put(replyToProducerTopic, bridge);
348             }
349             return bridge.getConsumerTopic();
350         }else{
351             OutboundTopicBridge bridge = (OutboundTopicBridge) replyToBridges.get(replyToProducerTopic);
352             if (bridge == null){
353                 bridge = new OutboundTopicBridge(){
354                     protected Destination JavaDoc processReplyToDestination (Destination JavaDoc destination){
355                         return null;
356                     }
357                 };
358                 try{
359                     TopicSession JavaDoc replyToConsumerSession = ((TopicConnection JavaDoc)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
360                     Topic JavaDoc replyToConsumerTopic = replyToConsumerSession.createTemporaryTopic();
361                     replyToConsumerSession.close();
362                     bridge.setConsumerTopic(replyToConsumerTopic);
363                     bridge.setProducerTopic(replyToProducerTopic);
364                     bridge.setProducerConnection((TopicConnection JavaDoc)replyToProducerConnection);
365                     bridge.setConsumerConnection((TopicConnection JavaDoc)replyToConsumerConnection);
366                     bridge.setDoHandleReplyTo(false);
367                     if(bridge.getJmsMessageConvertor()==null){
368                         bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
369                     }
370                     bridge.setJmsConnector(this);
371                     bridge.start();
372                     log.info("Created replyTo bridge for " + replyToProducerTopic);
373                 }catch(Exception JavaDoc e){
374                     log.error("Failed to create replyTo bridge for topic: " + replyToProducerTopic, e);
375                     return null;
376                 }
377                 replyToBridges.put(replyToProducerTopic, bridge);
378             }
379             return bridge.getConsumerTopic();
380         }
381     }
382     
383     protected Topic JavaDoc createActiveMQTopic(TopicSession JavaDoc session,String JavaDoc topicName) throws JMSException JavaDoc{
384         return session.createTopic(topicName);
385     }
386     
387     protected Topic JavaDoc createForeignTopic(TopicSession JavaDoc session,String JavaDoc topicName) throws JMSException JavaDoc{
388         Topic JavaDoc result = null;
389         try{
390             result = session.createTopic(topicName);
391         }catch(JMSException JavaDoc e){
392             //look-up the Topic
393
try{
394                 result = (Topic JavaDoc) jndiOutboundTemplate.lookup(topicName, Topic JavaDoc.class);
395             }catch(NamingException JavaDoc e1){
396                 String JavaDoc errStr = "Failed to look-up Topic for name: " + topicName;
397                 log.error(errStr,e);
398                 JMSException JavaDoc jmsEx = new JMSException JavaDoc(errStr);
399                 jmsEx.setLinkedException(e1);
400                 throw jmsEx;
401             }
402         }
403         return result;
404     }
405
406     
407 }
408
Popular Tags