KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > continuent > sequoia > controller > core > ControllerWorkerThread


1 /**
2  * Sequoia: Database clustering technology.
3  * Copyright (C) 2002-2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Copyright (C) 2005 AmicoSoft, Inc. dba Emic Networks
6  * Contact: sequoia@continuent.org
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  * Initial developer(s): Emmanuel Cecchet.
21  * Contributor(s): ______________________________________.
22  */

23
24 package org.continuent.sequoia.controller.core;
25
26 import java.io.IOException JavaDoc;
27 import java.io.OptionalDataException JavaDoc;
28 import java.net.Socket JavaDoc;
29 import java.util.ArrayList JavaDoc;
30
31 import org.continuent.sequoia.common.i18n.Translate;
32 import org.continuent.sequoia.common.log.Trace;
33 import org.continuent.sequoia.common.protocol.Commands;
34 import org.continuent.sequoia.common.stream.DriverBufferedInputStream;
35 import org.continuent.sequoia.common.stream.DriverBufferedOutputStream;
36 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase;
37 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread;
38
39 /**
40  * The <code>ControllerWorkerThread</code> handles a connection with a Sequoia
41  * driver. It reads a String containing the virtual database name from the
42  * driver and sends back the corresponding <code>ConnectionPoint</code>.
43  *
44  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
45  * @version 1.0
46  */

