1 17 package org.apache.servicemix.wsn.spring; 18 19 import java.io.StringWriter ; 20 21 import javax.jbi.JBIException; 22 import javax.jbi.messaging.ExchangeStatus; 23 import javax.jbi.messaging.MessageExchange; 24 import javax.jbi.messaging.MessagingException; 25 import javax.jbi.messaging.NormalizedMessage; 26 import javax.xml.bind.JAXBContext; 27 import javax.xml.transform.Source ; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.servicemix.MessageExchangeListener; 32 import org.apache.servicemix.components.util.ComponentSupport; 33 import org.apache.servicemix.jbi.jaxp.SourceTransformer; 34 import org.apache.servicemix.jbi.jaxp.StringSource; 35 import org.apache.servicemix.wsn.client.AbstractWSAClient; 36 import org.apache.servicemix.wsn.client.NotificationBroker; 37 import org.oasis_open.docs.wsn.b_2.Subscribe; 38 import org.oasis_open.docs.wsn.b_2.SubscribeResponse; 39 import org.oasis_open.docs.wsn.b_2.Unsubscribe; 40 import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse; 41 import org.w3c.dom.Element ; 42 43 49 public class PublisherComponent extends ComponentSupport implements MessageExchangeListener { 50 51 private static final Log log = LogFactory.getLog(PublisherComponent.class); 52 53 private NotificationBroker wsnBroker; 54 private String topic; 55 private boolean demand; 56 private String subscriptionEndpoint = "subscription"; 57 private Subscribe subscription; 58 59 62 public boolean getDemand() { 63 return demand; 64 } 65 66 69 public void setDemand(boolean demand) { 70 this.demand = demand; 71 } 72 73 76 public String getTopic() { 77 return topic; 78 } 79 80 83 public void setTopic(String topic) { 84 this.topic = topic; 85 } 86 87 90 public Subscribe getSubscription() { 91 return subscription; 92 } 93 94 97 public void init() throws JBIException { 98 super.init(); 99 getContext().activateEndpoint(getService(), subscriptionEndpoint); 100 } 101 102 105 public void start() throws JBIException { 106 new Thread () { 107 public void run() { 108 try { 109 wsnBroker = new NotificationBroker(getContext()); 110 String wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/" + subscriptionEndpoint; 111 wsnBroker.registerPublisher(AbstractWSAClient.createWSA(wsaAddress), 112 topic, 113 demand); 114 } catch (Exception e) { 115 log.error("Could not create wsn client", e); 116 } 117 } 118 }.start(); 119 } 120 121 124 public void shutDown() throws JBIException { 125 super.shutDown(); 126 } 127 128 131 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 132 if (exchange.getStatus() != ExchangeStatus.ACTIVE) { 133 return; 134 } 135 if (exchange.getEndpoint().getEndpointName().equals(subscriptionEndpoint)) { 137 try { 138 JAXBContext jaxbContext = JAXBContext.newInstance(Subscribe.class); 139 Source src = exchange.getMessage("in").getContent(); 140 Object input = jaxbContext.createUnmarshaller().unmarshal(src); 141 if (input instanceof Subscribe) { 142 subscription = (Subscribe) input; 143 SubscribeResponse response = new SubscribeResponse(); 144 String wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/" + subscriptionEndpoint; 145 response.setSubscriptionReference(AbstractWSAClient.createWSA(wsaAddress)); 146 StringWriter writer = new StringWriter (); 147 jaxbContext.createMarshaller().marshal(response, writer); 148 NormalizedMessage out = exchange.createMessage(); 149 out.setContent(new StringSource(writer.toString())); 150 exchange.setMessage(out, "out"); 151 send(exchange); 152 } else if (input instanceof Unsubscribe) { 153 subscription = null; 154 UnsubscribeResponse response = new UnsubscribeResponse(); 155 StringWriter writer = new StringWriter (); 156 jaxbContext.createMarshaller().marshal(response, writer); 157 NormalizedMessage out = exchange.createMessage(); 158 out.setContent(new StringSource(writer.toString())); 159 exchange.setMessage(out, "out"); 160 send(exchange); 161 } else { 162 throw new Exception ("Unkown request"); 163 } 164 } catch (Exception e) { 165 fail(exchange, e); 166 } 167 } else { 169 try { 170 if (!demand || subscription != null) { 171 Element elem = new SourceTransformer().toDOMElement(exchange.getMessage("in")); 172 wsnBroker.notify(topic, elem); 173 done(exchange); 174 } else { 175 log.info("Ingore notification as the publisher is no subscribers"); 176 } 177 } catch (Exception e) { 178 fail(exchange, e); 179 } 180 } 181 } 182 183 } 184 | Popular Tags |