1 17 package org.apache.servicemix.jbi.nmr.flow.seda; 18 19 import java.util.Iterator ; 20 import java.util.Map ; 21 22 import javax.jbi.JBIException; 23 import javax.jbi.management.LifeCycleMBean; 24 import javax.jbi.messaging.ExchangeStatus; 25 import javax.jbi.messaging.MessageExchange; 26 import javax.jbi.messaging.MessagingException; 27 import javax.management.JMException ; 28 import javax.management.MBeanAttributeInfo ; 29 import javax.management.ObjectName ; 30 import javax.transaction.Transaction ; 31 import javax.transaction.TransactionManager ; 32 33 import org.apache.servicemix.jbi.event.ComponentAdapter; 34 import org.apache.servicemix.jbi.event.ComponentEvent; 35 import org.apache.servicemix.jbi.event.ComponentListener; 36 import org.apache.servicemix.jbi.framework.ComponentNameSpace; 37 import org.apache.servicemix.jbi.management.AttributeInfoHelper; 38 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 39 import org.apache.servicemix.jbi.nmr.Broker; 40 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow; 41 import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint; 42 43 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 44 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; 45 46 55 public class SedaFlow extends AbstractFlow { 56 57 protected Map queueMap = new ConcurrentHashMap(); 58 protected int capacity = 100; 59 protected AtomicBoolean started = new AtomicBoolean(false); 60 protected ComponentListener listener; 61 62 67 public String getDescription() { 68 return "seda"; 69 } 70 71 77 public void init(Broker broker) throws JBIException { 78 super.init(broker); 79 listener = new ComponentAdapter() { 80 public void componentShutDown(ComponentEvent event) { 81 onComponentShutdown(event.getComponent().getComponentNameSpace()); 82 } 83 }; 84 broker.getContainer().addListener(listener); 85 } 86 87 92 public boolean canHandle(MessageExchange me) { 93 if (isPersistent(me)) { 94 return false; 95 } 96 if (isClustered(me)) { 97 return false; 98 } 99 if (isTransacted(me)) { 100 if (!isSynchronous(me)) { 101 if (me.getStatus() == ExchangeStatus.ACTIVE) { 103 return false; 104 } 105 } 106 } 107 return true; 108 } 109 110 115 public void start() throws JBIException { 116 if (started.compareAndSet(false, true)) { 117 for (Iterator i = queueMap.values().iterator();i.hasNext();) { 118 SedaQueue queue = (SedaQueue) i.next(); 119 queue.start(); 120 } 121 } 122 super.start(); 123 } 124 125 130 public void stop() throws JBIException { 131 if (started.compareAndSet(true, false)) { 132 for (Iterator i = queueMap.values().iterator();i.hasNext();) { 133 SedaQueue queue = (SedaQueue) i.next(); 134 queue.stop(); 135 } 136 } 137 super.stop(); 138 } 139 140 145 public void shutDown() throws JBIException { 146 broker.getContainer().removeListener(listener); 147 for (Iterator i = queueMap.values().iterator(); i.hasNext();) { 148 SedaQueue queue = (SedaQueue) i.next(); 149 queue.shutDown(); 150 unregisterQueue(queue); 151 } 152 super.shutDown(); 153 } 154 155 161 protected void doSend(MessageExchangeImpl me) throws JBIException { 162 if (me.getDestinationId() == null) { 163 me.setDestinationId(((AbstractServiceEndpoint) me.getEndpoint()).getComponentNameSpace()); 164 } 165 if (isTransacted(me)) { 166 me.setTxState(MessageExchangeImpl.TX_STATE_CONVEYED); 167 } 168 suspendTx(me); 172 enqueuePacket(me); 173 } 174 175 protected void doRouting(MessageExchangeImpl me) throws MessagingException { 176 resumeTx(me); 177 super.doRouting(me); 178 } 179 180 185 protected void enqueuePacket(MessageExchangeImpl me) throws JBIException { 186 ComponentNameSpace cns = me.getDestinationId(); 187 SedaQueue queue = (SedaQueue) queueMap.get(cns); 188 if (queue == null) { 189 queue = new SedaQueue(cns); 190 queueMap.put(cns, queue); 191 queue.init(this, capacity); 192 registerQueue(cns, queue); 193 if (started.get()) { 194 queue.start(); 195 } 196 } 197 try { 198 queue.enqueue(me); 199 } 200 catch (InterruptedException e) { 201 throw new MessagingException(queue + " Failed to enqueue exchange: " + me, e); 202 } 203 } 204 205 210 public synchronized void onComponentShutdown(ComponentNameSpace cns) { 211 SedaQueue queue = (SedaQueue) queueMap.remove(cns); 212 if (queue != null) { 213 try { 214 queue.shutDown(); 215 unregisterQueue(queue); 216 } 217 catch (JBIException e) { 218 log.error("Failed to stop SedaQueue: " + queue + ": " + e); 219 if (log.isDebugEnabled()) { 220 log.debug("Failed to stop SedaQueue: " + queue, e); 221 } 222 } 223 } 224 } 225 226 231 public synchronized void release(SedaQueue queue) { 232 if (queue != null) { 233 queueMap.remove(queue.getComponentNameSpace()); 234 unregisterQueue(queue); 235 } 236 } 237 238 241 public int getCapacity() { 242 return capacity; 243 } 244 245 248 public void setCapacity(int capacity) { 249 this.capacity = capacity; 250 } 251 252 257 public int getQueueNumber() { 258 return queueMap.size(); 259 } 260 261 protected void registerQueue(ComponentNameSpace cns, SedaQueue queue) { 262 try { 263 ObjectName objectName = broker.getContainer().getManagementContext().createObjectName(queue); 264 if (getSubType() != null) { 265 objectName = new ObjectName (objectName + ",subtype=" + getSubType()); 266 } 267 queue.setObjectName(objectName); 268 broker.getContainer().getManagementContext().registerMBean(objectName, queue, LifeCycleMBean.class); 269 } 270 catch (JMException e) { 271 log.error("Failed to register SedaQueue: " + queue + " with the ManagementContext: " + e); 272 if (log.isDebugEnabled()) { 273 log.debug("Failed to register SedaQueue: " + queue + " with the ManagementContext", e); 274 } 275 } 276 } 277 278 protected void unregisterQueue(SedaQueue queue) { 279 try { 280 broker.getContainer().getManagementContext().unregisterMBean(queue.getObjectName()); 281 } 282 catch (JBIException e) { 283 log.error("Failed to unregister SedaQueue: " + queue + " from the ManagementContext"); 284 if (log.isDebugEnabled()) { 285 log.debug("Failed to unregister SedaQueue: " + queue + " with the ManagementContext", e); 286 } 287 } 288 } 289 290 296 public MBeanAttributeInfo [] getAttributeInfos() throws JMException { 297 AttributeInfoHelper helper = new AttributeInfoHelper(); 298 helper.addAttribute(getObjectToManage(), "capacity", "default capacity of a SedaQueue"); 299 helper.addAttribute(getObjectToManage(), "queueNumber", "number of running SedaQueues"); 300 return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos()); 301 } 302 303 protected void suspendTx(MessageExchangeImpl me) throws MessagingException { 304 try { 305 Transaction oldTx = me.getTransactionContext(); 306 if (oldTx != null) { 307 TransactionManager tm = (TransactionManager ) getBroker().getContainer().getTransactionManager(); 308 if (tm != null) { 309 if (log.isDebugEnabled()) { 310 log.debug("Suspending transaction for " + me.getExchangeId() + " in " + this); 311 } 312 Transaction tx = tm.suspend(); 313 if (tx != oldTx) { 314 throw new IllegalStateException ("the transaction context set in the messageExchange is not bound to the current thread"); 315 } 316 } 317 } 318 } catch (Exception e) { 319 throw new MessagingException(e); 320 } 321 } 322 323 protected void resumeTx(MessageExchangeImpl me) throws MessagingException { 324 try { 325 Transaction oldTx = me.getTransactionContext(); 326 if (oldTx != null) { 327 TransactionManager tm = (TransactionManager ) getBroker().getContainer().getTransactionManager(); 328 if (tm != null) { 329 if (log.isDebugEnabled()) { 330 log.debug("Resuming transaction for " + me.getExchangeId() + " in " + this); 331 } 332 tm.resume(oldTx); 333 } 334 } 335 } catch (Exception e) { 336 throw new MessagingException(e); 337 } 338 } 339 340 } 341 | Popular Tags |