KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > flow > jms > JMSFlow


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr.flow.jms;
18
19 import java.util.Iterator JavaDoc;
20 import java.util.Map JavaDoc;
21 import java.util.Set JavaDoc;
22
23 import javax.jbi.JBIException;
24 import javax.jbi.messaging.MessageExchange;
25 import javax.jbi.messaging.MessagingException;
26 import javax.jbi.messaging.MessageExchange.Role;
27 import javax.jbi.servicedesc.ServiceEndpoint;
28 import javax.jms.DeliveryMode JavaDoc;
29 import javax.jms.JMSException JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageConsumer JavaDoc;
32 import javax.jms.MessageListener JavaDoc;
33 import javax.jms.MessageProducer JavaDoc;
34 import javax.jms.ObjectMessage JavaDoc;
35 import javax.jms.Queue JavaDoc;
36 import javax.jms.Session JavaDoc;
37 import javax.jms.Topic JavaDoc;
38 import javax.resource.spi.work.Work JavaDoc;
39 import javax.resource.spi.work.WorkException JavaDoc;
40
41 import org.apache.activemq.ActiveMQConnection;
42 import org.apache.activemq.ActiveMQConnectionFactory;
43 import org.apache.activemq.advisory.AdvisorySupport;
44 import org.apache.activemq.command.ActiveMQDestination;
45 import org.apache.activemq.command.ActiveMQMessage;
46 import org.apache.activemq.command.ConsumerId;
47 import org.apache.activemq.command.ConsumerInfo;
48 import org.apache.activemq.command.RemoveInfo;
49 import org.apache.servicemix.JbiConstants;
50 import org.apache.servicemix.jbi.event.ComponentAdapter;
51 import org.apache.servicemix.jbi.event.ComponentEvent;
52 import org.apache.servicemix.jbi.event.ComponentListener;
53 import org.apache.servicemix.jbi.event.EndpointAdapter;
54 import org.apache.servicemix.jbi.event.EndpointEvent;
55 import org.apache.servicemix.jbi.event.EndpointListener;
56 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
57 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
58 import org.apache.servicemix.jbi.nmr.Broker;
59 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
60 import org.apache.servicemix.jbi.servicedesc.EndpointSupport;
61 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
62
63 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
64 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
65 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
66
67 /**
68  * Use for message routing among a network of containers. All routing/registration happens automatically.
69  *
70  * @version $Revision: 438849 $
71  * @org.apache.xbean.XBean element="jmsFlow"
72  */

73 public class JMSFlow extends AbstractFlow implements MessageListener JavaDoc {
74
75     private static final String JavaDoc INBOUND_PREFIX = "org.apache.servicemix.jms.";
76
77     private String JavaDoc jmsURL = "peer://org.apache.servicemix?persistent=false";
78
79     private String JavaDoc userName;
80
81     private String JavaDoc password;
82
83     private ActiveMQConnectionFactory connectionFactory;
84
85     private ActiveMQConnection connection;
86
87     private String JavaDoc broadcastDestinationName = "org.apache.servicemix.JMSFlow";
88
89     private MessageProducer JavaDoc queueProducer;
90
91     private MessageProducer JavaDoc topicProducer;
92
93     private Topic JavaDoc broadcastTopic;
94
95     private Session JavaDoc broadcastSession;
96
97     private MessageConsumer JavaDoc broadcastConsumer;
98
99     private Session JavaDoc inboundSession;
100
101     private MessageConsumer JavaDoc advisoryConsumer;
102
103     private Set JavaDoc subscriberSet = new CopyOnWriteArraySet();
104
105     private Map JavaDoc consumerMap = new ConcurrentHashMap();
106
107     private AtomicBoolean started = new AtomicBoolean(false);
108
109     private EndpointListener endpointListener;
110     
111     private ComponentListener componentListener;
112
113     /**
114      * The type of Flow
115      *
116      * @return the type
117      */

118     public String JavaDoc getDescription() {
119         return "jms";
120     }
121
122     /**
123      * @return Returns the jmsURL.
124      */

125     public String JavaDoc getJmsURL() {
126         return jmsURL;
127     }
128
129     /**
130      * @param jmsURL The jmsURL to set.
131      */

132     public void setJmsURL(String JavaDoc jmsURL) {
133         this.jmsURL = jmsURL;
134     }
135
136     /**
137      * @return Returns the password.
138      */

139     public String JavaDoc getPassword() {
140         return password;
141     }
142
143     /**
144      * @param password The password to set.
145      */

146     public void setPassword(String JavaDoc password) {
147         this.password = password;
148     }
149
150     /**
151      * @return Returns the userName.
152      */

153     public String JavaDoc getUserName() {
154         return userName;
155     }
156
157     /**
158      * @param userName The userName to set.
159      */

160     public void setUserName(String JavaDoc userName) {
161         this.userName = userName;
162     }
163
164     /**
165      * @return Returns the connectionFactory.
166      */

167     public ActiveMQConnectionFactory getConnectionFactory() {
168         return connectionFactory;
169     }
170
171     /**
172      * @param connectionFactory The connectionFactory to set.
173      */

174     public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
175         this.connectionFactory = connectionFactory;
176     }
177
178     /**
179      * @return Returns the broadcastDestinationName.
180      */

