1 17 package org.apache.servicemix.wsn.client; 18 19 import java.util.List ; 20 21 import javax.jbi.JBIException; 22 import javax.jbi.component.ComponentContext; 23 import javax.xml.bind.JAXBContext; 24 import javax.xml.bind.JAXBElement; 25 import javax.xml.bind.JAXBException; 26 import javax.xml.namespace.QName ; 27 28 import org.apache.servicemix.client.ServiceMixClient; 29 import org.apache.servicemix.client.ServiceMixClientFacade; 30 import org.apache.servicemix.jbi.container.JBIContainer; 31 import org.apache.servicemix.jbi.resolver.ServiceNameEndpointResolver; 32 import org.apache.servicemix.wsn.AbstractSubscription; 33 import org.oasis_open.docs.wsn.b_2.FilterType; 34 import org.oasis_open.docs.wsn.b_2.GetCurrentMessage; 35 import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse; 36 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType; 37 import org.oasis_open.docs.wsn.b_2.Notify; 38 import org.oasis_open.docs.wsn.b_2.QueryExpressionType; 39 import org.oasis_open.docs.wsn.b_2.Subscribe; 40 import org.oasis_open.docs.wsn.b_2.SubscribeResponse; 41 import org.oasis_open.docs.wsn.b_2.TopicExpressionType; 42 import org.oasis_open.docs.wsn.b_2.UseRaw; 43 import org.oasis_open.docs.wsn.br_2.RegisterPublisher; 44 import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse; 45 import org.w3._2005._08.addressing.EndpointReferenceType; 46 47 public class NotificationBroker extends AbstractWSAClient { 48 49 public static String WSN_URI = "http://servicemix.org/wsnotification"; 50 public static String WSN_SERVICE = "NotificationBroker"; 51 52 public static QName NOTIFICATION_BROKER = new QName (WSN_URI, WSN_SERVICE); 53 54 public NotificationBroker(ComponentContext context) throws JAXBException { 55 ServiceMixClientFacade client = new ServiceMixClientFacade(context); 56 client.setMarshaler(new JAXBMarshaler(JAXBContext.newInstance(Subscribe.class, RegisterPublisher.class))); 57 setClient(client); 58 setResolver(new ServiceNameEndpointResolver(NOTIFICATION_BROKER)); 59 } 60 61 public NotificationBroker(ComponentContext context, String brokerName) throws JAXBException { 62 setClient(createJaxbClient(context)); 63 setEndpoint(createWSA(WSN_URI + "/" + WSN_SERVICE + "/" + brokerName)); 64 setResolver(resolveWSA(getEndpoint())); 65 } 66 67 public NotificationBroker(JBIContainer container) throws JBIException, JAXBException { 68 setClient(createJaxbClient(container)); 69 setResolver(new ServiceNameEndpointResolver(NOTIFICATION_BROKER)); 70 } 71 72 public NotificationBroker(JBIContainer container, String brokerName) throws JBIException, JAXBException { 73 setClient(createJaxbClient(container)); 74 setEndpoint(createWSA(WSN_URI + "/" + WSN_SERVICE + "/" + brokerName)); 75 setResolver(resolveWSA(getEndpoint())); 76 } 77 78 public NotificationBroker(ServiceMixClient client) { 79 setClient(client); 80 setResolver(new ServiceNameEndpointResolver(NOTIFICATION_BROKER)); 81 } 82 83 public NotificationBroker(ServiceMixClient client, String brokerName) { 84 setClient(client); 85 setEndpoint(createWSA(WSN_URI + "/" + WSN_SERVICE + "/" + brokerName)); 86 setResolver(resolveWSA(getEndpoint())); 87 } 88 89 public void notify(String topic, Object msg) throws JBIException { 90 Notify notify = new Notify(); 91 NotificationMessageHolderType holder = new NotificationMessageHolderType(); 92 if (topic != null) { 93 TopicExpressionType topicExp = new TopicExpressionType(); 94 topicExp.getContent().add(topic); 95 holder.setTopic(topicExp); 96 } 97 holder.setMessage(new NotificationMessageHolderType.Message()); 98 holder.getMessage().setAny(msg); 99 notify.getNotificationMessage().add(holder); 100 send(notify); 101 } 102 103 public Subscription subscribe(EndpointReferenceType consumer, 104 String topic) throws JBIException { 105 return subscribe(consumer, topic, null, false); 106 } 107 108 public Subscription subscribe(EndpointReferenceType consumer, 109 String topic, 110 String xpath) throws JBIException { 111 return subscribe(consumer, topic, xpath, false); 112 } 113 114 public Subscription subscribe(EndpointReferenceType consumer, 115 String topic, 116 String xpath, 117 boolean raw) throws JBIException { 118 119 Subscribe subscribeRequest = new Subscribe(); 120 subscribeRequest.setConsumerReference(consumer); 121 subscribeRequest.setFilter(new FilterType()); 122 if (topic != null) { 123 TopicExpressionType topicExp = new TopicExpressionType(); 124 topicExp.getContent().add(topic); 125 subscribeRequest.getFilter().getAny().add(new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION, TopicExpressionType.class, topicExp)); 126 } 127 if (xpath != null) { 128 QueryExpressionType xpathExp = new QueryExpressionType(); 129 xpathExp.setDialect(AbstractSubscription.XPATH1_URI); 130 xpathExp.getContent().add(xpath); 131 subscribeRequest.getFilter().getAny().add(new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT, QueryExpressionType.class, xpathExp)); 132 } 133 if (raw) { 134 subscribeRequest.setSubscriptionPolicy(new Subscribe.SubscriptionPolicy()); 135 subscribeRequest.getSubscriptionPolicy().getAny().add(new UseRaw()); 136 } 137 SubscribeResponse response = (SubscribeResponse) request(subscribeRequest); 138 return new Subscription(response.getSubscriptionReference(), getClient()); 139 } 140 141 public List <Object > getCurrentMessage(String topic) throws JBIException { 142 GetCurrentMessage getCurrentMessageRequest = new GetCurrentMessage(); 143 if (topic != null) { 144 TopicExpressionType topicExp = new TopicExpressionType(); 145 topicExp.getContent().add(topic); 146 getCurrentMessageRequest.setTopic(topicExp); 147 } 148 GetCurrentMessageResponse response = (GetCurrentMessageResponse) request(getCurrentMessageRequest); 149 return response.getAny(); 150 } 151 152 public Publisher registerPublisher(EndpointReferenceType publisherReference, 153 String topic, 154 boolean demand) throws JBIException { 155 156 RegisterPublisher registerPublisherRequest = new RegisterPublisher(); 157 registerPublisherRequest.setPublisherReference(publisherReference); 158 if (topic != null) { 159 TopicExpressionType topicExp = new TopicExpressionType(); 160 topicExp.getContent().add(topic); 161 registerPublisherRequest.getTopic().add(topicExp); 162 } 163 registerPublisherRequest.setDemand(Boolean.valueOf(demand)); 164 RegisterPublisherResponse response = (RegisterPublisherResponse) request(registerPublisherRequest); 165 return new Publisher(response.getPublisherRegistrationReference(), getClient()); 166 } 167 168 } 169 | Popular Tags |