KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > wsn > jms > JmsSubscription


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.wsn.jms;
18
19 import java.io.StringReader JavaDoc;
20
21 import javax.jms.Connection JavaDoc;
22 import javax.jms.JMSException JavaDoc;
23 import javax.jms.Message JavaDoc;
24 import javax.jms.MessageConsumer JavaDoc;
25 import javax.jms.MessageListener JavaDoc;
26 import javax.jms.Session JavaDoc;
27 import javax.jms.TextMessage JavaDoc;
28 import javax.jms.Topic JavaDoc;
29 import javax.xml.datatype.XMLGregorianCalendar JavaDoc;
30 import javax.xml.parsers.DocumentBuilderFactory JavaDoc;
31 import javax.xml.xpath.XPath JavaDoc;
32 import javax.xml.xpath.XPathConstants JavaDoc;
33 import javax.xml.xpath.XPathExpression JavaDoc;
34 import javax.xml.xpath.XPathExpressionException JavaDoc;
35 import javax.xml.xpath.XPathFactory JavaDoc;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.servicemix.wsn.AbstractSubscription;
40 import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
41 import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
42 import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
43 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
44 import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
45 import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
46 import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
47 import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
48 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
49 import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
50 import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
51 import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
52 import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
53 import org.oasis_open.docs.wsn.b_2.PauseFailedFaultType;
54 import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
55 import org.oasis_open.docs.wsn.b_2.Subscribe;
56 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
57 import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
58 import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
59 import org.w3c.dom.Document JavaDoc;
60 import org.w3c.dom.Element JavaDoc;
61 import org.xml.sax.InputSource JavaDoc;
62
63 public abstract class JmsSubscription extends AbstractSubscription implements MessageListener JavaDoc {
64
65     private static Log log = LogFactory.getLog(JmsSubscription.class);
66     
67     private Connection JavaDoc connection;
68     private Session JavaDoc session;
69     private JmsTopicExpressionConverter topicConverter;
70     private Topic JavaDoc jmsTopic;
71     
72     public JmsSubscription(String JavaDoc name) {
73         super(name);
74         topicConverter = new JmsTopicExpressionConverter();
75     }
76
77     protected void start() throws SubscribeCreationFailedFault {
78         try {
79             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
80             MessageConsumer JavaDoc consumer = session.createConsumer(jmsTopic);
81             consumer.setMessageListener(this);
82         } catch (JMSException JavaDoc e) {
83             SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
84             throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
85         }
86     }
87     
88     @Override JavaDoc
89     protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
90         super.validateSubscription(subscribeRequest);
91         try {
92             jmsTopic = topicConverter.toActiveMQTopic(topic);
93         } catch (InvalidTopicException e) {
94             InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
95             throw new InvalidTopicExpressionFault(e.getMessage(), fault);
96         }
97     }
98     
99     @Override JavaDoc
100     protected void pause() throws PauseFailedFault {
101         if (session == null) {
102             PauseFailedFaultType fault = new PauseFailedFaultType();
103             throw new PauseFailedFault("Subscription is already paused", fault);
104         } else {
105             try {
106                 session.close();
107             } catch (JMSException JavaDoc e) {
108                 PauseFailedFaultType fault = new PauseFailedFaultType();
109                 throw new PauseFailedFault("Error pausing subscription", fault, e);
110             } finally {
111                 session = null;
112             }
113         }
114     }
115
116     @Override JavaDoc
117     protected void resume() throws ResumeFailedFault {
118         if (session != null) {
119             ResumeFailedFaultType fault = new ResumeFailedFaultType();
120             throw new ResumeFailedFault("Subscription is already running", fault);
121         } else {
122             try {
123                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
124                 MessageConsumer JavaDoc consumer = session.createConsumer(jmsTopic);
125                 consumer.setMessageListener(this);
126             } catch (JMSException JavaDoc e) {
127                 ResumeFailedFaultType fault = new ResumeFailedFaultType();
128                 throw new ResumeFailedFault("Error resuming subscription", fault, e);
129             }
130         }
131     }
132
133     @Override JavaDoc
134     protected void renew(XMLGregorianCalendar JavaDoc terminationTime) throws UnacceptableTerminationTimeFault {
135         UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
136         throw new UnacceptableTerminationTimeFault(
137                 "TerminationTime is not supported",
138                 fault);
139     }
140
141     @Override JavaDoc
142     protected void unsubscribe() throws UnableToDestroySubscriptionFault {
143         super.unsubscribe();
144         if (session != null) {
145             try {
146                 session.close();
147             } catch (JMSException JavaDoc e) {
148                 UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
149                 throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
150             } finally {
151                 session = null;
152             }
153         }
154     }
155
156     public Connection JavaDoc getConnection() {
157         return connection;
158     }
159
160     public void setConnection(Connection JavaDoc connection) {
161         this.connection = connection;
162     }
163
164     public void onMessage(Message JavaDoc jmsMessage) {
165         try {
166             TextMessage JavaDoc text = (TextMessage JavaDoc) jmsMessage;
167             DocumentBuilderFactory JavaDoc factory = DocumentBuilderFactory.newInstance();
168             factory.setNamespaceAware(true);
169             Document JavaDoc doc = factory.newDocumentBuilder().parse(new InputSource JavaDoc(new StringReader JavaDoc(text.getText())));
170             Element JavaDoc root = doc.getDocumentElement();
171             Element JavaDoc holder = (Element JavaDoc) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0);
172             Element JavaDoc message = (Element JavaDoc) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0);
173             Element JavaDoc content = null;
174             for (int i = 0; i < message.getChildNodes().getLength(); i++) {
175                 if (message.getChildNodes().item(i) instanceof Element JavaDoc) {
176                     content = (Element JavaDoc) message.getChildNodes().item(i);
177                     break;
178                 }
179             }
180             boolean match = doFilter(content);
181             if (match) {
182                 if (useRaw) {
183                     doNotify(content);
184                 } else {
185                     doNotify(root);
186                 }
187             }
188         } catch (Exception JavaDoc e) {
189             log.warn("Error notifying consumer", e);
190         }
191     }
192     
193     protected boolean doFilter(Element JavaDoc content) {
194         if (contentFilter != null) {
195             if (!contentFilter.getDialect().equals(XPATH1_URI)) {
196                 throw new IllegalStateException JavaDoc("Unsupported dialect: " + contentFilter.getDialect());
197             }
198             try {
199                 XPathFactory JavaDoc xpfactory = XPathFactory.newInstance();
200                 XPath JavaDoc xpath = xpfactory.newXPath();
201                 XPathExpression JavaDoc exp = xpath.compile(contentFilter.getContent().get(0).toString());
202                 Boolean JavaDoc ret = (Boolean JavaDoc) exp.evaluate(content, XPathConstants.BOOLEAN);
203                 return ret.booleanValue();
204             } catch (XPathExpressionException JavaDoc e) {
205                 log.warn("Could not filter notification", e);
206             }
207             return false;
208         }
209         return true;
210     }
211     
212     protected abstract void doNotify(Element JavaDoc content);
213
214 }
215
Popular Tags