KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > opensymphony > oscache > plugins > clustersupport > JavaGroupsBroadcastingListener


1 /*
2  * Copyright (c) 2002-2003 by OpenSymphony
3  * All rights reserved.
4  */

5 package com.opensymphony.oscache.plugins.clustersupport;
6
7 import com.opensymphony.oscache.base.Cache;
8 import com.opensymphony.oscache.base.Config;
9 import com.opensymphony.oscache.base.FinalizationException;
10 import com.opensymphony.oscache.base.InitializationException;
11
12 import org.apache.commons.logging.Log;
13 import org.apache.commons.logging.LogFactory;
14
15 import org.jgroups.Address;
16 import org.jgroups.Channel;
17
18 import org.jgroups.blocks.NotificationBus;
19
20 import java.io.Serializable JavaDoc;
21
22 /**
23  * <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
24  * the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
25  * messages across a cluster.</p>
26  *
27  * <p>One of the following properties should be configured in <code>oscache.properties</code> for
28  * this listener:
29  * <ul>
30  * <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
31  * <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
32  * control over the behaviour of JavaGroups</li>
33  * </ul>
34  * Please refer to the clustering documentation for further details on the configuration of this listener.</p>
35  *
36  * @author <a HREF="&#109;a&#105;&#108;&#116;&#111;:chris&#64;swebtec.&#99;&#111;&#109;">Chris Miller</a>
37  */

38 public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer {
39     private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
40     private static final String JavaDoc BUS_NAME = "OSCacheBus";
41     private static final String JavaDoc CHANNEL_PROPERTIES = "cache.cluster.properties";
42     private static final String JavaDoc MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
43
44     /**
45     * The first half of the default channel properties. They default channel properties are:
46     * <pre>
47     * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
48     * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
49     * PING(timeout=2000;num_initial_members=3):\
50     * MERGE2(min_interval=5000;max_interval=10000):\
51     * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
52     * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
53     * UNICAST(timeout=300,600,1200,2400):\
54     * pbcast.STABLE(desired_avg_gossip=20000):\
55     * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
56     * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
57     * </pre>
58     *
59     * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
60     */

61     private static final String JavaDoc DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
62
63     /**
64     * The second half of the default channel properties. They default channel properties are:
65     * <pre>
66     * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
67     * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
68     * PING(timeout=2000;num_initial_members=3):\
69     * MERGE2(min_interval=5000;max_interval=10000):\
70     * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
71     * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
72     * UNICAST(timeout=300,600,1200,2400):\
73     * pbcast.STABLE(desired_avg_gossip=20000):\
74     * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
75     * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
76     * </pre>
77     *
78     * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
79     */

80     private static final String JavaDoc DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
81     private static final String JavaDoc DEFAULT_MULTICAST_IP = "231.12.21.132";
82     private NotificationBus bus;
83
84     /**
85     * Initializes the broadcasting listener by starting up a JavaGroups notification
86     * bus instance to handle incoming and outgoing messages.
87     *
88     * @param config An OSCache configuration object.
89     * @throws com.opensymphony.oscache.base.InitializationException If this listener has
90     * already been initialized.
91     */

92     public synchronized void initialize(Cache cache, Config config) throws InitializationException {
93         super.initialize(cache, config);
94
95         String JavaDoc properties = config.getProperty(CHANNEL_PROPERTIES);
96         String JavaDoc multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
97
98         if ((properties == null) && (multicastIP == null)) {
99             multicastIP = DEFAULT_MULTICAST_IP;
100         }
101
102         if (properties == null) {
103             properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST;
104         } else {
105             properties = properties.trim();
106         }
107
108         if (log.isInfoEnabled()) {
109             log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
110         }
111
112         try {
113             bus = new NotificationBus(BUS_NAME, properties);
114             bus.start();
115             bus.getChannel().setOpt(Channel.LOCAL, new Boolean JavaDoc(false));
116             bus.setConsumer(this);
117             log.info("JavaGroups clustering support started successfully");
118         } catch (Exception JavaDoc e) {
119             throw new InitializationException("Initialization failed: " + e);
120         }
121     }
122
123     /**
124     * Shuts down the JavaGroups being managed by this listener. This
125     * occurs once the cache is shut down and this listener is no longer
126     * in use.
127     *
128     * @throws com.opensymphony.oscache.base.FinalizationException
129     */

130     public synchronized void finialize() throws FinalizationException {
131         if (log.isInfoEnabled()) {
132             log.info("JavaGroups shutting down...");
133         }
134
135         // It's possible that the notification bus is null (CACHE-154)
136
if (bus != null) {
137             bus.stop();
138             bus = null;
139         } else {
140             log.warn("Notification bus wasn't initialized or finialize was invoked before!");
141         }
142
143         if (log.isInfoEnabled()) {
144             log.info("JavaGroups shutdown complete.");
145         }
146     }
147
148     /**
149     * Uses JavaGroups to broadcast the supplied notification message across the cluster.
150     *
151     * @param message The cluster nofication message to broadcast.
152     */

153     protected void sendNotification(ClusterNotification message) {
154         bus.sendNotification(message);
155     }
156
157     /**
158     * Handles incoming notification messages from JavaGroups. This method should
159     * never be called directly.
160     *
161     * @param serializable The incoming message object. This must be a {@link ClusterNotification}.
162     */

163     public void handleNotification(Serializable JavaDoc serializable) {
164         if (!(serializable instanceof ClusterNotification)) {
165             log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored.");
166
167             return;
168         }
169
170         handleClusterNotification((ClusterNotification) serializable);
171     }
172
173     /**
174     * We are not using the caching, so we just return something that identifies
175     * us. This method should never be called directly.
176     */

177     public Serializable JavaDoc getCache() {
178         return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
179     }
180
181     /**
182     * A callback that is fired when a new member joins the cluster. This
183     * method should never be called directly.
184     *
185     * @param address The address of the member who just joined.
186     */

187     public void memberJoined(Address address) {
188         if (log.isInfoEnabled()) {
189             log.info("A new member at address '" + address + "' has joined the cluster");
190         }
191     }
192
193     /**
194     * A callback that is fired when an existing member leaves the cluster.
195     * This method should never be called directly.
196     *
197     * @param address The address of the member who left.
198     */

199     public void memberLeft(Address address) {
200         if (log.isInfoEnabled()) {
201             log.info("Member at address '" + address + "' left the cluster");
202         }
203     }
204 }
205
Popular Tags