1 10 package org.mmbase.clustering.unicast; 11 12 import org.mmbase.clustering.Statistics; 13 import java.io.DataOutputStream ; 14 import java.io.IOException ; 15 import java.net.*; 16 import java.util.*; 17 18 import org.mmbase.core.util.DaemonThread; 19 import org.mmbase.module.builders.MMServers; 20 import org.mmbase.module.core.*; 21 22 import org.mmbase.util.Queue; 23 import org.mmbase.util.logging.Logger; 24 import org.mmbase.util.logging.Logging; 25 26 27 34 public class ChangesSender implements Runnable { 35 36 private static final Logger log = Logging.getLoggerInstance(ChangesSender.class); 37 38 39 private final Statistics send; 40 41 42 private Thread kicker = null; 43 44 45 private final Queue nodesToSend; 46 47 48 private final Map configuration; 49 private final int defaultUnicastPort; 50 51 52 private final int unicastTimeout; 53 54 55 private long lastServerChecked = -1; 56 private List activeServers = new ArrayList(); 57 58 59 private long serverInterval; 60 61 68 ChangesSender(Map configuration, int unicastPort, int unicastTimeout, Queue nodesToSend, Statistics send) { 69 this.nodesToSend = nodesToSend; 70 this.configuration = configuration; 71 this.defaultUnicastPort = unicastPort; 72 this.unicastTimeout = unicastTimeout; 73 this.send = send; 74 this.start(); 75 } 76 77 private void start() { 78 if (kicker == null) { 79 kicker = new DaemonThread(this, "UnicastSender"); 80 kicker.start(); 81 log.debug("UnicastSender started"); 82 } 83 } 84 void stop() { 85 if (kicker != null) { 86 kicker.interrupt(); 87 kicker.setPriority(Thread.MIN_PRIORITY); 88 kicker = null; 89 } else { 90 log.service("Cannot stop thread, because it is null"); 91 } 92 } 93 94 95 public void run() { 97 while(kicker != null) { 98 try { 99 byte[] data = (byte[]) nodesToSend.get(); 100 long startTime = System.currentTimeMillis(); 101 List servers = getActiveServers(); 102 for (int i = 0; i < servers.size(); i++) { 103 MMObjectNode node = (MMObjectNode) servers.get(i); 104 if (node != null) { 105 String hostname = node.getStringValue("host"); 106 String machinename = node.getStringValue("name"); 107 108 int unicastPort = defaultUnicastPort; 109 String specificPort = (String ) configuration.get(machinename + ".unicastport"); 110 if (specificPort != null) { 111 unicastPort = Integer.parseInt(specificPort); 112 } 113 Socket socket = null; 114 DataOutputStream os = null; 115 try { 116 socket = new Socket(); 117 socket.connect(new InetSocketAddress(hostname, unicastPort), unicastTimeout); 118 os = new DataOutputStream (socket.getOutputStream()); 119 os.write(data, 0, data.length); 120 os.flush(); 121 if (log.isDebugEnabled()) { 122 log.debug("SEND=>" + hostname + ":" + unicastPort); 123 } 124 } catch(SocketTimeoutException ste) { 125 servers.remove(i); 126 log.warn("Server timeout: " + hostname + ":" + unicastPort + " " + ste + ". Removed " + node + " from active server list."); 127 } catch (ConnectException ce) { 128 log.warn("Connect exception: " + hostname + ":" + unicastPort + " " + ce + "."); 129 } catch (IOException e) { 130 log.error("can't send message to " + hostname + ":" + unicastPort + " " + e.getMessage() , e); 131 } finally { 132 if (os != null) { 133 try { 134 os.close(); 135 } catch (IOException e1) { 136 } 137 } 138 if (socket != null) { 139 try { 140 socket.close(); 141 } catch (IOException e1) { 142 } 143 } 144 } 145 } 146 } 147 send.count++; 148 send.bytes += data.length; 149 send.cost += (System.currentTimeMillis() - startTime); 150 151 } catch (InterruptedException e) { 152 log.debug(Thread.currentThread().getName() +" was interruped."); 153 break; 154 } catch (Exception e) { 155 log.error(e.getMessage(), e); 156 } 157 } 158 } 159 160 164 private List getActiveServers() { 165 List prevActiveServers = activeServers; 166 if (serverInterval < 0) { 167 MMBase mmbase = MMBase.getMMBase(); 168 MMServers mmservers = (MMServers) mmbase.getBuilder("mmservers"); 169 serverInterval = mmservers.getIntervalTime(); 170 activeServers = mmservers.getActiveServers(); 171 lastServerChecked = System.currentTimeMillis(); 172 log.info("Active servers: " + activeServers ); 173 } else { 174 if (lastServerChecked + serverInterval < System.currentTimeMillis()) { 175 MMBase mmbase = MMBase.getMMBase(); 176 MMServers mmservers = (MMServers) mmbase.getBuilder("mmservers"); 177 activeServers = mmservers.getActiveServers(); 178 lastServerChecked = System.currentTimeMillis(); 179 if (! activeServers.equals(prevActiveServers)) { 180 log.info("Active servers: " + activeServers + " " + prevActiveServers.size() + " -> " + activeServers.size()); 181 } else { 182 log.debug("Active servers: " + activeServers); 183 } 184 } 185 } 186 return activeServers; 187 } 188 189 190 } 191
| Popular Tags
|