1 10 package org.mmbase.clustering; 11 12 13 import java.io.*; 14 import java.util.*; 15 16 import org.mmbase.core.event.*; 17 import org.mmbase.core.util.DaemonThread; 18 import org.mmbase.module.core.*; 19 import org.mmbase.util.Queue; 20 import org.mmbase.util.logging.Logger; 21 import org.mmbase.util.logging.Logging; 22 23 34 public abstract class ClusterManager implements AllEventListener, Runnable { 35 36 private static final Logger log = Logging.getLoggerInstance(ClusterManager.class); 37 38 39 protected final Statistics receive = new Statistics(); 40 protected final Statistics send = new Statistics(); 41 42 43 44 protected Queue nodesToSend = new Queue(64); 45 46 protected Queue nodesToSpawn = new Queue(64); 47 48 49 protected Thread kicker = null; 50 51 protected boolean spawnThreads = true; 52 53 protected boolean compatible17 = false; 54 55 56 public final void shutdown(){ 57 log.info("Shutting down clustering"); 58 stopCommunicationThreads(); 59 kicker.setPriority(Thread.MIN_PRIORITY); 60 kicker = null; 61 } 62 63 protected void readConfiguration(Map configuration) { 64 String tmp = (String ) configuration.get("spawnthreads"); 65 if (tmp != null && !tmp.equals("")) { 66 spawnThreads = !"false".equalsIgnoreCase(tmp); 67 } 68 } 69 70 73 protected abstract void startCommunicationThreads(); 74 75 78 protected abstract void stopCommunicationThreads(); 79 80 public void notify(Event event){ 81 if(event.getMachine().equals(MMBase.getMMBase().getMachineName())){ 83 byte[] message = createMessage(event); 84 log.debug("Sending an event to the cluster"); 85 nodesToSend.append(message); 86 } else { 87 log.trace("Ignoring remote event from " + event.getMachine() + " it will not be propagated"); 88 } 89 } 90 91 94 protected void start() { 95 96 if (kicker == null) { 97 kicker = new DaemonThread(this, "ClusterManager"); 98 kicker.start(); 99 try { 100 kicker.setPriority(Thread.NORM_PRIORITY + 1); 101 } catch (NullPointerException npe) { 102 log.warn("Could not set thread priority of Cluster Manager"); 106 } 107 startCommunicationThreads(); 108 } 109 } 110 111 protected byte[] createMessage(Event event) { 112 if (log.isDebugEnabled()) { 113 log.debug("Serializing " + event); 114 } 115 try { 116 long startTime = System.currentTimeMillis(); 117 ByteArrayOutputStream bytes = new ByteArrayOutputStream(); 118 if (compatible17) { 119 if (event instanceof NodeEvent || event instanceof RelationEvent) { 120 NodeEvent ne; 121 if (event instanceof RelationEvent) { 122 RelationEvent re = (RelationEvent) event; 123 ne = re.getNodeEvent(); 124 ByteArrayOutputStream b1 = new ByteArrayOutputStream(); 125 byte[] rel1 = createMessage(re.getMachine(), re.getRelationSourceNumber(), re.getRelationSourceType(), "r").getBytes(); 126 b1.write(rel1, 0, rel1.length); 127 b1.write(','); 128 b1.write(0); 129 nodesToSend.append(b1.toByteArray()); 130 ByteArrayOutputStream b2 = new ByteArrayOutputStream(); 131 byte[] rel2 = createMessage(re.getMachine(), re.getRelationDestinationNumber(), re.getRelationDestinationType(), "r").getBytes(); 132 b2.write(rel2, 0, rel2.length); 133 b2.write(','); 134 b2.write(0); 135 nodesToSend.append(b2.toByteArray()); 136 } else { 137 ne = (NodeEvent) event; 138 } 139 byte[] oldStyleEvent = createMessage(ne.getMachine(), ne.getNodeNumber(), ne.getBuilderName(), NodeEvent.newTypeToOldType(ne.getType())).getBytes(); 140 bytes.write(oldStyleEvent, 0, oldStyleEvent.length); 141 } 142 } 143 bytes.write(','); 144 bytes.write(0); 145 ObjectOutputStream out = new ObjectOutputStream(bytes); 146 out.writeObject(event); 147 long cost = System.currentTimeMillis() - startTime; 148 send.parseCost += cost; 149 send.cost += cost; 150 return bytes.toByteArray(); 151 } catch (IOException ioe) { 152 log.error(ioe.getMessage(), ioe); 153 return null; 154 } 155 156 } 157 158 159 protected int follownr = 1; 160 161 170 protected String createMessage(String machine, int nodenr, String tableName, String type) { 171 return machine + ',' + (follownr++) + ',' + nodenr + ',' + tableName + ',' + type; 172 } 173 174 protected Event parseMessage(byte[] message) { 175 try { 176 ByteArrayInputStream stream = new ByteArrayInputStream(message); 177 int c = 1; 178 while (c > 0) { 179 c = stream.read(); 181 } 182 ObjectInputStream in = new ObjectInputStream(stream); 183 Event event = (Event) in.readObject(); 184 if (log.isDebugEnabled()) { 185 log.debug("Unserialized " + event); 186 } 187 return event; 188 } catch (StreamCorruptedException scc) { 189 log.debug(scc.getMessage() + ". Supposing old style message."); 191 String mes = new String (message); 193 NodeEvent event = parseMessageBackwardCompatible(mes); 194 if (log.isDebugEnabled()) { 195 log.debug("Old style message " + event); 196 } 197 return event; 198 } catch (EOFException eofe) { 199 String mes = new String (message); 201 NodeEvent event = parseMessageBackwardCompatible(mes); 202 if (log.isDebugEnabled()) { 203 log.debug("Old style message " + event); 204 } 205 return event; 206 } catch (IOException ioe) { 207 log.error(ioe); 208 return null; 209 } catch (ClassNotFoundException cnfe) { 210 log.error(cnfe); 211 return null; 212 } 213 } 214 215 protected NodeEvent parseMessageBackwardCompatible(String message) { 216 if (log.isDebugEnabled()) { 217 log.debug("RECEIVE=>" + message); 218 } 219 StringTokenizer tok = new StringTokenizer(message,","); 220 if (tok.hasMoreTokens()) { 221 String machine = tok.nextToken(); 222 if (tok.hasMoreTokens()) { 223 String vnr = tok.nextToken(); 224 if (tok.hasMoreTokens()) { 225 String id = tok.nextToken(); 226 if (tok.hasMoreTokens()) { 227 String tb = tok.nextToken(); 228 if (tok.hasMoreTokens()) { 229 String ctype = tok.nextToken(); 230 if (!ctype.equals("s")) { 231 MMBase mmbase = MMBase.getMMBase(); 232 MMObjectBuilder builder = mmbase.getBuilder(tb); 233 if (builder == null) builder = mmbase.getBuilder("object"); 234 MMObjectNode node = builder.getNode(id); 235 if (node != null) { 236 return new NodeEvent(machine, tb, node.getNumber(), node.getOldValues(), node.getValues(), NodeEvent.oldTypeToNewType(ctype)); 237 } else { 238 try { 239 return new NodeEvent(machine, tb, Integer.valueOf(id).intValue(), null, null, NodeEvent.oldTypeToNewType(ctype)); 240 } catch (NumberFormatException nfe) { 241 log.error(message + ": colud not parse " + id + " to a node number."); 242 } 243 } 244 } else { 245 log.error("XML messages not suppported any more"); 247 } 248 } else log.error(message + ": 'ctype' could not be extracted from this string!"); 249 } else log.error(message + ": 'tb' could not be extracted from this string!"); 250 } else log.error(message + ": 'id' could not be extracted from this string!"); 251 } else log.error(message + ": 'vnr' could not be extracted from this string!"); 252 } else log.error(message + ": 'machine' could not be extracted from this string!"); 253 return null; 254 } 255 256 259 public void run() { 260 while(kicker != null) { 261 try { 262 byte[] message = (byte[]) nodesToSpawn.get(); 263 if (message == null) continue; 264 long startTime = System.currentTimeMillis(); 265 if (log.isDebugEnabled()) { 266 log.trace("RECEIVED =>" + message.length + " bytes"); 267 } 268 receive.count++; 269 receive.bytes += message.length; 270 Event event = parseMessage(message); 271 receive.parseCost += (System.currentTimeMillis() - startTime); 272 if (event != null) { 273 handleEvent(event); 274 } else { 275 log.warn("Could not handle event, it is null"); 276 } 277 receive.cost += (System.currentTimeMillis() - startTime); 278 } catch (InterruptedException e) { 279 log.debug(Thread.currentThread().getName() +" was interruped."); 280 break; 281 } catch(Throwable t) { 282 log.error(t.getMessage(), t); 283 } 284 } 285 286 } 287 288 292 protected void handleEvent(final Event event) { 293 MMBase mmbase = MMBase.getMMBase(); 295 if (mmbase == null || !mmbase.getState()) { 296 if (log.isDebugEnabled()) { 297 log.debug("Ignoring event " + event + ", mmbase is not up " + mmbase); 298 } 299 return; 300 } 301 if (mmbase.getMachineName().equals(event.getMachine())) { 302 if (log.isDebugEnabled()) { 304 log.debug("Ignoring event " + event + " it is from this (" + event.getMachine() + ") mmbase"); 305 } 306 return; 307 } 308 if (event instanceof NodeEvent) { 309 MMObjectBuilder builder = mmbase.getBuilder(((NodeEvent) event).getBuilderName()); 310 if (builder != null && (! builder.broadcastChanges())) { 311 log.info("Ignoring node-event for node type " + builder + " because broad cast changes is false"); 312 return; 313 } 314 } 315 316 if (log.isDebugEnabled()) { 317 log.debug("Handling event " + event + " for " + event.getMachine()); 318 } 319 320 if (spawnThreads) { 321 Runnable job = new Runnable () { 322 public void run() { 323 long startTime = System.currentTimeMillis(); 324 EventManager.getInstance().propagateEvent(event); 325 receive.cost += (System.currentTimeMillis() - startTime); 326 } 327 }; 328 org.mmbase.util.ThreadPools.jobsExecutor.execute(job); 329 } else { 330 try { 331 EventManager.getInstance().propagateEvent(event); 332 } catch (Throwable t) { 333 log.error("Exception during propegation of event: " + event + ": " + t.getMessage(), t); 334 } 335 } 336 } 337 338 } 339 | Popular Tags |