1 17 package org.apache.servicemix.jbi.nmr.flow; 18 19 import javax.jbi.JBIException; 20 import javax.jbi.management.LifeCycleMBean; 21 import javax.jbi.messaging.MessageExchange; 22 import javax.jbi.messaging.MessagingException; 23 import javax.jbi.messaging.MessageExchange.Role; 24 import javax.jbi.servicedesc.ServiceEndpoint; 25 import javax.management.JMException ; 26 import javax.management.MBeanAttributeInfo ; 27 import javax.management.ObjectName ; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.servicemix.JbiConstants; 32 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl; 33 import org.apache.servicemix.jbi.framework.ComponentNameSpace; 34 import org.apache.servicemix.jbi.management.AttributeInfoHelper; 35 import org.apache.servicemix.jbi.management.BaseLifeCycle; 36 import org.apache.servicemix.jbi.messaging.ExchangePacket; 37 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 38 import org.apache.servicemix.jbi.nmr.Broker; 39 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint; 40 41 import edu.emory.mathcs.backport.java.util.concurrent.locks.ReadWriteLock; 42 import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantReadWriteLock; 43 44 49 public abstract class AbstractFlow extends BaseLifeCycle implements Flow { 50 51 protected final Log log = LogFactory.getLog(getClass()); 52 53 protected Broker broker; 54 private ReadWriteLock lock = new ReentrantReadWriteLock(); 55 private Thread suspendThread = null; 56 private String name; 57 58 64 public void init(Broker broker) throws JBIException { 65 this.broker = broker; 66 ObjectName objectName = broker.getContainer().getManagementContext().createObjectName(this); 68 try { 69 broker.getContainer().getManagementContext().registerMBean(objectName, this, LifeCycleMBean.class); 70 } 71 catch (JMException e) { 72 throw new JBIException("Failed to register MBean with the ManagementContext", e); 73 } 74 } 75 76 80 public void start() throws JBIException{ 81 super.start(); 82 } 83 84 85 89 public void stop() throws JBIException{ 90 if (log.isDebugEnabled()) 91 log.debug("Called Flow stop"); 92 if (suspendThread != null){ 93 suspendThread.interrupt(); 94 } 95 super.stop(); 96 } 97 98 102 public void shutDown() throws JBIException{ 103 if (log.isDebugEnabled()) { 104 log.debug("Called Flow shutdown"); 105 } 106 super.shutDown(); 107 } 108 109 114 public void send(MessageExchange me) throws JBIException{ 115 if (log.isDebugEnabled()) { 116 log.debug("Called Flow send"); 117 } 118 try { 120 lock.readLock().lock(); 121 doSend((MessageExchangeImpl) me); 122 } finally{ 123 lock.readLock().unlock(); 124 } 125 } 126 127 130 public synchronized void suspend(){ 131 if (log.isDebugEnabled()) { 132 log.debug("Called Flow suspend"); 133 } 134 lock.writeLock().lock(); 135 suspendThread = Thread.currentThread(); 136 } 137 138 139 142 public synchronized void resume(){ 143 if (log.isDebugEnabled()) { 144 log.debug("Called Flow resume"); 145 } 146 lock.writeLock().unlock(); 147 suspendThread = null; 148 } 149 150 155 protected abstract void doSend(MessageExchangeImpl me) throws JBIException; 156 157 163 protected void doRouting(MessageExchangeImpl me) throws MessagingException { 164 ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId(); 165 ComponentMBeanImpl lcc = broker.getContainer().getRegistry().getComponent(id.getName()); 167 if (lcc != null) { 168 if (lcc.getDeliveryChannel() != null) { 169 lcc.getDeliveryChannel().processInBound(me); 170 } else { 171 throw new MessagingException("Component " + id.getName() + " is shut down"); 172 } 173 } 174 else { 175 throw new MessagingException("No component named " + id.getName() + " - Couldn't route MessageExchange " + me); 176 } 177 } 178 179 185 public MBeanAttributeInfo [] getAttributeInfos() throws JMException { 186 AttributeInfoHelper helper = new AttributeInfoHelper(); 187 helper.addAttribute(getObjectToManage(), "description", "The type of flow"); 188 return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos()); 189 } 190 191 196 protected boolean isPersistent(MessageExchange me) { 197 ExchangePacket packet = ((MessageExchangeImpl) me).getPacket(); 198 if (packet.getPersistent() != null) { 199 return packet.getPersistent().booleanValue(); 200 } else { 201 return broker.getContainer().isPersistent(); 202 } 203 } 204 205 protected boolean isTransacted(MessageExchange me) { 206 return me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) != null; 207 } 208 209 protected boolean isSynchronous(MessageExchange me) { 210 Boolean sync = (Boolean ) me.getProperty(JbiConstants.SEND_SYNC); 211 return sync != null && sync.booleanValue(); 212 } 213 214 protected boolean isClustered(MessageExchange me) { 215 MessageExchangeImpl mei = (MessageExchangeImpl) me; 216 if (mei.getDestinationId() == null) { 217 ServiceEndpoint se = me.getEndpoint(); 218 if (se instanceof InternalEndpoint) { 219 return ((InternalEndpoint) se).isClustered(); 220 } else { 222 return false; 223 } 224 } else { 225 String destination = mei.getDestinationId().getContainerName(); 226 String source = mei.getSourceId().getContainerName(); 227 return !source.equals(destination); 228 } 229 } 230 231 public Broker getBroker() { 232 return broker; 233 } 234 235 239 public String getType() { 240 return "Flow"; 241 } 242 243 247 public String getName() { 248 if (this.name == null) { 249 String name = super.getName(); 250 if (name.endsWith("Flow")) { 251 name = name.substring(0, name.length() - 4); 252 } 253 return name; 254 } else { 255 return this.name; 256 } 257 } 258 259 public void setName(String name) { 260 this.name = name; 261 } 262 263 } | Popular Tags |