KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > flow > AbstractFlow


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr.flow;
18
19 import javax.jbi.JBIException;
20 import javax.jbi.management.LifeCycleMBean;
21 import javax.jbi.messaging.MessageExchange;
22 import javax.jbi.messaging.MessagingException;
23 import javax.jbi.messaging.MessageExchange.Role;
24 import javax.jbi.servicedesc.ServiceEndpoint;
25 import javax.management.JMException JavaDoc;
26 import javax.management.MBeanAttributeInfo JavaDoc;
27 import javax.management.ObjectName JavaDoc;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.servicemix.JbiConstants;
32 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
33 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
34 import org.apache.servicemix.jbi.management.AttributeInfoHelper;
35 import org.apache.servicemix.jbi.management.BaseLifeCycle;
36 import org.apache.servicemix.jbi.messaging.ExchangePacket;
37 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
38 import org.apache.servicemix.jbi.nmr.Broker;
39 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
40
41 import edu.emory.mathcs.backport.java.util.concurrent.locks.ReadWriteLock;
42 import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantReadWriteLock;
43
44 /**
45  * A simple Straight through flow
46  *
47  * @version $Revision: 426415 $
48  */

49 public abstract class AbstractFlow extends BaseLifeCycle implements Flow {
50     
51     protected final Log log = LogFactory.getLog(getClass());
52     
53     protected Broker broker;
54     private ReadWriteLock lock = new ReentrantReadWriteLock();
55     private Thread JavaDoc suspendThread = null;
56     private String JavaDoc name;
57
58     /**
59      * Initialize the Region
60      *
61      * @param broker
62      * @throws JBIException
63      */

64     public void init(Broker broker) throws JBIException {
65         this.broker = broker;
66         // register self with the management context
67
ObjectName JavaDoc objectName = broker.getContainer().getManagementContext().createObjectName(this);
68         try {
69             broker.getContainer().getManagementContext().registerMBean(objectName, this, LifeCycleMBean.class);
70         }
71         catch (JMException JavaDoc e) {
72             throw new JBIException("Failed to register MBean with the ManagementContext", e);
73         }
74     }
75     
76     /**
77      * start the flow
78      * @throws JBIException
79      */

80     public void start() throws JBIException{
81         super.start();
82     }
83     
84     
85     /**
86      * stop the flow
87      * @throws JBIException
88      */

89     public void stop() throws JBIException{
90         if (log.isDebugEnabled())
91             log.debug("Called Flow stop");
92         if (suspendThread != null){
93             suspendThread.interrupt();
94         }
95         super.stop();
96     }
97     
98     /**
99      * shutDown the flow
100      * @throws JBIException
101      */

102     public void shutDown() throws JBIException{
103         if (log.isDebugEnabled()) {
104             log.debug("Called Flow shutdown");
105         }
106         super.shutDown();
107     }
108     
109     /**
110      * Distribute an ExchangePacket
111      * @param packet
112      * @throws JBIException
113      */

114     public void send(MessageExchange me) throws JBIException{
115         if (log.isDebugEnabled()) {
116             log.debug("Called Flow send");
117         }
118         // do send
119
try {
120             lock.readLock().lock();
121             doSend((MessageExchangeImpl) me);
122         } finally{
123             lock.readLock().unlock();
124         }
125     }
126     
127     /**
128      * suspend the flow to prevent any message exchanges
129      */

130     public synchronized void suspend(){
131         if (log.isDebugEnabled()) {
132             log.debug("Called Flow suspend");
133         }
134         lock.writeLock().lock();
135         suspendThread = Thread.currentThread();
136     }
137     
138     
139     /**
140      * resume message exchange processing
141      */

142     public synchronized void resume(){
143         if (log.isDebugEnabled()) {
144             log.debug("Called Flow resume");
145         }
146         lock.writeLock().unlock();
147         suspendThread = null;
148     }
149     
150     /**
151      * Do the Flow specific routing
152      * @param packet
153      * @throws JBIException
154      */

155     protected abstract void doSend(MessageExchangeImpl me) throws JBIException;
156
157     /**
158      * Distribute an ExchangePacket
159      *
160      * @param packet
161      * @throws MessagingException
162      */

163     protected void doRouting(MessageExchangeImpl me) throws MessagingException {
164         ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
165         //As the MessageExchange could come from another container - ensure we get the local Component
166
ComponentMBeanImpl lcc = broker.getContainer().getRegistry().getComponent(id.getName());
167         if (lcc != null) {
168             if (lcc.getDeliveryChannel() != null) {
169                 lcc.getDeliveryChannel().processInBound(me);
170             } else {
171                 throw new MessagingException("Component " + id.getName() + " is shut down");
172             }
173         }
174         else {
175             throw new MessagingException("No component named " + id.getName() + " - Couldn't route MessageExchange " + me);
176         }
177     }
178     
179     /**
180      * Get an array of MBeanAttributeInfo
181      *
182      * @return array of AttributeInfos
183      * @throws JMException
184      */

185     public MBeanAttributeInfo JavaDoc[] getAttributeInfos() throws JMException JavaDoc {
186         AttributeInfoHelper helper = new AttributeInfoHelper();
187         helper.addAttribute(getObjectToManage(), "description", "The type of flow");
188         return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
189     }
190
191     /**
192      * Check if the given packet should be persisted or not.
193      * @param packet
194      * @return
195      */

196     protected boolean isPersistent(MessageExchange me) {
197         ExchangePacket packet = ((MessageExchangeImpl) me).getPacket();
198         if (packet.getPersistent() != null) {
199             return packet.getPersistent().booleanValue();
200         } else {
201             return broker.getContainer().isPersistent();
202         }
203     }
204
205     protected boolean isTransacted(MessageExchange me) {
206         return me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) != null;
207     }
208     
209     protected boolean isSynchronous(MessageExchange me) {
210         Boolean JavaDoc sync = (Boolean JavaDoc) me.getProperty(JbiConstants.SEND_SYNC);
211         return sync != null && sync.booleanValue();
212     }
213
214     protected boolean isClustered(MessageExchange me) {
215         MessageExchangeImpl mei = (MessageExchangeImpl) me;
216         if (mei.getDestinationId() == null) {
217             ServiceEndpoint se = me.getEndpoint();
218             if (se instanceof InternalEndpoint) {
219                 return ((InternalEndpoint) se).isClustered();
220             // Unknown: assume this is not clustered
221
} else {
222                 return false;
223             }
224         } else {
225             String JavaDoc destination = mei.getDestinationId().getContainerName();
226             String JavaDoc source = mei.getSourceId().getContainerName();
227             return !source.equals(destination);
228         }
229     }
230     
231     public Broker getBroker() {
232         return broker;
233     }
234
235     /**
236      * Get the type of the item
237      * @return the type
238      */

239     public String JavaDoc getType() {
240         return "Flow";
241     }
242     
243     /**
244      * Get the name of the item
245      * @return the name
246      */

247     public String JavaDoc getName() {
248         if (this.name == null) {
249             String JavaDoc name = super.getName();
250             if (name.endsWith("Flow")) {
251                 name = name.substring(0, name.length() - 4);
252             }
253             return name;
254         } else {
255             return this.name;
256         }
257     }
258     
259     public void setName(String JavaDoc name) {
260         this.name = name;
261     }
262     
263 }
Popular Tags