1 17 package org.apache.servicemix.wsn.jbi; 18 19 import javax.jbi.JBIException; 20 import javax.jbi.component.ComponentContext; 21 import javax.jbi.messaging.DeliveryChannel; 22 import javax.jbi.messaging.InOnly; 23 import javax.jbi.messaging.MessageExchange; 24 import javax.jbi.messaging.MessageExchangeFactory; 25 import javax.jbi.messaging.NormalizedMessage; 26 import javax.jbi.servicedesc.ServiceEndpoint; 27 import javax.xml.bind.JAXBContext; 28 import javax.xml.namespace.QName ; 29 import javax.xml.parsers.DocumentBuilder ; 30 import javax.xml.parsers.DocumentBuilderFactory ; 31 import javax.xml.transform.dom.DOMSource ; 32 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.servicemix.common.ExchangeProcessor; 36 import org.apache.servicemix.wsn.component.WSNLifeCycle; 37 import org.apache.servicemix.wsn.jaxws.InvalidFilterFault; 38 import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault; 39 import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault; 40 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault; 41 import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault; 42 import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault; 43 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault; 44 import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault; 45 import org.apache.servicemix.wsn.jms.JmsSubscription; 46 import org.oasis_open.docs.wsn.b_2.Subscribe; 47 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType; 48 import org.w3c.dom.Document ; 49 import org.w3c.dom.DocumentFragment ; 50 import org.w3c.dom.Element ; 51 import org.w3c.dom.NodeList ; 52 53 public class JbiSubscription extends JmsSubscription { 54 55 private static Log log = LogFactory.getLog(JbiSubscription.class); 56 57 private WSNLifeCycle lifeCycle; 58 private ServiceEndpoint endpoint; 59 private ExchangeProcessor processor; 60 61 public JbiSubscription(String name) { 62 super(name); 63 processor = new NoOpProcessor(); 64 } 65 66 @Override 67 protected void start() throws SubscribeCreationFailedFault { 68 super.start(); 69 } 70 71 @Override 72 protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault { 73 super.validateSubscription(subscribeRequest); 74 try { 75 endpoint = resolveConsumer(subscribeRequest); 76 } catch (Exception e) { 77 SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType(); 78 throw new SubscribeCreationFailedFault("Unable to resolve consumer reference endpoint", fault, e); 79 } 80 if (endpoint == null) { 81 SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType(); 82 throw new SubscribeCreationFailedFault("Unable to resolve consumer reference endpoint", fault); 83 } 84 } 85 86 protected ServiceEndpoint resolveConsumer(Subscribe subscribeRequest) throws Exception { 87 JAXBContext ctx = JAXBContext.newInstance(Subscribe.class); 89 DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); 90 dbf.setNamespaceAware(true); 91 DocumentBuilder db = dbf.newDocumentBuilder(); 92 Document doc = db.newDocument(); 93 ctx.createMarshaller().marshal(subscribeRequest, doc); 94 NodeList nl = doc.getDocumentElement().getElementsByTagNameNS("http://docs.oasis-open.org/wsn/b-2", "ConsumerReference"); 95 if (nl.getLength() != 1) { 96 throw new Exception ("Subscribe request must have exactly one ConsumerReference node"); 97 } 98 Element el = (Element ) nl.item(0); 99 DocumentFragment epr = doc.createDocumentFragment(); 100 epr.appendChild(el); 101 ServiceEndpoint endpoint = getContext().resolveEndpointReference(epr); 102 if (endpoint == null) { 103 String [] parts = split(subscribeRequest.getConsumerReference().getAddress().getValue().trim()); 104 endpoint = getContext().getEndpoint(new QName (parts[0], parts[1]), parts[2]); 105 } 106 return endpoint; 107 } 108 109 protected String [] split(String uri) { 110 char sep; 111 if (uri.indexOf('/') > 0) { 112 sep = '/'; 113 } else { 114 sep = ':'; 115 } 116 int idx1 = uri.lastIndexOf(sep); 117 int idx2 = uri.lastIndexOf(sep, idx1 - 1); 118 String epName = uri.substring(idx1 + 1); 119 String svcName = uri.substring(idx2 + 1, idx1); 120 String nsUri = uri.substring(0, idx2); 121 return new String [] { nsUri, svcName, epName }; 122 } 123 124 @Override 125 protected void doNotify(final Element content) { 126 try { 127 DeliveryChannel channel = getContext().getDeliveryChannel(); 128 MessageExchangeFactory factory = channel.createExchangeFactory(endpoint); 129 InOnly inonly = factory.createInOnlyExchange(); 130 NormalizedMessage msg = inonly.createMessage(); 131 inonly.setInMessage(msg); 132 msg.setContent(new DOMSource (content)); 133 getLifeCycle().sendConsumerExchange(inonly, processor); 134 } catch (JBIException e) { 135 log.warn("Could not deliver notification", e); 136 } 137 } 138 139 public ComponentContext getContext() { 140 return lifeCycle.getContext(); 141 } 142 143 public WSNLifeCycle getLifeCycle() { 144 return lifeCycle; 145 } 146 147 public void setLifeCycle(WSNLifeCycle lifeCycle) { 148 this.lifeCycle = lifeCycle; 149 } 150 151 protected class NoOpProcessor implements ExchangeProcessor { 152 153 public void process(MessageExchange exchange) throws Exception { 154 } 155 156 public void start() throws Exception { 157 } 158 159 public void stop() throws Exception { 160 } 161 } 162 163 } 164 | Popular Tags |