KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > flow > jca > JCAFlow


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr.flow.jca;
18
19 import java.io.Serializable JavaDoc;
20 import java.util.Map JavaDoc;
21 import java.util.Set JavaDoc;
22
23 import javax.jbi.JBIException;
24 import javax.jbi.messaging.MessageExchange;
25 import javax.jbi.messaging.MessagingException;
26 import javax.jbi.messaging.MessageExchange.Role;
27 import javax.jbi.servicedesc.ServiceEndpoint;
28 import javax.jms.Connection JavaDoc;
29 import javax.jms.ConnectionFactory JavaDoc;
30 import javax.jms.DeliveryMode JavaDoc;
31 import javax.jms.Destination JavaDoc;
32 import javax.jms.JMSException JavaDoc;
33 import javax.jms.Message JavaDoc;
34 import javax.jms.MessageConsumer JavaDoc;
35 import javax.jms.MessageListener JavaDoc;
36 import javax.jms.MessageProducer JavaDoc;
37 import javax.jms.ObjectMessage JavaDoc;
38 import javax.jms.Session JavaDoc;
39 import javax.jms.Topic JavaDoc;
40 import javax.resource.spi.BootstrapContext JavaDoc;
41 import javax.resource.spi.ConnectionManager JavaDoc;
42 import javax.resource.spi.ResourceAdapter JavaDoc;
43 import javax.resource.spi.ResourceAdapterInternalException JavaDoc;
44 import javax.transaction.Status JavaDoc;
45 import javax.transaction.SystemException JavaDoc;
46 import javax.transaction.TransactionManager JavaDoc;
47
48 import org.apache.activemq.advisory.AdvisorySupport;
49 import org.apache.activemq.command.ActiveMQDestination;
50 import org.apache.activemq.command.ActiveMQMessage;
51 import org.apache.activemq.command.ActiveMQQueue;
52 import org.apache.activemq.command.ActiveMQTopic;
53 import org.apache.activemq.command.ConsumerId;
54 import org.apache.activemq.command.ConsumerInfo;
55 import org.apache.activemq.command.RemoveInfo;
56 import org.apache.activemq.ra.ActiveMQActivationSpec;
57 import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
58 import org.apache.activemq.ra.ActiveMQResourceAdapter;
59 import org.apache.geronimo.connector.BootstrapContextImpl;
60 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.SinglePool;
61 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
62 import org.apache.geronimo.connector.work.GeronimoWorkManager;
63 import org.apache.geronimo.transaction.context.TransactionContextManager;
64 import org.apache.servicemix.JbiConstants;
65 import org.apache.servicemix.jbi.container.SpringJBIContainer;
66 import org.apache.servicemix.jbi.event.ComponentAdapter;
67 import org.apache.servicemix.jbi.event.ComponentEvent;
68 import org.apache.servicemix.jbi.event.ComponentListener;
69 import org.apache.servicemix.jbi.event.EndpointAdapter;
70 import org.apache.servicemix.jbi.event.EndpointEvent;
71 import org.apache.servicemix.jbi.event.EndpointListener;
72 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
73 import org.apache.servicemix.jbi.nmr.Broker;
74 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
75 import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
76 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
77 import org.jencks.JCAConnector;
78 import org.jencks.SingletonEndpointFactory;
79 import org.jencks.factory.ConnectionManagerFactoryBean;
80 import org.springframework.context.ApplicationContext;
81
82 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
83 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
84 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
85
86 /**
87  * Use for message routing among a network of containers. All routing/registration happens automatically.
88  *
89  * @version $Revision: 438849 $
90  * @org.apache.xbean.XBean element="jcaFlow"
91  */

