1 23 24 package org.continuent.sequoia.controller.core; 25 26 import java.io.IOException ; 27 import java.io.OptionalDataException ; 28 import java.net.Socket ; 29 import java.util.ArrayList ; 30 31 import org.continuent.sequoia.common.i18n.Translate; 32 import org.continuent.sequoia.common.log.Trace; 33 import org.continuent.sequoia.common.protocol.Commands; 34 import org.continuent.sequoia.common.stream.DriverBufferedInputStream; 35 import org.continuent.sequoia.common.stream.DriverBufferedOutputStream; 36 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabase; 37 import org.continuent.sequoia.controller.virtualdatabase.VirtualDatabaseWorkerThread; 38 39 47 public class ControllerWorkerThread extends Thread 48 { 49 private ControllerServerThread serverThread; 50 private boolean isKilled = false; 51 52 53 static Trace logger = Trace 54 .getLogger("org.continuent.sequoia.controller.core.Controller"); 55 56 59 60 66 public ControllerWorkerThread(ControllerServerThread serverThread) 67 { 68 super("ControllerWorkerThread"); 69 this.serverThread = serverThread; 70 } 71 72 75 public void run() 76 { 77 Socket clientSocket; 78 79 if (serverThread == null) 80 { 81 logger.error(Translate.get("controller.workerthread.null.serverthread")); 82 isKilled = true; 83 } 84 else if (serverThread.controllerServerThreadPendingQueue == null) 85 { 86 logger.error(Translate.get("controller.workerthread.null.pendingqueue")); 87 isKilled = true; 88 } 89 90 while (!isKilled) 92 { 93 if (serverThread.isShuttingDown()) 94 break; 95 synchronized (serverThread.controllerServerThreadPendingQueue) 97 { 98 while (serverThread.controllerServerThreadPendingQueue.isEmpty()) 99 { 100 serverThread.idleWorkerThreads++; 102 boolean timeout = false; 103 try 104 { 105 long before = System.currentTimeMillis(); 106 serverThread.controllerServerThreadPendingQueue 107 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME); 108 long now = System.currentTimeMillis(); 109 timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME; 111 } 112 catch (InterruptedException ignore) 113 { 114 } 115 serverThread.idleWorkerThreads--; 116 if (serverThread.controllerServerThreadPendingQueue == null) 118 { 119 isKilled = true; 120 break; 121 } 122 if (timeout 123 && serverThread.controllerServerThreadPendingQueue.isEmpty()) 124 { 125 isKilled = true; 127 break; 128 } 129 } 130 131 if (isKilled) 132 break; 133 134 clientSocket = (Socket ) serverThread.controllerServerThreadPendingQueue 136 .remove(0); 137 } 139 if (clientSocket == null) 140 { 141 logger.error(Translate.get("controller.workerthread.null.socket")); 142 continue; 143 } 144 else if (logger.isDebugEnabled()) 145 logger.debug(Translate.get("controller.workerthread.connection.from", 146 new String []{clientSocket.getInetAddress().toString(), 147 String.valueOf(clientSocket.getPort())})); 148 149 try 150 { 151 clientSocket.setTcpNoDelay(true); 154 155 DriverBufferedInputStream in = new DriverBufferedInputStream( 157 clientSocket); 158 DriverBufferedOutputStream out = new DriverBufferedOutputStream( 159 clientSocket); 160 161 int driverVersion = in.readInt(); 163 164 if (driverVersion != Commands.ProtocolVersion) 165 { 166 if (driverVersion != Commands.Ping) 167 { 168 String versionMismatch = Translate 174 .get( 175 "controller.workerthread.protocol.versions", 176 new Object []{ 177 Integer.toString(Commands 178 .getProtocolMajorVersion(driverVersion)) 179 + "." 180 + Commands.getProtocolMinorVersion(driverVersion), 181 Commands 182 .getProtocolMajorVersion(Commands.ProtocolVersion) 183 + "." 184 + Commands 185 .getProtocolMinorVersion(Commands.ProtocolVersion)}); 186 if (Commands.getProtocolMajorVersion(driverVersion) != Commands 187 .getProtocolMajorVersion(Commands.ProtocolVersion) 188 || Commands.getProtocolMinorVersion(driverVersion) > Commands 189 .getProtocolMinorVersion(Commands.ProtocolVersion)) 190 { 191 abortConnectionEstablishement(out, Translate.get( 192 "controller.workerthread.protocol.incompatible", 193 versionMismatch)); 194 continue; 195 } 196 if (logger.isInfoEnabled()) 197 logger.info(Translate.get( 198 "controller.workerthread.protocol.old.driver", 199 versionMismatch)); 200 } 201 else 202 { 203 if (logger.isDebugEnabled()) 204 logger.debug("Controller pinged"); 205 try 206 { 207 clientSocket.close(); 209 } 210 catch (Exception ignore) 211 { 212 } 213 continue; 214 } 215 } 216 String virtualDatabaseName = in.readLongUTF(); 218 219 VirtualDatabase vdb = serverThread.controller 221 .getVirtualDatabase(virtualDatabaseName); 222 if (vdb == null) 223 { 224 abortConnectionEstablishement(out, Translate.get( 227 "virtualdatabase.not.found", virtualDatabaseName)); 228 continue; 229 } 230 if (vdb.isShuttingDown()) 231 { 232 String msg = Translate.get("virtualdatabase.shutting.down", 233 virtualDatabaseName); 234 logger.warn(msg); 235 abortConnectionEstablishement(out, msg); 236 continue; 237 } 238 239 ArrayList vdbActiveThreads = vdb.getActiveThreads(); 243 ArrayList vdbPendingQueue = vdb.getPendingConnections(); 244 245 if (vdbActiveThreads == null) 246 { 247 logger.error(Translate 248 .get("controller.workerthread.null.active.thread")); 249 isKilled = true; 250 } 251 if (vdbPendingQueue == null) 252 { 253 logger 254 .error(Translate.get("controller.workerthread.null.connection")); 255 isKilled = true; 256 } 257 258 boolean tooManyConnections; 260 synchronized (vdbActiveThreads) 261 { 262 while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads()) 263 { 264 forkVirtualDatabaseWorkerThread(vdb, Translate 265 .get("controller.workerthread.starting.thread.for.minimum")); 266 } 267 268 tooManyConnections = (vdb.getMaxNbOfConnections() > 0) 271 && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb 272 .getMaxNbOfConnections(); 273 } 274 if (tooManyConnections) 275 { 276 abortConnectionEstablishement(out, Translate 277 .get("controller.workerthread.too.many.connections")); 278 continue; 279 } 280 281 289 out.writeBoolean(true); 290 291 synchronized (vdbPendingQueue) 293 { 294 vdbPendingQueue.add(in); 295 vdbPendingQueue.add(out); 296 clientSocket = null; 298 synchronized (vdbActiveThreads) 299 { if (vdb.getIdleThreads() < vdbPendingQueue.size() / 2) 301 { if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads()) 303 || (vdb.getMaxNbOfThreads() == 0)) 304 { 305 forkVirtualDatabaseWorkerThread(vdb, Translate 306 .get("controller.workerthread.starting.thread")); 307 } 308 else if (logger.isInfoEnabled()) 309 logger.info(Translate.get( 310 "controller.workerthread.maximum.thread", vdb 311 .getMaxNbOfThreads())); 312 } 313 else 314 { 315 if (logger.isDebugEnabled()) 316 logger.debug(Translate 317 .get("controller.workerthread.notify.thread")); 318 vdbPendingQueue.notifyAll(); 324 } 325 } 326 } 327 } 328 catch (OptionalDataException e) 330 { 331 logger 332 .error(Translate.get("controller.workerthread.protocol.error", e)); 333 } 334 catch (IOException e) 335 { 336 logger.error(Translate.get("controller.workerthread.io.error", e)); 337 } 338 finally 339 { 340 try 341 { 342 if (clientSocket != null) 343 { 344 if (logger.isDebugEnabled()) 345 logger.debug(Translate 346 .get("controller.workerthread.connection.closing")); 347 clientSocket.close(); 348 } 349 } 350 catch (IOException ignore) 351 { 352 } 353 } 354 } 355 356 if (logger.isDebugEnabled()) 357 logger.debug(Translate.get("controller.workerthread.terminating")); 358 } 359 360 376 private void abortConnectionEstablishement(DriverBufferedOutputStream out, 377 String reason) 378 { 379 if (logger.isWarnEnabled()) 380 logger.warn(reason); 381 try 382 { 383 out.writeBoolean(false); out.writeLongUTF(reason); 385 out.flush(); 386 } 387 catch (IOException ignored) 389 { 390 } 391 } 392 393 399 private void forkVirtualDatabaseWorkerThread(VirtualDatabase vdb, 400 String debugmesg) 401 { 402 if (logger.isDebugEnabled()) 403 logger.debug(debugmesg); 404 VirtualDatabaseWorkerThread thread; 405 406 thread = new VirtualDatabaseWorkerThread(serverThread.controller, vdb); 407 408 vdb.addVirtualDatabaseWorkerThread(thread); 409 thread.start(); 410 } 411 } | Popular Tags |