KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > wsn > AbstractNotificationBroker


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.wsn;
18
19 import java.util.Map JavaDoc;
20 import java.util.concurrent.ConcurrentHashMap JavaDoc;
21
22 import javax.jws.Oneway;
23 import javax.jws.WebMethod;
24 import javax.jws.WebParam;
25 import javax.jws.WebResult;
26 import javax.jws.WebService;
27
28 import org.apache.activemq.util.IdGenerator;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
32 import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
33 import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
34 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
35 import org.apache.servicemix.wsn.jaxws.MultipleTopicsSpecifiedFault;
36 import org.apache.servicemix.wsn.jaxws.NoCurrentMessageOnTopicFault;
37 import org.apache.servicemix.wsn.jaxws.NotificationBroker;
38 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
39 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
40 import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
41 import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
42 import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
43 import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
44 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
45 import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
46 import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
47 import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
48 import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
49 import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
50 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
51 import org.oasis_open.docs.wsn.b_2.Notify;
52 import org.oasis_open.docs.wsn.b_2.Subscribe;
53 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
54 import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
55 import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
56 import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
57 import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
58 import org.w3._2005._08.addressing.EndpointReferenceType;
59
60 @WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.NotificationBroker")
61 public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker {
62
63     private static Log log = LogFactory.getLog(AbstractNotificationBroker.class);
64     
65     private IdGenerator idGenerator;
66     private AbstractPublisher anonymousPublisher;
67     private Map JavaDoc<String JavaDoc,AbstractPublisher> publishers;
68     private Map JavaDoc<String JavaDoc,AbstractSubscription> subscriptions;
69
70     public AbstractNotificationBroker(String JavaDoc name) {
71         super(name);
72         idGenerator = new IdGenerator();
73         subscriptions = new ConcurrentHashMap JavaDoc<String JavaDoc,AbstractSubscription>();
74         publishers = new ConcurrentHashMap JavaDoc<String JavaDoc, AbstractPublisher>();
75     }
76
77     public void init() throws Exception JavaDoc {
78         register();
79         anonymousPublisher = createPublisher("Anonymous");
80         anonymousPublisher.register();
81     }
82     
83     protected String JavaDoc createAddress() {
84         return "http://servicemix.org/wsnotification/NotificationBroker/" + getName();
85     }
86     
87     /**
88      *
89      * @param notify
90      */

91     @WebMethod(operationName = "Notify")
92     @Oneway
93     public void notify(
94         @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
95         Notify notify) {
96         
97         log.debug("Notify");
98         handleNotify(notify);
99     }
100     
101     protected void handleNotify(Notify notify) {
102         for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
103             EndpointReferenceType producerReference = messageHolder.getProducerReference();
104             AbstractPublisher publisher = getPublisher(producerReference);
105             if (publisher != null) {
106                 publisher.notify(messageHolder);
107             }
108         }
109     }
110
111     protected AbstractPublisher getPublisher(EndpointReferenceType producerReference) {
112         AbstractPublisher publisher = null;
113         if (producerReference != null &&
114             producerReference.getAddress() != null &&
115             producerReference.getAddress().getValue() != null) {
116             String JavaDoc address = producerReference.getAddress().getValue();
117             publisher = publishers.get(address);
118         }
119         if (publisher == null) {
120             publisher = anonymousPublisher;
121         }
122         return publisher;
123     }
124     
125     /**
126      *
127      * @param subscribeRequest
128      * @return
129      * returns org.oasis_open.docs.wsn.b_1.SubscribeResponse
130      * @throws SubscribeCreationFailedFault
131      * @throws InvalidTopicExpressionFault
132      * @throws TopicNotSupportedFault
133      * @throws InvalidFilterFault
134      * @throws InvalidProducerPropertiesExpressionFault
135      * @throws ResourceUnknownFault
136      * @throws InvalidUseRawValueFault
137      * @throws InvalidMessageContentExpressionFault
138      * @throws TopicExpressionDialectUnknownFault
139      * @throws UnacceptableInitialTerminationTimeFault
140      */

