KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > cocoon > components > jms > AbstractMessagePublisher


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.apache.cocoon.components.jms;
17
18 import javax.jms.DeliveryMode JavaDoc;
19 import javax.jms.JMSException JavaDoc;
20 import javax.jms.Message JavaDoc;
21 import javax.jms.Session JavaDoc;
22 import javax.jms.Topic JavaDoc;
23 import javax.jms.TopicConnection JavaDoc;
24 import javax.jms.TopicPublisher JavaDoc;
25 import javax.jms.TopicSession JavaDoc;
26
27 import org.apache.avalon.framework.activity.Disposable;
28 import org.apache.avalon.framework.activity.Initializable;
29 import org.apache.avalon.framework.logger.AbstractLogEnabled;
30 import org.apache.avalon.framework.parameters.ParameterException;
31 import org.apache.avalon.framework.parameters.Parameterizable;
32 import org.apache.avalon.framework.parameters.Parameters;
33 import org.apache.avalon.framework.service.ServiceException;
34 import org.apache.avalon.framework.service.ServiceManager;
35 import org.apache.avalon.framework.service.Serviceable;
36
37 /**
38  * Abstract JMS message publisher. Use this as a basis for components
39  * that want to publish JMS messages.
40  * When used in conjunction with the default {@link org.apache.cocoon.components.jms.JMSConnectionManager}
41  * implementation this class supports automatic reconnection when the connection gets severed.
42  *
43  * <p>Parameters:</p>
44  * <table border="1">
45  * <tbody>
46  * <tr>
47  * <th align="left">parameter</th>
48  * <th align="left">required</th>
49  * <th align="left">default</th>
50  * <th align="left">description</th>
51  * </tr>
52  * <tr>
53  * <td valign="top">connection</td>
54  * <td valign="top">yes</td>
55  * <td>&nbsp;</td>
56  * <td valign="top">
57  * Name of the connection registered with
58  * {@link org.apache.cocoon.components.jms.JMSConnectionManager}.
59  * This must be a topic connection.
60  * </td>
61  * </tr>
62  * <tr>
63  * <td valign="top">topic</td>
64  * <td valign="top">yes</td>
65  * <td>&nbsp;</td>
66  * <td valign="top">The name of the topic to publish messages to.</td>
67  * </tr>
68  * <tr>
69  * <td valign="top">priority</td>
70  * <td valign="top">no</td>
71  * <td>4</td>
72  * <td valign="top">the priority of the published messages</td>
73  * </tr>
74  * <tr>
75  * <td valign="top">time-to-live</td>
76  * <td valign="top">no</td>
77  * <td>10000</td>
78  * <td valign="top">the message's lifetime in milliseconds</td>
79  * </tr>
80  * <tr>
81  * <td valign="top">persistent-delivery</td>
82  * <td valign="top">no</td>
83  * <td>false</td>
84  * <td valign="top">whether to use persistent delivery mode when publishing messages</td>
85  * </tr>
86  * </tbody>
87  * </table>
88  *
89  * @version CVS $Id: AbstractMessagePublisher.java 30941 2004-07-29 19:56:58Z vgritsenko $
90  */

