1 5 package com.opensymphony.oscache.plugins.clustersupport; 6 7 import com.opensymphony.oscache.base.Cache; 8 import com.opensymphony.oscache.base.Config; 9 import com.opensymphony.oscache.base.FinalizationException; 10 import com.opensymphony.oscache.base.InitializationException; 11 12 import org.apache.commons.logging.Log; 13 import org.apache.commons.logging.LogFactory; 14 15 import org.jgroups.Address; 16 import org.jgroups.Channel; 17 18 import org.jgroups.blocks.NotificationBus; 19 20 import java.io.Serializable ; 21 22 38 public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer { 39 private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class); 40 private static final String BUS_NAME = "OSCacheBus"; 41 private static final String CHANNEL_PROPERTIES = "cache.cluster.properties"; 42 private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip"; 43 44 61 private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr="; 62 63 80 private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)"; 81 private static final String DEFAULT_MULTICAST_IP = "231.12.21.132"; 82 private NotificationBus bus; 83 84 92 public synchronized void initialize(Cache cache, Config config) throws InitializationException { 93 super.initialize(cache, config); 94 95 String properties = config.getProperty(CHANNEL_PROPERTIES); 96 String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY); 97 98 if ((properties == null) && (multicastIP == null)) { 99 multicastIP = DEFAULT_MULTICAST_IP; 100 } 101 102 if (properties == null) { 103 properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST; 104 } else { 105 properties = properties.trim(); 106 } 107 108 if (log.isInfoEnabled()) { 109 log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties); 110 } 111 112 try { 113 bus = new NotificationBus(BUS_NAME, properties); 114 bus.start(); 115 bus.getChannel().setOpt(Channel.LOCAL, new Boolean (false)); 116 bus.setConsumer(this); 117 log.info("JavaGroups clustering support started successfully"); 118 } catch (Exception e) { 119 throw new InitializationException("Initialization failed: " + e); 120 } 121 } 122 123 130 public synchronized void finialize() throws FinalizationException { 131 if (log.isInfoEnabled()) { 132 log.info("JavaGroups shutting down..."); 133 } 134 135 if (bus != null) { 137 bus.stop(); 138 bus = null; 139 } else { 140 log.warn("Notification bus wasn't initialized or finialize was invoked before!"); 141 } 142 143 if (log.isInfoEnabled()) { 144 log.info("JavaGroups shutdown complete."); 145 } 146 } 147 148 153 protected void sendNotification(ClusterNotification message) { 154 bus.sendNotification(message); 155 } 156 157 163 public void handleNotification(Serializable serializable) { 164 if (!(serializable instanceof ClusterNotification)) { 165 log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored."); 166 167 return; 168 } 169 170 handleClusterNotification((ClusterNotification) serializable); 171 } 172 173 177 public Serializable getCache() { 178 return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress(); 179 } 180 181 187 public void memberJoined(Address address) { 188 if (log.isInfoEnabled()) { 189 log.info("A new member at address '" + address + "' has joined the cluster"); 190 } 191 } 192 193 199 public void memberLeft(Address address) { 200 if (log.isInfoEnabled()) { 201 log.info("Member at address '" + address + "' left the cluster"); 202 } 203 } 204 } 205 | Popular Tags |