KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > jms > JmsConnector


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network.jms;
19
20 import java.util.Iterator JavaDoc;
21 import java.util.List JavaDoc;
22 import java.util.Map JavaDoc;
23
24 import javax.jms.Connection JavaDoc;
25 import javax.jms.Destination JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.naming.NamingException JavaDoc;
28
29 import org.apache.activemq.ActiveMQConnectionFactory;
30 import org.apache.activemq.Service;
31 import org.apache.activemq.broker.BrokerService;
32 import org.apache.activemq.util.LRUCache;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.springframework.jndi.JndiTemplate;
36
37 import java.util.concurrent.CopyOnWriteArrayList JavaDoc;
38 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
39
40 /**
41  * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
42  * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
43  * JMS 1.0.2 compliant.
44  *
45  * @version $Revision: 1.1.1.1 $
46  */

47 public abstract class JmsConnector implements Service {
48
49     private static final Log log = LogFactory.getLog(JmsConnector.class);
50     protected JndiTemplate jndiLocalTemplate;
51     protected JndiTemplate jndiOutboundTemplate;
52     protected JmsMesageConvertor inboundMessageConvertor;
53     protected JmsMesageConvertor outboundMessageConvertor;
54     private List JavaDoc inboundBridges = new CopyOnWriteArrayList JavaDoc();
55     private List JavaDoc outboundBridges = new CopyOnWriteArrayList JavaDoc();
56     protected AtomicBoolean JavaDoc initialized = new AtomicBoolean JavaDoc(false);
57     protected AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
58     protected ActiveMQConnectionFactory embeddedConnectionFactory;
59     protected int replyToDestinationCacheSize = 10000;
60     protected String JavaDoc outboundUsername;
61     protected String JavaDoc outboundPassword;
62     protected String JavaDoc localUsername;
63     protected String JavaDoc localPassword;
64     private String JavaDoc name;
65
66     protected LRUCache replyToBridges = createLRUCache();
67         
68     static private LRUCache createLRUCache() {
69         return new LRUCache() {
70             private static final long serialVersionUID = -7446792754185879286L;
71     
72             protected boolean removeEldestEntry(Map.Entry JavaDoc enty) {
73                 if (size() > maxCacheSize) {
74                     Iterator JavaDoc iter = entrySet().iterator();
75                     Map.Entry JavaDoc lru = (Map.Entry JavaDoc) iter.next();
76                     remove(lru.getKey());
77                     DestinationBridge bridge = (DestinationBridge) lru.getValue();
78                     try {
79                         bridge.stop();
80                         log.info("Expired bridge: " + bridge);
81                     }
82                     catch (Exception JavaDoc e) {
83                         log.warn("stopping expired bridge" + bridge + " caused an exception", e);
84                     }
85                 }
86                 return false;
87             }
88         };
89     }
90
91     /**
92      */

93     public boolean init() {
94         boolean result = initialized.compareAndSet(false, true);
95         if (result) {
96             if (jndiLocalTemplate == null) {
97                 jndiLocalTemplate = new JndiTemplate();
98             }
99             if (jndiOutboundTemplate == null) {
100                 jndiOutboundTemplate = new JndiTemplate();
101             }
102             if (inboundMessageConvertor == null) {
103                 inboundMessageConvertor = new SimpleJmsMessageConvertor();
104             }
105             if (outboundMessageConvertor == null) {
106                 outboundMessageConvertor = new SimpleJmsMessageConvertor();
107             }
108             replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
109         }
110         return result;
111     }
112
113     public void start() throws Exception JavaDoc {
114         init();
115         if (started.compareAndSet(false, true)) {
116             for (int i = 0; i < inboundBridges.size(); i++) {
117                 DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i);
118                 bridge.start();
119             }
120             for (int i = 0; i < outboundBridges.size(); i++) {
121                 DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i);
122                 bridge.start();
123             }
124             log.info("JMS Connector " + getName() + " Started");
125         }
126     }
127
128     public void stop() throws Exception JavaDoc {
129         if (started.compareAndSet(true, false)) {
130             for (int i = 0; i < inboundBridges.size(); i++) {
131                 DestinationBridge bridge = (DestinationBridge) inboundBridges.get(i);
132                 bridge.stop();
133             }
134             for (int i = 0; i < outboundBridges.size(); i++) {
135                 DestinationBridge bridge = (DestinationBridge) outboundBridges.get(i);
136                 bridge.stop();
137             }
138             log.info("JMS Connector " + getName() + " Stopped");
139         }
140     }
141
142     protected abstract Destination JavaDoc createReplyToBridge(Destination JavaDoc destination, Connection JavaDoc consumerConnection, Connection JavaDoc producerConnection);
143
144     /**
145      * One way to configure the local connection - this is called by The
146      * BrokerService when the Connector is embedded
147      *
148      * @param service
149      */

