KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > flow > seda > SedaFlow


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr.flow.seda;
18
19 import java.util.Iterator JavaDoc;
20 import java.util.Map JavaDoc;
21
22 import javax.jbi.JBIException;
23 import javax.jbi.management.LifeCycleMBean;
24 import javax.jbi.messaging.ExchangeStatus;
25 import javax.jbi.messaging.MessageExchange;
26 import javax.jbi.messaging.MessagingException;
27 import javax.management.JMException JavaDoc;
28 import javax.management.MBeanAttributeInfo JavaDoc;
29 import javax.management.ObjectName JavaDoc;
30 import javax.transaction.Transaction JavaDoc;
31 import javax.transaction.TransactionManager JavaDoc;
32
33 import org.apache.servicemix.jbi.event.ComponentAdapter;
34 import org.apache.servicemix.jbi.event.ComponentEvent;
35 import org.apache.servicemix.jbi.event.ComponentListener;
36 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
37 import org.apache.servicemix.jbi.management.AttributeInfoHelper;
38 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
39 import org.apache.servicemix.jbi.nmr.Broker;
40 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
41 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
42
43 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
44 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
45
46 /**
47  * The SedaFlow introduces a simple event staging between the internal processes
48  * in the NMR Broker. A Seda flow (the default) is suited for general deployment,
49  * as the additional staging is well suited buffering exchanges between heavily
50  * routed to components (where state may be being used) for example.
51  *
52  * @version $Revision: 440668 $
53  * @org.apache.xbean.XBean element="sedaFlow"
54  */

