KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > winstone > cluster > SimpleCluster


1 /*
2  * Copyright 2003-2006 Rick Knowles <winstone-devel at lists sourceforge net>
3  * Distributed under the terms of either:
4  * - the common development and distribution license (CDDL), v1.0; or
5  * - the GNU Lesser General Public License, v2.1 or later
6  */

7 package winstone.cluster;
8
9 import java.io.IOException JavaDoc;
10 import java.io.InputStream JavaDoc;
11 import java.io.ObjectInputStream JavaDoc;
12 import java.io.ObjectOutputStream JavaDoc;
13 import java.io.OutputStream JavaDoc;
14 import java.net.ConnectException JavaDoc;
15 import java.net.Socket JavaDoc;
16 import java.util.ArrayList JavaDoc;
17 import java.util.Collection JavaDoc;
18 import java.util.Date JavaDoc;
19 import java.util.HashSet JavaDoc;
20 import java.util.Hashtable JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.List JavaDoc;
23 import java.util.Map JavaDoc;
24 import java.util.Set JavaDoc;
25 import java.util.StringTokenizer JavaDoc;
26
27 import winstone.Cluster;
28 import winstone.HostConfiguration;
29 import winstone.HostGroup;
30 import winstone.Logger;
31 import winstone.WebAppConfiguration;
32 import winstone.WinstoneResourceBundle;
33 import winstone.WinstoneSession;
34
35 /**
36  * Represents a cluster of winstone containers.
37  *
38  * @author <a HREF="mailto:rick_knowles@hotmail.com">Rick Knowles</a>
39  * @version $Id: SimpleCluster.java,v 1.8 2006/08/10 06:38:31 rickknowles Exp $
40  */

41 public class SimpleCluster implements Runnable JavaDoc, Cluster {
42     final int SESSION_CHECK_TIMEOUT = 100;
43     final int HEARTBEAT_PERIOD = 5000;
44     final int MAX_NO_OF_MISSING_HEARTBEATS = 3;
45     final byte NODELIST_DOWNLOAD_TYPE = (byte) '2';
46     final byte NODE_HEARTBEAT_TYPE = (byte) '3';
47
48     public static final WinstoneResourceBundle CLUSTER_RESOURCES = new WinstoneResourceBundle("winstone.cluster.LocalStrings");
49     private int controlPort;
50     private String JavaDoc initialClusterNodes;
51     private Map JavaDoc clusterAddresses;
52     private boolean interrupted;
53
54     /**
55      * Builds a cluster instance
56      */

57     public SimpleCluster(Map JavaDoc args, Integer JavaDoc controlPort) {
58         this.interrupted = false;
59         this.clusterAddresses = new Hashtable JavaDoc();
60         if (controlPort != null)
61             this.controlPort = controlPort.intValue();
62
63         // Start cluster init thread
64
this.initialClusterNodes = (String JavaDoc) args.get("clusterNodes");
65         Thread JavaDoc thread = new Thread JavaDoc(this, CLUSTER_RESOURCES
66                 .getString("SimpleCluster.ThreadName"));
67         thread.setDaemon(true);
68         thread.setPriority(Thread.MIN_PRIORITY);
69         thread.start();
70     }
71
72     public void destroy() {
73         this.interrupted = true;
74     }
75
76     /**
77      * Send a heartbeat every now and then, and remove any nodes that haven't
78      * responded in 3 heartbeats.
79      */

80     public void run() {
81         // Ask each of the known addresses for their cluster lists, and build a
82
// set
83
if (this.initialClusterNodes != null) {
84             StringTokenizer JavaDoc st = new StringTokenizer JavaDoc(this.initialClusterNodes,
85                     ",");
86             while (st.hasMoreTokens() && !interrupted)
87                 askClusterNodeForNodeList(st.nextToken());
88         }
89
90         Logger.log(Logger.DEBUG, CLUSTER_RESOURCES, "SimpleCluster.InitNodes", ""
91                 + this.clusterAddresses.size());
92
93         while (!interrupted) {
94             try {
95                 Set JavaDoc addresses = new HashSet JavaDoc(this.clusterAddresses.keySet());
96                 Date JavaDoc noHeartbeatDate = new Date JavaDoc(System.currentTimeMillis()
97                         - (MAX_NO_OF_MISSING_HEARTBEATS * HEARTBEAT_PERIOD));
98                 for (Iterator JavaDoc i = addresses.iterator(); i.hasNext();) {
99                     String JavaDoc ipPort = (String JavaDoc) i.next();
100
101                     Date JavaDoc lastHeartBeat = (Date JavaDoc) this.clusterAddresses
102                             .get(ipPort);
103                     if (lastHeartBeat.before(noHeartbeatDate)) {
104                         this.clusterAddresses.remove(ipPort);
105                         Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES,
106                                 "SimpleCluster.RemovingNode", ipPort);
107                     }
108
109                     // Send heartbeat
110
else
111                         sendHeartbeat(ipPort);
112
113                 }
114                 Thread.sleep(HEARTBEAT_PERIOD);
115             } catch (Throwable JavaDoc err) {
116                 Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
117                         "SimpleCluster.ErrorMonitorThread", err);
118             }
119         }
120         Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES,
121                 "SimpleCluster.FinishedMonitorThread");
122     }
123
124     /**
125      * Check if the other nodes in this cluster have a session for this
126      * sessionId.
127      *
128      * @param sessionId The id of the session to check for
129      * @return A valid session instance
130      */

