KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > jms > JmsQueueConnector


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network.jms;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22
23 import javax.jms.Connection JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Queue JavaDoc;
27 import javax.jms.QueueConnection JavaDoc;
28 import javax.jms.QueueConnectionFactory JavaDoc;
29 import javax.jms.QueueSession JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.naming.NamingException JavaDoc;
32 /**
33  * A Bridge to other JMS Queue providers
34  *
35  * @org.apache.xbean.XBean
36  *
37  * @version $Revision: 1.1.1.1 $
38  */

39 public class JmsQueueConnector extends JmsConnector{
40     private static final Log log=LogFactory.getLog(JmsQueueConnector.class);
41     private String JavaDoc outboundQueueConnectionFactoryName;
42     private String JavaDoc localConnectionFactoryName;
43     private QueueConnectionFactory JavaDoc outboundQueueConnectionFactory;
44     private QueueConnectionFactory JavaDoc localQueueConnectionFactory;
45     private QueueConnection JavaDoc outboundQueueConnection;
46     private QueueConnection JavaDoc localQueueConnection;
47     private InboundQueueBridge[] inboundQueueBridges;
48     private OutboundQueueBridge[] outboundQueueBridges;
49
50     public boolean init(){
51         boolean result=super.init();
52         if(result){
53             try{
54                 initializeForeignQueueConnection();
55                 initializeLocalQueueConnection();
56                 initializeInboundJmsMessageConvertor();
57                 initializeOutboundJmsMessageConvertor();
58                 initializeInboundQueueBridges();
59                 initializeOutboundQueueBridges();
60             }catch(Exception JavaDoc e){
61                 log.error("Failed to initialize the JMSConnector",e);
62             }
63         }
64         return result;
65     }
66     
67
68     
69     /**
70      * @return Returns the inboundQueueBridges.
71      */

72     public InboundQueueBridge[] getInboundQueueBridges(){
73         return inboundQueueBridges;
74     }
75
76     /**
77      * @param inboundQueueBridges
78      * The inboundQueueBridges to set.
79      */

80     public void setInboundQueueBridges(InboundQueueBridge[] inboundQueueBridges){
81         this.inboundQueueBridges=inboundQueueBridges;
82     }
83
84     /**
85      * @return Returns the outboundQueueBridges.
86      */

87     public OutboundQueueBridge[] getOutboundQueueBridges(){
88         return outboundQueueBridges;
89     }
90
91     /**
92      * @param outboundQueueBridges
93      * The outboundQueueBridges to set.
94      */

95     public void setOutboundQueueBridges(OutboundQueueBridge[] outboundQueueBridges){
96         this.outboundQueueBridges=outboundQueueBridges;
97     }
98
99     /**
100      * @return Returns the localQueueConnectionFactory.
101      */

102     public QueueConnectionFactory JavaDoc getLocalQueueConnectionFactory(){
103         return localQueueConnectionFactory;
104     }
105
106     /**
107      * @param localQueueConnectionFactory
108      * The localQueueConnectionFactory to set.
109      */

110     public void setLocalQueueConnectionFactory(QueueConnectionFactory JavaDoc localConnectionFactory){
111         this.localQueueConnectionFactory=localConnectionFactory;
112     }
113
114     /**
115      * @return Returns the outboundQueueConnectionFactory.
116      */

117     public QueueConnectionFactory JavaDoc getOutboundQueueConnectionFactory(){
118         return outboundQueueConnectionFactory;
119     }
120
121     /**
122      * @return Returns the outboundQueueConnectionFactoryName.
123      */

124     public String JavaDoc getOutboundQueueConnectionFactoryName(){
125         return outboundQueueConnectionFactoryName;
126     }
127
128     /**
129      * @param outboundQueueConnectionFactoryName
130      * The outboundQueueConnectionFactoryName to set.
131      */

132     public void setOutboundQueueConnectionFactoryName(String JavaDoc foreignQueueConnectionFactoryName){
133         this.outboundQueueConnectionFactoryName=foreignQueueConnectionFactoryName;
134     }
135
136     /**
137      * @return Returns the localConnectionFactoryName.
138      */

139     public String JavaDoc getLocalConnectionFactoryName(){
140         return localConnectionFactoryName;
141     }
142
143     /**
144      * @param localConnectionFactoryName
145      * The localConnectionFactoryName to set.
146      */

147     public void setLocalConnectionFactoryName(String JavaDoc localConnectionFactoryName){
148         this.localConnectionFactoryName=localConnectionFactoryName;
149     }
150
151     /**
152      * @return Returns the localQueueConnection.
153      */

154     public QueueConnection JavaDoc getLocalQueueConnection(){
155         return localQueueConnection;
156     }
157
158     /**
159      * @param localQueueConnection
160      * The localQueueConnection to set.
161      */

162     public void setLocalQueueConnection(QueueConnection JavaDoc localQueueConnection){
163         this.localQueueConnection=localQueueConnection;
164     }
165
166     /**
167      * @return Returns the outboundQueueConnection.
168      */

169     public QueueConnection JavaDoc getOutboundQueueConnection(){
170         return outboundQueueConnection;
171     }
172
173     /**
174      * @param outboundQueueConnection
175      * The outboundQueueConnection to set.
176      */

177     public void setOutboundQueueConnection(QueueConnection JavaDoc foreignQueueConnection){
178         this.outboundQueueConnection=foreignQueueConnection;
179     }
180
181     /**
182      * @param outboundQueueConnectionFactory
183      * The outboundQueueConnectionFactory to set.
184      */

185     public void setOutboundQueueConnectionFactory(QueueConnectionFactory JavaDoc foreignQueueConnectionFactory){
186         this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
187     }
188
189     public void restartProducerConnection() throws NamingException JavaDoc, JMSException JavaDoc {
190         outboundQueueConnection = null;
191         initializeForeignQueueConnection();
192     }
193
194     protected void initializeForeignQueueConnection() throws NamingException JavaDoc,JMSException JavaDoc{
195         if(outboundQueueConnection==null){
196             // get the connection factories
197
if(outboundQueueConnectionFactory==null){
198                 // look it up from JNDI
199
if(outboundQueueConnectionFactoryName!=null){
200                     outboundQueueConnectionFactory=(QueueConnectionFactory JavaDoc) jndiOutboundTemplate.lookup(
201                                     outboundQueueConnectionFactoryName,QueueConnectionFactory JavaDoc.class);
202                     if(outboundUsername!=null){
203                         outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
204                                         outboundPassword);
205                     }else{
206                         outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
207                     }
208                 }else {
209                     throw new JMSException JavaDoc("Cannot create localConnection - no information");
210                 }
211             }else {
212                 if(outboundUsername!=null){
213                     outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
214                                     outboundPassword);
215                 }else{
216                     outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
217                 }
218             }
219         }
220         outboundQueueConnection.start();
221     }
222
223     protected void initializeLocalQueueConnection() throws NamingException JavaDoc,JMSException JavaDoc{
224         if(localQueueConnection==null){
225             // get the connection factories
226
if(localQueueConnectionFactory==null){
227                 if(embeddedConnectionFactory==null){
228                     // look it up from JNDI
229
if(localConnectionFactoryName!=null){
230                         localQueueConnectionFactory=(QueueConnectionFactory JavaDoc) jndiLocalTemplate.lookup(
231                                         localConnectionFactoryName,QueueConnectionFactory JavaDoc.class);
232                         if(localUsername!=null){
233                             localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
234                                             localPassword);
235                         }else{
236                             localQueueConnection=localQueueConnectionFactory.createQueueConnection();
237                         }
238                     }else {
239                         throw new JMSException JavaDoc("Cannot create localConnection - no information");
240                     }
241                 }else{
242                     localQueueConnection = embeddedConnectionFactory.createQueueConnection();
243                 }
244             }else {
245                 if(localUsername!=null){
246                     localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
247                                     localPassword);
248                 }else{
249                     localQueueConnection=localQueueConnectionFactory.createQueueConnection();
250                 }
251             }
252         }
253         localQueueConnection.start();
254     }
255     
256     protected void initializeInboundJmsMessageConvertor(){
257         inboundMessageConvertor.setConnection(localQueueConnection);
258     }
259     
260     protected void initializeOutboundJmsMessageConvertor(){
261         outboundMessageConvertor.setConnection(outboundQueueConnection);
262     }
263
264     protected void initializeInboundQueueBridges() throws JMSException JavaDoc{
265         if(inboundQueueBridges!=null){
266             QueueSession JavaDoc outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
267             QueueSession JavaDoc localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
268             for(int i=0;i<inboundQueueBridges.length;i++){
269                 InboundQueueBridge bridge=inboundQueueBridges[i];
270                 String JavaDoc localQueueName=bridge.getLocalQueueName();
271                 Queue JavaDoc activemqQueue=createActiveMQQueue(localSession,localQueueName);
272                 String JavaDoc queueName = bridge.getInboundQueueName();
273                 Queue JavaDoc foreignQueue=createForeignQueue(outboundSession,queueName);
274                 bridge.setConsumerQueue(foreignQueue);
275                 bridge.setProducerQueue(activemqQueue);
276                 bridge.setProducerConnection(localQueueConnection);
277                 bridge.setConsumerConnection(outboundQueueConnection);
278                 if(bridge.getJmsMessageConvertor()==null){
279                     bridge.setJmsMessageConvertor(getInboundMessageConvertor());
280                 }
281                 bridge.setJmsConnector(this);
282                 addInboundBridge(bridge);
283             }
284             outboundSession.close();
285             localSession.close();
286         }
287     }
288
289     protected void initializeOutboundQueueBridges() throws JMSException JavaDoc{
290         if(outboundQueueBridges!=null){
291             QueueSession JavaDoc outboundSession = outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
292             QueueSession JavaDoc localSession = localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
293             for(int i=0;i<outboundQueueBridges.length;i++){
294                 OutboundQueueBridge bridge=outboundQueueBridges[i];
295                 String JavaDoc localQueueName=bridge.getLocalQueueName();
296                 Queue JavaDoc activemqQueue=createActiveMQQueue(localSession,localQueueName);
297                 String JavaDoc queueName=bridge.getOutboundQueueName();
298                 Queue JavaDoc foreignQueue=createForeignQueue(outboundSession,queueName);
299                 bridge.setConsumerQueue(activemqQueue);
300                 bridge.setProducerQueue(foreignQueue);
301                 bridge.setProducerConnection(outboundQueueConnection);
302                 bridge.setConsumerConnection(localQueueConnection);
303                 if(bridge.getJmsMessageConvertor()==null){
304                     bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
305                 }
306                 bridge.setJmsConnector(this);
307                 addOutboundBridge(bridge);
308             }
309             outboundSession.close();
310             localSession.close();
311         }
312     }
313     
314     protected Destination JavaDoc createReplyToBridge(Destination JavaDoc destination, Connection JavaDoc replyToProducerConnection, Connection JavaDoc replyToConsumerConnection){
315         Queue JavaDoc replyToProducerQueue =(Queue JavaDoc)destination;
316         boolean isInbound = replyToProducerConnection.equals(localQueueConnection);
317         
318         if(isInbound){
319             InboundQueueBridge bridge = (InboundQueueBridge) replyToBridges.get(replyToProducerQueue);
320             if (bridge == null){
321                 bridge = new InboundQueueBridge(){
322                     protected Destination JavaDoc processReplyToDestination (Destination JavaDoc destination){
323                         return null;
324                     }
325                 };
326                 try{
327                     QueueSession JavaDoc replyToConsumerSession = ((QueueConnection JavaDoc)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
328                     Queue JavaDoc replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
329                     replyToConsumerSession.close();
330                     bridge.setConsumerQueue(replyToConsumerQueue);
331                     bridge.setProducerQueue(replyToProducerQueue);
332                     bridge.setProducerConnection((QueueConnection JavaDoc)replyToProducerConnection);
333                     bridge.setConsumerConnection((QueueConnection JavaDoc)replyToConsumerConnection);
334                     bridge.setDoHandleReplyTo(false);
335                     if(bridge.getJmsMessageConvertor()==null){
336                         bridge.setJmsMessageConvertor(getInboundMessageConvertor());
337                     }
338                     bridge.setJmsConnector(this);
339                     bridge.start();
340                     log.info("Created replyTo bridge for " + replyToProducerQueue);
341                 }catch(Exception JavaDoc e){
342                     log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
343                     return null;
344                 }
345                 replyToBridges.put(replyToProducerQueue, bridge);
346             }
347             return bridge.getConsumerQueue();
348         }else{
349             OutboundQueueBridge bridge = (OutboundQueueBridge) replyToBridges.get(replyToProducerQueue);
350             if (bridge == null){
351                 bridge = new OutboundQueueBridge(){
352                     protected Destination JavaDoc processReplyToDestination (Destination JavaDoc destination){
353                         return null;
354                     }
355                 };
356                 try{
357                     QueueSession JavaDoc replyToConsumerSession = ((QueueConnection JavaDoc)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
358                     Queue JavaDoc replyToConsumerQueue = replyToConsumerSession.createTemporaryQueue();
359                     replyToConsumerSession.close();
360                     bridge.setConsumerQueue(replyToConsumerQueue);
361                     bridge.setProducerQueue(replyToProducerQueue);
362                     bridge.setProducerConnection((QueueConnection JavaDoc)replyToProducerConnection);
363                     bridge.setConsumerConnection((QueueConnection JavaDoc)replyToConsumerConnection);
364                     bridge.setDoHandleReplyTo(false);
365                     if(bridge.getJmsMessageConvertor()==null){
366                         bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
367                     }
368                     bridge.setJmsConnector(this);
369                     bridge.start();
370                     log.info("Created replyTo bridge for " + replyToProducerQueue);
371                 }catch(Exception JavaDoc e){
372                     log.error("Failed to create replyTo bridge for queue: " + replyToProducerQueue, e);
373                     return null;
374                 }
375                 replyToBridges.put(replyToProducerQueue, bridge);
376             }
377             return bridge.getConsumerQueue();
378         }
379     }
380     
381     protected Queue JavaDoc createActiveMQQueue(QueueSession JavaDoc session,String JavaDoc queueName) throws JMSException JavaDoc{
382         return session.createQueue(queueName);
383     }
384     
385     protected Queue JavaDoc createForeignQueue(QueueSession JavaDoc session,String JavaDoc queueName) throws JMSException JavaDoc{
386         Queue JavaDoc result = null;
387         try{
388             result = session.createQueue(queueName);
389         }catch(JMSException JavaDoc e){
390             //look-up the Queue
391
try{
392                 result = (Queue JavaDoc) jndiOutboundTemplate.lookup(queueName, Queue JavaDoc.class);
393             }catch(NamingException JavaDoc e1){
394                 String JavaDoc errStr = "Failed to look-up Queue for name: " + queueName;
395                 log.error(errStr,e);
396                 JMSException JavaDoc jmsEx = new JMSException JavaDoc(errStr);
397                 jmsEx.setLinkedException(e1);
398                 throw jmsEx;
399             }
400         }
401         return result;
402     }
403
404     
405 }
406
Popular Tags