1 10 package org.mmbase.clustering.unicast; 11 12 import java.util.Map ; 13 14 import org.mmbase.core.event.NodeEvent; 15 import org.mmbase.clustering.ClusterManager; 16 import org.mmbase.util.logging.Logger; 17 import org.mmbase.util.logging.Logging; 18 import org.mmbase.util.xml.UtilReader; 19 20 21 27 public class Unicast extends ClusterManager { 28 29 private static final Logger log = Logging.getLoggerInstance(Unicast.class); 30 31 public static final String CONFIG_FILE = "unicast.xml"; 32 33 34 private int unicastPort = 4243; 35 36 37 private int unicastTimeout = 10 * 1000; 38 39 40 41 private ChangesSender ucs; 42 43 private ChangesReceiver ucr; 44 45 48 private final UtilReader reader = new UtilReader(CONFIG_FILE, 49 new Runnable () { 50 public void run() { 51 synchronized(Unicast.this) { 52 stopCommunicationThreads(); 53 readConfiguration(reader.getProperties()); 54 startCommunicationThreads(); 55 } 56 } 57 }); 58 59 60 61 public Unicast(){ 62 readConfiguration(reader.getProperties()); 63 start(); 64 } 65 66 protected synchronized void readConfiguration(Map configuration) { 67 super.readConfiguration(configuration); 68 69 String tmp = (String ) configuration.get("unicastport"); 70 if (tmp != null && !tmp.equals("")) { 71 try { 72 unicastPort = Integer.parseInt(tmp); 73 } catch (Exception e) {} 74 } 75 tmp = (String ) configuration.get(org.mmbase.module.core.MMBase.getMMBase().getMachineName() + ".unicastport"); 76 if (tmp != null && !tmp.equals("")) { 77 try { 78 unicastPort = Integer.parseInt(tmp); 79 } catch (Exception e) {} 80 } 81 82 tmp = (String ) configuration.get("unicasttimeout"); 83 if (tmp != null && !tmp.equals("")) { 84 try { 85 unicastTimeout = Integer.parseInt(tmp); 86 } catch (Exception e) {} 87 } 88 89 log.info("unicast port: " + unicastPort); 90 log.info("unicast timeout: " + unicastTimeout); 91 92 } 93 94 97 protected synchronized void startCommunicationThreads() { 98 ucs = new ChangesSender(reader.getProperties(), unicastPort, unicastTimeout, nodesToSend, send); 99 try { 100 ucr = new ChangesReceiver(unicastPort, nodesToSpawn); 101 } catch (java.io.IOException ioe) { 102 log.error(ioe); 103 } 104 } 105 106 109 protected synchronized void stopCommunicationThreads() { 110 if (ucs != null) { 111 ucs.stop(); 112 log.service("Stopped communication sender " + ucs); 113 ucs = null; 114 } 115 if (ucr != null) { 116 ucr.stop(); 117 log.service("Stopped communication receiver " + ucr); 118 ucr = null; 119 } 120 } 121 122 public void changedNode(NodeEvent event) { 124 byte[] message = createMessage(event); 125 nodesToSend.append(message); 126 nodesToSpawn.append(message); 128 if (log.isDebugEnabled()) { 129 log.debug("message: " + event); 130 } 131 return; 132 } 133 134 } 135
| Popular Tags
|