1 7 package winstone.cluster; 8 9 import java.io.IOException ; 10 import java.io.InputStream ; 11 import java.io.ObjectInputStream ; 12 import java.io.ObjectOutputStream ; 13 import java.io.OutputStream ; 14 import java.net.ConnectException ; 15 import java.net.Socket ; 16 import java.util.ArrayList ; 17 import java.util.Collection ; 18 import java.util.Date ; 19 import java.util.HashSet ; 20 import java.util.Hashtable ; 21 import java.util.Iterator ; 22 import java.util.List ; 23 import java.util.Map ; 24 import java.util.Set ; 25 import java.util.StringTokenizer ; 26 27 import winstone.Cluster; 28 import winstone.HostConfiguration; 29 import winstone.HostGroup; 30 import winstone.Logger; 31 import winstone.WebAppConfiguration; 32 import winstone.WinstoneResourceBundle; 33 import winstone.WinstoneSession; 34 35 41 public class SimpleCluster implements Runnable , Cluster { 42 final int SESSION_CHECK_TIMEOUT = 100; 43 final int HEARTBEAT_PERIOD = 5000; 44 final int MAX_NO_OF_MISSING_HEARTBEATS = 3; 45 final byte NODELIST_DOWNLOAD_TYPE = (byte) '2'; 46 final byte NODE_HEARTBEAT_TYPE = (byte) '3'; 47 48 public static final WinstoneResourceBundle CLUSTER_RESOURCES = new WinstoneResourceBundle("winstone.cluster.LocalStrings"); 49 private int controlPort; 50 private String initialClusterNodes; 51 private Map clusterAddresses; 52 private boolean interrupted; 53 54 57 public SimpleCluster(Map args, Integer controlPort) { 58 this.interrupted = false; 59 this.clusterAddresses = new Hashtable (); 60 if (controlPort != null) 61 this.controlPort = controlPort.intValue(); 62 63 this.initialClusterNodes = (String ) args.get("clusterNodes"); 65 Thread thread = new Thread (this, CLUSTER_RESOURCES 66 .getString("SimpleCluster.ThreadName")); 67 thread.setDaemon(true); 68 thread.setPriority(Thread.MIN_PRIORITY); 69 thread.start(); 70 } 71 72 public void destroy() { 73 this.interrupted = true; 74 } 75 76 80 public void run() { 81 if (this.initialClusterNodes != null) { 84 StringTokenizer st = new StringTokenizer (this.initialClusterNodes, 85 ","); 86 while (st.hasMoreTokens() && !interrupted) 87 askClusterNodeForNodeList(st.nextToken()); 88 } 89 90 Logger.log(Logger.DEBUG, CLUSTER_RESOURCES, "SimpleCluster.InitNodes", "" 91 + this.clusterAddresses.size()); 92 93 while (!interrupted) { 94 try { 95 Set addresses = new HashSet (this.clusterAddresses.keySet()); 96 Date noHeartbeatDate = new Date (System.currentTimeMillis() 97 - (MAX_NO_OF_MISSING_HEARTBEATS * HEARTBEAT_PERIOD)); 98 for (Iterator i = addresses.iterator(); i.hasNext();) { 99 String ipPort = (String ) i.next(); 100 101 Date lastHeartBeat = (Date ) this.clusterAddresses 102 .get(ipPort); 103 if (lastHeartBeat.before(noHeartbeatDate)) { 104 this.clusterAddresses.remove(ipPort); 105 Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES, 106 "SimpleCluster.RemovingNode", ipPort); 107 } 108 109 else 111 sendHeartbeat(ipPort); 112 113 } 114 Thread.sleep(HEARTBEAT_PERIOD); 115 } catch (Throwable err) { 116 Logger.log(Logger.ERROR, CLUSTER_RESOURCES, 117 "SimpleCluster.ErrorMonitorThread", err); 118 } 119 } 120 Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES, 121 "SimpleCluster.FinishedMonitorThread"); 122 } 123 124 131 public WinstoneSession askClusterForSession(String sessionId, 132 WebAppConfiguration webAppConfig) { 133 Collection addresses = new ArrayList (clusterAddresses.keySet()); 135 Collection searchThreads = new ArrayList (); 136 for (Iterator i = addresses.iterator(); i.hasNext();) { 137 String ipPort = (String ) i.next(); 138 ClusterSessionSearch search = new ClusterSessionSearch( 139 webAppConfig.getContextPath(), webAppConfig.getOwnerHostname(), 140 sessionId, ipPort, this.controlPort); 141 searchThreads.add(search); 142 } 143 144 WinstoneSession answer = null; 146 String senderThread = null; 147 boolean finished = false; 148 while (!finished) { 149 List finishedThreads = new ArrayList (); 152 for (Iterator i = searchThreads.iterator(); i.hasNext();) { 153 ClusterSessionSearch searchThread = (ClusterSessionSearch) i 154 .next(); 155 if (!searchThread.isFinished()) 156 continue; 157 else if (searchThread.getResult() == null) 158 finishedThreads.add(searchThread); 159 else { 160 answer = searchThread.getResult(); 161 senderThread = searchThread.getAddressPort(); 162 } 163 } 164 165 for (Iterator i = finishedThreads.iterator(); i.hasNext();) 167 searchThreads.remove(i.next()); 168 169 if (searchThreads.isEmpty() || (answer != null)) 170 finished = true; 171 else 172 try { 173 Thread.sleep(100); 174 } catch (InterruptedException err) { 175 } 176 } 177 178 for (Iterator i = searchThreads.iterator(); i.hasNext();) { 180 ClusterSessionSearch searchThread = (ClusterSessionSearch) i.next(); 181 searchThread.destroy(); 182 } 183 if (answer != null) { 184 answer.activate(webAppConfig); 185 Logger.log(Logger.DEBUG, CLUSTER_RESOURCES, 186 "SimpleCluster.SessionTransferredFrom", senderThread); 187 } 188 return answer; 189 } 190 191 196 private void askClusterNodeForNodeList(String address) { 197 try { 198 int colonPos = address.indexOf(':'); 199 String ipAddress = address.substring(0, colonPos); 200 String port = address.substring(colonPos + 1); 201 Socket clusterListSocket = new Socket (ipAddress, 202 Integer.parseInt(port)); 203 this.clusterAddresses.put(clusterListSocket.getInetAddress() 204 .getHostAddress() + ":" + port, new Date ()); 205 InputStream in = clusterListSocket.getInputStream(); 206 OutputStream out = clusterListSocket.getOutputStream(); 207 out.write(NODELIST_DOWNLOAD_TYPE); 208 out.flush(); 209 210 ObjectOutputStream outControl = new ObjectOutputStream (out); 212 outControl.writeInt(this.controlPort); 213 outControl.flush(); 214 215 ObjectInputStream inData = new ObjectInputStream (in); 217 int nodeCount = inData.readInt(); 218 for (int n = 0; n < nodeCount; n++) 219 this.clusterAddresses.put(inData.readUTF(), new Date ()); 220 221 inData.close(); 222 outControl.close(); 223 out.close(); 224 in.close(); 225 clusterListSocket.close(); 226 } catch (ConnectException err) { 227 Logger.log(Logger.DEBUG, CLUSTER_RESOURCES, 228 "SimpleCluster.NoNodeListResponse", address); 229 } catch (Throwable err) { 230 Logger.log(Logger.ERROR, CLUSTER_RESOURCES, 231 "SimpleCluster.ErrorGetNodeList", address, err); 232 } 233 } 234 235 240 private void sendHeartbeat(String address) { 241 try { 242 int colonPos = address.indexOf(':'); 243 String ipAddress = address.substring(0, colonPos); 244 String port = address.substring(colonPos + 1); 245 Socket heartbeatSocket = new Socket (ipAddress, 246 Integer.parseInt(port)); 247 OutputStream out = heartbeatSocket.getOutputStream(); 248 out.write(NODE_HEARTBEAT_TYPE); 249 out.flush(); 250 ObjectOutputStream outData = new ObjectOutputStream (out); 251 outData.writeInt(this.controlPort); 252 outData.close(); 253 heartbeatSocket.close(); 254 Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES, 255 "SimpleCluster.HeartbeatSent", address); 256 } catch (ConnectException err) { 257 } catch (Throwable err) { 258 Logger.log(Logger.ERROR, CLUSTER_RESOURCES, 259 "SimpleCluster.HeartbeatError", address, err); 260 } 261 } 262 263 273 public void clusterRequest(byte requestType, InputStream in, 274 OutputStream out, Socket socket, HostGroup hostGroup) 275 throws IOException { 276 if (requestType == ClusterSessionSearch.SESSION_CHECK_TYPE) 277 handleClusterSessionRequest(socket, in, out, hostGroup); 278 else if (requestType == NODELIST_DOWNLOAD_TYPE) 279 handleNodeListDownloadRequest(socket, in, out); 280 else if (requestType == NODE_HEARTBEAT_TYPE) 281 handleNodeHeartBeatRequest(socket, in); 282 else 283 Logger.log(Logger.ERROR, CLUSTER_RESOURCES, 284 "SimpleCluster.UnknownRequest", "" + (char) requestType); 285 } 286 287 290 public void handleClusterSessionRequest(Socket socket, InputStream in, 291 OutputStream out, HostGroup hostGroup) 292 throws IOException { 293 ObjectInputStream inControl = new ObjectInputStream (in); 295 int port = inControl.readInt(); 296 String ipPortSender = socket.getInetAddress().getHostAddress() + ":" + port; 297 String sessionId = inControl.readUTF(); 298 String hostname = inControl.readUTF(); 299 HostConfiguration hostConfig = hostGroup.getHostByName(hostname); 300 String webAppPrefix = inControl.readUTF(); 301 WebAppConfiguration webAppConfig = hostConfig.getWebAppByURI(webAppPrefix); 302 ObjectOutputStream outData = new ObjectOutputStream (out); 303 if (webAppConfig == null) { 304 outData.writeUTF(ClusterSessionSearch.SESSION_NOT_FOUND); 305 } else { 306 WinstoneSession session = webAppConfig.getSessionById(sessionId, true); 307 if (session != null) { 308 outData.writeUTF(ClusterSessionSearch.SESSION_FOUND); 309 outData.writeObject(session); 310 outData.flush(); 311 if (inControl.readUTF().equals( 312 ClusterSessionSearch.SESSION_RECEIVED)) 313 session.passivate(); 314 Logger.log(Logger.DEBUG, CLUSTER_RESOURCES, 315 "SimpleCluster.SessionTransferredTo", ipPortSender); 316 } else { 317 outData.writeUTF(ClusterSessionSearch.SESSION_NOT_FOUND); 318 } 319 } 320 outData.close(); 321 inControl.close(); 322 } 323 324 327 public void handleNodeListDownloadRequest(Socket socket, InputStream in, 328 OutputStream out) throws IOException { 329 ObjectInputStream inControl = new ObjectInputStream (in); 332 int port = inControl.readInt(); 333 String ipPortSender = socket.getInetAddress().getHostAddress() + ":" 334 + port; 335 List allClusterNodes = new ArrayList (this.clusterAddresses.keySet()); 336 List relevantClusterNodes = new ArrayList (); 337 for (Iterator i = allClusterNodes.iterator(); i.hasNext();) { 338 String node = (String ) i.next(); 339 if (!node.equals(ipPortSender)) 340 relevantClusterNodes.add(node); 341 } 342 343 ObjectOutputStream outData = new ObjectOutputStream (out); 344 outData.writeInt(relevantClusterNodes.size()); 345 outData.flush(); 346 for (Iterator i = relevantClusterNodes.iterator(); i.hasNext();) { 347 String ipPort = (String ) i.next(); 348 if (!ipPort.equals(ipPortSender)) 349 outData.writeUTF(ipPort); 350 outData.flush(); 351 } 352 outData.close(); 353 inControl.close(); 354 } 355 356 359 public void handleNodeHeartBeatRequest(Socket socket, InputStream in) 360 throws IOException { 361 ObjectInputStream inData = new ObjectInputStream (in); 362 int remoteControlPort = inData.readInt(); 363 inData.close(); 364 String ipPort = socket.getInetAddress().getHostAddress() + ":" 365 + remoteControlPort; 366 this.clusterAddresses.put(ipPort, new Date ()); 367 Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES, 368 "SimpleCluster.HeartbeatReceived", ipPort); 369 } 370 } 371 | Popular Tags |