1 16 package org.apache.cocoon.components.jms; 17 18 import javax.jms.JMSException ; 19 import javax.jms.MessageListener ; 20 import javax.jms.Session ; 21 import javax.jms.Topic ; 22 import javax.jms.TopicConnection ; 23 import javax.jms.TopicSession ; 24 import javax.jms.TopicSubscriber ; 25 import org.apache.avalon.framework.activity.Disposable; 26 import org.apache.avalon.framework.activity.Initializable; 27 import org.apache.avalon.framework.logger.AbstractLogEnabled; 28 import org.apache.avalon.framework.parameters.ParameterException; 29 import org.apache.avalon.framework.parameters.Parameterizable; 30 import org.apache.avalon.framework.parameters.Parameters; 31 import org.apache.avalon.framework.service.ServiceException; 32 import org.apache.avalon.framework.service.ServiceManager; 33 import org.apache.avalon.framework.service.Serviceable; 34 35 78 public abstract class AbstractMessageListener extends AbstractLogEnabled 79 implements MessageListener , Serviceable, Parameterizable, Initializable, Disposable, 80 JMSConnectionEventListener { 81 82 84 private static final String CONNECTION_PARAM = "connection"; 85 private static final String TOPIC_PARAM = "topic"; 86 private static final String SUBSCRIPTION_ID_PARAM = "subscription-id"; 87 private static final String MESSAGE_SELECTOR_PARAM = "message-selector"; 88 89 91 protected ServiceManager m_manager; 92 93 94 protected String m_connectionName; 95 protected String m_topicName; 96 protected String m_subscriptionId; 97 protected String m_selector; 98 protected int m_acknowledgeMode; 99 100 101 private JMSConnectionManager m_connectionManager; 102 103 104 private TopicSession m_session; 105 106 107 private TopicSubscriber m_subscriber; 108 109 111 public AbstractMessageListener () { 112 } 113 114 public void service(ServiceManager manager) throws ServiceException { 115 m_manager = manager; 116 m_connectionManager = (JMSConnectionManager) m_manager.lookup(JMSConnectionManager.ROLE); 117 } 118 119 public void parameterize(Parameters parameters) throws ParameterException { 120 121 m_connectionName = parameters.getParameter(CONNECTION_PARAM); 122 m_topicName = parameters.getParameter(TOPIC_PARAM); 123 124 m_subscriptionId = parameters.getParameter(SUBSCRIPTION_ID_PARAM, null); 125 m_selector = parameters.getParameter(MESSAGE_SELECTOR_PARAM, null); 126 127 } 128 129 133 public void initialize() throws Exception { 134 if (m_connectionManager instanceof JMSConnectionEventNotifier) { 135 ((JMSConnectionEventNotifier) m_connectionManager).addConnectionListener(m_connectionName, this); 136 } 137 createSessionAndSubscriber(); 138 } 139 140 public void dispose() { 141 closeSubscriberAndSession(); 142 m_manager.release(m_connectionManager); 143 } 144 145 public void onConnection(String name) { 146 if (getLogger().isInfoEnabled()) { 147 getLogger().info("Creating subscriber because of reconnection"); 148 } 149 try { 150 createSessionAndSubscriber(); 151 } 152 catch (JMSException e) { 153 if (getLogger().isWarnEnabled()) { 154 getLogger().warn("Reinitialization after reconnection failed", e); 155 } 156 } 157 } 158 159 public void onDisconnection(String name) { 160 if (getLogger().isInfoEnabled()) { 161 getLogger().info("Closing subscriber because of disconnection"); 162 } 163 closeSubscriberAndSession(); 164 } 165 166 private void createSessionAndSubscriber() throws JMSException { 167 m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE; 170 171 final TopicConnection connection = (TopicConnection ) m_connectionManager.getConnection(m_connectionName); 173 if (connection != null) { 174 m_session = connection.createTopicSession(false, m_acknowledgeMode); 175 final Topic topic = m_session.createTopic(m_topicName); 176 if (m_subscriptionId != null) { 177 m_subscriber = m_session.createDurableSubscriber(topic, m_subscriptionId, m_selector, false); 178 } 179 else { 180 m_subscriber = m_session.createSubscriber(topic, m_selector, false); 181 } 182 m_subscriber.setMessageListener(this); 183 m_session.recover(); 185 } 186 else { 187 if (getLogger().isWarnEnabled()) { 188 getLogger().warn("Could not obtain JMS connection '" + m_connectionName + "'"); 189 } 190 } 191 } 192 193 private void closeSubscriberAndSession() { 194 if (m_subscriber != null) { 195 try { 196 m_subscriber.close(); 197 } catch (JMSException e) { 198 getLogger().error("Error closing subscriber", e); 199 } 200 finally { 201 m_subscriber = null; 202 } 203 } 204 if (m_session != null) { 205 try { 206 m_session.close(); 207 } 208 catch (JMSException e) { 209 getLogger().error("Error closing session", e); 210 } 211 finally { 212 m_session = null; 213 } 214 } 215 } 216 } 217 | Popular Tags |