1 10 package org.mmbase.clustering.jgroups; 11 12 import java.util.Map ; 13 import java.util.regex.Pattern ; 14 import java.util.regex.Matcher ; 15 16 import org.mmbase.clustering.ClusterManager; 17 import org.mmbase.util.logging.Logger; 18 import org.mmbase.util.logging.Logging; 19 import org.mmbase.util.xml.UtilReader; 20 21 import org.jgroups.*; 22 23 43 public class Multicast extends ClusterManager { 44 45 private static final Logger log = Logging.getLoggerInstance(Multicast.class); 46 47 51 public static final String CONFIG_FILE = "multicastJG.xml"; 52 53 57 private ChangesSender mcs; 58 59 63 private ChangesReceiver mcr; 64 65 69 private JChannel channel; 70 71 75 private String channelProperties; 76 77 83 private String channelName; 84 85 88 private final UtilReader reader = new UtilReader(CONFIG_FILE, 89 new Runnable () { 90 public void run() { 91 synchronized(Multicast.this) { 92 stopCommunicationThreads(); 93 readConfiguration(reader.getProperties()); 94 startCommunicationThreads(); 95 } 96 } 97 }); 98 99 101 public Multicast() { 102 readConfiguration(reader.getProperties()); 103 start(); 104 } 105 106 109 protected synchronized void readConfiguration(Map configuration) { 110 super.readConfiguration(configuration); 111 112 String tmp = (String ) configuration.get("channelproperties"); 113 if (tmp != null && !tmp.equals("")) { 114 channelProperties = tmp; 115 } else { 116 log.error("No channel properties found"); 117 } 118 119 tmp = (String ) configuration.get("channelname"); 120 if (tmp != null && !tmp.equals("")) { 121 channelName = tmp; 122 } 123 124 128 Pattern p = Pattern.compile("\\s"); 129 Matcher m = p.matcher(channelProperties); 130 channelProperties = m.replaceAll(""); 131 132 try { 133 if (channel != null) channel.disconnect(); 134 channel = new JChannel(channelProperties); 135 channel.connect(channelName); 136 } catch (ChannelException createChannelException) { 137 log.error("JChannel: Unable to create or join multicast channel: " + createChannelException.getMessage(), createChannelException); 138 if (channel == null) { 139 return; 140 } 141 } 142 143 if (channel.isConnected()) { 144 log.info("Joining channel: " + channel.toString(true)); 145 } else { 146 log.warn("Could not connect channel: " + channel.toString(true)); 147 } 148 } 149 150 155 protected synchronized void startCommunicationThreads() { 156 mcs = new ChangesSender(channel, nodesToSend, send); 157 log.service("Started communication sender " + mcs); 158 mcr = new ChangesReceiver(channel, nodesToSpawn); 159 log.service("Started communication receiver " + mcr); 160 } 161 162 163 protected synchronized void stopCommunicationThreads() { 164 if (mcs != null) { 165 mcs.stop(); 166 log.service("Stopped communication sender " + mcs); 167 mcs = null; 168 } 169 if (mcr != null) { 170 mcr.stop(); 171 log.service("Stopped communication receiver " + mcr); 172 mcr = null; 173 } 174 if (channel != null) { 175 log.service("Disconnecting jgroup channel " + channel.toString(true)); 176 channel.disconnect(); 177 channel = null; 178 } 179 } 180 } 181
| Popular Tags
|