1 17 package org.apache.servicemix.wsn.jms; 18 19 import java.io.StringReader ; 20 import java.io.StringWriter ; 21 import java.util.ArrayList ; 22 import java.util.List ; 23 24 import javax.jms.Connection ; 25 import javax.jms.JMSException ; 26 import javax.jms.Message ; 27 import javax.jms.MessageConsumer ; 28 import javax.jms.MessageProducer ; 29 import javax.jms.Queue ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 import javax.xml.bind.JAXBContext; 33 import javax.xml.bind.JAXBException; 34 35 import org.apache.commons.logging.Log; 36 import org.apache.commons.logging.LogFactory; 37 import org.apache.servicemix.wsn.AbstractPullPoint; 38 import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault; 39 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType; 40 import org.oasis_open.docs.wsn.b_2.Notify; 41 import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType; 42 43 public class JmsPullPoint extends AbstractPullPoint { 44 45 private static Log log = LogFactory.getLog(JmsPullPoint.class); 46 47 private JAXBContext jaxbContext; 48 private Connection connection; 49 private Session session; 50 private Queue queue; 51 private MessageProducer producer; 52 private MessageConsumer consumer; 53 54 public JmsPullPoint(String name) { 55 super(name); 56 try { 57 jaxbContext = JAXBContext.newInstance(Notify.class); 58 } catch (JAXBException e) { 59 throw new RuntimeException ("Could not create PullEndpoint", e); 60 } 61 } 62 63 protected void initSession() throws JMSException { 64 if (session == null) { 65 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 66 queue = session.createQueue(getName()); 67 producer = session.createProducer(queue); 68 consumer = session.createConsumer(queue); 69 } 70 } 71 72 @Override 73 protected synchronized void store(NotificationMessageHolderType messageHolder) { 74 try { 75 initSession(); 76 Notify notify = new Notify(); 77 notify.getNotificationMessage().add(messageHolder); 78 StringWriter writer = new StringWriter (); 79 jaxbContext.createMarshaller().marshal(notify, writer); 80 Message message = session.createTextMessage(writer.toString()); 81 producer.send(message); 82 } catch (JMSException e) { 83 log.warn("Error storing message", e); 84 if (session != null) { 85 try { 86 session.close(); 87 } catch (JMSException inner) { 88 log.debug("Error closing session", inner); 89 } finally { 90 session = null; 91 } 92 } 93 } catch (JAXBException e) { 94 log.warn("Error storing message", e); 95 } 96 } 97 98 @Override 99 protected synchronized List <NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault { 100 Session session = null; 101 try { 102 if (max == 0) { 103 max = 256; 104 } 105 initSession(); 106 List <NotificationMessageHolderType> messages = new ArrayList <NotificationMessageHolderType>(); 107 for (int i = 0; i < max; i++) { 108 Message msg = consumer.receiveNoWait(); 109 if (msg == null) { 110 break; 111 } 112 TextMessage txtMsg = (TextMessage ) msg; 113 StringReader reader = new StringReader (txtMsg.getText()); 114 Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader); 115 messages.addAll(notify.getNotificationMessage()); 116 } 117 return messages; 118 } catch (JMSException e) { 119 log.info("Error retrieving messages", e); 120 if (session != null) { 121 try { 122 session.close(); 123 } catch (JMSException inner) { 124 log.debug("Error closing session", inner); 125 } finally { 126 session = null; 127 } 128 } 129 ResourceUnknownFaultType fault = new ResourceUnknownFaultType(); 130 throw new ResourceUnknownFault("Unable to retrieve messages", fault, e); 131 } catch (JAXBException e) { 132 log.info("Error retrieving messages", e); 133 ResourceUnknownFaultType fault = new ResourceUnknownFaultType(); 134 throw new ResourceUnknownFault("Unable to retrieve messages", fault, e); 135 } 136 } 137 138 public Connection getConnection() { 139 return connection; 140 } 141 142 public void setConnection(Connection connection) { 143 this.connection = connection; 144 } 145 146 } 147 | Popular Tags |