131     public WinstoneSession askClusterForSession(String JavaDoc sessionId,
132             WebAppConfiguration webAppConfig) {
133         // Iterate through the cluster members
134
Collection JavaDoc addresses = new ArrayList JavaDoc(clusterAddresses.keySet());
135         Collection JavaDoc searchThreads = new ArrayList JavaDoc();
136         for (Iterator JavaDoc i = addresses.iterator(); i.hasNext();) {
137             String JavaDoc ipPort = (String JavaDoc) i.next();
138             ClusterSessionSearch search = new ClusterSessionSearch(
139                     webAppConfig.getContextPath(), webAppConfig.getOwnerHostname(),
140                     sessionId, ipPort, this.controlPort);
141             searchThreads.add(search);
142         }
143
144         // Wait until we get an answer
145
WinstoneSession answer = null;
146         String JavaDoc senderThread = null;
147         boolean finished = false;
148         while (!finished) {
149             // Loop through all search threads. If finished, exit, otherwise
150
// sleep
151
List JavaDoc finishedThreads = new ArrayList JavaDoc();
152             for (Iterator JavaDoc i = searchThreads.iterator(); i.hasNext();) {
153                 ClusterSessionSearch searchThread = (ClusterSessionSearch) i
154                         .next();
155                 if (!searchThread.isFinished())
156                     continue;
157                 else if (searchThread.getResult() == null)
158                     finishedThreads.add(searchThread);
159                 else {
160                     answer = searchThread.getResult();
161                     senderThread = searchThread.getAddressPort();
162                 }
163             }
164
165             // Remove finished threads
166
for (Iterator JavaDoc i = finishedThreads.iterator(); i.hasNext();)
167                 searchThreads.remove(i.next());
168
169             if (searchThreads.isEmpty() || (answer != null))
170                 finished = true;
171             else
172                 try {
173                     Thread.sleep(100);
174                 } catch (InterruptedException JavaDoc err) {
175                 }
176         }
177
178         // Once we have an answer, terminate all search threads
179
for (Iterator JavaDoc i = searchThreads.iterator(); i.hasNext();) {
180             ClusterSessionSearch searchThread = (ClusterSessionSearch) i.next();
181             searchThread.destroy();
182         }
183         if (answer != null) {
184             answer.activate(webAppConfig);
185             Logger.log(Logger.DEBUG, CLUSTER_RESOURCES,
186                     "SimpleCluster.SessionTransferredFrom", senderThread);
187         }
188         return answer;
189     }
190
191     /**
192      * Given an address, retrieve the list of cluster nodes and initialise dates
193      *
194      * @param address The address to request a node list from
195      */

