KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > cjdbc > controller > core > ControllerWorkerThread


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________________________.
23  */

24
25 package org.objectweb.cjdbc.controller.core;
26
27 import java.io.IOException JavaDoc;
28 import java.io.OptionalDataException JavaDoc;
29 import java.net.Socket JavaDoc;
30 import java.util.ArrayList JavaDoc;
31
32 import org.objectweb.cjdbc.common.i18n.Translate;
33 import org.objectweb.cjdbc.common.log.Trace;
34 import org.objectweb.cjdbc.common.stream.CJDBCInputStream;
35 import org.objectweb.cjdbc.common.stream.CJDBCOutputStream;
36 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
37 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread;
38 import org.objectweb.cjdbc.driver.protocol.Commands;
39
40 /**
41  * The <code>ControllerWorkerThread</code> handles a connection with a C-JDBC
42  * driver. It reads a String containing the virtual database name from the
43  * driver and sends back the corresponding <code>ConnectionPoint</code>.
44  *
45  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
46  * @version 1.0
47  */

48 public class ControllerWorkerThread extends Thread JavaDoc
49 {
50   private ControllerServerThread serverThread;
51   private boolean isKilled = false;
52
53   /** Logger instance. */
54   static Trace logger = Trace
55                                               .getLogger("org.objectweb.cjdbc.controller.core.Controller");
56
57   /*
58    * Constructor
59    */

60
61   /**
62    * Creates a new <code>ControllerWorkerThread</code> instance.
63    *
64    * @param serverThread the <code>ControllerServerThread</code> that created
65    * us.
66    */

67   public ControllerWorkerThread(ControllerServerThread serverThread)
68   {
69     super("ControllerWorkerThread");
70     this.serverThread = serverThread;
71   }
72
73   /**
74    * Gets a connection from the connection queue and process it.
75    */

76   public void run()
77   {
78     Socket JavaDoc clientSocket;
79
80     if (serverThread == null)
81     {
82       logger.error(Translate.get("controller.workerthread.null.serverthread"));
83       isKilled = true;
84     }
85     else if (serverThread.controllerServerThreadPendingQueue == null)
86     {
87       logger.error(Translate.get("controller.workerthread.null.pendingqueue"));
88       isKilled = true;
89     }
90
91     // Main loop
92
while (!isKilled)
93     {
94       if (serverThread.isShuttingDown())
95         break;
96       // Get a connection from the pending queue
97
synchronized (serverThread.controllerServerThreadPendingQueue)
98       {
99         while (serverThread.controllerServerThreadPendingQueue.isEmpty())
100         {
101           // Nothing to do, let's sleep ...
102
serverThread.idleWorkerThreads++;
103           boolean timeout = false;
104           try
105           {
106             long before = System.currentTimeMillis();
107             serverThread.controllerServerThreadPendingQueue
108                 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME);
109             long now = System.currentTimeMillis();
110             // Check if timeout has expired
111
timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME;
112           }
113           catch (InterruptedException JavaDoc ignore)
114           {
115           }
116           serverThread.idleWorkerThreads--;
117           // We are shutting down
118
if (serverThread.controllerServerThreadPendingQueue == null)
119           {
120             isKilled = true;
121             break;
122           }
123           if (timeout
124               && serverThread.controllerServerThreadPendingQueue.isEmpty())
125           {
126             // Nothing to do, let's die.
127
isKilled = true;
128             break;
129           }
130         }
131
132         if (isKilled)
133           break;
134
135         // Get a connection
136
clientSocket = (Socket JavaDoc) serverThread.controllerServerThreadPendingQueue
137             .remove(0);
138       } // synchronized (serverThread.controllerServerThreadPendingQueue)
139

140       if (clientSocket == null)
141       {
142         logger.error(Translate.get("controller.workerthread.null.socket"));
143         continue;
144       }
145       else if (logger.isDebugEnabled())
146         logger.debug(Translate.get("controller.workerthread.connection.from",
147             new String JavaDoc[]{clientSocket.getInetAddress().toString(),
148                 String.valueOf(clientSocket.getPort())}));
149
150       try
151       {
152         // Disable Nagle algorithm else small messages are not sent
153
// (at least under Linux) even if we flush the output stream.
154
clientSocket.setTcpNoDelay(true);
155
156         // Handle connection
157
CJDBCInputStream in = new CJDBCInputStream(clientSocket);
158         CJDBCOutputStream out = new CJDBCOutputStream(clientSocket);
159
160         // Check protocol version for driver compatibility
161
int driverVersion = in.readInt();
162
163         if (driverVersion != Commands.ProtocolVersion)
164         {
165           if (driverVersion != Commands.Ping)
166             logger
167                 .warn(Translate.get(
168                     "controller.workerthread.protocol.incompatible",
169                     driverVersion));
170           else
171           {
172             if (logger.isDebugEnabled())
173               logger.debug("Controller pinged");
174             try
175             {
176               // Close the socket
177
clientSocket.close();
178             }
179             catch (Exception JavaDoc ignore)
180             {
181             }
182           }
183           continue;
184         }
185         // Driver version OK
186
String JavaDoc virtualDatabaseName = in.readUTF();
187
188         // Read the virtual database name
189
VirtualDatabase vdb = serverThread.controller
190             .getVirtualDatabase(virtualDatabaseName);
191         if (vdb == null)
192         {
193           String JavaDoc msg = Translate.get("virtualdatabase.not.found",
194               virtualDatabaseName);
195           logger.warn(msg);
196           continue;
197         }
198         if (vdb.isShuttingDown())
199         {
200           String JavaDoc msg = Translate.get("virtualdatabase.shutting.down",
201               virtualDatabaseName);
202           logger.warn(msg);
203           continue;
204         }
205
206         // At this point we have the virtual database the driver wants to
207
// connect to and we have to give the job to a
208
// VirtualDatabaseWorkerThread
209
ArrayList JavaDoc vdbActiveThreads = vdb.getActiveThreads();
210         ArrayList JavaDoc vdbPendingQueue = vdb.getPendingConnections();
211
212         if (vdbActiveThreads == null)
213         {
214           logger.error(Translate
215               .get("controller.workerthread.null.active.thread"));
216           isKilled = true;
217         }
218         if (vdbPendingQueue == null)
219         {
220           logger
221               .error(Translate.get("controller.workerthread.null.connection"));
222           isKilled = true;
223         }
224
225         // Start minimum number of worker threads
226
boolean tooManyConnections;
227         synchronized (vdbActiveThreads)
228         {
229           while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads())
230           {
231             forkVirtualDatabaseWorkerThread(vdb,
232                 "controller.workerthread.starting.thread.for.minimum");
233           }
234
235           // Check if maximum number of concurrent connections has been
236
// reached
237
tooManyConnections = (vdb.getMaxNbOfConnections() > 0)
238               && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb
239                   .getMaxNbOfConnections();
240         }
241         if (tooManyConnections)
242         {
243           out.writeBoolean(false);
244           out.writeUTF(Translate
245               .get("controller.workerthread.too.many.connections"));
246           out.close(); // closing is OK ?
247
continue;
248         }
249
250         // Put the connection in the queue
251
synchronized (vdbPendingQueue)
252         {
253           vdbPendingQueue.add(in);
254           vdbPendingQueue.add(out);
255           // Nullify the socket else it is closed in the finally block
256
clientSocket = null;
257           synchronized (vdbActiveThreads)
258           { // Is a thread available?
259
if (vdb.getIdleThreads() == 0)
260             { // No
261
if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads())
262                   || (vdb.getMaxNbOfThreads() == 0))
263               {
264                 forkVirtualDatabaseWorkerThread(vdb,
265                     "controller.workerthread.starting.thread");
266               }
267               else if (logger.isInfoEnabled())
268                 logger.info(Translate.get(
269                     "controller.workerthread.maximum.thread", vdb
270                         .getMaxNbOfThreads()));
271             }
272             else
273             {
274               if (logger.isDebugEnabled())
275                 logger.debug(Translate
276                     .get("controller.workerthread.notify.thread"));
277               // Here we notify all threads else if one thread doesn't wake
278
// up after the first notify() we will send a second notify()
279
// and one signal will be lost. So the safe way is to wake up
280
// everybody and that worker threads go back to sleep if there
281
// is no job.
282
vdbPendingQueue.notifyAll();
283             }
284           }
285         }
286       }
287       // }
288
catch (OptionalDataException JavaDoc e)
289       {
290         logger
291             .error(Translate.get("controller.workerthread.protocol.error", e));
292       }
293       catch (IOException JavaDoc e)
294       {
295         logger.error(Translate.get("controller.workerthread.io.error", e));
296       }
297       finally
298       {
299         try
300         {
301           if (clientSocket != null)
302           {
303             if (logger.isDebugEnabled())
304               logger.debug(Translate
305                   .get("controller.workerthread.connection.closing"));
306             clientSocket.close();
307           }
308         }
309         catch (IOException JavaDoc ignore)
310         {
311         }
312       }
313     }
314
315     if (logger.isDebugEnabled())
316       logger.debug(Translate.get("controller.workerthread.terminating"));
317   }
318
319   /**
320    * Fork a new worker thread.
321    *
322    * @param vdb VirtualDatabase to be served
323    * @param debugmesg debug message for the controller log
324    */

325   private void forkVirtualDatabaseWorkerThread(VirtualDatabase vdb,
326       String JavaDoc debugmesg)
327   {
328     if (logger.isDebugEnabled())
329       logger.debug(Translate.get(debugmesg));
330     VirtualDatabaseWorkerThread thread;
331
332     thread = new VirtualDatabaseWorkerThread(serverThread.controller, vdb);
333
334     vdb.getActiveThreads().add(thread);
335     vdb.addCurrentNbOfThread();
336     thread.start();
337   }
338 }
Popular Tags