141     @WebMethod(operationName = "Subscribe")
142     @WebResult(name = "SubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeResponse")
143     public SubscribeResponse subscribe(
144         @WebParam(name = "Subscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeRequest")
145         Subscribe subscribeRequest)
146         throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
147         
148         log.debug("Subscribe");
149         return handleSubscribe(subscribeRequest, null);
150     }
151     
152     public SubscribeResponse handleSubscribe(Subscribe subscribeRequest,
153                                              EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
154         AbstractSubscription subscription = null;
155         boolean success = false;
156         try {
157             subscription = createSubcription(idGenerator.generateSanitizedId());
158             subscription.setBroker(this);
159             subscriptions.put(subscription.getAddress(), subscription);
160             subscription.create(subscribeRequest);
161             if (manager != null) {
162                 subscription.setManager(manager);
163             }
164             subscription.register();
165             SubscribeResponse response = new SubscribeResponse();
166             response.setSubscriptionReference(createEndpointReference(subscription.getAddress()));
167             success = true;
168             return response;
169         } catch (EndpointRegistrationException e) {
170             SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
171             throw new SubscribeCreationFailedFault("Unable to register endpoint", fault, e);
172         } finally {
173             if (!success && subscription != null) {
174                 subscriptions.remove(subscription);
175                 try {
176                     subscription.unsubscribe();
177                 } catch (UnableToDestroySubscriptionFault e) {
178                     log.info("Error destroying subscription", e);
179                 }
180             }
181         }
182     }
183     
184     public void unsubscribe(String JavaDoc address) throws UnableToDestroySubscriptionFault {
185         AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address);
186         if (subscription != null) {
187             subscription.unsubscribe();
188         }
189     }
190     
191     /**
192      *
193      * @param getCurrentMessageRequest
194      * @return
195      * returns org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse
196      * @throws MultipleTopicsSpecifiedFault
197      * @throws TopicNotSupportedFault
198      * @throws InvalidTopicExpressionFault
199      * @throws ResourceUnknownFault
200      * @throws TopicExpressionDialectUnknownFault
201      * @throws NoCurrentMessageOnTopicFault
202      */

203     @WebMethod(operationName = "GetCurrentMessage")
204     @WebResult(name = "GetCurrentMessageResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageResponse")
205     public GetCurrentMessageResponse getCurrentMessage(
206         @WebParam(name = "GetCurrentMessage", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageRequest")
207         GetCurrentMessage getCurrentMessageRequest)
208         throws InvalidTopicExpressionFault, MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault {
209         
210         log.debug("GetCurrentMessage");
211         NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
212         throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
213     }
214
215     /**
216      *
217      * @param registerPublisherRequest
218      * @return
219      * returns org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse
220      * @throws PublisherRegistrationRejectedFault
221      * @throws InvalidTopicExpressionFault
222      * @throws TopicNotSupportedFault
223      * @throws ResourceUnknownFault
224      * @throws PublisherRegistrationFailedFault
225      */

226     @WebMethod(operationName = "RegisterPublisher")
227     @WebResult(name = "RegisterPublisherResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherResponse")
228     public RegisterPublisherResponse registerPublisher(
229         @WebParam(name = "RegisterPublisher", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherRequest")
230         RegisterPublisher registerPublisherRequest)
231         throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
232         
233         log.debug("RegisterPublisher");
234         return handleRegisterPublisher(registerPublisherRequest, null);
235     }
236     
237     public RegisterPublisherResponse handleRegisterPublisher(
238                         RegisterPublisher registerPublisherRequest,
239                         EndpointManager manager) throws InvalidTopicExpressionFault,
240                                                         PublisherRegistrationFailedFault,
241                                                         PublisherRegistrationRejectedFault,
242                                                         ResourceUnknownFault,
243                                                         TopicNotSupportedFault {
244         AbstractPublisher publisher = null;
245         boolean success = false;
246         try {
247             publisher = createPublisher(idGenerator.generateSanitizedId());
248             publishers.put(publisher.getAddress(), publisher);
249             if (manager != null) {
250                 publisher.setManager(manager);
251             }
252             publisher.register();
253             publisher.create(registerPublisherRequest);
254             RegisterPublisherResponse response = new RegisterPublisherResponse();
255             response.setPublisherRegistrationReference(createEndpointReference(publisher.getAddress()));
256             success = true;
257             return response;
258         } catch (EndpointRegistrationException e) {
259             PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
260             throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e);
261         } finally {
262             if (!success && publisher != null) {
263                 publishers.remove(publisher.getAddress());
264                 try {
265                     publisher.destroy();
266                 } catch (ResourceNotDestroyedFault e) {
267                     log.info("Error destroying publisher", e);
268                 }
269             }
270         }
271     }
272
273     protected abstract AbstractPublisher createPublisher(String JavaDoc name);
274     
275     protected abstract AbstractSubscription createSubcription(String JavaDoc name);
276
277 }
278
Popular Tags