55 public class SedaFlow extends AbstractFlow {
56
57     protected Map JavaDoc queueMap = new ConcurrentHashMap();
58     protected int capacity = 100;
59     protected AtomicBoolean started = new AtomicBoolean(false);
60     protected ComponentListener listener;
61
62     /**
63      * The type of Flow
64      *
65      * @return the type
66      */

67     public String JavaDoc getDescription() {
68         return "seda";
69     }
70
71     /**
72      * Initialize the Region
73      *
74      * @param broker
75      * @throws JBIException
76      */

77     public void init(Broker broker) throws JBIException {
78         super.init(broker);
79         listener = new ComponentAdapter() {
80             public void componentShutDown(ComponentEvent event) {
81                 onComponentShutdown(event.getComponent().getComponentNameSpace());
82             }
83         };
84         broker.getContainer().addListener(listener);
85     }
86
87     /**
88      * Check if the flow can support the requested QoS for this exchange
89      * @param me the exchange to check
90      * @return true if this flow can handle the given exchange
91      */

92     public boolean canHandle(MessageExchange me) {
93         if (isPersistent(me)) {
94             return false;
95         }
96         if (isClustered(me)) {
97             return false;
98         }
99         if (isTransacted(me)) {
100             if (!isSynchronous(me)) {
101                 // we have the mirror, so the role is the one for the target component
102
if (me.getStatus() == ExchangeStatus.ACTIVE) {
103                     return false;
104                 }
105             }
106         }
107         return true;
108     }
109     
110     /**
111      * start the flow
112      *
113      * @throws JBIException
114      */

115     public void start() throws JBIException {
116         if (started.compareAndSet(false, true)) {
117             for (Iterator JavaDoc i = queueMap.values().iterator();i.hasNext();) {
118                 SedaQueue queue = (SedaQueue) i.next();
119                 queue.start();
120             }
121         }
122         super.start();
123     }
124
125     /**
126      * stop the flow
127      *
128      * @throws JBIException
129      */

130     public void stop() throws JBIException {
131         if (started.compareAndSet(true, false)) {
132             for (Iterator JavaDoc i = queueMap.values().iterator();i.hasNext();) {
133                 SedaQueue queue = (SedaQueue) i.next();
134                 queue.stop();
135             }
136         }
137         super.stop();
138     }
139
140     /**
141      * shutDown the flow
142      *
143      * @throws JBIException
144      */

145     public void shutDown() throws JBIException {
146         broker.getContainer().removeListener(listener);
147         for (Iterator JavaDoc i = queueMap.values().iterator(); i.hasNext();) {
148             SedaQueue queue = (SedaQueue) i.next();
149             queue.shutDown();
150             unregisterQueue(queue);
151         }
152         super.shutDown();
153     }
154
155     /**
156      * Distribute an ExchangePacket
157      *
158      * @param packet
159      * @throws JBIException
160      */

161     protected void doSend(MessageExchangeImpl me) throws JBIException {
162         if (me.getDestinationId() == null) {
163             me.setDestinationId(((AbstractServiceEndpoint) me.getEndpoint()).getComponentNameSpace());
164         }
165         if (isTransacted(me)) {
166             me.setTxState(MessageExchangeImpl.TX_STATE_CONVEYED);
167         }
168         // If the message has been sent synchronously, do not use seda
169
// as it would consume threads from the work manager in a useless
170
// way. This could lead to deadlocks.
171
suspendTx(me);
172         enqueuePacket(me);
173     }
174     
175     protected void doRouting(MessageExchangeImpl me) throws MessagingException {
176         resumeTx(me);
177         super.doRouting(me);
178     }
179
180     /**
181      * Put the packet in the queue for later processing.
182      * @param packet
183      * @throws JBIException
184      */

185     protected void enqueuePacket(MessageExchangeImpl me) throws JBIException {
186         ComponentNameSpace cns = me.getDestinationId();
187         SedaQueue queue = (SedaQueue) queueMap.get(cns);
188         if (queue == null) {
189             queue = new SedaQueue(cns);
190             queueMap.put(cns, queue);
191             queue.init(this, capacity);
192             registerQueue(cns, queue);
193             if (started.get()) {
194                 queue.start();
195             }
196         }
197         try {
198             queue.enqueue(me);
199         }
200         catch (InterruptedException JavaDoc e) {
201             throw new MessagingException(queue + " Failed to enqueue exchange: " + me, e);
202         }
203     }
204     
205     /**
206      * Process state changes in Components
207      *
208      * @param event
209      */

210     public synchronized void onComponentShutdown(ComponentNameSpace cns) {
211         SedaQueue queue = (SedaQueue) queueMap.remove(cns);
212         if (queue != null) {
213             try {
214                 queue.shutDown();
215                 unregisterQueue(queue);
216             }
217             catch (JBIException e) {
218                 log.error("Failed to stop SedaQueue: " + queue + ": " + e);
219                 if (log.isDebugEnabled()) {
220                     log.debug("Failed to stop SedaQueue: " + queue, e);
221                 }
222             }
223         }
224     }
225
226     /**
227      * release a queue
228      *
229      * @param queue
230      */

231     public synchronized void release(SedaQueue queue) {
232         if (queue != null) {
233             queueMap.remove(queue.getComponentNameSpace());
234             unregisterQueue(queue);
235         }
236     }
237
238     /**
239      * @return Returns the capacity.
240      */

241     public int getCapacity() {
242         return capacity;
243     }
244
245     /**
246      * @param capacity The capacity to set.
247      */

248     public void setCapacity(int capacity) {
249         this.capacity = capacity;
250     }
251
252     /**
253      * Get Queue number
254      *
255      * @return number of running Queues
256      */

257     public int getQueueNumber() {
258         return queueMap.size();
259     }
260
261     protected void registerQueue(ComponentNameSpace cns, SedaQueue queue) {
262         try {
263             ObjectName JavaDoc objectName = broker.getContainer().getManagementContext().createObjectName(queue);
264             if (getSubType() != null) {
265                 objectName = new ObjectName JavaDoc(objectName + ",subtype=" + getSubType());
266             }
267             queue.setObjectName(objectName);
268             broker.getContainer().getManagementContext().registerMBean(objectName, queue, LifeCycleMBean.class);
269         }
270         catch (JMException JavaDoc e) {
271             log.error("Failed to register SedaQueue: " + queue + " with the ManagementContext: " + e);
272             if (log.isDebugEnabled()) {
273                 log.debug("Failed to register SedaQueue: " + queue + " with the ManagementContext", e);
274             }
275         }
276     }
277
278     protected void unregisterQueue(SedaQueue queue) {
279         try {
280             broker.getContainer().getManagementContext().unregisterMBean(queue.getObjectName());
281         }
282         catch (JBIException e) {
283             log.error("Failed to unregister SedaQueue: " + queue + " from the ManagementContext");
284             if (log.isDebugEnabled()) {
285                 log.debug("Failed to unregister SedaQueue: " + queue + " with the ManagementContext", e);
286             }
287         }
288     }
289
290     /**
291      * Get an array of MBeanAttributeInfo
292      *
293      * @return array of AttributeInfos
294      * @throws JMException
295      */

296     public MBeanAttributeInfo JavaDoc[] getAttributeInfos() throws JMException JavaDoc {
297         AttributeInfoHelper helper = new AttributeInfoHelper();
298         helper.addAttribute(getObjectToManage(), "capacity", "default capacity of a SedaQueue");
299         helper.addAttribute(getObjectToManage(), "queueNumber", "number of running SedaQueues");
300         return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
301     }
302
303     protected void suspendTx(MessageExchangeImpl me) throws MessagingException {
304         try {
305             Transaction JavaDoc oldTx = me.getTransactionContext();
306             if (oldTx != null) {
307                 TransactionManager JavaDoc tm = (TransactionManager JavaDoc) getBroker().getContainer().getTransactionManager();
308                 if (tm != null) {
309                     if (log.isDebugEnabled()) {
310                         log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this);
311                     }
312                     Transaction JavaDoc tx = tm.suspend();
313                     if (tx != oldTx) {
314                         throw new IllegalStateException JavaDoc("the transaction context set in the messageExchange is not bound to the current thread");
315                     }
316                 }
317             }
318         } catch (Exception JavaDoc e) {
319             throw new MessagingException(e);
320         }
321     }
322
323     protected void resumeTx(MessageExchangeImpl me) throws MessagingException {
324         try {
325             Transaction JavaDoc oldTx = me.getTransactionContext();
326             if (oldTx != null) {
327                 TransactionManager JavaDoc tm = (TransactionManager JavaDoc) getBroker().getContainer().getTransactionManager();
328                 if (tm != null) {
329                     if (log.isDebugEnabled()) {
330                         log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this);
331                     }
332                     tm.resume(oldTx);
333                 }
334             }
335         } catch (Exception JavaDoc e) {
336             throw new MessagingException(e);
337         }
338     }
339
340 }
341
Popular Tags