1 24 25 package org.objectweb.cjdbc.controller.core; 26 27 import java.io.IOException ; 28 import java.io.OptionalDataException ; 29 import java.net.Socket ; 30 import java.util.ArrayList ; 31 32 import org.objectweb.cjdbc.common.i18n.Translate; 33 import org.objectweb.cjdbc.common.log.Trace; 34 import org.objectweb.cjdbc.common.stream.CJDBCInputStream; 35 import org.objectweb.cjdbc.common.stream.CJDBCOutputStream; 36 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 37 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread; 38 import org.objectweb.cjdbc.driver.protocol.Commands; 39 40 48 public class ControllerWorkerThread extends Thread 49 { 50 private ControllerServerThread serverThread; 51 private boolean isKilled = false; 52 53 54 static Trace logger = Trace 55 .getLogger("org.objectweb.cjdbc.controller.core.Controller"); 56 57 60 61 67 public ControllerWorkerThread(ControllerServerThread serverThread) 68 { 69 super("ControllerWorkerThread"); 70 this.serverThread = serverThread; 71 } 72 73 76 public void run() 77 { 78 Socket clientSocket; 79 80 if (serverThread == null) 81 { 82 logger.error(Translate.get("controller.workerthread.null.serverthread")); 83 isKilled = true; 84 } 85 else if (serverThread.controllerServerThreadPendingQueue == null) 86 { 87 logger.error(Translate.get("controller.workerthread.null.pendingqueue")); 88 isKilled = true; 89 } 90 91 while (!isKilled) 93 { 94 if (serverThread.isShuttingDown()) 95 break; 96 synchronized (serverThread.controllerServerThreadPendingQueue) 98 { 99 while (serverThread.controllerServerThreadPendingQueue.isEmpty()) 100 { 101 serverThread.idleWorkerThreads++; 103 boolean timeout = false; 104 try 105 { 106 long before = System.currentTimeMillis(); 107 serverThread.controllerServerThreadPendingQueue 108 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME); 109 long now = System.currentTimeMillis(); 110 timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME; 112 } 113 catch (InterruptedException ignore) 114 { 115 } 116 serverThread.idleWorkerThreads--; 117 if (serverThread.controllerServerThreadPendingQueue == null) 119 { 120 isKilled = true; 121 break; 122 } 123 if (timeout 124 && serverThread.controllerServerThreadPendingQueue.isEmpty()) 125 { 126 isKilled = true; 128 break; 129 } 130 } 131 132 if (isKilled) 133 break; 134 135 clientSocket = (Socket ) serverThread.controllerServerThreadPendingQueue 137 .remove(0); 138 } 140 if (clientSocket == null) 141 { 142 logger.error(Translate.get("controller.workerthread.null.socket")); 143 continue; 144 } 145 else if (logger.isDebugEnabled()) 146 logger.debug(Translate.get("controller.workerthread.connection.from", 147 new String []{clientSocket.getInetAddress().toString(), 148 String.valueOf(clientSocket.getPort())})); 149 150 try 151 { 152 clientSocket.setTcpNoDelay(true); 155 156 CJDBCInputStream in = new CJDBCInputStream(clientSocket); 158 CJDBCOutputStream out = new CJDBCOutputStream(clientSocket); 159 160 int driverVersion = in.readInt(); 162 163 if (driverVersion != Commands.ProtocolVersion) 164 { 165 if (driverVersion != Commands.Ping) 166 logger 167 .warn(Translate.get( 168 "controller.workerthread.protocol.incompatible", 169 driverVersion)); 170 else 171 { 172 if (logger.isDebugEnabled()) 173 logger.debug("Controller pinged"); 174 try 175 { 176 clientSocket.close(); 178 } 179 catch (Exception ignore) 180 { 181 } 182 } 183 continue; 184 } 185 String virtualDatabaseName = in.readUTF(); 187 188 VirtualDatabase vdb = serverThread.controller 190 .getVirtualDatabase(virtualDatabaseName); 191 if (vdb == null) 192 { 193 String msg = Translate.get("virtualdatabase.not.found", 194 virtualDatabaseName); 195 logger.warn(msg); 196 continue; 197 } 198 if (vdb.isShuttingDown()) 199 { 200 String msg = Translate.get("virtualdatabase.shutting.down", 201 virtualDatabaseName); 202 logger.warn(msg); 203 continue; 204 } 205 206 ArrayList vdbActiveThreads = vdb.getActiveThreads(); 210 ArrayList vdbPendingQueue = vdb.getPendingConnections(); 211 212 if (vdbActiveThreads == null) 213 { 214 logger.error(Translate 215 .get("controller.workerthread.null.active.thread")); 216 isKilled = true; 217 } 218 if (vdbPendingQueue == null) 219 { 220 logger 221 .error(Translate.get("controller.workerthread.null.connection")); 222 isKilled = true; 223 } 224 225 boolean tooManyConnections; 227 synchronized (vdbActiveThreads) 228 { 229 while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads()) 230 { 231 forkVirtualDatabaseWorkerThread(vdb, 232 "controller.workerthread.starting.thread.for.minimum"); 233 } 234 235 tooManyConnections = (vdb.getMaxNbOfConnections() > 0) 238 && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb 239 .getMaxNbOfConnections(); 240 } 241 if (tooManyConnections) 242 { 243 out.writeBoolean(false); 244 out.writeUTF(Translate 245 .get("controller.workerthread.too.many.connections")); 246 out.close(); continue; 248 } 249 250 synchronized (vdbPendingQueue) 252 { 253 vdbPendingQueue.add(in); 254 vdbPendingQueue.add(out); 255 clientSocket = null; 257 synchronized (vdbActiveThreads) 258 { if (vdb.getIdleThreads() == 0) 260 { if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads()) 262 || (vdb.getMaxNbOfThreads() == 0)) 263 { 264 forkVirtualDatabaseWorkerThread(vdb, 265 "controller.workerthread.starting.thread"); 266 } 267 else if (logger.isInfoEnabled()) 268 logger.info(Translate.get( 269 "controller.workerthread.maximum.thread", vdb 270 .getMaxNbOfThreads())); 271 } 272 else 273 { 274 if (logger.isDebugEnabled()) 275 logger.debug(Translate 276 .get("controller.workerthread.notify.thread")); 277 vdbPendingQueue.notifyAll(); 283 } 284 } 285 } 286 } 287 catch (OptionalDataException e) 289 { 290 logger 291 .error(Translate.get("controller.workerthread.protocol.error", e)); 292 } 293 catch (IOException e) 294 { 295 logger.error(Translate.get("controller.workerthread.io.error", e)); 296 } 297 finally 298 { 299 try 300 { 301 if (clientSocket != null) 302 { 303 if (logger.isDebugEnabled()) 304 logger.debug(Translate 305 .get("controller.workerthread.connection.closing")); 306 clientSocket.close(); 307 } 308 } 309 catch (IOException ignore) 310 { 311 } 312 } 313 } 314 315 if (logger.isDebugEnabled()) 316 logger.debug(Translate.get("controller.workerthread.terminating")); 317 } 318 319 325 private void forkVirtualDatabaseWorkerThread(VirtualDatabase vdb, 326 String debugmesg) 327 { 328 if (logger.isDebugEnabled()) 329 logger.debug(Translate.get(debugmesg)); 330 VirtualDatabaseWorkerThread thread; 331 332 thread = new VirtualDatabaseWorkerThread(serverThread.controller, vdb); 333 334 vdb.getActiveThreads().add(thread); 335 vdb.addCurrentNbOfThread(); 336 thread.start(); 337 } 338 } | Popular Tags |