1 5 package com.opensymphony.oscache.plugins.clustersupport; 6 7 import com.opensymphony.oscache.base.Cache; 8 import com.opensymphony.oscache.base.Config; 9 import com.opensymphony.oscache.base.FinalizationException; 10 import com.opensymphony.oscache.base.InitializationException; 11 12 import org.apache.commons.logging.Log; 13 import org.apache.commons.logging.LogFactory; 14 15 import javax.jms.*; 16 17 import javax.naming.InitialContext ; 18 19 25 public class JMS10BroadcastingListener extends AbstractBroadcastingListener { 26 private final static Log log = LogFactory.getLog(JMS10BroadcastingListener.class); 27 28 31 private String clusterNode; 32 33 36 private TopicConnection connection; 37 38 41 private TopicPublisher publisher; 42 43 46 private TopicSession publisherSession; 47 48 66 public void initialize(Cache cache, Config config) throws InitializationException { 67 super.initialize(cache, config); 68 69 clusterNode = config.getProperty("cache.cluster.jms.node.name"); 71 72 String topic = config.getProperty("cache.cluster.jms.topic.name"); 73 String topicFactory = config.getProperty("cache.cluster.jms.topic.factory"); 74 75 if (log.isInfoEnabled()) { 76 log.info("Starting JMS clustering (node name=" + clusterNode + ", topic=" + topic + ", topic factory=" + topicFactory + ")"); 77 } 78 79 try { 80 InitialContext jndi = new InitialContext (); 83 84 TopicConnectionFactory connectionFactory = (TopicConnectionFactory) jndi.lookup(topicFactory); 86 87 connection = connectionFactory.createTopicConnection(); 89 90 publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 92 93 TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 94 95 Topic chatTopic = (Topic) jndi.lookup(topic); 97 98 publisher = publisherSession.createPublisher(chatTopic); 100 101 TopicSubscriber subscriber = subSession.createSubscriber(chatTopic); 102 103 subscriber.setMessageListener(new MessageListener() { 105 public void onMessage(Message message) { 106 try { 107 ObjectMessage objectMessage = null; 109 110 if (!(message instanceof ObjectMessage)) { 111 log.error("Cannot handle message of type (class=" + message.getClass().getName() + "). Notification ignored."); 112 return; 113 } 114 115 objectMessage = (ObjectMessage) message; 116 117 if (!(objectMessage.getObject() instanceof ClusterNotification)) { 119 log.error("An unknown cluster notification message received (class=" + objectMessage.getObject().getClass().getName() + "). Notification ignored."); 120 return; 121 } 122 123 if (log.isDebugEnabled()) { 124 log.debug(objectMessage.getObject()); 125 } 126 127 if (!objectMessage.getStringProperty("nodeName").equals(clusterNode)) { 129 ClusterNotification notification = (ClusterNotification) objectMessage.getObject(); 131 handleClusterNotification(notification); 132 } 133 } catch (JMSException jmsEx) { 134 log.error("Cannot handle cluster Notification", jmsEx); 135 } 136 } 137 }); 138 139 connection.start(); 141 } catch (Exception e) { 142 throw new InitializationException("Initialization of the JMS10BroadcastingListener failed: " + e); 143 } 144 } 145 146 152 public void finialize() throws FinalizationException { 153 try { 154 if (log.isInfoEnabled()) { 155 log.info("Shutting down JMS clustering..."); 156 } 157 158 connection.close(); 159 160 if (log.isInfoEnabled()) { 161 log.info("JMS clustering shutdown complete."); 162 } 163 } catch (JMSException e) { 164 log.warn("A problem was encountered when closing the JMS connection", e); 165 } 166 } 167 168 protected void sendNotification(ClusterNotification message) { 169 try { 170 ObjectMessage objectMessage = publisherSession.createObjectMessage(); 171 objectMessage.setObject(message); 172 173 objectMessage.setStringProperty("nodeName", clusterNode); 175 publisher.publish(objectMessage); 176 } catch (JMSException e) { 177 log.error("Cannot send notification " + message, e); 178 } 179 } 180 } 181 | Popular Tags |