92 public class JCAFlow extends AbstractFlow implements MessageListener JavaDoc {
93     
94     private static final String JavaDoc INBOUND_PREFIX = "org.apache.servicemix.jca.";
95     private String JavaDoc jmsURL = "tcp://localhost:61616";
96     private String JavaDoc userName;
97     private String JavaDoc password;
98     private ConnectionFactory JavaDoc connectionFactory;
99     private Connection JavaDoc connection;
100     private String JavaDoc broadcastDestinationName = "org.apache.servicemix.JCAFlow";
101     private Topic JavaDoc broadcastTopic;
102     private Map JavaDoc connectorMap = new ConcurrentHashMap();
103     private AtomicBoolean started = new AtomicBoolean(false);
104     private Set JavaDoc subscriberSet=new CopyOnWriteArraySet();
105     private TransactionContextManager transactionContextManager;
106     private ConnectionManager JavaDoc connectionManager;
107     private BootstrapContext JavaDoc bootstrapContext;
108     private ResourceAdapter JavaDoc resourceAdapter;
109     private JCAConnector containerConnector;
110     private JCAConnector broadcastConnector;
111     private Session JavaDoc broadcastSession;
112     private Topic JavaDoc advisoryTopic;
113     private MessageConsumer JavaDoc advisoryConsumer;
114
115     private EndpointListener endpointListener;
116
117     private ComponentListener componentListener;
118
119     /**
120      * The type of Flow
121      *
122      * @return the type
123      */

124     public String JavaDoc getDescription() {
125         return "jca";
126     }
127
128     /**
129      * Returns the JMS URL for this flow
130      *
131      * @return Returns the jmsURL.
132      */

133     public String JavaDoc getJmsURL() {
134         return jmsURL;
135     }
136
137     /**
138      * Sets the JMS URL for this flow
139      *
140      * @param jmsURL The jmsURL to set.
141      */

142     public void setJmsURL(String JavaDoc jmsURL) {
143         this.jmsURL = jmsURL;
144     }
145
146     /**
147      * Returns the password for this flow
148      *
149      * @return Returns the password.
150      */

151     public String JavaDoc getPassword() {
152         return password;
153     }
154
155     /**
156      * Sets the password for this flow
157      *
158      * @param password The password to set.
159      */

160     public void setPassword(String JavaDoc password) {
161         this.password = password;
162     }
163
164     /**
165      * Sets the User Name for this flow
166      *
167      * @return Returns the userName.
168      */

169     public String JavaDoc getUserName() {
170         return userName;
171     }
172
173     /**
174      * Returns the User Name for this flow
175      *
176      * @param userName The userName to set.
177      */

178     public void setUserName(String JavaDoc userName) {
179         this.userName = userName;
180     }
181
182     /**
183      * Returns the ConnectionFactory for this flow
184      *
185      * @return Returns the connectionFactory.
186      */

187     public ConnectionFactory JavaDoc getConnectionFactory() {
188         return connectionFactory;
189     }
190
191     /**
192      * Sets the ConnectionFactory for this flow
193      *
194      * @param connectionFactory The connectionFactory to set.
195      */

196     public void setConnectionFactory(ConnectionFactory JavaDoc connectoFactory) {
197         this.connectionFactory = connectoFactory;
198     }
199
200     /**
201      * Returns the Broadcast Destination Name for this flow
202      *
203      * @return Returns the broadcastDestinationName.
204      */

205     public String JavaDoc getBroadcastDestinationName() {
206         return broadcastDestinationName;
207     }
208
209     /**
210      * Sets the Broadcast Destination Name for this flow
211      *
212      * @param broadcastDestinationName The broadcastDestinationName to set.
213      */

214     public void setBroadcastDestinationName(String JavaDoc broadcastDestinationName) {
215         this.broadcastDestinationName = broadcastDestinationName;
216     }
217
218     protected ResourceAdapter JavaDoc createResourceAdapter() throws ResourceAdapterInternalException JavaDoc {
219         ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
220         ra.setServerUrl(jmsURL);
221         ra.start(getBootstrapContext());
222         return ra;
223     }
224     
225     public TransactionManager JavaDoc getTransactionManager() {
226         return (TransactionManager JavaDoc) broker.getContainer().getTransactionManager();
227     }
228     
229     /**
230      * Initialize the Region
231      *
232      * @param broker
233      * @throws JBIException
234      */

235     public void init(Broker broker) throws JBIException {
236         log.debug(broker.getContainer().getName() + ": Initializing jca flow");
237         super.init(broker);
238         // Create and register endpoint listener
239
endpointListener = new EndpointAdapter() {
240             public void internalEndpointRegistered(EndpointEvent event) {
241                 onInternalEndpointRegistered(event, true);
242             }
243
244             public void internalEndpointUnregistered(EndpointEvent event) {
245                 onInternalEndpointUnregistered(event, true);
246             }
247         };
248         broker.getContainer().addListener(endpointListener);
249         // Create and register component listener
250
componentListener = new ComponentAdapter() {
251             public void componentStarted(ComponentEvent event) {
252                 onComponentStarted(event);
253             }
254             public void componentStopped(ComponentEvent event) {
255                 onComponentStopped(event);
256             }
257         };
258         broker.getContainer().addListener(componentListener);
259         try {
260             resourceAdapter = createResourceAdapter();
261             
262             // Inbound connector
263
ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
264             ac.setDestinationType("javax.jms.Queue");
265             ac.setDestination(INBOUND_PREFIX + broker.getContainer().getName());
266             containerConnector = new JCAConnector();
267             containerConnector.setBootstrapContext(getBootstrapContext());
268             containerConnector.setActivationSpec(ac);
269             containerConnector.setResourceAdapter(resourceAdapter);
270             containerConnector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager()));
271             containerConnector.start();
272             
273             // Outbound connector
274
ActiveMQManagedConnectionFactory mcf = new ActiveMQManagedConnectionFactory();
275             mcf.setResourceAdapter(resourceAdapter);
276             connectionFactory = (ConnectionFactory JavaDoc) mcf.createConnectionFactory(getConnectionManager());
277             
278             // Outbound broadcast
279
connection = ((ActiveMQResourceAdapter) resourceAdapter).makeConnection();
280             connection.start();
281             broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
282             
283             broadcastSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
284             broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
285             advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
286         }
287         catch (Exception JavaDoc e) {
288             log.error("Failed to initialize JCAFlow", e);
289             throw new JBIException(e);
290         }
291     }
292
293     /**
294      * start the flow
295      *
296      * @throws JBIException
297      */