150     public void setBrokerService(BrokerService service) {
151         embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
152     }
153
154     /**
155      * @return Returns the jndiTemplate.
156      */

157     public JndiTemplate getJndiLocalTemplate() {
158         return jndiLocalTemplate;
159     }
160
161     /**
162      * @param jndiTemplate
163      * The jndiTemplate to set.
164      */

165     public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
166         this.jndiLocalTemplate = jndiTemplate;
167     }
168
169     /**
170      * @return Returns the jndiOutboundTemplate.
171      */

172     public JndiTemplate getJndiOutboundTemplate() {
173         return jndiOutboundTemplate;
174     }
175
176     /**
177      * @param jndiOutboundTemplate
178      * The jndiOutboundTemplate to set.
179      */

180     public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
181         this.jndiOutboundTemplate = jndiOutboundTemplate;
182     }
183
184     /**
185      * @return Returns the inboundMessageConvertor.
186      */

187     public JmsMesageConvertor getInboundMessageConvertor() {
188         return inboundMessageConvertor;
189     }
190
191     /**
192      * @param inboundMessageConvertor
193      * The inboundMessageConvertor to set.
194      */

195     public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
196         this.inboundMessageConvertor = jmsMessageConvertor;
197     }
198
199     /**
200      * @return Returns the outboundMessageConvertor.
201      */

202     public JmsMesageConvertor getOutboundMessageConvertor() {
203         return outboundMessageConvertor;
204     }
205
206     /**
207      * @param outboundMessageConvertor
208      * The outboundMessageConvertor to set.
209      */

210     public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
211         this.outboundMessageConvertor = outboundMessageConvertor;
212     }
213
214     /**
215      * @return Returns the replyToDestinationCacheSize.
216      */

217     public int getReplyToDestinationCacheSize() {
218         return replyToDestinationCacheSize;
219     }
220
221     /**
222      * @param replyToDestinationCacheSize
223      * The replyToDestinationCacheSize to set.
224      */

225     public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
226         this.replyToDestinationCacheSize = replyToDestinationCacheSize;
227     }
228
229     /**
230      * @return Returns the localPassword.
231      */

232     public String JavaDoc getLocalPassword() {
233         return localPassword;
234     }
235
236     /**
237      * @param localPassword
238      * The localPassword to set.
239      */

240     public void setLocalPassword(String JavaDoc localPassword) {
241         this.localPassword = localPassword;
242     }
243
244     /**
245      * @return Returns the localUsername.
246      */

247     public String JavaDoc getLocalUsername() {
248         return localUsername;
249     }
250
251     /**
252      * @param localUsername
253      * The localUsername to set.
254      */

255     public void setLocalUsername(String JavaDoc localUsername) {
256         this.localUsername = localUsername;
257     }
258
259     /**
260      * @return Returns the outboundPassword.
261      */

262     public String JavaDoc getOutboundPassword() {
263         return outboundPassword;
264     }
265
266     /**
267      * @param outboundPassword
268      * The outboundPassword to set.
269      */

270     public void setOutboundPassword(String JavaDoc outboundPassword) {
271         this.outboundPassword = outboundPassword;
272     }
273
274     /**
275      * @return Returns the outboundUsername.
276      */

277     public String JavaDoc getOutboundUsername() {
278         return outboundUsername;
279     }
280
281     /**
282      * @param outboundUsername
283      * The outboundUsername to set.
284      */

285     public void setOutboundUsername(String JavaDoc outboundUsername) {
286         this.outboundUsername = outboundUsername;
287     }
288
289     protected void addInboundBridge(DestinationBridge bridge) {
290         inboundBridges.add(bridge);
291     }
292
293     protected void addOutboundBridge(DestinationBridge bridge) {
294         outboundBridges.add(bridge);
295     }
296
297     protected void removeInboundBridge(DestinationBridge bridge) {
298         inboundBridges.add(bridge);
299     }
300
301     protected void removeOutboundBridge(DestinationBridge bridge) {
302         outboundBridges.add(bridge);
303     }
304
305     public String JavaDoc getName() {
306         if (name == null) {
307             name = "Connector:" + getNextId();
308         }
309         return name;
310     }
311
312     static int nextId;
313
314     static private synchronized int getNextId() {
315         return nextId++;
316     }
317
318     public void setName(String JavaDoc name) {
319         this.name = name;
320     }
321
322     public abstract void restartProducerConnection() throws NamingException JavaDoc, JMSException JavaDoc;
323 }
324
Popular Tags