KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mmbase > clustering > ClusterManager


1 /*
2
3 This software is OSI Certified Open Source Software.
4 OSI Certified is a certification mark of the Open Source Initiative.
5
6 The license (Mozilla version 1.0) can be read at the MMBase site.
7 See http://www.MMBase.org/license
8
9 */

10 package org.mmbase.clustering;
11
12
13 import java.io.*;
14 import java.util.*;
15
16 import org.mmbase.core.event.*;
17 import org.mmbase.core.util.DaemonThread;
18 import org.mmbase.module.core.*;
19 import org.mmbase.util.Queue;
20 import org.mmbase.util.logging.Logger;
21 import org.mmbase.util.logging.Logging;
22
23 /**
24  * ClusterManager is a thread object that reads the receive queue
25  * and calls the objects (listeners) who need to know.
26  * The ClusterManager starts communication threads to handle the sending
27  * and receiving of messages.
28  *
29  * @author Nico Klasens
30  * @author Michiel Meeuwissen
31  * @author Ernst Bunders
32  * @version $Id: ClusterManager.java,v 1.32 2006/08/09 11:52:33 pierre Exp $
33  */

34 public abstract class ClusterManager implements AllEventListener, Runnable JavaDoc {
35
36     private static final Logger log = Logging.getLoggerInstance(ClusterManager.class);
37
38
39     protected final Statistics receive = new Statistics();
40     protected final Statistics send = new Statistics();
41
42
43     /** Queue with messages to send to other MMBase instances */
44     protected Queue nodesToSend = new Queue(64);
45     /** Queue with received messages from other MMBase instances */
46     protected Queue nodesToSpawn = new Queue(64);
47
48     /** Thread which processes the messages */
49     protected Thread JavaDoc kicker = null;
50
51     protected boolean spawnThreads = true;
52
53     protected boolean compatible17 = false;
54
55
56     public final void shutdown(){
57         log.info("Shutting down clustering");
58         stopCommunicationThreads();
59         kicker.setPriority(Thread.MIN_PRIORITY);
60         kicker = null;
61     }
62
63     protected void readConfiguration(Map configuration) {
64         String JavaDoc tmp = (String JavaDoc) configuration.get("spawnthreads");
65         if (tmp != null && !tmp.equals("")) {
66             spawnThreads = !"false".equalsIgnoreCase(tmp);
67         }
68     }
69
70     /**
71      * Subclasses should start the communication threads in this method
72      */

73     protected abstract void startCommunicationThreads();
74
75     /**
76      * Subclasses should stop the communication threads in this method
77      */

78     protected abstract void stopCommunicationThreads();
79
80     public void notify(Event event){
81         //we only want to propagate the local events into the cluster
82
if(event.getMachine().equals(MMBase.getMMBase().getMachineName())){
83             byte[] message = createMessage(event);
84             log.debug("Sending an event to the cluster");
85             nodesToSend.append(message);
86         } else {
87             log.trace("Ignoring remote event from " + event.getMachine() + " it will not be propagated");
88         }
89     }
90
91     /**
92      * Starts the Changer Thread.
93      */

94     protected void start() {
95         /* Start up the main thread */
96         if (kicker == null) {
97             kicker = new DaemonThread(this, "ClusterManager");
98             kicker.start();
99             try {
100                 kicker.setPriority(Thread.NORM_PRIORITY + 1);
101             } catch (NullPointerException JavaDoc npe) {
102                 // MM:a NPE is thrown here sometimes if ThreadGroup of kicker is null.
103
// which I saw happening on my jvm (ThreadGroep set to null by Thread.start).
104
// I don't understand it, but it's not worth failing ClusterManager completely.
105
log.warn("Could not set thread priority of Cluster Manager");
106             }
107             startCommunicationThreads();
108         }
109     }
110
111     protected byte[] createMessage(Event event) {
112         if (log.isDebugEnabled()) {
113             log.debug("Serializing " + event);
114         }
115         try {
116             long startTime = System.currentTimeMillis();
117             ByteArrayOutputStream bytes = new ByteArrayOutputStream();
118             if (compatible17) {
119                 if (event instanceof NodeEvent || event instanceof RelationEvent) {
120                     NodeEvent ne;
121                     if (event instanceof RelationEvent) {
122                         RelationEvent re = (RelationEvent) event;
123                         ne = re.getNodeEvent();
124                         ByteArrayOutputStream b1 = new ByteArrayOutputStream();
125                         byte[] rel1 = createMessage(re.getMachine(), re.getRelationSourceNumber(), re.getRelationSourceType(), "r").getBytes();
126                         b1.write(rel1, 0, rel1.length);
127                         b1.write(',');
128                         b1.write(0);
129                         nodesToSend.append(b1.toByteArray());
130                         ByteArrayOutputStream b2 = new ByteArrayOutputStream();
131                         byte[] rel2 = createMessage(re.getMachine(), re.getRelationDestinationNumber(), re.getRelationDestinationType(), "r").getBytes();
132                         b2.write(rel2, 0, rel2.length);
133                         b2.write(',');
134                         b2.write(0);
135                         nodesToSend.append(b2.toByteArray());
136                     } else {
137                         ne = (NodeEvent) event;
138                     }
139                     byte[] oldStyleEvent = createMessage(ne.getMachine(), ne.getNodeNumber(), ne.getBuilderName(), NodeEvent.newTypeToOldType(ne.getType())).getBytes();
140                     bytes.write(oldStyleEvent, 0, oldStyleEvent.length);
141                 }
142             }
143             bytes.write(',');
144             bytes.write(0);
145             ObjectOutputStream out = new ObjectOutputStream(bytes);
146             out.writeObject(event);
147             long cost = System.currentTimeMillis() - startTime;
148             send.parseCost += cost;
149             send.cost += cost;
150             return bytes.toByteArray();
151         } catch (IOException ioe) {
152             log.error(ioe.getMessage(), ioe);
153             return null;
154         }
155
156     }
157
158     /** Followup number of message */
159     protected int follownr = 1;
160
161     /**
162      * Creates MMBase 1.7 parseable message. This is simple String, which is prefixed before the actual 1.8 message.
163      *
164      * @param machine MMBase 'machine name'.
165      * @param nodenr node number
166      * @param tableName node type (tablename)
167      * @param type command type
168      * @return message
169      */

170     protected String JavaDoc createMessage(String JavaDoc machine, int nodenr, String JavaDoc tableName, String JavaDoc type) {
171         return machine + ',' + (follownr++) + ',' + nodenr + ',' + tableName + ',' + type;
172     }
173
174     protected Event parseMessage(byte[] message) {
175         try {
176             ByteArrayInputStream stream = new ByteArrayInputStream(message);
177             int c = 1;
178             while (c > 0) {
179                 // ignore backwards compatibility message
180
c = stream.read();
181             }
182             ObjectInputStream in = new ObjectInputStream(stream);
183             Event event = (Event) in.readObject();
184             if (log.isDebugEnabled()) {
185                 log.debug("Unserialized " + event);
186             }
187             return event;
188         } catch (StreamCorruptedException scc) {
189             // not sure that this can happen, now, because of the while(c>0) trick.
190
log.debug(scc.getMessage() + ". Supposing old style message.");
191             // Possibly, it is a message from an 1.7 system
192
String JavaDoc mes = new String JavaDoc(message);
193             NodeEvent event = parseMessageBackwardCompatible(mes);
194             if (log.isDebugEnabled()) {
195                 log.debug("Old style message " + event);
196             }
197             return event;
198         } catch (EOFException eofe) {
199             // suppose that this is a 1.7 message
200
String JavaDoc mes = new String JavaDoc(message);
201             NodeEvent event = parseMessageBackwardCompatible(mes);
202             if (log.isDebugEnabled()) {
203                 log.debug("Old style message " + event);
204             }
205             return event;
206         } catch (IOException ioe) {
207             log.error(ioe);
208             return null;
209         } catch (ClassNotFoundException JavaDoc cnfe) {
210             log.error(cnfe);
211             return null;
212         }
213     }
214
215     protected NodeEvent parseMessageBackwardCompatible(String JavaDoc message) {
216         if (log.isDebugEnabled()) {
217             log.debug("RECEIVE=>" + message);
218         }
219         StringTokenizer tok = new StringTokenizer(message,",");
220         if (tok.hasMoreTokens()) {
221             String JavaDoc machine = tok.nextToken();
222             if (tok.hasMoreTokens()) {
223                 String JavaDoc vnr = tok.nextToken();
224                 if (tok.hasMoreTokens()) {
225                     String JavaDoc id = tok.nextToken();
226                     if (tok.hasMoreTokens()) {
227                         String JavaDoc tb = tok.nextToken();
228                         if (tok.hasMoreTokens()) {
229                             String JavaDoc ctype = tok.nextToken();
230                             if (!ctype.equals("s")) {
231                                 MMBase mmbase = MMBase.getMMBase();
232                                 MMObjectBuilder builder = mmbase.getBuilder(tb);
233                                 if (builder == null) builder = mmbase.getBuilder("object");
234                                 MMObjectNode node = builder.getNode(id);
235                                 if (node != null) {
236                                     return new NodeEvent(machine, tb, node.getNumber(), node.getOldValues(), node.getValues(), NodeEvent.oldTypeToNewType(ctype));
237                                 } else {
238                                     try {
239                                         return new NodeEvent(machine, tb, Integer.valueOf(id).intValue(), null, null, NodeEvent.oldTypeToNewType(ctype));
240                                     } catch (NumberFormatException JavaDoc nfe) {
241                                         log.error(message + ": colud not parse " + id + " to a node number.");
242                                     }
243                                 }
244                             } else {
245                                 /// XXXX should we?
246
log.error("XML messages not suppported any more");
247                             }
248                         } else log.error(message + ": 'ctype' could not be extracted from this string!");
249                     } else log.error(message + ": 'tb' could not be extracted from this string!");
250                 } else log.error(message + ": 'id' could not be extracted from this string!");
251             } else log.error(message + ": 'vnr' could not be extracted from this string!");
252         } else log.error(message + ": 'machine' could not be extracted from this string!");
253         return null;
254     }
255
256     /**
257      * @see java.lang.Runnable#run()
258      */

259     public void run() {
260         while(kicker != null) {
261             try {
262                 byte[] message = (byte[]) nodesToSpawn.get();
263                 if (message == null) continue;
264                 long startTime = System.currentTimeMillis();
265                 if (log.isDebugEnabled()) {
266                     log.trace("RECEIVED =>" + message.length + " bytes");
267                 }
268                 receive.count++;
269                 receive.bytes += message.length;
270                 Event event = parseMessage(message);
271                 receive.parseCost += (System.currentTimeMillis() - startTime);
272                 if (event != null) {
273                     handleEvent(event);
274                 } else {
275                     log.warn("Could not handle event, it is null");
276                 }
277                 receive.cost += (System.currentTimeMillis() - startTime);
278             } catch (InterruptedException JavaDoc e) {
279                 log.debug(Thread.currentThread().getName() +" was interruped.");
280                 break;
281             } catch(Throwable JavaDoc t) {
282                 log.error(t.getMessage(), t);
283             }
284         }
285
286     }
287
288     /**
289
290      * @param event
291      */

292     protected void handleEvent(final Event event) {
293         // check if MMBase is 100% up and running, if not eat event
294
MMBase mmbase = MMBase.getMMBase();
295         if (mmbase == null || !mmbase.getState()) {
296             if (log.isDebugEnabled()) {
297                 log.debug("Ignoring event " + event + ", mmbase is not up " + mmbase);
298             }
299             return;
300         }
301         if (mmbase.getMachineName().equals(event.getMachine())) {
302             // ignore changes of ourselves
303
if (log.isDebugEnabled()) {
304                 log.debug("Ignoring event " + event + " it is from this (" + event.getMachine() + ") mmbase");
305             }
306             return;
307         }
308         if (event instanceof NodeEvent) {
309             MMObjectBuilder builder = mmbase.getBuilder(((NodeEvent) event).getBuilderName());
310             if (builder != null && (! builder.broadcastChanges())) {
311                 log.info("Ignoring node-event for node type " + builder + " because broad cast changes is false");
312                 return;
313             }
314         }
315
316         if (log.isDebugEnabled()) {
317             log.debug("Handling event " + event + " for " + event.getMachine());
318         }
319
320         if (spawnThreads) {
321             Runnable JavaDoc job = new Runnable JavaDoc () {
322                     public void run() {
323                         long startTime = System.currentTimeMillis();
324                         EventManager.getInstance().propagateEvent(event);
325                         receive.cost += (System.currentTimeMillis() - startTime);
326                     }
327                 };
328             org.mmbase.util.ThreadPools.jobsExecutor.execute(job);
329         } else {
330             try {
331                 EventManager.getInstance().propagateEvent(event);
332             } catch (Throwable JavaDoc t) {
333                 log.error("Exception during propegation of event: " + event + ": " + t.getMessage(), t);
334             }
335         }
336     }
337
338 }
339
Popular Tags