298     public void start() throws JBIException {
299         if (started.compareAndSet(false, true)) {
300             super.start();
301             try {
302                 // Inbound broadcast
303
ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
304                 ac.setDestinationType("javax.jms.Topic");
305                 ac.setDestination(broadcastDestinationName);
306                 broadcastConnector = new JCAConnector();
307                 broadcastConnector.setBootstrapContext(getBootstrapContext());
308                 broadcastConnector.setActivationSpec(ac);
309                 broadcastConnector.setResourceAdapter(resourceAdapter);
310                 broadcastConnector.setEndpointFactory(new SingletonEndpointFactory(new MessageListener JavaDoc() {
311                     public void onMessage(Message JavaDoc message) {
312                         try {
313                             Object JavaDoc obj = ((ObjectMessage JavaDoc) message).getObject();
314                             if (obj instanceof EndpointEvent) {
315                                 EndpointEvent event = (EndpointEvent) obj;
316                                 String JavaDoc container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName();
317                                 if (!getBroker().getContainer().getName().equals(container)) {
318                                     if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
319                                         onRemoteEndpointRegistered(event);
320                                     } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
321                                         onRemoteEndpointUnregistered(event);
322                                     }
323                                 }
324                             }
325                         } catch (Exception JavaDoc e) {
326                             log.error("Error processing incoming broadcast message", e);
327                         }
328                     }
329                 }));
330                 broadcastConnector.start();
331                 
332                 advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
333                 advisoryConsumer.setMessageListener(new MessageListener JavaDoc() {
334                     public void onMessage(Message JavaDoc message) {
335                         if (started.get()) {
336                             onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
337                         }
338                     }
339                 });
340             }
341             catch (Exception JavaDoc e) {
342                 throw new JBIException("JMSException caught in start: " + e.getMessage(), e);
343             }
344         }
345     }
346
347     /**
348      * stop the flow
349      *
350      * @throws JBIException
351      */

