1 17 package org.apache.servicemix.wsn.jms; 18 19 import java.io.StringWriter ; 20 21 import javax.jms.Connection ; 22 import javax.jms.JMSException ; 23 import javax.jms.Message ; 24 import javax.jms.MessageProducer ; 25 import javax.jms.Session ; 26 import javax.jms.Topic ; 27 import javax.xml.bind.JAXBContext; 28 import javax.xml.bind.JAXBException; 29 30 import org.apache.activemq.advisory.ConsumerEvent; 31 import org.apache.activemq.advisory.ConsumerEventSource; 32 import org.apache.activemq.advisory.ConsumerListener; 33 import org.apache.commons.logging.Log; 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.servicemix.wsn.AbstractPublisher; 36 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault; 37 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault; 38 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault; 39 import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault; 40 import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault; 41 import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault; 42 import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType; 43 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType; 44 import org.oasis_open.docs.wsn.b_2.Notify; 45 import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType; 46 import org.oasis_open.docs.wsn.br_2.RegisterPublisher; 47 import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType; 48 49 public abstract class JmsPublisher extends AbstractPublisher implements ConsumerListener { 50 51 private static Log log = LogFactory.getLog(JmsPublisher.class); 52 53 private Connection connection; 54 private JmsTopicExpressionConverter topicConverter; 55 private JAXBContext jaxbContext; 56 private Topic jmsTopic; 57 private ConsumerEventSource advisory; 58 private Object subscription; 59 60 public JmsPublisher(String name) { 61 super(name); 62 topicConverter = new JmsTopicExpressionConverter(); 63 try { 64 jaxbContext = JAXBContext.newInstance(Notify.class); 65 } catch (JAXBException e) { 66 throw new RuntimeException ("Unable to create JAXB context", e); 67 } 68 } 69 70 public Connection getConnection() { 71 return connection; 72 } 73 74 public void setConnection(Connection connection) { 75 this.connection = connection; 76 } 77 78 @Override 79 public void notify(NotificationMessageHolderType messageHolder) { 80 Session session = null; 81 try { 82 Topic topic = topicConverter.toActiveMQTopic(messageHolder.getTopic()); 83 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 84 MessageProducer producer = session.createProducer(topic); 85 Notify notify = new Notify(); 86 notify.getNotificationMessage().add(messageHolder); 87 StringWriter writer = new StringWriter (); 88 jaxbContext.createMarshaller().marshal(notify, writer); 89 Message message = session.createTextMessage(writer.toString()); 90 producer.send(message); 91 } catch (JMSException e) { 92 log.warn("Error dispatching message", e); 93 } catch (JAXBException e) { 94 log.warn("Error dispatching message", e); 95 } catch (InvalidTopicException e) { 96 log.warn("Error dispatching message", e); 97 } finally { 98 if (session != null) { 99 try { 100 session.close(); 101 } catch (JMSException e) { 102 log.debug("Error closing session", e); 103 } 104 } 105 } 106 } 107 108 @Override 109 protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault { 110 super.validatePublisher(registerPublisherRequest); 111 try { 112 jmsTopic = topicConverter.toActiveMQTopic(topic); 113 } catch (InvalidTopicException e) { 114 InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType(); 115 throw new InvalidTopicExpressionFault(e.getMessage(), fault); 116 } 117 } 118 119 @Override 120 protected void start() throws PublisherRegistrationFailedFault { 121 if (demand) { 122 try { 123 advisory = new ConsumerEventSource(connection, jmsTopic); 124 advisory.setConsumerListener(this); 125 advisory.start(); 126 } catch (Exception e) { 127 PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType(); 128 throw new PublisherRegistrationFailedFault("Error starting demand-based publisher", fault, e); 129 } 130 } 131 } 132 133 protected void destroy() throws ResourceNotDestroyedFault { 134 try { 135 if (advisory != null) { 136 advisory.stop(); 137 } 138 } catch (Exception e) { 139 ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType(); 140 throw new ResourceNotDestroyedFault("Error destroying publisher", fault, e); 141 } finally { 142 super.destroy(); 143 } 144 } 145 146 public void onConsumerEvent(ConsumerEvent event) { 147 if (event.getConsumerCount() > 0) { 148 if (subscription == null) { 149 subscription = startSubscription(); 151 } 152 } else { 153 if (subscription != null) { 154 Object sub = subscription; 156 subscription = null; 157 destroySubscription(sub); 158 } 159 } 160 } 161 162 protected abstract void destroySubscription(Object subscription); 163 164 protected abstract Object startSubscription(); 165 166 167 } 168 | Popular Tags |