KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > wsn > jms > JmsPublisher


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.wsn.jms;
18
19 import java.io.StringWriter JavaDoc;
20
21 import javax.jms.Connection JavaDoc;
22 import javax.jms.JMSException JavaDoc;
23 import javax.jms.Message JavaDoc;
24 import javax.jms.MessageProducer JavaDoc;
25 import javax.jms.Session JavaDoc;
26 import javax.jms.Topic JavaDoc;
27 import javax.xml.bind.JAXBContext;
28 import javax.xml.bind.JAXBException;
29
30 import org.apache.activemq.advisory.ConsumerEvent;
31 import org.apache.activemq.advisory.ConsumerEventSource;
32 import org.apache.activemq.advisory.ConsumerListener;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.servicemix.wsn.AbstractPublisher;
36 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
37 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
38 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
39 import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
40 import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
41 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
42 import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
43 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
44 import org.oasis_open.docs.wsn.b_2.Notify;
45 import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
46 import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
47 import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
48
49 public abstract class JmsPublisher extends AbstractPublisher implements ConsumerListener {
50
51     private static Log log = LogFactory.getLog(JmsPublisher.class);
52     
53     private Connection JavaDoc connection;
54     private JmsTopicExpressionConverter topicConverter;
55     private JAXBContext jaxbContext;
56     private Topic JavaDoc jmsTopic;
57     private ConsumerEventSource advisory;
58     private Object JavaDoc subscription;
59
60     public JmsPublisher(String JavaDoc name) {
61         super(name);
62         topicConverter = new JmsTopicExpressionConverter();
63         try {
64             jaxbContext = JAXBContext.newInstance(Notify.class);
65         } catch (JAXBException e) {
66             throw new RuntimeException JavaDoc("Unable to create JAXB context", e);
67         }
68     }
69
70     public Connection JavaDoc getConnection() {
71         return connection;
72     }
73
74     public void setConnection(Connection JavaDoc connection) {
75         this.connection = connection;
76     }
77
78     @Override JavaDoc
79     public void notify(NotificationMessageHolderType messageHolder) {
80         Session JavaDoc session = null;
81         try {
82             Topic JavaDoc topic = topicConverter.toActiveMQTopic(messageHolder.getTopic());
83             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
84             MessageProducer JavaDoc producer = session.createProducer(topic);
85             Notify notify = new Notify();
86             notify.getNotificationMessage().add(messageHolder);
87             StringWriter JavaDoc writer = new StringWriter JavaDoc();
88             jaxbContext.createMarshaller().marshal(notify, writer);
89             Message JavaDoc message = session.createTextMessage(writer.toString());
90             producer.send(message);
91         } catch (JMSException JavaDoc e) {
92             log.warn("Error dispatching message", e);
93         } catch (JAXBException e) {
94             log.warn("Error dispatching message", e);
95         } catch (InvalidTopicException e) {
96             log.warn("Error dispatching message", e);
97         } finally {
98             if (session != null) {
99                 try {
100                     session.close();
101                 } catch (JMSException JavaDoc e) {
102                     log.debug("Error closing session", e);
103                 }
104             }
105         }
106     }
107
108     @Override JavaDoc
109     protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
110         super.validatePublisher(registerPublisherRequest);
111         try {
112             jmsTopic = topicConverter.toActiveMQTopic(topic);
113         } catch (InvalidTopicException e) {
114             InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
115             throw new InvalidTopicExpressionFault(e.getMessage(), fault);
116         }
117     }
118     
119     @Override JavaDoc
120     protected void start() throws PublisherRegistrationFailedFault {
121         if (demand) {
122             try {
123                 advisory = new ConsumerEventSource(connection, jmsTopic);
124                 advisory.setConsumerListener(this);
125                 advisory.start();
126             } catch (Exception JavaDoc e) {
127                 PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
128                 throw new PublisherRegistrationFailedFault("Error starting demand-based publisher", fault, e);
129             }
130         }
131     }
132
133     protected void destroy() throws ResourceNotDestroyedFault {
134         try {
135             if (advisory != null) {
136                 advisory.stop();
137             }
138         } catch (Exception JavaDoc e) {
139             ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
140             throw new ResourceNotDestroyedFault("Error destroying publisher", fault, e);
141         } finally {
142             super.destroy();
143         }
144     }
145     
146     public void onConsumerEvent(ConsumerEvent event) {
147         if (event.getConsumerCount() > 0) {
148             if (subscription == null) {
149                 // start subscription
150
subscription = startSubscription();
151             }
152         } else {
153             if (subscription != null) {
154                 // destroy subscription
155
Object JavaDoc sub = subscription;
156                 subscription = null;
157                 destroySubscription(sub);
158             }
159         }
160     }
161
162     protected abstract void destroySubscription(Object JavaDoc subscription);
163
164     protected abstract Object JavaDoc startSubscription();
165     
166
167 }
168
Popular Tags