352     public void stop() throws JBIException {
353         if (started.compareAndSet(true, false)) {
354             super.stop();
355             try {
356                 advisoryConsumer.close();
357             }
358             catch (JMSException JavaDoc e) {
359                 log.debug("JMSException caught in stop" ,e);
360             }
361         }
362     }
363
364     public void shutDown() throws JBIException {
365         super.shutDown();
366         stop();
367         // Remove endpoint listener
368
broker.getContainer().removeListener(endpointListener);
369         // Remove component listener
370
broker.getContainer().removeListener(componentListener);
371         // Destroy connectors
372
while (!connectorMap.isEmpty()) {
373             JCAConnector connector = (JCAConnector) connectorMap.remove(connectorMap.keySet().iterator().next());
374             try {
375                 connector.destroy();
376             } catch (Exception JavaDoc e) {
377                 log.debug("Error closing jca connector", e);
378             }
379         }
380         try {
381             containerConnector.destroy();
382         } catch (Exception JavaDoc e) {
383             log.debug("Error closing jca connector", e);
384         }
385         try {
386             broadcastConnector.destroy();
387         } catch (Exception JavaDoc e) {
388             log.debug("Error closing jca connector", e);
389         }
390         // Destroy the resource adapter
391
resourceAdapter.stop();
392         if (this.connection != null) {
393             try {
394                 this.connection.close();
395             }
396             catch (JMSException JavaDoc e) {
397                 log.debug("Error closing JMS Connection", e);
398             }
399         }
400     }
401
402     /**
403      * useful for testing
404      *
405      * @return number of containers in the network
406      */

407     public int numberInNetwork() {
408         return subscriberSet.size();
409     }
410
411     /**
412      * Check if the flow can support the requested QoS for this exchange
413      * @param me the exchange to check
414      * @return true if this flow can handle the given exchange
415      */

416     public boolean canHandle(MessageExchange me) {
417         if (isTransacted(me) && isSynchronous(me)) {
418             return false;
419         }
420         return true;
421     }
422     
423     public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
424         if (!started.get()) {
425             return;
426         }
427         try {
428             String JavaDoc key = EndpointSupport.getKey(event.getEndpoint());
429             if(!connectorMap.containsKey(key)){
430                 ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
431                 ac.setDestinationType("javax.jms.Queue");
432                 ac.setDestination(INBOUND_PREFIX + key);
433                 JCAConnector connector = new JCAConnector();
434                 connector.setBootstrapContext(getBootstrapContext());
435                 connector.setActivationSpec(ac);
436                 connector.setResourceAdapter(resourceAdapter);
437                 connector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager()));
438                 connector.start();
439                 connectorMap.put(key, connector);
440             }
441             // broadcast change to the network
442
if (broadcast) {
443                 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
444                 sendJmsMessage(broadcastTopic, event, false, false);
445             }
446         } catch (Exception JavaDoc e) {
447             log.error("Cannot create consumer for " + event.getEndpoint(), e);
448         }
449     }
450     
451     public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
452         try{
453             String JavaDoc key = EndpointSupport.getKey(event.getEndpoint());
454             JCAConnector connector=(JCAConnector) connectorMap.remove(key);
455             if(connector!=null){
456                 connector.destroy();
457             }
458             // broadcast change to the network
459
if (broadcast) {
460                 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
461                 sendJmsMessage(broadcastTopic, event, false, false);
462             }
463         } catch (Exception JavaDoc e) {
464             log.error("Cannot destroy consumer for " + event, e);
465         }
466     }
467     
468     public void onComponentStarted(ComponentEvent event) {
469         if (!started.get()) {
470             return;
471         }
472         try {
473             String JavaDoc key = event.getComponent().getName();
474             if(!connectorMap.containsKey(key)){
475                 ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
476                 ac.setDestinationType("javax.jms.Queue");
477                 ac.setDestination(INBOUND_PREFIX + key);
478                 JCAConnector connector = new JCAConnector();
479                 connector.setBootstrapContext(getBootstrapContext());
480                 connector.setActivationSpec(ac);
481                 connector.setResourceAdapter(resourceAdapter);
482                 connector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager()));
483                 connector.start();
484                 connectorMap.put(key, connector);
485             }
486         } catch (Exception JavaDoc e) {
487             log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
488         }
489     }
490     
491     public void onComponentStopped(ComponentEvent event) {
492         try {
493             String JavaDoc key = event.getComponent().getName();
494             JCAConnector connector = (JCAConnector) connectorMap.remove(key);
495             if (connector != null){
496                 connector.destroy();
497             }
498         } catch (Exception JavaDoc e) {
499             log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e);
500         }
501     }
502
503     public void onRemoteEndpointRegistered(EndpointEvent event) {
504         log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint());
505         broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint());
506     }
507
508     public void onRemoteEndpointUnregistered(EndpointEvent event) {
509         log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint());
510         broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
511     }
512
513     /**
514      * Distribute an ExchangePacket
515      *
516      * @param me
517      * @throws MessagingException
518      */

