KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > wsn > spring > PublisherComponent


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.wsn.spring;
18
19 import java.io.StringWriter JavaDoc;
20
21 import javax.jbi.JBIException;
22 import javax.jbi.messaging.ExchangeStatus;
23 import javax.jbi.messaging.MessageExchange;
24 import javax.jbi.messaging.MessagingException;
25 import javax.jbi.messaging.NormalizedMessage;
26 import javax.xml.bind.JAXBContext;
27 import javax.xml.transform.Source JavaDoc;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.servicemix.MessageExchangeListener;
32 import org.apache.servicemix.components.util.ComponentSupport;
33 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
34 import org.apache.servicemix.jbi.jaxp.StringSource;
35 import org.apache.servicemix.wsn.client.AbstractWSAClient;
36 import org.apache.servicemix.wsn.client.NotificationBroker;
37 import org.oasis_open.docs.wsn.b_2.Subscribe;
38 import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
39 import org.oasis_open.docs.wsn.b_2.Unsubscribe;
40 import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
41 import org.w3c.dom.Element JavaDoc;
42
43 /**
44  *
45  * @author gnodet
46  * @version $Revision: 376451 $
47  * @org.apache.xbean.XBean element="publisher"
48  */

49 public class PublisherComponent extends ComponentSupport implements MessageExchangeListener {
50
51     private static final Log log = LogFactory.getLog(PublisherComponent.class);
52     
53     private NotificationBroker wsnBroker;
54     private String JavaDoc topic;
55     private boolean demand;
56     private String JavaDoc subscriptionEndpoint = "subscription";
57     private Subscribe subscription;
58     
59     /**
60      * @return Returns the demand.
61      */

62     public boolean getDemand() {
63         return demand;
64     }
65
66     /**
67      * @param demand The demand to set.
68      */

69     public void setDemand(boolean demand) {
70         this.demand = demand;
71     }
72
73     /**
74      * @return Returns the topic.
75      */

76     public String JavaDoc getTopic() {
77         return topic;
78     }
79
80     /**
81      * @param topic The topic to set.
82      */

83     public void setTopic(String JavaDoc topic) {
84         this.topic = topic;
85     }
86
87     /**
88      * @return Returns the subscription.
89      */

90     public Subscribe getSubscription() {
91         return subscription;
92     }
93
94     /* (non-Javadoc)
95      * @see org.apache.servicemix.jbi.management.BaseLifeCycle#init()
96      */

97     public void init() throws JBIException {
98         super.init();
99         getContext().activateEndpoint(getService(), subscriptionEndpoint);
100     }
101     
102     /* (non-Javadoc)
103      * @see javax.jbi.management.LifeCycleMBean#start()
104      */

105     public void start() throws JBIException {
106         new Thread JavaDoc() {
107             public void run() {
108                 try {
109                     wsnBroker = new NotificationBroker(getContext());
110                     String JavaDoc wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/" + subscriptionEndpoint;
111                     wsnBroker.registerPublisher(AbstractWSAClient.createWSA(wsaAddress),
112                                                 topic,
113                                                 demand);
114                 } catch (Exception JavaDoc e) {
115                     log.error("Could not create wsn client", e);
116                 }
117             }
118         }.start();
119     }
120     
121     /* (non-Javadoc)
122      * @see javax.jbi.management.LifeCycleMBean#shutDown()
123      */

124     public void shutDown() throws JBIException {
125         super.shutDown();
126     }
127     
128     /* (non-Javadoc)
129      * @see org.apache.servicemix.MessageExchangeListener#onMessageExchange(javax.jbi.messaging.MessageExchange)
130      */

131     public void onMessageExchange(MessageExchange exchange) throws MessagingException {
132         if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
133             return;
134         }
135         // This is a notification from the WSN broker
136
if (exchange.getEndpoint().getEndpointName().equals(subscriptionEndpoint)) {
137             try {
138                 JAXBContext jaxbContext = JAXBContext.newInstance(Subscribe.class);
139                 Source JavaDoc src = exchange.getMessage("in").getContent();
140                 Object JavaDoc input = jaxbContext.createUnmarshaller().unmarshal(src);
141                 if (input instanceof Subscribe) {
142                     subscription = (Subscribe) input;
143                     SubscribeResponse response = new SubscribeResponse();
144                     String JavaDoc wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/" + subscriptionEndpoint;
145                     response.setSubscriptionReference(AbstractWSAClient.createWSA(wsaAddress));
146                     StringWriter JavaDoc writer = new StringWriter JavaDoc();
147                     jaxbContext.createMarshaller().marshal(response, writer);
148                     NormalizedMessage out = exchange.createMessage();
149                     out.setContent(new StringSource(writer.toString()));
150                     exchange.setMessage(out, "out");
151                     send(exchange);
152                 } else if (input instanceof Unsubscribe) {
153                     subscription = null;
154                     UnsubscribeResponse response = new UnsubscribeResponse();
155                     StringWriter JavaDoc writer = new StringWriter JavaDoc();
156                     jaxbContext.createMarshaller().marshal(response, writer);
157                     NormalizedMessage out = exchange.createMessage();
158                     out.setContent(new StringSource(writer.toString()));
159                     exchange.setMessage(out, "out");
160                     send(exchange);
161                 } else {
162                     throw new Exception JavaDoc("Unkown request");
163                 }
164             } catch (Exception JavaDoc e) {
165                 fail(exchange, e);
166             }
167         // This is a notification to publish
168
} else {
169             try {
170                 if (!demand || subscription != null) {
171                     Element JavaDoc elem = new SourceTransformer().toDOMElement(exchange.getMessage("in"));
172                     wsnBroker.notify(topic, elem);
173                     done(exchange);
174                 } else {
175                     log.info("Ingore notification as the publisher is no subscribers");
176                 }
177             } catch (Exception JavaDoc e) {
178                 fail(exchange, e);
179             }
180         }
181     }
182
183 }
184
Popular Tags