1 17 package org.apache.servicemix.wsn; 18 19 import java.util.Map ; 20 import java.util.concurrent.ConcurrentHashMap ; 21 22 import javax.jws.Oneway; 23 import javax.jws.WebMethod; 24 import javax.jws.WebParam; 25 import javax.jws.WebResult; 26 import javax.jws.WebService; 27 28 import org.apache.activemq.util.IdGenerator; 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.servicemix.wsn.jaxws.InvalidFilterFault; 32 import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault; 33 import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault; 34 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault; 35 import org.apache.servicemix.wsn.jaxws.MultipleTopicsSpecifiedFault; 36 import org.apache.servicemix.wsn.jaxws.NoCurrentMessageOnTopicFault; 37 import org.apache.servicemix.wsn.jaxws.NotificationBroker; 38 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault; 39 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault; 40 import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault; 41 import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault; 42 import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault; 43 import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault; 44 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault; 45 import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault; 46 import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault; 47 import org.oasis_open.docs.wsn.b_2.GetCurrentMessage; 48 import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse; 49 import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType; 50 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType; 51 import org.oasis_open.docs.wsn.b_2.Notify; 52 import org.oasis_open.docs.wsn.b_2.Subscribe; 53 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType; 54 import org.oasis_open.docs.wsn.b_2.SubscribeResponse; 55 import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType; 56 import org.oasis_open.docs.wsn.br_2.RegisterPublisher; 57 import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse; 58 import org.w3._2005._08.addressing.EndpointReferenceType; 59 60 @WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.NotificationBroker") 61 public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker { 62 63 private static Log log = LogFactory.getLog(AbstractNotificationBroker.class); 64 65 private IdGenerator idGenerator; 66 private AbstractPublisher anonymousPublisher; 67 private Map <String ,AbstractPublisher> publishers; 68 private Map <String ,AbstractSubscription> subscriptions; 69 70 public AbstractNotificationBroker(String name) { 71 super(name); 72 idGenerator = new IdGenerator(); 73 subscriptions = new ConcurrentHashMap <String ,AbstractSubscription>(); 74 publishers = new ConcurrentHashMap <String , AbstractPublisher>(); 75 } 76 77 public void init() throws Exception { 78 register(); 79 anonymousPublisher = createPublisher("Anonymous"); 80 anonymousPublisher.register(); 81 } 82 83 protected String createAddress() { 84 return "http://servicemix.org/wsnotification/NotificationBroker/" + getName(); 85 } 86 87 91 @WebMethod(operationName = "Notify") 92 @Oneway 93 public void notify( 94 @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify") 95 Notify notify) { 96 97 log.debug("Notify"); 98 handleNotify(notify); 99 } 100 101 protected void handleNotify(Notify notify) { 102 for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) { 103 EndpointReferenceType producerReference = messageHolder.getProducerReference(); 104 AbstractPublisher publisher = getPublisher(producerReference); 105 if (publisher != null) { 106 publisher.notify(messageHolder); 107 } 108 } 109 } 110 111 protected AbstractPublisher getPublisher(EndpointReferenceType producerReference) { 112 AbstractPublisher publisher = null; 113 if (producerReference != null && 114 producerReference.getAddress() != null && 115 producerReference.getAddress().getValue() != null) { 116 String address = producerReference.getAddress().getValue(); 117 publisher = publishers.get(address); 118 } 119 if (publisher == null) { 120 publisher = anonymousPublisher; 121 } 122 return publisher; 123 } 124 125 141 @WebMethod(operationName = "Subscribe") 142 @WebResult(name = "SubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeResponse") 143 public SubscribeResponse subscribe( 144 @WebParam(name = "Subscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeRequest") 145 Subscribe subscribeRequest) 146 throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault { 147 148 log.debug("Subscribe"); 149 return handleSubscribe(subscribeRequest, null); 150 } 151 152 public SubscribeResponse handleSubscribe(Subscribe subscribeRequest, 153 EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault { 154 AbstractSubscription subscription = null; 155 boolean success = false; 156 try { 157 subscription = createSubcription(idGenerator.generateSanitizedId()); 158 subscription.setBroker(this); 159 subscriptions.put(subscription.getAddress(), subscription); 160 subscription.create(subscribeRequest); 161 if (manager != null) { 162 subscription.setManager(manager); 163 } 164 subscription.register(); 165 SubscribeResponse response = new SubscribeResponse(); 166 response.setSubscriptionReference(createEndpointReference(subscription.getAddress())); 167 success = true; 168 return response; 169 } catch (EndpointRegistrationException e) { 170 SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType(); 171 throw new SubscribeCreationFailedFault("Unable to register endpoint", fault, e); 172 } finally { 173 if (!success && subscription != null) { 174 subscriptions.remove(subscription); 175 try { 176 subscription.unsubscribe(); 177 } catch (UnableToDestroySubscriptionFault e) { 178 log.info("Error destroying subscription", e); 179 } 180 } 181 } 182 } 183 184 public void unsubscribe(String address) throws UnableToDestroySubscriptionFault { 185 AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address); 186 if (subscription != null) { 187 subscription.unsubscribe(); 188 } 189 } 190 191 203 @WebMethod(operationName = "GetCurrentMessage") 204 @WebResult(name = "GetCurrentMessageResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageResponse") 205 public GetCurrentMessageResponse getCurrentMessage( 206 @WebParam(name = "GetCurrentMessage", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageRequest") 207 GetCurrentMessage getCurrentMessageRequest) 208 throws InvalidTopicExpressionFault, MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault { 209 210 log.debug("GetCurrentMessage"); 211 NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType(); 212 throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault); 213 } 214 215 226 @WebMethod(operationName = "RegisterPublisher") 227 @WebResult(name = "RegisterPublisherResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherResponse") 228 public RegisterPublisherResponse registerPublisher( 229 @WebParam(name = "RegisterPublisher", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherRequest") 230 RegisterPublisher registerPublisherRequest) 231 throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault { 232 233 log.debug("RegisterPublisher"); 234 return handleRegisterPublisher(registerPublisherRequest, null); 235 } 236 237 public RegisterPublisherResponse handleRegisterPublisher( 238 RegisterPublisher registerPublisherRequest, 239 EndpointManager manager) throws InvalidTopicExpressionFault, 240 PublisherRegistrationFailedFault, 241 PublisherRegistrationRejectedFault, 242 ResourceUnknownFault, 243 TopicNotSupportedFault { 244 AbstractPublisher publisher = null; 245 boolean success = false; 246 try { 247 publisher = createPublisher(idGenerator.generateSanitizedId()); 248 publishers.put(publisher.getAddress(), publisher); 249 if (manager != null) { 250 publisher.setManager(manager); 251 } 252 publisher.register(); 253 publisher.create(registerPublisherRequest); 254 RegisterPublisherResponse response = new RegisterPublisherResponse(); 255 response.setPublisherRegistrationReference(createEndpointReference(publisher.getAddress())); 256 success = true; 257 return response; 258 } catch (EndpointRegistrationException e) { 259 PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType(); 260 throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e); 261 } finally { 262 if (!success && publisher != null) { 263 publishers.remove(publisher.getAddress()); 264 try { 265 publisher.destroy(); 266 } catch (ResourceNotDestroyedFault e) { 267 log.info("Error destroying publisher", e); 268 } 269 } 270 } 271 } 272 273 protected abstract AbstractPublisher createPublisher(String name); 274 275 protected abstract AbstractSubscription createSubcription(String name); 276 277 } 278 | Popular Tags |