519     protected void doSend(MessageExchangeImpl me) throws MessagingException {
520         doRouting(me);
521     }
522     
523     /**
524      * Distribute an ExchangePacket
525      *
526      * @param me
527      * @throws MessagingException
528      */

529     public void doRouting(final MessageExchangeImpl me) throws MessagingException {
530         // let ActiveMQ do the routing ...
531
try {
532             String JavaDoc destination;
533             if (me.getRole() == Role.PROVIDER) {
534                 if (me.getDestinationId() == null) {
535                     destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
536                 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
537                     destination = INBOUND_PREFIX + me.getDestinationId().getName();
538                 } else {
539                     destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
540                 }
541             } else {
542                 if (me.getSourceId() == null) {
543                     throw new IllegalStateException JavaDoc("No sourceId set on the exchange");
544                 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
545                     // If the consumer is stateless and has specified a sender endpoint,
546
// this exchange will be sent to the given endpoint queue, so that
547
// This property must have been created using EndpointSupport.getKey
548
// fail-over and load-balancing can be achieved
549
if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
550                         destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
551                     } else {
552                         destination = INBOUND_PREFIX + me.getSourceId().getName();
553                     }
554                 } else {
555                     destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
556                 }
557             }
558             if (me.isTransacted()) {
559                 me.setTxState(MessageExchangeImpl.TX_STATE_ENLISTED);
560             }
561             sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
562         } catch (JMSException JavaDoc e) {
563             log.error("Failed to send exchange: " + me + " internal JMS Network", e);
564             throw new MessagingException(e);
565         } catch (SystemException JavaDoc e) {
566             log.error("Failed to send exchange: " + me + " transaction problem", e);
567             throw new MessagingException(e);
568         }
569     }
570
571     /**
572      * MessageListener implementation
573      *
574      * @param message
575      */

