KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ra > ActiveMQResourceAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.ra;
19
20 import java.io.Serializable JavaDoc;
21 import java.net.URI JavaDoc;
22 import java.net.URISyntaxException JavaDoc;
23 import java.util.HashMap JavaDoc;
24
25 import javax.jms.Connection JavaDoc;
26 import javax.jms.JMSException JavaDoc;
27 import javax.jms.XAConnection JavaDoc;
28 import javax.jms.XASession JavaDoc;
29 import javax.resource.NotSupportedException JavaDoc;
30 import javax.resource.ResourceException JavaDoc;
31 import javax.resource.spi.ActivationSpec JavaDoc;
32 import javax.resource.spi.BootstrapContext JavaDoc;
33 import javax.resource.spi.ResourceAdapterInternalException JavaDoc;
34 import javax.resource.spi.endpoint.MessageEndpointFactory JavaDoc;
35 import javax.transaction.xa.XAResource JavaDoc;
36
37 import org.apache.activemq.ActiveMQConnection;
38 import org.apache.activemq.ActiveMQConnectionFactory;
39 import org.apache.activemq.RedeliveryPolicy;
40 import org.apache.activemq.broker.BrokerFactory;
41 import org.apache.activemq.broker.BrokerService;
42 import org.apache.activemq.util.ServiceSupport;
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45
46 /**
47  * Knows how to connect to one ActiveMQ server. It can then activate endpoints
48  * and deliver messages to those end points using the connection configure in the
49  * resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
50  *
51  * @org.apache.xbean.XBean element="resourceAdapter" rootElement="true"
52  * description="The JCA Resource Adaptor for ActiveMQ"
53  *
54  * @version $Revision: 515576 $
55  */

56 public class ActiveMQResourceAdapter implements MessageResourceAdapter, Serializable JavaDoc {
57
58     private static final long serialVersionUID = -5417363537865649130L;
59     private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
60     
61     private final HashMap JavaDoc endpointWorkers = new HashMap JavaDoc();
62     private final ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
63
64     private BootstrapContext JavaDoc bootstrapContext;
65     private String JavaDoc brokerXmlConfig;
66     private BrokerService broker;
67     private ActiveMQConnectionFactory connectionFactory;
68
69     /**
70      *
71      */

72     public ActiveMQResourceAdapter() {
73         super();
74     }
75
76     /**
77      * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
78      */

79     public void start(BootstrapContext JavaDoc bootstrapContext) throws ResourceAdapterInternalException JavaDoc {
80         this.bootstrapContext = bootstrapContext;
81         if (brokerXmlConfig!=null && brokerXmlConfig.trim().length()>0 ) {
82             try {
83                 broker = BrokerFactory.createBroker(new URI JavaDoc(brokerXmlConfig));
84                 broker.start();
85             } catch (Throwable JavaDoc e) {
86                 throw new ResourceAdapterInternalException JavaDoc("Failed to startup an embedded broker: "+brokerXmlConfig+", due to: "+e, e);
87             }
88         }
89     }
90
91     /**
92      * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection()
93      */

94     public ActiveMQConnection makeConnection() throws JMSException JavaDoc {
95         if (connectionFactory != null) {
96             return makeConnection(info, connectionFactory);
97         }
98         return makeConnection(info);
99     }
100
101     /**
102      */

103     public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info) throws JMSException JavaDoc {
104
105         ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
106         return makeConnection(info, connectionFactory);
107     }
108
109     /**
110      * @see org.apache.activemq.ra.MessageResourceAdapter#makeConnection(org.apache.activemq.ra.ActiveMQConnectionRequestInfo, org.apache.activemq.ActiveMQConnectionFactory)
111      */

112     public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo info, ActiveMQConnectionFactory connectionFactory) throws JMSException JavaDoc {
113         String JavaDoc userName = info.getUserName();
114         String JavaDoc password = info.getPassword();
115         ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
116
117         String JavaDoc clientId = info.getClientid();
118         if (clientId != null && clientId.length() > 0) {
119             physicalConnection.setClientID(clientId);
120         }
121         return physicalConnection;
122     }
123
124     /**
125      * @param activationSpec
126      */

