KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > opensymphony > oscache > plugins > clustersupport > JMSBroadcastingListener


1 /*
2  * Copyright (c) 2002-2003 by OpenSymphony
3  * All rights reserved.
4  */

5 package com.opensymphony.oscache.plugins.clustersupport;
6
7 import com.opensymphony.oscache.base.Cache;
8 import com.opensymphony.oscache.base.Config;
9 import com.opensymphony.oscache.base.FinalizationException;
10 import com.opensymphony.oscache.base.InitializationException;
11
12 import org.apache.commons.logging.Log;
13 import org.apache.commons.logging.LogFactory;
14
15 import javax.jms.*;
16
17 import javax.naming.InitialContext JavaDoc;
18
19 /**
20  * A JMS based clustering implementation. This implementation is independent of the
21  * JMS provider and uses non-persistent messages on a publish subscribe protocol.
22  *
23  * @author <a HREF="mailto:motoras@linuxmail.org">Romulus Pasca</a>
24  */

25 public class JMSBroadcastingListener extends AbstractBroadcastingListener {
26     private final static Log log = LogFactory.getLog(JMSBroadcastingListener.class);
27
28     /**
29      *The JMS connection used
30      */

31     private Connection connection;
32
33     /**
34      * Th object used to publish new messages
35      */

36     private MessageProducer messagePublisher;
37
38     /**
39      * The current JMS session
40      */

41     private Session publisherSession;
42
43     /**
44      * The name of this cluster. Used to identify the sender of a message.
45      */

46     private String JavaDoc clusterNode;
47
48     /**
49      * <p>Called by the cache administrator class when a cache is instantiated.</p>
50      * <p>The JMS broadcasting implementation requires the following configuration
51      * properties to be specified in <code>oscache.properties</code>:
52      * <ul>
53      * <li><b>cache.cluster.jms.topic.factory</b> - The JMS connection factory to use</li>
54      * <li><b>cache.cluster.jms.topic.name</b> - The JMS topic name</li>
55      * <li><b>cache.cluster.jms.node.name</b> - The name of this node in the cluster. This
56      * should be unique for each node.</li>
57      * Please refer to the clustering documentation for further details on configuring
58      * the JMS clustered caching.</p>
59      *
60      * @param cache the cache instance that this listener is attached to.
61      *
62      * @throws com.opensymphony.oscache.base.InitializationException thrown when there was a
63      * problem initializing the listener. The cache administrator will log this error and
64      * disable the listener.
65      */

66     public void initialize(Cache cache, Config config) throws InitializationException {
67         super.initialize(cache, config);
68
69         // Get the name of this node
70
clusterNode = config.getProperty("cache.cluster.jms.node.name");
71
72         String JavaDoc topic = config.getProperty("cache.cluster.jms.topic.name");
73         String JavaDoc topicFactory = config.getProperty("cache.cluster.jms.topic.factory");
74
75         if (log.isInfoEnabled()) {
76             log.info("Starting JMS clustering (node name=" + clusterNode + ", topic=" + topic + ", topic factory=" + topicFactory + ")");
77         }
78
79         try {
80             // Make sure you have specified the necessary JNDI properties (usually in
81
// a jndi.properties resource file, or as system properties)
82
InitialContext JavaDoc jndi = new InitialContext JavaDoc();
83
84             // Look up a JMS connection factory
85
ConnectionFactory connectionFactory = (ConnectionFactory) jndi.lookup(topicFactory);
86
87             // Create a JMS connection
88
connection = connectionFactory.createConnection();
89
90             // Create session objects
91
publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
92
93             Session subSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
94
95             // Look up the JMS topic
96
Topic chatTopic = (Topic) jndi.lookup(topic);
97
98             // Create the publisher and subscriber
99
messagePublisher = publisherSession.createProducer(chatTopic);
100
101             MessageConsumer messageConsumer = subSession.createConsumer(chatTopic);
102
103             // Set the message listener
104
messageConsumer.setMessageListener(new MessageListener() {
105                     public void onMessage(Message message) {
106                         try {
107                             //check the message type
108
ObjectMessage objectMessage = null;
109
110                             if (!(message instanceof ObjectMessage)) {
111                                 log.error("Cannot handle message of type (class=" + message.getClass().getName() + "). Notification ignored.");
112                                 return;
113                             }
114
115                             objectMessage = (ObjectMessage) message;
116
117                             //check the message content
118
if (!(objectMessage.getObject() instanceof ClusterNotification)) {
119                                 log.error("An unknown cluster notification message received (class=" + objectMessage.getObject().getClass().getName() + "). Notification ignored.");
120                                 return;
121                             }
122
123                             if (log.isDebugEnabled()) {
124                                 log.debug(objectMessage.getObject());
125                             }
126
127                             // This prevents the notification sent by this node from being handled by itself
128
if (!objectMessage.getStringProperty("nodeName").equals(clusterNode)) {
129                                 //now handle the message
130
ClusterNotification notification = (ClusterNotification) objectMessage.getObject();
131                                 handleClusterNotification(notification);
132                             }
133                         } catch (JMSException jmsEx) {
134                             log.error("Cannot handle cluster Notification", jmsEx);
135                         }
136                     }
137                 });
138
139             // Start the JMS connection; allows messages to be delivered
140
connection.start();
141         } catch (Exception JavaDoc e) {
142             throw new InitializationException("Initialization of the JMSBroadcastingListener failed: " + e);
143         }
144     }
145
146     /**
147      * Called by the cache administrator class when a cache is destroyed.
148      *
149      * @throws com.opensymphony.oscache.base.FinalizationException thrown when there was a problem finalizing the
150      * listener. The cache administrator will catch and log this error.
151      */

152     public void finialize() throws FinalizationException {
153         try {
154             if (log.isInfoEnabled()) {
155                 log.info("Shutting down JMS clustering...");
156             }
157
158             connection.close();
159
160             if (log.isInfoEnabled()) {
161                 log.info("JMS clustering shutdown complete.");
162             }
163         } catch (JMSException e) {
164             log.warn("A problem was encountered when closing the JMS connection", e);
165         }
166     }
167
168     protected void sendNotification(ClusterNotification message) {
169         try {
170             ObjectMessage objectMessage = publisherSession.createObjectMessage();
171             objectMessage.setObject(message);
172
173             //sign the message, with the name of this node
174
objectMessage.setStringProperty("nodeName", clusterNode);
175             messagePublisher.send(objectMessage);
176         } catch (JMSException e) {
177             log.error("Cannot send notification " + message, e);
178         }
179     }
180 }
181
Popular Tags