91 public abstract class AbstractMessagePublisher extends AbstractLogEnabled
92 implements Serviceable, Parameterizable, Initializable, Disposable, JMSConnectionEventListener {
93
94     // ---------------------------------------------------- Constants
95

96     private static final String JavaDoc CONNECTION_PARAM = "connection";
97     private static final String JavaDoc TOPIC_PARAM = "topic";
98     private static final String JavaDoc PRIORITY_PARAM = "priority";
99     private static final String JavaDoc TIME_TO_LIVE_PARAM = "time-to-live";
100     private static final String JavaDoc PERSISTENT_DELIVERY_PARAM = "persistent-delivery";
101     
102     private static final int DEFAULT_PRIORITY = 4;
103     private static final int DEFAULT_TIME_TO_LIVE = 10000;
104
105     // ---------------------------------------------------- Instance variables
106

107     private ServiceManager m_manager;
108     private JMSConnectionManager m_connectionManager;
109
110     protected TopicSession JavaDoc m_session;
111     protected TopicPublisher JavaDoc m_publisher;
112
113     protected int m_mode;
114     protected int m_priority;
115     protected int m_timeToLive;
116     protected String JavaDoc m_topicName;
117     protected int m_acknowledgeMode;
118     protected String JavaDoc m_connectionName;
119
120     // ---------------------------------------------------- Lifecycle
121

122     public AbstractMessagePublisher() {
123     }
124
125     public void service(ServiceManager manager) throws ServiceException {
126         m_manager = manager;
127         m_connectionManager = (JMSConnectionManager) m_manager.lookup(JMSConnectionManager.ROLE);
128     }
129
130     public void parameterize(Parameters parameters) throws ParameterException {
131         m_connectionName = parameters.getParameter(CONNECTION_PARAM);
132         m_topicName = parameters.getParameter(TOPIC_PARAM);
133         m_priority = parameters.getParameterAsInteger(PRIORITY_PARAM, DEFAULT_PRIORITY);
134         boolean persistent = parameters.getParameterAsBoolean(PERSISTENT_DELIVERY_PARAM, false);
135         m_mode = (persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
136         m_timeToLive = parameters.getParameterAsInteger(TIME_TO_LIVE_PARAM, DEFAULT_TIME_TO_LIVE);
137     }
138
139     public void initialize() throws Exception JavaDoc {
140         if (m_connectionManager instanceof JMSConnectionEventNotifier) {
141             ((JMSConnectionEventNotifier) m_connectionManager).addConnectionListener(m_connectionName, this);
142         }
143         createSessionAndPublisher();
144     }
145
146     public void dispose() {
147         closePublisherAndSession();
148         if (m_manager != null) {
149             if (m_connectionManager != null) {
150                 m_manager.release(m_connectionManager);
151             }
152         }
153     }
154
155     // ---------------------------------------------------- JMSConnectionEventListener
156

157     public void onConnection(String JavaDoc name) {
158         if (getLogger().isInfoEnabled()) {
159             getLogger().info("Creating publisher because of reconnection");
160         }
161         try {
162             createSessionAndPublisher();
163         }
164         catch (JMSException JavaDoc e) {
165             if (getLogger().isWarnEnabled()) {
166                 getLogger().warn("Reinitialization after reconnection failed", e);
167             }
168         }
169     }
170
171     public void onDisconnection(String JavaDoc name) {
172         if (getLogger().isInfoEnabled()) {
173             getLogger().info("Closing subscriber because of disconnection");
174         }
175         closePublisherAndSession();
176     }
177
178     // ---------------------------------------------------- Implementation
179

180     /**
181      * Concrete classes call this method to publish messages.
182      */

183     protected synchronized void publishMessage(Message JavaDoc message) throws JMSException JavaDoc {
184         // TODO: discover disconnected state and queue messages until connected.
185
if (getLogger().isDebugEnabled()) {
186             getLogger().debug("Publishing message '" + message + "'");
187         }
188         m_publisher.publish(message, m_mode, m_priority, m_timeToLive);
189     }
190
191     private void createSessionAndPublisher() throws JMSException JavaDoc {
192         // set the default acknowledge mode
193
// concrete implementations may override this
194
m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE;
195
196         // create the message publisher
197
final TopicConnection JavaDoc connection = (TopicConnection JavaDoc) m_connectionManager.getConnection(m_connectionName);
198         if (connection != null) {
199             m_session = connection.createTopicSession(false, m_acknowledgeMode);
200             final Topic JavaDoc topic = m_session.createTopic(m_topicName);
201             m_publisher = m_session.createPublisher(topic);
202         }
203         else {
204             if (getLogger().isWarnEnabled()) {
205                 getLogger().warn("Could not obtain JMS connection '" + m_connectionName + "'");
206             }
207         }
208     }
209
210     private void closePublisherAndSession() {
211         if (m_publisher != null) {
212             try {
213                 m_publisher.close();
214             } catch (JMSException JavaDoc e) {
215                 getLogger().error("Error closing publisher.", e);
216             }
217         }
218         if (m_session != null) {
219             try {
220                 m_session.close();
221             }
222             catch (JMSException JavaDoc e) {
223                 getLogger().warn("Error closing session.", e);
224             }
225         }
226     }
227
228 }
229
Popular Tags