127     public ActiveMQConnection makeConnection(MessageActivationSpec activationSpec) throws JMSException JavaDoc {
128         ActiveMQConnectionFactory connectionFactory = createConnectionFactory(info);
129         String JavaDoc userName = defaultValue(activationSpec.getUserName(), info.getUserName());
130         String JavaDoc password = defaultValue(activationSpec.getPassword(), info.getPassword());
131         String JavaDoc clientId = activationSpec.getClientId();
132         if (clientId != null) {
133             connectionFactory.setClientID(clientId);
134         }
135         else {
136             if (activationSpec.isDurableSubscription()) {
137                 log.warn("No clientID specified for durable subscription: " + activationSpec);
138             }
139         }
140         ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
141
142         // have we configured a redelivery policy
143
RedeliveryPolicy redeliveryPolicy = activationSpec.redeliveryPolicy();
144         if (redeliveryPolicy != null) {
145             physicalConnection.setRedeliveryPolicy(redeliveryPolicy);
146         }
147         return physicalConnection;
148     }
149
150     /**
151      * @param info
152      * @throws JMSException
153      * @throws URISyntaxException
154      */

155     synchronized private ActiveMQConnectionFactory createConnectionFactory(ActiveMQConnectionRequestInfo info) throws JMSException JavaDoc {
156         ActiveMQConnectionFactory factory = connectionFactory;
157         if (factory != null && info.isConnectionFactoryConfigured()) {
158             factory = factory.copy();
159         }
160         else if (factory == null) {
161             factory = new ActiveMQConnectionFactory();
162         }
163         info.configure(factory);
164         return factory;
165     }
166
167     private String JavaDoc defaultValue(String JavaDoc value, String JavaDoc defaultValue) {
168         if (value != null)
169             return value;
170         return defaultValue;
171     }
172
173     /**
174      * @see javax.resource.spi.ResourceAdapter#stop()
175      */

176     public void stop() {
177         while (endpointWorkers.size() > 0) {
178             ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next();
179             endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
180         }
181         if (broker != null) {
182             ServiceSupport.dispose(broker);
183             broker = null;
184         }
185         this.bootstrapContext = null;
186     }
187
188     /**
189      * @see org.apache.activemq.ra.MessageResourceAdapter#getBootstrapContext()
190      */

191     public BootstrapContext JavaDoc getBootstrapContext() {
192         return bootstrapContext;
193     }
194
195     /**
196      * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
197      * javax.resource.spi.ActivationSpec)
198      */

199     public void endpointActivation(MessageEndpointFactory JavaDoc endpointFactory, ActivationSpec JavaDoc activationSpec)
200             throws ResourceException JavaDoc {
201
202         // spec section 5.3.3
203
if (!equals(activationSpec.getResourceAdapter())) {
204             throw new ResourceException JavaDoc("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")");
205         }
206
207         if (!(activationSpec instanceof MessageActivationSpec)) {
208             throw new NotSupportedException JavaDoc("That type of ActicationSpec not supported: " + activationSpec.getClass());
209         }
210
211         ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
212                 (MessageActivationSpec) activationSpec);
213         // This is weird.. the same endpoint activated twice.. must be a
214
// container error.
215
if (endpointWorkers.containsKey(key)) {
216             throw new IllegalStateException JavaDoc("Endpoint previously activated");
217         }
218
219         ActiveMQEndpointWorker worker = new ActiveMQEndpointWorker(this, key);
220
221         endpointWorkers.put(key, worker);
222         worker.start();
223     }
224
225     /**
226      * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
227      * javax.resource.spi.ActivationSpec)
228      */

229     public void endpointDeactivation(MessageEndpointFactory JavaDoc endpointFactory, ActivationSpec JavaDoc activationSpec) {
230
231         if (activationSpec instanceof MessageActivationSpec) {
232             ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (MessageActivationSpec) activationSpec);
233             ActiveMQEndpointWorker worker = (ActiveMQEndpointWorker) endpointWorkers.remove(key);
234             if (worker == null) {
235                 // This is weird.. that endpoint was not activated.. oh well..
236
// this method
237
// does not throw exceptions so just return.
238
return;
239             }
240             try {
241                 worker.stop();
242             } catch (InterruptedException JavaDoc e) {
243                 // We interrupted.. we won't throw an exception but will stop
244
// waiting for the worker
245
// to stop.. we tried our best. Keep trying to interrupt the
246
// thread.
247
Thread.currentThread().interrupt();
248             }
249
250         }
251
252     }
253
254     /**
255      * We only connect to one resource manager per ResourceAdapter instance, so
256      * any ActivationSpec will return the same XAResource.
257      *
258      * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
259      */

