KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > cocoon > components > jms > AbstractMessageListener


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.apache.cocoon.components.jms;
17
18 import javax.jms.JMSException JavaDoc;
19 import javax.jms.MessageListener JavaDoc;
20 import javax.jms.Session JavaDoc;
21 import javax.jms.Topic JavaDoc;
22 import javax.jms.TopicConnection JavaDoc;
23 import javax.jms.TopicSession JavaDoc;
24 import javax.jms.TopicSubscriber JavaDoc;
25 import org.apache.avalon.framework.activity.Disposable;
26 import org.apache.avalon.framework.activity.Initializable;
27 import org.apache.avalon.framework.logger.AbstractLogEnabled;
28 import org.apache.avalon.framework.parameters.ParameterException;
29 import org.apache.avalon.framework.parameters.Parameterizable;
30 import org.apache.avalon.framework.parameters.Parameters;
31 import org.apache.avalon.framework.service.ServiceException;
32 import org.apache.avalon.framework.service.ServiceManager;
33 import org.apache.avalon.framework.service.Serviceable;
34
35 /**
36  * Abstract {@link javax.jms.MessageListener} implementation.
37  * Use this as a basis for concrete MessageListener implementations.
38  * When used in conjunction with the default {@link org.apache.cocoon.components.jms.JMSConnectionManager}
39  * implementation this class supports automatic reconnection when the connection gets severed.
40  *
41  * <p>Parameters:</p>
42  * <table border="1">
43  * <tbody>
44  * <tr>
45  * <th align="left">parameter</th>
46  * <th align="left">required/default</th>
47  * <th align="left">description</th>
48  * </tr>
49  * <tr>
50  * <td valign="top">connection</td>
51  * <td valign="top">required</td>
52  * <td valign="top">
53  * Name of the connection registered with
54  * {@link org.apache.cocoon.components.jms.JMSConnectionManager}.
55  * This must be a topic connection.
56  * </td>
57  * </tr>
58  * <tr>
59  * <td>topic</td>
60  * <td>required</td>
61  * <td>The name of the topic to subscribe to.</td>
62  * </tr>
63  * <tr>
64  * <td>subscription-id</td>
65  * <td>(<code>null</code>)</td>
66  * <td>An optional durable subscription id.</td>
67  * </tr>
68  * <tr>
69  * <td>message-selector</td>
70  * <td>(<code>null</code>)</td>
71  * <td>An optional message selector.</td>
72  * </tr>
73  * </tbody>
74  * </table>
75  *
76  * @version CVS $Id: AbstractMessageListener.java 30941 2004-07-29 19:56:58Z vgritsenko $
77  */

78 public abstract class AbstractMessageListener extends AbstractLogEnabled
79 implements MessageListener JavaDoc, Serviceable, Parameterizable, Initializable, Disposable,
80            JMSConnectionEventListener {
81
82     // ---------------------------------------------------- Constants
83

84     private static final String JavaDoc CONNECTION_PARAM = "connection";
85     private static final String JavaDoc TOPIC_PARAM = "topic";
86     private static final String JavaDoc SUBSCRIPTION_ID_PARAM = "subscription-id";
87     private static final String JavaDoc MESSAGE_SELECTOR_PARAM = "message-selector";
88
89     // ---------------------------------------------------- Instance variables
90

91     protected ServiceManager m_manager;
92
93     /* configuration */
94     protected String JavaDoc m_connectionName;
95     protected String JavaDoc m_topicName;
96     protected String JavaDoc m_subscriptionId;
97     protected String JavaDoc m_selector;
98     protected int m_acknowledgeMode;
99
100     /* connection manager component */
101     private JMSConnectionManager m_connectionManager;
102
103     /* our session */
104     private TopicSession JavaDoc m_session;
105
106     /* our subscriber */
107     private TopicSubscriber JavaDoc m_subscriber;
108
109     // ---------------------------------------------------- Lifecycle
110

111     public AbstractMessageListener () {
112     }
113
114     public void service(ServiceManager manager) throws ServiceException {
115         m_manager = manager;
116         m_connectionManager = (JMSConnectionManager) m_manager.lookup(JMSConnectionManager.ROLE);
117     }
118
119     public void parameterize(Parameters parameters) throws ParameterException {
120
121         m_connectionName = parameters.getParameter(CONNECTION_PARAM);
122         m_topicName = parameters.getParameter(TOPIC_PARAM);
123
124         m_subscriptionId = parameters.getParameter(SUBSCRIPTION_ID_PARAM, null);
125         m_selector = parameters.getParameter(MESSAGE_SELECTOR_PARAM, null);
126
127     }
128
129     /**
130      * Registers this MessageListener as a TopicSubscriber to the configured Topic.
131      * @throws Exception
132      */

133     public void initialize() throws Exception JavaDoc {
134         if (m_connectionManager instanceof JMSConnectionEventNotifier) {
135             ((JMSConnectionEventNotifier) m_connectionManager).addConnectionListener(m_connectionName, this);
136         }
137         createSessionAndSubscriber();
138     }
139
140     public void dispose() {
141         closeSubscriberAndSession();
142         m_manager.release(m_connectionManager);
143     }
144
145     public void onConnection(String JavaDoc name) {
146         if (getLogger().isInfoEnabled()) {
147             getLogger().info("Creating subscriber because of reconnection");
148         }
149         try {
150             createSessionAndSubscriber();
151         }
152         catch (JMSException JavaDoc e) {
153             if (getLogger().isWarnEnabled()) {
154                 getLogger().warn("Reinitialization after reconnection failed", e);
155             }
156         }
157     }
158
159     public void onDisconnection(String JavaDoc name) {
160         if (getLogger().isInfoEnabled()) {
161             getLogger().info("Closing subscriber because of disconnection");
162         }
163         closeSubscriberAndSession();
164     }
165
166     private void createSessionAndSubscriber() throws JMSException JavaDoc {
167         // set the default acknowledge mode to dups
168
// concrete implementations may want to override this
169
m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
170
171         // register this MessageListener with a TopicSubscriber
172
final TopicConnection JavaDoc connection = (TopicConnection JavaDoc) m_connectionManager.getConnection(m_connectionName);
173         if (connection != null) {
174             m_session = connection.createTopicSession(false, m_acknowledgeMode);
175             final Topic JavaDoc topic = m_session.createTopic(m_topicName);
176             if (m_subscriptionId != null) {
177                 m_subscriber = m_session.createDurableSubscriber(topic, m_subscriptionId, m_selector, false);
178             }
179             else {
180                 m_subscriber = m_session.createSubscriber(topic, m_selector, false);
181             }
182             m_subscriber.setMessageListener(this);
183             // recover in case of reconnection
184
m_session.recover();
185         }
186         else {
187             if (getLogger().isWarnEnabled()) {
188                 getLogger().warn("Could not obtain JMS connection '" + m_connectionName + "'");
189             }
190         }
191     }
192
193     private void closeSubscriberAndSession() {
194         if (m_subscriber != null) {
195             try {
196                 m_subscriber.close();
197             } catch (JMSException JavaDoc e) {
198                 getLogger().error("Error closing subscriber", e);
199             }
200             finally {
201                 m_subscriber = null;
202             }
203         }
204         if (m_session != null) {
205             try {
206                 m_session.close();
207             }
208             catch (JMSException JavaDoc e) {
209                 getLogger().error("Error closing session", e);
210             }
211             finally {
212                 m_session = null;
213             }
214         }
215     }
216 }
217
Popular Tags