181     public String JavaDoc getBroadcastDestinationName() {
182         return broadcastDestinationName;
183     }
184
185     /**
186      * @param broadcastDestinationName The broadcastDestinationName to set.
187      */

188     public void setBroadcastDestinationName(String JavaDoc broadcastDestinationName) {
189         this.broadcastDestinationName = broadcastDestinationName;
190     }
191
192     /**
193      * Check if the flow can support the requested QoS for this exchange
194      * @param me the exchange to check
195      * @return true if this flow can handle the given exchange
196      */

197     public boolean canHandle(MessageExchange me) {
198         if (isTransacted(me)) {
199             return false;
200         }
201         return true;
202     }
203
204     /**
205      * Initialize the Region
206      *
207      * @param broker
208      * @throws JBIException
209      */

210     public void init(Broker broker) throws JBIException {
211         log.debug(broker.getContainer().getName() + ": Initializing jms flow");
212         super.init(broker);
213         // Create and register endpoint listener
214
endpointListener = new EndpointAdapter() {
215             public void internalEndpointRegistered(EndpointEvent event) {
216                 onInternalEndpointRegistered(event, true);
217             }
218
219             public void internalEndpointUnregistered(EndpointEvent event) {
220                 onInternalEndpointUnregistered(event, true);
221             }
222         };
223         broker.getContainer().addListener(endpointListener);
224         // Create and register component listener
225
componentListener = new ComponentAdapter() {
226             public void componentStarted(ComponentEvent event) {
227                 onComponentStarted(event);
228             }
229             public void componentStopped(ComponentEvent event) {
230                 onComponentStopped(event);
231             }
232         };
233         broker.getContainer().addListener(componentListener);
234         try {
235             if (connectionFactory == null) {
236                 if (jmsURL != null) {
237                     connectionFactory = new ActiveMQConnectionFactory(jmsURL);
238                 } else {
239                     connectionFactory = new ActiveMQConnectionFactory();
240                 }
241             }
242             if (userName != null) {
243                 connection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
244             } else {
245                 connection = (ActiveMQConnection) connectionFactory.createConnection();
246             }
247             connection.setClientID(broker.getContainer().getName());
248             connection.start();
249             inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
250             Queue JavaDoc queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainer().getName());
251             MessageConsumer JavaDoc inboundQueue = inboundSession.createConsumer(queue);
252             inboundQueue.setMessageListener(this);
253             queueProducer = inboundSession.createProducer(null);
254             broadcastSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
255             broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
256             topicProducer = broadcastSession.createProducer(broadcastTopic);
257             topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
258         } catch (JMSException JavaDoc e) {
259             log.error("Failed to initialize JMSFlow", e);
260             throw new JBIException(e);
261         }
262     }
263
264     /**
265      * start the flow
266      *
267      * @throws JBIException
268      */

269     public void start() throws JBIException {
270         if (started.compareAndSet(false, true)) {
271             log.debug(broker.getContainer().getName() + ": Starting jms flow");
272             super.start();
273             try {
274                 broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
275                 broadcastConsumer.setMessageListener(new MessageListener JavaDoc() {
276                     public void onMessage(Message JavaDoc message) {
277                         try {
278                             Object JavaDoc obj = ((ObjectMessage JavaDoc) message).getObject();
279                             if (obj instanceof EndpointEvent) {
280                                 EndpointEvent event = (EndpointEvent) obj;
281                                 String JavaDoc container = ((InternalEndpoint) event.getEndpoint()).getComponentNameSpace().getContainerName();
282                                 if (!getBroker().getContainer().getName().equals(container)) {
283                                     if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
284                                         onRemoteEndpointRegistered(event);
285                                     } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
286                                         onRemoteEndpointUnregistered(event);
287                                     }
288                                 }
289                             }
290                         } catch (Exception JavaDoc e) {
291                             log.error("Error processing incoming broadcast message", e);
292                         }
293                     }
294                 });
295                 Topic JavaDoc advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
296                 advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
297                 advisoryConsumer.setMessageListener(new MessageListener JavaDoc() {
298                     public void onMessage(Message JavaDoc message) {
299                         if (started.get()) {
300                             onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
301                         }
302                     }
303                 });
304
305                 // Start queue consumers for all components
306
for (Iterator JavaDoc it = broker.getContainer().getRegistry().getComponents().iterator(); it.hasNext();) {
307                     ComponentMBeanImpl cmp = (ComponentMBeanImpl) it.next();
308                     if (cmp.isStarted()) {
309                         onComponentStarted(new ComponentEvent(cmp, ComponentEvent.COMPONENT_STARTED));
310                     }
311                 }
312                 // Start queue consumers for all endpoints
313
ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
314                 for (int i = 0; i < endpoints.length; i++) {
315                     if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
316                         onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
317                                 EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
318                     }
319                 }
320             } catch (JMSException JavaDoc e) {
321                 JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
322                 throw jbiEx;
323             }
324         }
325     }
326
327     /**
328      * stop the flow
329      *
330      * @throws JBIException
331      */