260     public XAResource JavaDoc[] getXAResources(ActivationSpec JavaDoc[] activationSpecs) throws ResourceException JavaDoc {
261         Connection JavaDoc connection = null;
262         try {
263             connection = makeConnection();
264             if (connection instanceof XAConnection JavaDoc) {
265                 XASession JavaDoc session = ((XAConnection JavaDoc) connection).createXASession();
266                 XAResource JavaDoc xaResource = session.getXAResource();
267                 return new XAResource JavaDoc[] { xaResource };
268             }
269             return new XAResource JavaDoc[] {};
270         } catch (JMSException JavaDoc e) {
271             throw new ResourceException JavaDoc(e);
272         } finally {
273             try {
274                 connection.close();
275             } catch (Throwable JavaDoc ignore) {
276                 //
277
}
278         }
279     }
280
281     // ///////////////////////////////////////////////////////////////////////
282
//
283
// Java Bean getters and setters for this ResourceAdapter class.
284
//
285
// ///////////////////////////////////////////////////////////////////////
286

287     /**
288      * @return client id
289      */

290     public String JavaDoc getClientid() {
291         return emptyToNull(info.getClientid());
292     }
293
294     /**
295      * @return password
296      */

297     public String JavaDoc getPassword() {
298         return emptyToNull(info.getPassword());
299     }
300
301     /**
302      * @return server URL
303      */

304     public String JavaDoc getServerUrl() {
305         return info.getServerUrl();
306     }
307
308     /**
309      * @return user name
310      */

311     public String JavaDoc getUserName() {
312         return emptyToNull(info.getUserName());
313     }
314
315     /**
316      * @param clientid
317      */

318     public void setClientid(String JavaDoc clientid) {
319         info.setClientid(clientid);
320     }
321
322     /**
323      * @param password
324      */

325     public void setPassword(String JavaDoc password) {
326         info.setPassword(password);
327     }
328
329     /**
330      * @param url
331      */

332     public void setServerUrl(String JavaDoc url) {
333         info.setServerUrl(url);
334     }
335
336     /**
337      * @param userid
338      */

339     public void setUserName(String JavaDoc userid) {
340         info.setUserName(userid);
341     }
342
343     /**
344      * @see org.apache.activemq.ra.MessageResourceAdapter#getBrokerXmlConfig()
345      */

346     public String JavaDoc getBrokerXmlConfig() {
347         return brokerXmlConfig;
348     }
349
350     /**
351      * Sets the <a HREF="http://activemq.org/Xml+Configuration">XML
352      * configuration file </a> used to configure the ActiveMQ broker via Spring
353      * if using embedded mode.
354      *
355      * @param brokerXmlConfig
356      * is the filename which is assumed to be on the classpath unless
357      * a URL is specified. So a value of <code>foo/bar.xml</code>
358      * would be assumed to be on the classpath whereas
359      * <code>file:dir/file.xml</code> would use the file system.
360      * Any valid URL string is supported.
361      */

362     public void setBrokerXmlConfig(String JavaDoc brokerXmlConfig) {
363         this.brokerXmlConfig=brokerXmlConfig;
364     }
365
366     /**
367      * @return durable topic prefetch
368      */

369     public Integer JavaDoc getDurableTopicPrefetch() {
370         return info.getDurableTopicPrefetch();
371     }
372
373     /**
374      * @return initial redelivery delay
375      */

376     public Long JavaDoc getInitialRedeliveryDelay() {
377         return info.getInitialRedeliveryDelay();
378     }
379
380     /**
381      * @return input stream prefetch
382      */

383     public Integer JavaDoc getInputStreamPrefetch() {
384         return info.getInputStreamPrefetch();
385     }
386
387     /**
388      * @return maximum redeliveries
389      */

390     public Integer JavaDoc getMaximumRedeliveries() {
391         return info.getMaximumRedeliveries();
392     }
393
394     /**
395      * @return queue browser prefetch
396      */

397     public Integer JavaDoc getQueueBrowserPrefetch() {
398         return info.getQueueBrowserPrefetch();
399     }
400
401     /**
402      * @return queue prefetch
403      */

404     public Integer JavaDoc getQueuePrefetch() {
405         return info.getQueuePrefetch();
406     }
407
408     /**
409      * @return redelivery backoff multiplier
410      */

411     public Short JavaDoc getRedeliveryBackOffMultiplier() {
412         return info.getRedeliveryBackOffMultiplier();
413     }
414
415     /**
416      * @return redelivery use exponential backoff
417      */

418     public Boolean JavaDoc getRedeliveryUseExponentialBackOff() {
419         return info.getRedeliveryUseExponentialBackOff();
420     }
421
422     /**
423      * @return topic prefetch
424      */

425     public Integer JavaDoc getTopicPrefetch() {
426         return info.getTopicPrefetch();
427     }
428
429     /**
430      * @return use inbound session enabled
431      */

