KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > wsn > jms > JmsPullPoint


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.wsn.jms;
18
19 import java.io.StringReader JavaDoc;
20 import java.io.StringWriter JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.List JavaDoc;
23
24 import javax.jms.Connection JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageProducer JavaDoc;
29 import javax.jms.Queue JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.jms.TextMessage JavaDoc;
32 import javax.xml.bind.JAXBContext;
33 import javax.xml.bind.JAXBException;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.servicemix.wsn.AbstractPullPoint;
38 import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
39 import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
40 import org.oasis_open.docs.wsn.b_2.Notify;
41 import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType;
42
43 public class JmsPullPoint extends AbstractPullPoint {
44
45     private static Log log = LogFactory.getLog(JmsPullPoint.class);
46     
47     private JAXBContext jaxbContext;
48     private Connection JavaDoc connection;
49     private Session JavaDoc session;
50     private Queue JavaDoc queue;
51     private MessageProducer JavaDoc producer;
52     private MessageConsumer JavaDoc consumer;
53
54     public JmsPullPoint(String JavaDoc name) {
55         super(name);
56         try {
57             jaxbContext = JAXBContext.newInstance(Notify.class);
58         } catch (JAXBException e) {
59             throw new RuntimeException JavaDoc("Could not create PullEndpoint", e);
60         }
61     }
62     
63     protected void initSession() throws JMSException JavaDoc {
64         if (session == null) {
65             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
66             queue = session.createQueue(getName());
67             producer = session.createProducer(queue);
68             consumer = session.createConsumer(queue);
69         }
70     }
71
72     @Override JavaDoc
73     protected synchronized void store(NotificationMessageHolderType messageHolder) {
74         try {
75             initSession();
76             Notify notify = new Notify();
77             notify.getNotificationMessage().add(messageHolder);
78             StringWriter JavaDoc writer = new StringWriter JavaDoc();
79             jaxbContext.createMarshaller().marshal(notify, writer);
80             Message JavaDoc message = session.createTextMessage(writer.toString());
81             producer.send(message);
82         } catch (JMSException JavaDoc e) {
83             log.warn("Error storing message", e);
84             if (session != null) {
85                 try {
86                     session.close();
87                 } catch (JMSException JavaDoc inner) {
88                     log.debug("Error closing session", inner);
89                 } finally {
90                     session = null;
91                 }
92             }
93         } catch (JAXBException e) {
94             log.warn("Error storing message", e);
95         }
96     }
97
98     @Override JavaDoc
99     protected synchronized List JavaDoc<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault {
100         Session JavaDoc session = null;
101         try {
102             if (max == 0) {
103                 max = 256;
104             }
105             initSession();
106             List JavaDoc<NotificationMessageHolderType> messages = new ArrayList JavaDoc<NotificationMessageHolderType>();
107             for (int i = 0; i < max; i++) {
108                 Message JavaDoc msg = consumer.receiveNoWait();
109                 if (msg == null) {
110                     break;
111                 }
112                 TextMessage JavaDoc txtMsg = (TextMessage JavaDoc) msg;
113                 StringReader JavaDoc reader = new StringReader JavaDoc(txtMsg.getText());
114                 Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
115                 messages.addAll(notify.getNotificationMessage());
116             }
117             return messages;
118         } catch (JMSException JavaDoc e) {
119             log.info("Error retrieving messages", e);
120             if (session != null) {
121                 try {
122                     session.close();
123                 } catch (JMSException JavaDoc inner) {
124                     log.debug("Error closing session", inner);
125                 } finally {
126                     session = null;
127                 }
128             }
129             ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
130             throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
131         } catch (JAXBException e) {
132             log.info("Error retrieving messages", e);
133             ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
134             throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
135         }
136     }
137     
138     public Connection JavaDoc getConnection() {
139         return connection;
140     }
141
142     public void setConnection(Connection JavaDoc connection) {
143         this.connection = connection;
144     }
145
146 }
147
Popular Tags