1 16 package org.apache.cocoon.components.jms; 17 18 import javax.jms.DeliveryMode ; 19 import javax.jms.JMSException ; 20 import javax.jms.Message ; 21 import javax.jms.Session ; 22 import javax.jms.Topic ; 23 import javax.jms.TopicConnection ; 24 import javax.jms.TopicPublisher ; 25 import javax.jms.TopicSession ; 26 27 import org.apache.avalon.framework.activity.Disposable; 28 import org.apache.avalon.framework.activity.Initializable; 29 import org.apache.avalon.framework.logger.AbstractLogEnabled; 30 import org.apache.avalon.framework.parameters.ParameterException; 31 import org.apache.avalon.framework.parameters.Parameterizable; 32 import org.apache.avalon.framework.parameters.Parameters; 33 import org.apache.avalon.framework.service.ServiceException; 34 import org.apache.avalon.framework.service.ServiceManager; 35 import org.apache.avalon.framework.service.Serviceable; 36 37 91 public abstract class AbstractMessagePublisher extends AbstractLogEnabled 92 implements Serviceable, Parameterizable, Initializable, Disposable, JMSConnectionEventListener { 93 94 96 private static final String CONNECTION_PARAM = "connection"; 97 private static final String TOPIC_PARAM = "topic"; 98 private static final String PRIORITY_PARAM = "priority"; 99 private static final String TIME_TO_LIVE_PARAM = "time-to-live"; 100 private static final String PERSISTENT_DELIVERY_PARAM = "persistent-delivery"; 101 102 private static final int DEFAULT_PRIORITY = 4; 103 private static final int DEFAULT_TIME_TO_LIVE = 10000; 104 105 107 private ServiceManager m_manager; 108 private JMSConnectionManager m_connectionManager; 109 110 protected TopicSession m_session; 111 protected TopicPublisher m_publisher; 112 113 protected int m_mode; 114 protected int m_priority; 115 protected int m_timeToLive; 116 protected String m_topicName; 117 protected int m_acknowledgeMode; 118 protected String m_connectionName; 119 120 122 public AbstractMessagePublisher() { 123 } 124 125 public void service(ServiceManager manager) throws ServiceException { 126 m_manager = manager; 127 m_connectionManager = (JMSConnectionManager) m_manager.lookup(JMSConnectionManager.ROLE); 128 } 129 130 public void parameterize(Parameters parameters) throws ParameterException { 131 m_connectionName = parameters.getParameter(CONNECTION_PARAM); 132 m_topicName = parameters.getParameter(TOPIC_PARAM); 133 m_priority = parameters.getParameterAsInteger(PRIORITY_PARAM, DEFAULT_PRIORITY); 134 boolean persistent = parameters.getParameterAsBoolean(PERSISTENT_DELIVERY_PARAM, false); 135 m_mode = (persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); 136 m_timeToLive = parameters.getParameterAsInteger(TIME_TO_LIVE_PARAM, DEFAULT_TIME_TO_LIVE); 137 } 138 139 public void initialize() throws Exception { 140 if (m_connectionManager instanceof JMSConnectionEventNotifier) { 141 ((JMSConnectionEventNotifier) m_connectionManager).addConnectionListener(m_connectionName, this); 142 } 143 createSessionAndPublisher(); 144 } 145 146 public void dispose() { 147 closePublisherAndSession(); 148 if (m_manager != null) { 149 if (m_connectionManager != null) { 150 m_manager.release(m_connectionManager); 151 } 152 } 153 } 154 155 157 public void onConnection(String name) { 158 if (getLogger().isInfoEnabled()) { 159 getLogger().info("Creating publisher because of reconnection"); 160 } 161 try { 162 createSessionAndPublisher(); 163 } 164 catch (JMSException e) { 165 if (getLogger().isWarnEnabled()) { 166 getLogger().warn("Reinitialization after reconnection failed", e); 167 } 168 } 169 } 170 171 public void onDisconnection(String name) { 172 if (getLogger().isInfoEnabled()) { 173 getLogger().info("Closing subscriber because of disconnection"); 174 } 175 closePublisherAndSession(); 176 } 177 178 180 183 protected synchronized void publishMessage(Message message) throws JMSException { 184 if (getLogger().isDebugEnabled()) { 186 getLogger().debug("Publishing message '" + message + "'"); 187 } 188 m_publisher.publish(message, m_mode, m_priority, m_timeToLive); 189 } 190 191 private void createSessionAndPublisher() throws JMSException { 192 m_acknowledgeMode = Session.DUPS_OK_ACKNOWLEDGE; 195 196 final TopicConnection connection = (TopicConnection ) m_connectionManager.getConnection(m_connectionName); 198 if (connection != null) { 199 m_session = connection.createTopicSession(false, m_acknowledgeMode); 200 final Topic topic = m_session.createTopic(m_topicName); 201 m_publisher = m_session.createPublisher(topic); 202 } 203 else { 204 if (getLogger().isWarnEnabled()) { 205 getLogger().warn("Could not obtain JMS connection '" + m_connectionName + "'"); 206 } 207 } 208 } 209 210 private void closePublisherAndSession() { 211 if (m_publisher != null) { 212 try { 213 m_publisher.close(); 214 } catch (JMSException e) { 215 getLogger().error("Error closing publisher.", e); 216 } 217 } 218 if (m_session != null) { 219 try { 220 m_session.close(); 221 } 222 catch (JMSException e) { 223 getLogger().warn("Error closing session.", e); 224 } 225 } 226 } 227 228 } 229 | Popular Tags |