1 17 package org.apache.servicemix.wsn.jms; 18 19 import java.io.StringReader ; 20 21 import javax.jms.Connection ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageConsumer ; 25 import javax.jms.MessageListener ; 26 import javax.jms.Session ; 27 import javax.jms.TextMessage ; 28 import javax.jms.Topic ; 29 import javax.xml.datatype.XMLGregorianCalendar ; 30 import javax.xml.parsers.DocumentBuilderFactory ; 31 import javax.xml.xpath.XPath ; 32 import javax.xml.xpath.XPathConstants ; 33 import javax.xml.xpath.XPathExpression ; 34 import javax.xml.xpath.XPathExpressionException ; 35 import javax.xml.xpath.XPathFactory ; 36 37 import org.apache.commons.logging.Log; 38 import org.apache.commons.logging.LogFactory; 39 import org.apache.servicemix.wsn.AbstractSubscription; 40 import org.apache.servicemix.wsn.jaxws.InvalidFilterFault; 41 import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault; 42 import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault; 43 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault; 44 import org.apache.servicemix.wsn.jaxws.PauseFailedFault; 45 import org.apache.servicemix.wsn.jaxws.ResumeFailedFault; 46 import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault; 47 import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault; 48 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault; 49 import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault; 50 import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault; 51 import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault; 52 import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType; 53 import org.oasis_open.docs.wsn.b_2.PauseFailedFaultType; 54 import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType; 55 import org.oasis_open.docs.wsn.b_2.Subscribe; 56 import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType; 57 import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType; 58 import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType; 59 import org.w3c.dom.Document ; 60 import org.w3c.dom.Element ; 61 import org.xml.sax.InputSource ; 62 63 public abstract class JmsSubscription extends AbstractSubscription implements MessageListener { 64 65 private static Log log = LogFactory.getLog(JmsSubscription.class); 66 67 private Connection connection; 68 private Session session; 69 private JmsTopicExpressionConverter topicConverter; 70 private Topic jmsTopic; 71 72 public JmsSubscription(String name) { 73 super(name); 74 topicConverter = new JmsTopicExpressionConverter(); 75 } 76 77 protected void start() throws SubscribeCreationFailedFault { 78 try { 79 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 80 MessageConsumer consumer = session.createConsumer(jmsTopic); 81 consumer.setMessageListener(this); 82 } catch (JMSException e) { 83 SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType(); 84 throw new SubscribeCreationFailedFault("Error starting subscription", fault, e); 85 } 86 } 87 88 @Override 89 protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault { 90 super.validateSubscription(subscribeRequest); 91 try { 92 jmsTopic = topicConverter.toActiveMQTopic(topic); 93 } catch (InvalidTopicException e) { 94 InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType(); 95 throw new InvalidTopicExpressionFault(e.getMessage(), fault); 96 } 97 } 98 99 @Override 100 protected void pause() throws PauseFailedFault { 101 if (session == null) { 102 PauseFailedFaultType fault = new PauseFailedFaultType(); 103 throw new PauseFailedFault("Subscription is already paused", fault); 104 } else { 105 try { 106 session.close(); 107 } catch (JMSException e) { 108 PauseFailedFaultType fault = new PauseFailedFaultType(); 109 throw new PauseFailedFault("Error pausing subscription", fault, e); 110 } finally { 111 session = null; 112 } 113 } 114 } 115 116 @Override 117 protected void resume() throws ResumeFailedFault { 118 if (session != null) { 119 ResumeFailedFaultType fault = new ResumeFailedFaultType(); 120 throw new ResumeFailedFault("Subscription is already running", fault); 121 } else { 122 try { 123 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 124 MessageConsumer consumer = session.createConsumer(jmsTopic); 125 consumer.setMessageListener(this); 126 } catch (JMSException e) { 127 ResumeFailedFaultType fault = new ResumeFailedFaultType(); 128 throw new ResumeFailedFault("Error resuming subscription", fault, e); 129 } 130 } 131 } 132 133 @Override 134 protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault { 135 UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType(); 136 throw new UnacceptableTerminationTimeFault( 137 "TerminationTime is not supported", 138 fault); 139 } 140 141 @Override 142 protected void unsubscribe() throws UnableToDestroySubscriptionFault { 143 super.unsubscribe(); 144 if (session != null) { 145 try { 146 session.close(); 147 } catch (JMSException e) { 148 UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType(); 149 throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e); 150 } finally { 151 session = null; 152 } 153 } 154 } 155 156 public Connection getConnection() { 157 return connection; 158 } 159 160 public void setConnection(Connection connection) { 161 this.connection = connection; 162 } 163 164 public void onMessage(Message jmsMessage) { 165 try { 166 TextMessage text = (TextMessage ) jmsMessage; 167 DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); 168 factory.setNamespaceAware(true); 169 Document doc = factory.newDocumentBuilder().parse(new InputSource (new StringReader (text.getText()))); 170 Element root = doc.getDocumentElement(); 171 Element holder = (Element ) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0); 172 Element message = (Element ) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0); 173 Element content = null; 174 for (int i = 0; i < message.getChildNodes().getLength(); i++) { 175 if (message.getChildNodes().item(i) instanceof Element ) { 176 content = (Element ) message.getChildNodes().item(i); 177 break; 178 } 179 } 180 boolean match = doFilter(content); 181 if (match) { 182 if (useRaw) { 183 doNotify(content); 184 } else { 185 doNotify(root); 186 } 187 } 188 } catch (Exception e) { 189 log.warn("Error notifying consumer", e); 190 } 191 } 192 193 protected boolean doFilter(Element content) { 194 if (contentFilter != null) { 195 if (!contentFilter.getDialect().equals(XPATH1_URI)) { 196 throw new IllegalStateException ("Unsupported dialect: " + contentFilter.getDialect()); 197 } 198 try { 199 XPathFactory xpfactory = XPathFactory.newInstance(); 200 XPath xpath = xpfactory.newXPath(); 201 XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString()); 202 Boolean ret = (Boolean ) exp.evaluate(content, XPathConstants.BOOLEAN); 203 return ret.booleanValue(); 204 } catch (XPathExpressionException e) { 205 log.warn("Could not filter notification", e); 206 } 207 return false; 208 } 209 return true; 210 } 211 212 protected abstract void doNotify(Element content); 213 214 } 215 | Popular Tags |