1 17 package org.apache.servicemix.jbi.nmr; 18 19 import org.apache.commons.logging.Log; 20 import org.apache.commons.logging.LogFactory; 21 import org.apache.servicemix.JbiConstants; 22 import org.apache.servicemix.MessageExchangeListener; 23 import org.apache.servicemix.components.util.ComponentSupport; 24 import org.apache.servicemix.jbi.framework.Registry; 25 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl; 26 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint; 27 28 import javax.jbi.JBIException; 29 import javax.jbi.messaging.DeliveryChannel; 30 import javax.jbi.messaging.InOnly; 31 import javax.jbi.messaging.MessageExchange; 32 import javax.jbi.messaging.MessagingException; 33 import javax.jbi.messaging.NormalizedMessage; 34 35 import java.util.Iterator ; 36 import java.util.List ; 37 import java.util.Set ; 38 39 45 public class SubscriptionManager extends ComponentSupport implements MessageExchangeListener { 46 47 private Registry registry; 48 private String flowName; 49 private static Log log = LogFactory.getLog(SubscriptionManager.class); 50 51 private static final String FROM_SUBSCRIPTION_MANAGER = "org.apache.servicemix.jbi.nmr.from_subman"; 53 54 59 public void init(Broker broker, Registry registry) throws JBIException { 60 this.registry = registry; 61 broker.getContainer().activateComponent(this, "#SubscriptionManager#"); 62 } 63 64 71 protected boolean dispatchToSubscribers(MessageExchangeImpl exchange) throws JBIException { 72 Boolean source = (Boolean ) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER); 73 if (source == null || !source.booleanValue()) { 74 List list = registry.getMatchingSubscriptionEndpoints(exchange); 75 if (list != null) { 76 for (int i = 0; i < list.size(); i++) { 77 InternalEndpoint endpoint = (InternalEndpoint)list.get(i); 78 dispatchToSubscriber(exchange, endpoint); 79 } 80 } 81 return list != null && !list.isEmpty(); 82 } else { 83 return false; 84 } 85 } 86 87 93 protected void dispatchToSubscriber(MessageExchangeImpl exchange, InternalEndpoint endpoint) throws JBIException { 94 if (log.isDebugEnabled() && endpoint != null) { 95 log.debug("Subscription Endpoint: "+endpoint.getEndpointName()); 96 } 97 Boolean source = (Boolean ) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER); 99 if (source == null || !source.booleanValue()) { 100 DeliveryChannel channel = getDeliveryChannel(); 101 InOnly me = channel.createExchangeFactory().createInOnlyExchange(); 102 me.setProperty(FROM_SUBSCRIPTION_MANAGER,Boolean.TRUE); 104 NormalizedMessage in = me.createMessage(); 105 getMessageTransformer().transform(me, exchange.getInMessage(), in); 106 me.setInMessage(in); 107 me.setEndpoint(endpoint); 108 Set names = exchange.getPropertyNames(); 109 for (Iterator iter = names.iterator(); iter.hasNext();) { 110 String name = (String ) iter.next(); 111 me.setProperty(name, exchange.getProperty(name)); 112 } 113 if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) { 114 channel.sendSync(me); 115 } else { 116 channel.send(me); 117 } 118 } 119 } 120 121 public String getFlowName() { 122 return flowName; 123 } 124 125 public void setFlowName(String flowName) { 126 this.flowName = flowName; 127 } 128 129 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 130 } 133 134 } 135 | Popular Tags |