576     public void onMessage(Message JavaDoc message) {
577         try {
578             if (message != null && started.get()) {
579                 ObjectMessage JavaDoc objMsg = (ObjectMessage JavaDoc) message;
580                 final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
581                 // Hack for redelivery: AMQ is too optimized and the object is the same upon redelivery
582
// so that there are side effect (the exchange state may have been modified)
583
// See http://jira.activemq.org/jira/browse/AMQ-519
584
//me = (MessageExchangeImpl) ((ActiveMQObjectMessage) ((ActiveMQObjectMessage) message).copy()).getObject();
585
TransactionManager JavaDoc tm = (TransactionManager JavaDoc) getTransactionManager();
586                 if (tm != null) {
587                     me.setTransactionContext(tm.getTransaction());
588                 }
589                 if (me.getDestinationId() == null) {
590                     ServiceEndpoint se = me.getEndpoint();
591                     se = broker.getContainer().getRegistry()
592                             .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
593                     me.setEndpoint(se);
594                     me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
595                 }
596                 super.doRouting(me);
597             }
598         }
599         catch (JMSException JavaDoc jmsEx) {
600             log.error("Caught an exception unpacking JMS Message: ", jmsEx);
601         }
602         catch (MessagingException e) {
603             log.error("Caught an exception routing ExchangePacket: ", e);
604         }
605         catch (SystemException JavaDoc e) {
606             log.error("Caught an exception acessing transaction context: ", e);
607         }
608     }
609
610     protected void onAdvisoryMessage(Object JavaDoc obj) {
611         if (obj instanceof ConsumerInfo) {
612             ConsumerInfo info = (ConsumerInfo) obj;
613             subscriberSet.add(info.getConsumerId().getConnectionId());
614             ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
615             for (int i = 0; i < endpoints.length; i++) {
616                 if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
617                     onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
618                             EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
619                 }
620             }
621         } else if (obj instanceof RemoveInfo) {
622             ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
623             subscriberSet.remove(id.getConnectionId());
624             removeAllPackets(id.getConnectionId());
625         }
626     }
627
628     private void removeAllPackets(String JavaDoc containerName) {
629         //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
630
}
631
632     public ConnectionManager JavaDoc getConnectionManager() throws Exception JavaDoc {
633         if (connectionManager == null) {
634             ConnectionManagerFactoryBean cmfb = new ConnectionManagerFactoryBean();
635             cmfb.setTransactionContextManager(getTransactionContextManager());
636             cmfb.setPoolingSupport(new SinglePool(
637                     16, // max size
638
0, // min size
639
100, // blockingTimeoutMilliseconds
640
1, // idleTimeoutMinutes
641
true, // matchOne
642
true, // matchAll
643
true)); // selectOneAssumeMatch
644
cmfb.setTransactionSupport(new XATransactions(
645                     true, // useTransactionCaching
646
false)); // useThreadCaching
647
cmfb.afterPropertiesSet();
648             connectionManager = (ConnectionManager JavaDoc) cmfb.getObject();
649         }
650         return connectionManager;
651     }
652
653     public void setConnectionManager(ConnectionManager JavaDoc connectionManager) {
654         this.connectionManager = connectionManager;
655     }
656
657     public TransactionContextManager getTransactionContextManager() {
658         if (transactionContextManager == null) {
659             if (broker != null && broker.getContainer() instanceof SpringJBIContainer) {
660                 ApplicationContext applicationContext = ((SpringJBIContainer) broker.getContainer()).getApplicationContext();
661                 if (applicationContext != null) {
662                     Map JavaDoc map = applicationContext.getBeansOfType(TransactionContextManager.class);
663                     if( map.size() == 1) {
664                         transactionContextManager = (TransactionContextManager) map.values().iterator().next();
665                     }
666                 }
667             }
668         }
669         return transactionContextManager;
670     }
671
672     public void setTransactionContextManager(
673             TransactionContextManager transactionContextManager) {
674         this.transactionContextManager = transactionContextManager;
675     }
676
677     public BootstrapContext JavaDoc getBootstrapContext() {
678         if (bootstrapContext == null) {
679             GeronimoWorkManager wm = (GeronimoWorkManager) broker.getContainer().getWorkManager();
680             bootstrapContext = new BootstrapContextImpl(wm);
681         }
682         return bootstrapContext;
683     }
684
685     public void setBootstrapContext(BootstrapContext JavaDoc bootstrapContext) {
686         this.bootstrapContext = bootstrapContext;
687     }
688     
689     public String JavaDoc toString(){
690         return broker.getContainer().getName() + " JCAFlow";
691     }
692     
693     private void sendJmsMessage(Destination JavaDoc dest, Serializable JavaDoc object, boolean persistent, boolean transacted) throws JMSException JavaDoc, SystemException JavaDoc {
694         if (transacted) {
695             TransactionManager JavaDoc tm = (TransactionManager JavaDoc) getBroker().getContainer().getTransactionManager();
696             if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
697                 return;
698             }
699         }
700         Connection JavaDoc connection = connectionFactory.createConnection();
701         try {
702             Session JavaDoc session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
703             ObjectMessage JavaDoc msg = session.createObjectMessage(object);
704             MessageProducer JavaDoc producer = session.createProducer(dest);
705             producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
706             producer.send(msg);
707         } finally {
708             connection.close();
709         }
710     }
711
712 }
713
Popular Tags