KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > jms > DestinationBridge


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network.jms;
19
20 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
21
22 import org.apache.activemq.Service;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25
26 import javax.jms.Connection JavaDoc;
27 import javax.jms.Destination JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.Message JavaDoc;
30 import javax.jms.MessageConsumer JavaDoc;
31 import javax.jms.MessageListener JavaDoc;
32 import javax.jms.MessageProducer JavaDoc;
33 import javax.naming.NamingException JavaDoc;
34
35 /**
36  * A Destination bridge is used to bridge between to different JMS systems
37  *
38  * @version $Revision: 1.1.1.1 $
39  */

40 public abstract class DestinationBridge implements Service, MessageListener JavaDoc {
41     private static final Log log = LogFactory.getLog(DestinationBridge.class);
42     protected MessageConsumer JavaDoc consumer;
43     protected AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
44     protected JmsMesageConvertor jmsMessageConvertor;
45     protected boolean doHandleReplyTo = true;
46     protected JmsConnector jmsConnector;
47     private int maximumRetries = 10;
48
49     /**
50      * @return Returns the consumer.
51      */

52     public MessageConsumer JavaDoc getConsumer() {
53         return consumer;
54     }
55
56     /**
57      * @param consumer
58      * The consumer to set.
59      */

60     public void setConsumer(MessageConsumer JavaDoc consumer) {
61         this.consumer = consumer;
62     }
63
64     /**
65      * @param connector
66      */

67     public void setJmsConnector(JmsConnector connector) {
68         this.jmsConnector = connector;
69     }
70
71     /**
72      * @return Returns the inboundMessageConvertor.
73      */

74     public JmsMesageConvertor getJmsMessageConvertor() {
75         return jmsMessageConvertor;
76     }
77
78     /**
79      * @param jmsMessageConvertor
80      */

81     public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
82         this.jmsMessageConvertor = jmsMessageConvertor;
83     }
84
85     public int getMaximumRetries() {
86         return maximumRetries;
87     }
88
89     /**
90      * Sets the maximum number of retries if a send fails before closing the
91      * bridge
92      */

93     public void setMaximumRetries(int maximumRetries) {
94         this.maximumRetries = maximumRetries;
95     }
96
97     protected Destination JavaDoc processReplyToDestination(Destination JavaDoc destination) {
98         return jmsConnector.createReplyToBridge(destination, getConnnectionForConsumer(), getConnectionForProducer());
99     }
100
101     public void start() throws Exception JavaDoc {
102         if (started.compareAndSet(false, true)) {
103             MessageConsumer JavaDoc consumer = createConsumer();
104             consumer.setMessageListener(this);
105             createProducer();
106         }
107     }
108
109     public void stop() throws Exception JavaDoc {
110         started.set(false);
111     }
112
113     public void onMessage(Message JavaDoc message) {
114         if (started.get() && message != null) {
115             int attempt = 0;
116             try {
117                 if (attempt > 0) {
118                     restartProducer();
119                 }
120                 Message JavaDoc converted;
121                 if (doHandleReplyTo) {
122                     Destination JavaDoc replyTo = message.getJMSReplyTo();
123                     if (replyTo != null) {
124                         converted = jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
125                     }
126                     else {
127                         converted = jmsMessageConvertor.convert(message);
128                     }
129                 }
130                 else {
131                     message.setJMSReplyTo(null);
132                     converted = jmsMessageConvertor.convert(message);
133                 }
134                 sendMessage(converted);
135                 message.acknowledge();
136             }
137             catch (Exception JavaDoc e) {
138                 log.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
139                 if (maximumRetries > 0 && attempt >= maximumRetries) {
140                     try {
141                         stop();
142                     }
143                     catch (Exception JavaDoc e1) {
144                         log.warn("Failed to stop cleanly", e1);
145                     }
146                 }
147             }
148         }
149     }
150
151     /**
152      * @return Returns the doHandleReplyTo.
153      */

154     protected boolean isDoHandleReplyTo() {
155         return doHandleReplyTo;
156     }
157
158     /**
159      * @param doHandleReplyTo
160      * The doHandleReplyTo to set.
161      */

162     protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
163         this.doHandleReplyTo = doHandleReplyTo;
164     }
165
166     protected abstract MessageConsumer JavaDoc createConsumer() throws JMSException JavaDoc;
167
168     protected abstract MessageProducer JavaDoc createProducer() throws JMSException JavaDoc;
169
170     protected abstract void sendMessage(Message JavaDoc message) throws JMSException JavaDoc;
171
172     protected abstract Connection JavaDoc getConnnectionForConsumer();
173
174     protected abstract Connection JavaDoc getConnectionForProducer();
175
176     protected void restartProducer() throws JMSException JavaDoc, NamingException JavaDoc {
177         try {
178             getConnectionForProducer().close();
179         }
180         catch (Exception JavaDoc e) {
181             log.debug("Ignoring failure to close producer connection: " + e, e);
182         }
183         jmsConnector.restartProducerConnection();
184         createProducer();
185     }
186 }
187
Popular Tags