332     public void stop() throws JBIException {
333         if (started.compareAndSet(true, false)) {
334             log.debug(broker.getContainer().getName() + ": Stopping jms flow");
335             super.stop();
336             for (Iterator JavaDoc it = subscriberSet.iterator(); it.hasNext();) {
337                 String JavaDoc id = (String JavaDoc) it.next();
338                 removeAllPackets(id);
339             }
340             subscriberSet.clear();
341             try {
342                 advisoryConsumer.close();
343                 broadcastConsumer.close();
344             } catch (JMSException JavaDoc e) {
345                 log.debug("JMSException caught in stop", e);
346             }
347         }
348     }
349
350     public void shutDown() throws JBIException {
351         super.shutDown();
352         stop();
353         // Remove endpoint listener
354
broker.getContainer().removeListener(endpointListener);
355         // Remove component listener
356
broker.getContainer().removeListener(componentListener);
357         if (this.connection != null) {
358             try {
359                 this.connection.close();
360             } catch (JMSException JavaDoc e) {
361                 log.warn("Error closing JMS Connection", e);
362             }
363         }
364     }
365
366     /**
367      * useful for testing
368      *
369      * @return number of containers in the network
370      */

371     public int numberInNetwork() {
372         return subscriberSet.size();
373     }
374
375     public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
376         if (!started.get()) {
377             return;
378         }
379         try {
380             String JavaDoc key = EndpointSupport.getKey(event.getEndpoint());
381             if (!consumerMap.containsKey(key)) {
382                 Queue JavaDoc queue = inboundSession.createQueue(INBOUND_PREFIX + key);
383                 MessageConsumer JavaDoc consumer = inboundSession.createConsumer(queue);
384                 consumer.setMessageListener(this);
385                 consumerMap.put(key, consumer);
386             }
387             if (broadcast) {
388                 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
389                 ObjectMessage JavaDoc msg = broadcastSession.createObjectMessage(event);
390                 topicProducer.send(msg);
391             }
392         } catch (Exception JavaDoc e) {
393             log.error("Cannot create consumer for " + event.getEndpoint(), e);
394         }
395     }
396
397     public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
398         try {
399             String JavaDoc key = EndpointSupport.getKey(event.getEndpoint());
400             MessageConsumer JavaDoc consumer = (MessageConsumer JavaDoc) consumerMap.remove(key);
401             if (consumer != null) {
402                 consumer.close();
403             }
404             if (broadcast) {
405                 ObjectMessage JavaDoc msg = broadcastSession.createObjectMessage(event);
406                 log.debug(broker.getContainer().getName() + ": broadcasting info for " + event);
407                 topicProducer.send(msg);
408             }
409         } catch (Exception JavaDoc e) {
410             log.error("Cannot destroy consumer for " + event, e);
411         }
412     }
413     
414     public void onComponentStarted(ComponentEvent event) {
415         if (!started.get()) {
416             return;
417         }
418         try {
419             String JavaDoc key = event.getComponent().getName();
420             if (!consumerMap.containsKey(key)) {
421                 Queue JavaDoc queue = inboundSession.createQueue(INBOUND_PREFIX + key);
422                 MessageConsumer JavaDoc consumer = inboundSession.createConsumer(queue);
423                 consumer.setMessageListener(this);
424                 consumerMap.put(key, consumer);
425             }
426         } catch (Exception JavaDoc e) {
427             log.error("Cannot create consumer for component " + event.getComponent().getName(), e);
428         }
429     }
430     
431     public void onComponentStopped(ComponentEvent event) {
432         try {
433             String JavaDoc key = event.getComponent().getName();
434             MessageConsumer JavaDoc consumer = (MessageConsumer JavaDoc) consumerMap.remove(key);
435             if (consumer != null) {
436                 consumer.close();
437             }
438         } catch (Exception JavaDoc e) {
439             log.error("Cannot destroy consumer for component " + event.getComponent().getName(), e);
440         }
441     }
442
443     public void onRemoteEndpointRegistered(EndpointEvent event) {
444         log.debug(broker.getContainer().getName() + ": adding remote endpoint: " + event.getEndpoint());
445         broker.getContainer().getRegistry().registerRemoteEndpoint(event.getEndpoint());
446     }
447
448     public void onRemoteEndpointUnregistered(EndpointEvent event) {
449         log.debug(broker.getContainer().getName() + ": removing remote endpoint: " + event.getEndpoint());
450         broker.getContainer().getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
451     }
452
453     /**
454      * Distribute an ExchangePacket
455      *
456      * @param me
457      * @throws MessagingException
458      */

