1 17 package org.apache.servicemix.wsn; 18 19 import java.util.GregorianCalendar ; 20 21 import javax.jws.WebMethod; 22 import javax.jws.WebParam; 23 import javax.jws.WebResult; 24 import javax.jws.WebService; 25 import javax.xml.bind.JAXBElement; 26 import javax.xml.datatype.DatatypeConfigurationException ; 27 import javax.xml.datatype.DatatypeConstants ; 28 import javax.xml.datatype.DatatypeFactory ; 29 import javax.xml.datatype.Duration ; 30 import javax.xml.datatype.XMLGregorianCalendar ; 31 import javax.xml.namespace.QName ; 32 33 import org.oasis_open.docs.wsn.b_2.InvalidFilterFaultType; 34 import org.oasis_open.docs.wsn.b_2.InvalidMessageContentExpressionFaultType; 35 import org.oasis_open.docs.wsn.b_2.InvalidProducerPropertiesExpressionFaultType; 36 import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType; 37 import org.oasis_open.docs.wsn.b_2.PauseSubscription; 38 import org.oasis_open.docs.wsn.b_2.PauseSubscriptionResponse; 39 import org.oasis_open.docs.wsn.b_2.QueryExpressionType; 40 import org.oasis_open.docs.wsn.b_2.Renew; 41 import org.oasis_open.docs.wsn.b_2.RenewResponse; 42 import org.oasis_open.docs.wsn.b_2.ResumeSubscription; 43 import org.oasis_open.docs.wsn.b_2.ResumeSubscriptionResponse; 44 import org.oasis_open.docs.wsn.b_2.Subscribe; 45 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType; 46 import org.oasis_open.docs.wsn.b_2.TopicExpressionType; 47 import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType; 48 import org.oasis_open.docs.wsn.b_2.UnacceptableInitialTerminationTimeFaultType; 49 import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType; 50 import org.oasis_open.docs.wsn.b_2.Unsubscribe; 51 import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse; 52 import org.oasis_open.docs.wsn.b_2.UseRaw; 53 import org.apache.servicemix.wsn.jaxws.InvalidFilterFault; 54 import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault; 55 import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault; 56 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault; 57 import org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager; 58 import org.apache.servicemix.wsn.jaxws.PauseFailedFault; 59 import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault; 60 import org.apache.servicemix.wsn.jaxws.ResumeFailedFault; 61 import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault; 62 import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault; 63 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault; 64 import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault; 65 import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault; 66 import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault; 67 import org.w3._2005._08.addressing.EndpointReferenceType; 68 69 @WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager") 70 public abstract class AbstractSubscription extends AbstractEndpoint 71 implements PausableSubscriptionManager { 72 73 public static final String WSN_URI = "http://docs.oasis-open.org/wsn/b-2"; 74 public static final String XPATH1_URI = "http://www.w3.org/TR/1999/REC-xpath-19991116"; 75 public static final QName QNAME_TOPIC_EXPRESSION = new QName (WSN_URI, "TopicExpression"); 76 public static final QName QNAME_PRODUCER_PROPERTIES = new QName (WSN_URI, "ProducerProperties"); 77 public static final QName QNAME_MESSAGE_CONTENT = new QName (WSN_URI, "MessageContent"); 78 public static final QName QNAME_USE_RAW = new QName (WSN_URI, "UseRaw"); 79 80 protected DatatypeFactory datatypeFactory; 81 protected XMLGregorianCalendar terminationTime; 82 protected boolean useRaw; 83 protected TopicExpressionType topic; 84 protected QueryExpressionType contentFilter; 85 protected EndpointReferenceType consumerReference; 86 protected AbstractNotificationBroker broker; 87 88 public AbstractSubscription(String name) { 89 super(name); 90 try { 91 this.datatypeFactory = DatatypeFactory.newInstance(); 92 } catch (DatatypeConfigurationException e) { 93 throw new RuntimeException ("Unable to initialize subscription", e); 94 } 95 } 96 97 105 @WebMethod(operationName = "Renew") 106 @WebResult(name = "RenewResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewResponse") 107 public RenewResponse renew( 108 @WebParam(name = "Renew", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewRequest") 109 Renew renewRequest) 110 throws ResourceUnknownFault, UnacceptableTerminationTimeFault { 111 112 XMLGregorianCalendar terminationTime = validateTerminationTime(renewRequest.getTerminationTime()); 113 renew(terminationTime); 114 RenewResponse response = new RenewResponse(); 115 response.setTerminationTime(terminationTime); 116 response.setCurrentTime(getCurrentTime()); 117 return response; 118 } 119 120 128 @WebMethod(operationName = "Unsubscribe") 129 @WebResult(name = "UnsubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeResponse") 130 public UnsubscribeResponse unsubscribe( 131 @WebParam(name = "Unsubscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeRequest") 132 Unsubscribe unsubscribeRequest) 133 throws ResourceUnknownFault, UnableToDestroySubscriptionFault { 134 135 broker.unsubscribe(getAddress()); 136 return new UnsubscribeResponse(); 137 } 138 139 147 @WebMethod(operationName = "PauseSubscription") 148 @WebResult(name = "PauseSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionResponse") 149 public PauseSubscriptionResponse pauseSubscription( 150 @WebParam(name = "PauseSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionRequest") 151 PauseSubscription pauseSubscriptionRequest) 152 throws PauseFailedFault, ResourceUnknownFault { 153 154 pause(); 155 return new PauseSubscriptionResponse(); 156 } 157 158 166 @WebMethod(operationName = "ResumeSubscription") 167 @WebResult(name = "ResumeSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionResponse") 168 public ResumeSubscriptionResponse resumeSubscription( 169 @WebParam(name = "ResumeSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionRequest") 170 ResumeSubscription resumeSubscriptionRequest) 171 throws ResourceUnknownFault, ResumeFailedFault { 172 173 resume(); 174 return new ResumeSubscriptionResponse(); 175 } 176 177 protected XMLGregorianCalendar validateInitialTerminationTime(String value) throws UnacceptableInitialTerminationTimeFault { 178 XMLGregorianCalendar tt = parseTerminationTime(value); 179 if (tt == null) { 180 UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType(); 181 throw new UnacceptableInitialTerminationTimeFault( 182 "Unable to parse initial termination time: '" + value + "'", 183 fault); 184 } 185 XMLGregorianCalendar ct = getCurrentTime(); 186 int c = tt.compare(ct); 187 if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) { 188 UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType(); 189 fault.setMinimumTime(ct); 190 throw new UnacceptableInitialTerminationTimeFault( 191 "Invalid initial termination time", 192 fault); 193 } 194 return tt; 195 } 196 197 protected XMLGregorianCalendar validateTerminationTime(String value) throws UnacceptableTerminationTimeFault { 198 XMLGregorianCalendar tt = parseTerminationTime(value); 199 if (tt == null) { 200 UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType(); 201 throw new UnacceptableTerminationTimeFault( 202 "Unable to parse termination time: '" + value + "'", 203 fault); 204 } 205 XMLGregorianCalendar ct = getCurrentTime(); 206 int c = tt.compare(ct); 207 if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) { 208 UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType(); 209 fault.setMinimumTime(ct); 210 throw new UnacceptableTerminationTimeFault( 211 "Invalid termination time", 212 fault); 213 } 214 return tt; 215 } 216 217 protected XMLGregorianCalendar parseTerminationTime(String value) { 218 try { 219 Duration d = datatypeFactory.newDuration(value); 220 XMLGregorianCalendar c = getCurrentTime(); 221 c.add(d); 222 return c; 223 } catch (Exception e) { } 224 try { 225 Duration d = datatypeFactory.newDurationDayTime(value); 226 XMLGregorianCalendar c = getCurrentTime(); 227 c.add(d); 228 return c; 229 } catch (Exception e) { } 230 try { 231 Duration d = datatypeFactory.newDurationYearMonth(value); 232 XMLGregorianCalendar c = getCurrentTime(); 233 c.add(d); 234 return c; 235 } catch (Exception e) { } 236 try { 237 return datatypeFactory.newXMLGregorianCalendar(value); 238 } catch (Exception e) { } 239 return null; 240 } 241 242 protected XMLGregorianCalendar getCurrentTime() { 243 return datatypeFactory.newXMLGregorianCalendar(new GregorianCalendar ()); 244 } 245 246 public XMLGregorianCalendar getTerminationTime() { 247 return terminationTime; 248 } 249 250 public void setTerminationTime(XMLGregorianCalendar terminationTime) { 251 this.terminationTime = terminationTime; 252 } 253 254 public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault { 255 validateSubscription(subscribeRequest); 256 start(); 257 } 258 259 protected abstract void start() throws SubscribeCreationFailedFault; 260 261 protected abstract void pause() throws PauseFailedFault; 262 263 protected abstract void resume() throws ResumeFailedFault; 264 265 protected abstract void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault; 266 267 protected void unsubscribe() throws UnableToDestroySubscriptionFault { 268 try { 269 unregister(); 270 } catch (EndpointRegistrationException e) { 271 UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType(); 272 throw new UnableToDestroySubscriptionFault("Error unregistering endpoint", fault, e); 273 } 274 } 275 276 protected String createAddress() { 277 return "http://servicemix.org/wsnotification/Subscription/" + getName(); 278 } 279 280 protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault { 281 consumerReference = subscribeRequest.getConsumerReference(); 283 if (subscribeRequest.getInitialTerminationTime() != null && 285 subscribeRequest.getInitialTerminationTime().isNil() == false && 286 subscribeRequest.getInitialTerminationTime().getValue() != null) { 287 String strTerminationTime = subscribeRequest.getInitialTerminationTime().getValue(); 288 terminationTime = validateInitialTerminationTime(strTerminationTime.trim()); 289 } 290 if (subscribeRequest.getFilter() != null) { 292 for (Object f : subscribeRequest.getFilter().getAny()) { 293 JAXBElement e = null; 294 if (f instanceof JAXBElement) { 295 e = (JAXBElement) f; 296 f = e.getValue(); 297 } 298 if (f instanceof TopicExpressionType) { 299 if (!e.getName().equals(QNAME_TOPIC_EXPRESSION)) { 300 InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType(); 301 throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault); 302 } 303 topic = (TopicExpressionType) f; 304 } else if (f instanceof QueryExpressionType) { 305 if (e != null && e.getName().equals(QNAME_PRODUCER_PROPERTIES)) { 306 InvalidProducerPropertiesExpressionFaultType fault = new InvalidProducerPropertiesExpressionFaultType(); 307 throw new InvalidProducerPropertiesExpressionFault("ProducerProperties are not supported", fault); 308 } else if (e != null && e.getName().equals(QNAME_MESSAGE_CONTENT)) { 309 if (contentFilter != null) { 310 InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType(); 311 throw new InvalidMessageContentExpressionFault("Only one MessageContent filter can be specified", fault); 312 } 313 contentFilter = (QueryExpressionType) f; 314 if (contentFilter.getDialect() == null) { 316 contentFilter.setDialect(XPATH1_URI); 317 } 318 } else { 319 InvalidFilterFaultType fault = new InvalidFilterFaultType(); 320 throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault); 321 } 322 } else { 323 InvalidFilterFaultType fault = new InvalidFilterFaultType(); 324 throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault); 325 } 326 } 327 } 328 if (subscribeRequest.getSubscriptionPolicy() != null) { 330 for (Object p : subscribeRequest.getSubscriptionPolicy().getAny()) { 331 JAXBElement e = null; 332 if (p instanceof JAXBElement) { 333 e = (JAXBElement) p; 334 p = e.getValue(); 335 } 336 if (p instanceof UseRaw) { 337 useRaw = true; 338 } else { 339 InvalidFilterFaultType fault = new InvalidFilterFaultType(); 340 throw new InvalidFilterFault("Unrecognized policy: " + p, fault); 341 } 342 } 343 } 344 if (consumerReference == null) { 346 SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType(); 347 throw new SubscribeCreationFailedFault("Invalid ConsumerReference: null", fault); 348 } 349 if (topic == null) { 351 InvalidFilterFaultType fault = new InvalidFilterFaultType(); 352 throw new InvalidFilterFault("Must specify a topic to subscribe on", fault); 353 } 354 if (contentFilter != null && !contentFilter.getDialect().equals(XPATH1_URI)) { 355 InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType(); 356 throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '" + contentFilter.getDialect() + "'", fault); 357 } 358 if (terminationTime != null) { 359 UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType(); 360 throw new UnacceptableInitialTerminationTimeFault( 361 "InitialTerminationTime is not supported", 362 fault); 363 } 364 } 365 366 public AbstractNotificationBroker getBroker() { 367 return broker; 368 } 369 370 public void setBroker(AbstractNotificationBroker broker) { 371 this.broker = broker; 372 } 373 } 374 | Popular Tags |