47 public class ControllerWorkerThread extends Thread JavaDoc
48 {
49   private ControllerServerThread serverThread;
50   private boolean isKilled = false;
51
52   /** Logger instance. */
53   static Trace logger = Trace
54                                               .getLogger("org.continuent.sequoia.controller.core.Controller");
55
56   /*
57    * Constructor
58    */

59
60   /**
61    * Creates a new <code>ControllerWorkerThread</code> instance.
62    *
63    * @param serverThread the <code>ControllerServerThread</code> that created
64    * us.
65    */

66   public ControllerWorkerThread(ControllerServerThread serverThread)
67   {
68     super("ControllerWorkerThread");
69     this.serverThread = serverThread;
70   }
71
72   /**
73    * Gets a connection from the connection queue and process it.
74    */

75   public void run()
76   {
77     Socket JavaDoc clientSocket;
78
79     if (serverThread == null)
80     {
81       logger.error(Translate.get("controller.workerthread.null.serverthread"));
82       isKilled = true;
83     }
84     else if (serverThread.controllerServerThreadPendingQueue == null)
85     {
86       logger.error(Translate.get("controller.workerthread.null.pendingqueue"));
87       isKilled = true;
88     }
89
90     // Main loop
91
while (!isKilled)
92     {
93       if (serverThread.isShuttingDown())
94         break;
95       // Get a connection from the pending queue
96
synchronized (serverThread.controllerServerThreadPendingQueue)
97       {
98         while (serverThread.controllerServerThreadPendingQueue.isEmpty())
99         {
100           // Nothing to do, let's sleep ...
101
serverThread.idleWorkerThreads++;
102           boolean timeout = false;
103           try
104           {
105             long before = System.currentTimeMillis();
106             serverThread.controllerServerThreadPendingQueue
107                 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME);
108             long now = System.currentTimeMillis();
109             // Check if timeout has expired
110
timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME;
111           }
112           catch (InterruptedException JavaDoc ignore)
113           {
114           }
115           serverThread.idleWorkerThreads--;
116           // We are shutting down
117
if (serverThread.controllerServerThreadPendingQueue == null)
118           {
119             isKilled = true;
120             break;
121           }
122           if (timeout
123               && serverThread.controllerServerThreadPendingQueue.isEmpty())
124           {
125             // Nothing to do, let's die.
126
isKilled = true;
127             break;
128           }
129         }
130
131         if (isKilled)
132           break;
133
134         // Get a connection
135
clientSocket = (Socket JavaDoc) serverThread.controllerServerThreadPendingQueue
136             .remove(0);
137       } // synchronized (serverThread.controllerServerThreadPendingQueue)
138

139       if (clientSocket == null)
140       {
141         logger.error(Translate.get("controller.workerthread.null.socket"));
142         continue;
143       }
144       else if (logger.isDebugEnabled())
145         logger.debug(Translate.get("controller.workerthread.connection.from",
146             new String JavaDoc[]{clientSocket.getInetAddress().toString(),
147                 String.valueOf(clientSocket.getPort())}));
148
149       try
150       {
151         // Disable Nagle algorithm else small messages are not sent
152
// (at least under Linux) even if we flush the output stream.
153
clientSocket.setTcpNoDelay(true);
154
155         // Handle connection
156
DriverBufferedInputStream in = new DriverBufferedInputStream(
157             clientSocket);
158         DriverBufferedOutputStream out = new DriverBufferedOutputStream(
159             clientSocket);
160
161         // Check protocol version for driver compatibility
162
int driverVersion = in.readInt();
163
164         if (driverVersion != Commands.ProtocolVersion)
165         {
166           if (driverVersion != Commands.Ping)
167           {
168             // If only minor versions differ, we can accept 'old versions' of
169
// drivers, they will still be compatible: they just won't know
170
// newer commands.
171
// In that case (Driver.minor < Controller.minor), we just inform
172
// that an old driver is connecting to us
173
String JavaDoc versionMismatch = Translate
174                 .get(
175                     "controller.workerthread.protocol.versions",
176                     new Object JavaDoc[]{
177                         Integer.toString(Commands
178                             .getProtocolMajorVersion(driverVersion))
179                             + "."
180                             + Commands.getProtocolMinorVersion(driverVersion),
181                         Commands
182                             .getProtocolMajorVersion(Commands.ProtocolVersion)
183                             + "."
184                             + Commands
185                                 .getProtocolMinorVersion(Commands.ProtocolVersion)});
186             if (Commands.getProtocolMajorVersion(driverVersion) != Commands
187                 .getProtocolMajorVersion(Commands.ProtocolVersion)
188                 || Commands.getProtocolMinorVersion(driverVersion) > Commands
189                     .getProtocolMinorVersion(Commands.ProtocolVersion))
190             {
191               abortConnectionEstablishement(out, Translate.get(
192                   "controller.workerthread.protocol.incompatible",
193                   versionMismatch));
194               continue;
195             }
196             if (logger.isInfoEnabled())
197               logger.info(Translate.get(
198                   "controller.workerthread.protocol.old.driver",
199                   versionMismatch));
200           }
201           else
202           {
203             if (logger.isDebugEnabled())
204               logger.debug("Controller pinged");
205             try
206             {
207               // Close the socket
208
clientSocket.close();
209             }
210             catch (Exception JavaDoc ignore)
211             {
212             }
213             continue;
214           }
215         }
216         // Driver version OK
217
String JavaDoc virtualDatabaseName = in.readLongUTF();
218
219         // Read the virtual database name
220
VirtualDatabase vdb = serverThread.controller
221             .getVirtualDatabase(virtualDatabaseName);
222         if (vdb == null)
223         {
224           // Tell the driver about the error (otherwise the driver just gets a
225
// closed socket, with no explicit reason)
226
abortConnectionEstablishement(out, Translate.get(
227               "virtualdatabase.not.found", virtualDatabaseName));
228           continue;
229         }
230         if (vdb.isShuttingDown())
231         {
232           String JavaDoc msg = Translate.get("virtualdatabase.shutting.down",
233               virtualDatabaseName);
234           logger.warn(msg);
235           abortConnectionEstablishement(out, msg);
236           continue;
237         }
238
239         // At this point we have the virtual database the driver wants to
240
// connect to and we have to give the job to a
241
// VirtualDatabaseWorkerThread
242
ArrayList JavaDoc vdbActiveThreads = vdb.getActiveThreads();
243         ArrayList JavaDoc vdbPendingQueue = vdb.getPendingConnections();
244
245         if (vdbActiveThreads == null)
246         {
247           logger.error(Translate
248               .get("controller.workerthread.null.active.thread"));
249           isKilled = true;
250         }
251         if (vdbPendingQueue == null)
252         {
253           logger
254               .error(Translate.get("controller.workerthread.null.connection"));
255           isKilled = true;
256         }
257
258         // Start minimum number of worker threads
259
boolean tooManyConnections;
260         synchronized (vdbActiveThreads)
261         {
262           while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads())
263           {
264             forkVirtualDatabaseWorkerThread(vdb, Translate
265                 .get("controller.workerthread.starting.thread.for.minimum"));
266           }
267
268           // Check if maximum number of concurrent connections has been
269
// reached
270
tooManyConnections = (vdb.getMaxNbOfConnections() > 0)
271               && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb
272                   .getMaxNbOfConnections();
273         }
274         if (tooManyConnections)
275         {
276           abortConnectionEstablishement(out, Translate
277               .get("controller.workerthread.too.many.connections"));
278           continue;
279         }
280
281         /**
282          * We successfully found the virtual database and we are handing over
283          * the connection to a virtual database worker thread (VDWT).
284          * Acknowledge success (for the vdb) to the driver, the VDWT will
285          * perform the user authentication.
286          * <p>
287          * Let the VDWT flush when it sends its boolean for auth. result.
288          */

289         out.writeBoolean(true);
290
291         // Put the connection in the queue
292
synchronized (vdbPendingQueue)
293         {
294           vdbPendingQueue.add(in);
295           vdbPendingQueue.add(out);
296           // Nullify the socket else it is closed in the finally block
297
clientSocket = null;
298           synchronized (vdbActiveThreads)
299           { // Is a thread available?
300
if (vdb.getIdleThreads() < vdbPendingQueue.size() / 2)
301             { // No
302
if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads())
303                   || (vdb.getMaxNbOfThreads() == 0))
304               {
305                 forkVirtualDatabaseWorkerThread(vdb, Translate
306                     .get("controller.workerthread.starting.thread"));
307               }
308               else if (logger.isInfoEnabled())
309                 logger.info(Translate.get(
310                     "controller.workerthread.maximum.thread", vdb
311                         .getMaxNbOfThreads()));
312             }
313             else
314             {
315               if (logger.isDebugEnabled())
316                 logger.debug(Translate
317                     .get("controller.workerthread.notify.thread"));
318               // Here we notify all threads else if one thread doesn't wake
319
// up after the first notify() we will send a second notify()
320
// and one signal will be lost. So the safe way is to wake up
321
// everybody and that worker threads go back to sleep if there
322
// is no job.
323
vdbPendingQueue.notifyAll();
324             }
325           }
326         }
327       }
328       // }
329
catch (OptionalDataException JavaDoc e)
330       {
331         logger
332             .error(Translate.get("controller.workerthread.protocol.error", e));
333       }
334       catch (IOException JavaDoc e)
335       {
336         logger.error(Translate.get("controller.workerthread.io.error", e));
337       }
338       finally
339       {
340         try
341         {
342           if (clientSocket != null)
343           {
344             if (logger.isDebugEnabled())
345               logger.debug(Translate
346                   .get("controller.workerthread.connection.closing"));
347             clientSocket.close();
348           }
349         }
350         catch (IOException JavaDoc ignore)
351         {
352         }
353       }
354     }
355
356     if (logger.isDebugEnabled())
357       logger.debug(Translate.get("controller.workerthread.terminating"));
358   }
359
360   /**
361    * Early aborts a connection establishement process: sends a negative
362    * acknowledgement to the driver, then the abort reason and logs a warning.
363    * <br>
364    * This function has been introduced in order to get more info <b>at the
365    * driver side</b>, when a connection fails. At this point (before the fork
366    * of the <code>VirtualDatabaseWorkerThread</code>), if we close the
367    * connection, the driver won't get any reason why the connection failed (only
368    * a closed socket). When calling this function, the driver has probably
369    * already send the remaining connection establishement info. These data are
370    * ignored (kept in the input socket buffer)
371    *
372    * @param reason string message indicating why the connection establishement
373    * has been aborted
374    * @param out output socket to the driver
375    */

376   private void abortConnectionEstablishement(DriverBufferedOutputStream out,
377       String JavaDoc reason)
378   {
379     if (logger.isWarnEnabled())
380       logger.warn(reason);
381     try
382     {
383       out.writeBoolean(false); // =connection failed
384
out.writeLongUTF(reason);
385       out.flush();
386     }
387     // ignore any stream error: we are closing the connection anyway !
388
catch (IOException JavaDoc ignored)
389     {
390     }
391   }
392
393   /**
394    * Fork a new worker thread.
395    *
396    * @param vdb VirtualDatabase to be served
397    * @param debugmesg debug message for the controller log
398    */

399   private void forkVirtualDatabaseWorkerThread(VirtualDatabase vdb,
400       String JavaDoc debugmesg)
401   {
402     if (logger.isDebugEnabled())
403       logger.debug(debugmesg);
404     VirtualDatabaseWorkerThread thread;
405
406     thread = new VirtualDatabaseWorkerThread(serverThread.controller, vdb);
407
408     vdb.addVirtualDatabaseWorkerThread(thread);
409     thread.start();
410   }
411 }
Popular Tags