459     protected void doSend(MessageExchangeImpl me) throws MessagingException {
460         doRouting(me);
461     }
462
463     /**
464      * Distribute an ExchangePacket
465      *
466      * @param me
467      * @throws MessagingException
468      */

469     public void doRouting(MessageExchangeImpl me) throws MessagingException {
470         // let ActiveMQ do the routing ...
471
try {
472             String JavaDoc destination;
473             if (me.getRole() == Role.PROVIDER) {
474                 if (me.getDestinationId() == null) {
475                     destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
476                 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
477                     destination = INBOUND_PREFIX + me.getDestinationId().getName();
478                 } else {
479                     destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
480                 }
481             } else {
482                 if (me.getSourceId() == null) {
483                     throw new IllegalStateException JavaDoc("No sourceId set on the exchange");
484                 } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
485                     // If the consumer is stateless and has specified a sender endpoint,
486
// this exchange will be sent to the given endpoint queue, so that
487
// fail-over and load-balancing can be achieved
488
// This property must have been created using EndpointSupport.getKey
489
if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
490                         destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
491                     } else {
492                         destination = INBOUND_PREFIX + me.getSourceId().getName();
493                     }
494                 } else {
495                     destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
496                 }
497             }
498                 
499             Queue JavaDoc queue = inboundSession.createQueue(destination);
500             ObjectMessage JavaDoc msg = inboundSession.createObjectMessage(me);
501             queueProducer.send(queue, msg);
502         } catch (JMSException JavaDoc e) {
503             log.error("Failed to send exchange: " + me + " internal JMS Network", e);
504             throw new MessagingException(e);
505         }
506     }
507
508     /**
509      * MessageListener implementation
510      *
511      * @param message
512      */

513     public void onMessage(final Message JavaDoc message) {
514         try {
515             if (message != null && started.get()) {
516                 ObjectMessage JavaDoc objMsg = (ObjectMessage JavaDoc) message;
517                 final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
518                 // Dispatch the message in another thread so as to free the jms session
519
// else if a component do a sendSync into the jms flow, the whole
520
// flow is deadlocked
521
broker.getContainer().getWorkManager().scheduleWork(new Work JavaDoc() {
522                     public void release() {
523                     }
524
525                     public void run() {
526                         try {
527                             if (me.getDestinationId() == null) {
528                                 ServiceEndpoint se = me.getEndpoint();
529                                 se = broker.getContainer().getRegistry()
530                                         .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
531                                 me.setEndpoint(se);
532                                 me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
533                             }
534                             JMSFlow.super.doRouting(me);
535                         } catch (Throwable JavaDoc e) {
536                             log.error("Caught an exception routing ExchangePacket: ", e);
537                         }
538                     }
539                 });
540             }
541         } catch (JMSException JavaDoc jmsEx) {
542             log.error("Caught an exception unpacking JMS Message: ", jmsEx);
543         } catch (WorkException JavaDoc e) {
544             log.error("Caught an exception routing ExchangePacket: ", e);
545         }
546     }
547
548     protected void onAdvisoryMessage(Object JavaDoc obj) {
549         if (obj instanceof ConsumerInfo) {
550             ConsumerInfo info = (ConsumerInfo) obj;
551             subscriberSet.add(info.getConsumerId().getConnectionId());
552             ServiceEndpoint[] endpoints = broker.getContainer().getRegistry().getEndpointsForInterface(null);
553             for (int i = 0; i < endpoints.length; i++) {
554                 if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
555                     onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
556                             EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
557                 }
558             }
559         } else if (obj instanceof RemoveInfo) {
560             ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
561             subscriberSet.remove(id.getConnectionId());
562             removeAllPackets(id.getConnectionId());
563         }
564     }
565
566     private void removeAllPackets(String JavaDoc containerName) {
567         //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
568
}
569 }
570
Popular Tags