196     private void askClusterNodeForNodeList(String JavaDoc address) {
197         try {
198             int colonPos = address.indexOf(':');
199             String JavaDoc ipAddress = address.substring(0, colonPos);
200             String JavaDoc port = address.substring(colonPos + 1);
201             Socket JavaDoc clusterListSocket = new Socket JavaDoc(ipAddress,
202                     Integer.parseInt(port));
203             this.clusterAddresses.put(clusterListSocket.getInetAddress()
204                     .getHostAddress() + ":" + port, new Date JavaDoc());
205             InputStream JavaDoc in = clusterListSocket.getInputStream();
206             OutputStream JavaDoc out = clusterListSocket.getOutputStream();
207             out.write(NODELIST_DOWNLOAD_TYPE);
208             out.flush();
209
210             // Write out the control port
211
ObjectOutputStream JavaDoc outControl = new ObjectOutputStream JavaDoc(out);
212             outControl.writeInt(this.controlPort);
213             outControl.flush();
214
215             // For each node, add an entry to cluster nodes
216
ObjectInputStream JavaDoc inData = new ObjectInputStream JavaDoc(in);
217             int nodeCount = inData.readInt();
218             for (int n = 0; n < nodeCount; n++)
219                 this.clusterAddresses.put(inData.readUTF(), new Date JavaDoc());
220
221             inData.close();
222             outControl.close();
223             out.close();
224             in.close();
225             clusterListSocket.close();
226         } catch (ConnectException JavaDoc err) {
227             Logger.log(Logger.DEBUG, CLUSTER_RESOURCES,
228                     "SimpleCluster.NoNodeListResponse", address);
229         } catch (Throwable JavaDoc err) {
230             Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
231                     "SimpleCluster.ErrorGetNodeList", address, err);
232         }
233     }
234
235     /**
236      * Given an address, send a heartbeat
237      *
238      * @param address The address to request a node list from
239      */

240     private void sendHeartbeat(String JavaDoc address) {
241         try {
242             int colonPos = address.indexOf(':');
243             String JavaDoc ipAddress = address.substring(0, colonPos);
244             String JavaDoc port = address.substring(colonPos + 1);
245             Socket JavaDoc heartbeatSocket = new Socket JavaDoc(ipAddress,
246                     Integer.parseInt(port));
247             OutputStream JavaDoc out = heartbeatSocket.getOutputStream();
248             out.write(NODE_HEARTBEAT_TYPE);
249             out.flush();
250             ObjectOutputStream JavaDoc outData = new ObjectOutputStream JavaDoc(out);
251             outData.writeInt(this.controlPort);
252             outData.close();
253             heartbeatSocket.close();
254             Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES,
255                     "SimpleCluster.HeartbeatSent", address);
256         } catch (ConnectException JavaDoc err) {/* ignore - 3 fails, and we remove */
257         } catch (Throwable JavaDoc err) {
258             Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
259                     "SimpleCluster.HeartbeatError", address, err);
260         }
261     }
262
263     /**
264      * Accept a control socket request related to the cluster functions and
265      * process the request.
266      *
267      * @param requestType A byte indicating the request type
268      * @param in Socket input stream
269      * @param outSocket output stream
270      * @param webAppConfig Instance of the web app
271      * @throws IOException
272      */

273     public void clusterRequest(byte requestType, InputStream JavaDoc in,
274             OutputStream JavaDoc out, Socket JavaDoc socket, HostGroup hostGroup)
275             throws IOException JavaDoc {
276         if (requestType == ClusterSessionSearch.SESSION_CHECK_TYPE)
277             handleClusterSessionRequest(socket, in, out, hostGroup);
278         else if (requestType == NODELIST_DOWNLOAD_TYPE)
279             handleNodeListDownloadRequest(socket, in, out);
280         else if (requestType == NODE_HEARTBEAT_TYPE)
281             handleNodeHeartBeatRequest(socket, in);
282         else
283             Logger.log(Logger.ERROR, CLUSTER_RESOURCES,
284                     "SimpleCluster.UnknownRequest", "" + (char) requestType);
285     }
286
287     /**
288      * Handles incoming socket requests for session search
289      */