432     public boolean isUseInboundSessionEnabled() {
433         return info.isUseInboundSessionEnabled();
434     }
435
436     /**
437      * @param i
438      */

439     public void setAllPrefetchValues(Integer JavaDoc i) {
440         info.setAllPrefetchValues(i);
441     }
442
443     /**
444      * @param durableTopicPrefetch
445      */

446     public void setDurableTopicPrefetch(Integer JavaDoc durableTopicPrefetch) {
447         info.setDurableTopicPrefetch(durableTopicPrefetch);
448     }
449
450     /**
451      * @param value
452      */

453     public void setInitialRedeliveryDelay(Long JavaDoc value) {
454         info.setInitialRedeliveryDelay(value);
455     }
456
457     /**
458      * @param inputStreamPrefetch
459      */

460     public void setInputStreamPrefetch(Integer JavaDoc inputStreamPrefetch) {
461         info.setInputStreamPrefetch(inputStreamPrefetch);
462     }
463
464     /**
465      * @param value
466      */

467     public void setMaximumRedeliveries(Integer JavaDoc value) {
468         info.setMaximumRedeliveries(value);
469     }
470
471     /**
472      * @param queueBrowserPrefetch
473      */

474     public void setQueueBrowserPrefetch(Integer JavaDoc queueBrowserPrefetch) {
475         info.setQueueBrowserPrefetch(queueBrowserPrefetch);
476     }
477
478     /**
479      * @param queuePrefetch
480      */

481     public void setQueuePrefetch(Integer JavaDoc queuePrefetch) {
482         info.setQueuePrefetch(queuePrefetch);
483     }
484
485     /**
486      * @param value
487      */

488     public void setRedeliveryBackOffMultiplier(Short JavaDoc value) {
489         info.setRedeliveryBackOffMultiplier(value);
490     }
491
492     /**
493      * @param value
494      */

495     public void setRedeliveryUseExponentialBackOff(Boolean JavaDoc value) {
496         info.setRedeliveryUseExponentialBackOff(value);
497     }
498
499     /**
500      * @param topicPrefetch
501      */

502     public void setTopicPrefetch(Integer JavaDoc topicPrefetch) {
503         info.setTopicPrefetch(topicPrefetch);
504     }
505
506     /**
507      * @return Returns the info.
508      */

509     public ActiveMQConnectionRequestInfo getInfo() {
510         return info;
511     }
512
513     /**
514      * @see java.lang.Object#equals(java.lang.Object)
515      */

516     @Override JavaDoc
517     public boolean equals(Object JavaDoc o) {
518         if (this == o) {
519             return true;
520         }
521         if (!(o instanceof MessageResourceAdapter)) {
522             return false;
523         }
524
525         final MessageResourceAdapter activeMQResourceAdapter = (MessageResourceAdapter) o;
526
527         if (!info.equals(activeMQResourceAdapter.getInfo())) {
528             return false;
529         }
530         if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.getBrokerXmlConfig()) ) {
531             return false;
532         }
533
534         return true;
535     }
536
537     private boolean notEqual(Object JavaDoc o1, Object JavaDoc o2) {
538         return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
539     }
540
541
542     /**
543      * @see java.lang.Object#hashCode()
544      */

545     @Override JavaDoc
546     public int hashCode() {
547         int result;
548         result = info.hashCode();
549         if( brokerXmlConfig !=null ) {
550             result ^= brokerXmlConfig.hashCode();
551         }
552         return result;
553     }
554
555     private String JavaDoc emptyToNull(String JavaDoc value) {
556         if (value == null || value.length() == 0) {
557             return null;
558         }
559         return value;
560     }
561
562     /**
563      * @return use inbound session
564      */

565     public Boolean JavaDoc getUseInboundSession() {
566         return info.getUseInboundSession();
567     }
568
569     /**
570      * @param useInboundSession
571      */

572     public void setUseInboundSession(Boolean JavaDoc useInboundSession) {
573         info.setUseInboundSession(useInboundSession);
574     }
575
576     /**
577      * @see org.apache.activemq.ra.MessageResourceAdapter#getConnectionFactory()
578      */

579     public ActiveMQConnectionFactory getConnectionFactory() {
580         return connectionFactory;
581     }
582
583     /**
584      * This allows a connection factory to be configured and shared between a ResourceAdaptor and outbound messaging.
585      * Note that setting the connectionFactory will overload many of the properties on this POJO such as the redelivery
586      * and prefetch policies; the properties on the connectionFactory will be used instead.
587      */

588     public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
589         this.connectionFactory = connectionFactory;
590     }
591
592
593 }
594
Popular Tags