290     public void handleClusterSessionRequest(Socket JavaDoc socket, InputStream JavaDoc in,
291             OutputStream JavaDoc out, HostGroup hostGroup)
292             throws IOException JavaDoc {
293         // Read in a string for the sessionId
294
ObjectInputStream JavaDoc inControl = new ObjectInputStream JavaDoc(in);
295         int port = inControl.readInt();
296         String JavaDoc ipPortSender = socket.getInetAddress().getHostAddress() + ":" + port;
297         String JavaDoc sessionId = inControl.readUTF();
298         String JavaDoc hostname = inControl.readUTF();
299         HostConfiguration hostConfig = hostGroup.getHostByName(hostname);
300         String JavaDoc webAppPrefix = inControl.readUTF();
301         WebAppConfiguration webAppConfig = hostConfig.getWebAppByURI(webAppPrefix);
302         ObjectOutputStream JavaDoc outData = new ObjectOutputStream JavaDoc(out);
303         if (webAppConfig == null) {
304             outData.writeUTF(ClusterSessionSearch.SESSION_NOT_FOUND);
305         } else {
306             WinstoneSession session = webAppConfig.getSessionById(sessionId, true);
307             if (session != null) {
308                 outData.writeUTF(ClusterSessionSearch.SESSION_FOUND);
309                 outData.writeObject(session);
310                 outData.flush();
311                 if (inControl.readUTF().equals(
312                         ClusterSessionSearch.SESSION_RECEIVED))
313                     session.passivate();
314                 Logger.log(Logger.DEBUG, CLUSTER_RESOURCES,
315                         "SimpleCluster.SessionTransferredTo", ipPortSender);
316             } else {
317                 outData.writeUTF(ClusterSessionSearch.SESSION_NOT_FOUND);
318             }
319         }
320         outData.close();
321         inControl.close();
322     }
323
324     /**
325      * Handles incoming socket requests for cluster node lists.
326      */

327     public void handleNodeListDownloadRequest(Socket JavaDoc socket, InputStream JavaDoc in,
328             OutputStream JavaDoc out) throws IOException JavaDoc {
329         // Get the ip and port of the requester, and make sure we don't send
330
// that
331
ObjectInputStream JavaDoc inControl = new ObjectInputStream JavaDoc(in);
332         int port = inControl.readInt();
333         String JavaDoc ipPortSender = socket.getInetAddress().getHostAddress() + ":"
334                 + port;
335         List JavaDoc allClusterNodes = new ArrayList JavaDoc(this.clusterAddresses.keySet());
336         List JavaDoc relevantClusterNodes = new ArrayList JavaDoc();
337         for (Iterator JavaDoc i = allClusterNodes.iterator(); i.hasNext();) {
338             String JavaDoc node = (String JavaDoc) i.next();
339             if (!node.equals(ipPortSender))
340                 relevantClusterNodes.add(node);
341         }
342
343         ObjectOutputStream JavaDoc outData = new ObjectOutputStream JavaDoc(out);
344         outData.writeInt(relevantClusterNodes.size());
345         outData.flush();
346         for (Iterator JavaDoc i = relevantClusterNodes.iterator(); i.hasNext();) {
347             String JavaDoc ipPort = (String JavaDoc) i.next();
348             if (!ipPort.equals(ipPortSender))
349                 outData.writeUTF(ipPort);
350             outData.flush();
351         }
352         outData.close();
353         inControl.close();
354     }
355
356     /**
357      * Handles heartbeats. Just updates the date of this node's last heartbeat
358      */

359     public void handleNodeHeartBeatRequest(Socket JavaDoc socket, InputStream JavaDoc in)
360             throws IOException JavaDoc {
361         ObjectInputStream JavaDoc inData = new ObjectInputStream JavaDoc(in);
362         int remoteControlPort = inData.readInt();
363         inData.close();
364         String JavaDoc ipPort = socket.getInetAddress().getHostAddress() + ":"
365                 + remoteControlPort;
366         this.clusterAddresses.put(ipPort, new Date JavaDoc());
367         Logger.log(Logger.FULL_DEBUG, CLUSTER_RESOURCES,
368                 "SimpleCluster.HeartbeatReceived", ipPort);
369     }
370 }
371
Popular Tags