1 10 package org.mmbase.module.database; 11 12 import java.sql.*; 13 import java.util.*; 14 import org.mmbase.util.DijkstraSemaphore; 15 import org.mmbase.module.core.MMBase; 16 import org.mmbase.util.logging.Logger; 17 import org.mmbase.util.logging.Logging; 18 19 25 public class MultiPool { 26 27 private static final Logger log = Logging.getLoggerInstance(MultiPool.class); 28 29 private List pool = null; 30 private List busyPool = new ArrayList(); 31 private int conMax = 4; 32 private DijkstraSemaphore semaphore = null; 33 private int totalConnections = 0; 34 private int maxQueries = 500; 35 private String name; 36 private String password; 37 private String url; 38 private DatabaseSupport databaseSupport; 39 40 private boolean doReconnect = true; 41 42 private long maxLifeTime = 120000; 43 private long maxZeroTime = maxLifeTime / 4; 44 45 48 MultiPool(DatabaseSupport databaseSupport, String url, String name, String password, int conMax) throws SQLException { 49 this(databaseSupport, url, name, password, conMax, 500); 50 51 } 52 55 MultiPool(DatabaseSupport databaseSupport, String url, String name, String password, int conMax,int maxQueries) throws SQLException { 56 57 this.conMax = conMax; 58 this.name = name; 59 this.url = url; 60 this.password = password; 61 this.maxQueries = maxQueries; 62 if (this.maxQueries <= 0) doReconnect = false; 63 this.databaseSupport = databaseSupport; 64 log.service("Creating a multipool for database " + name + "(" + url + ") containing : " + conMax + " connections, " + (doReconnect ? "which will be refreshed after " + this.maxQueries + " queries" : "which will not be refreshed")); 65 createPool(); 66 } 67 68 72 void setMaxLifeTime(long maxLifeTime) { 73 this.maxLifeTime = maxLifeTime; 74 maxZeroTime = maxLifeTime / 4; 75 } 76 77 81 long getMaxLifeTime() { 82 return maxLifeTime; 83 } 84 85 89 protected void createPool() { 90 pool = new ArrayList(); 91 MMBase mmb = MMBase.getMMBase(); 92 boolean logStack = true; 93 try { 94 while (!fillPool(logStack)) { 95 log.error("Cannot run with no connections, retrying after 10 seconds for " + mmb + " " + (mmb.isShutdown() ? "(shutdown)" : "")); 96 Thread.sleep(10000); 97 logStack = false; if (mmb.isShutdown()) { 99 log.info("MMBase has been shutted down."); 100 return; 101 } 102 } 103 semaphore = new DijkstraSemaphore(pool.size()); 104 log.service("Connection pool has " + conMax + " connections"); 105 } catch (InterruptedException ie) { 106 log.info("Interrupted: " + ie.getMessage()); 107 } 108 109 110 } 111 112 113 118 protected boolean fillPool(boolean logStack) { 119 int errors = 0; 120 SQLException firstError = null; 121 for (int i = 0; i < conMax ; i++) { 123 try { 124 pool.add(getMultiConnection()); 125 } catch (SQLException se) { 126 errors++; 127 if (log.isDebugEnabled()) { 128 log.debug("i: " + "error " + errors + ": " + se.getMessage()); 129 } 130 if (firstError == null) { 131 firstError = se; 132 } 133 } 134 } 135 if (errors > 0) { 136 String message = firstError.getMessage(); 137 if (logStack) { 138 message += " " + Logging.stackTrace(firstError); 139 } else { 140 int nl = message.indexOf('\n'); if (nl > 0) { 142 message = message.substring(0, nl) + "..."; } 144 } 145 146 log.error("Could not get all connections (" + errors + " failures, multipool size now " + pool.size() + " rather then " + conMax +"). First error: " + message); 147 if (pool.size() < 1) { return false; 149 } 150 this.conMax = pool.size(); 151 } 152 153 return true; 154 155 156 } 157 158 163 protected MultiConnection getMultiConnection() throws SQLException { 164 log.debug("Getting a new connection for url '" + url + "'"); 165 Connection con; 166 if (name.equals("url") && password.equals("url")) { 167 con = DriverManager.getConnection(url); 168 } else { 169 con = DriverManager.getConnection(url, name, password); 170 } 171 databaseSupport.initConnection(con); 172 return new MultiConnection(this, con); 173 } 174 175 179 protected void replaceConnection(MultiConnection multiCon) throws SQLException { 180 if (name.equals("url") && password.equals("url")) { 181 multiCon.con = DriverManager.getConnection(url); 182 } else { 183 multiCon.con = DriverManager.getConnection(url, name, password); 184 } 185 databaseSupport.initConnection(multiCon.con); 186 187 } 188 189 protected void finalize() { 190 shutdown(); 191 } 192 193 197 public void shutdown() { 198 log.info("Shutting down multipool " + this); 199 if (semaphore == null) return; synchronized (semaphore) { 201 try { 202 for (Iterator i = busyPool.iterator(); i.hasNext();) { 203 MultiConnection con = (MultiConnection) i.next(); 204 con.realclose(); 205 } 206 busyPool.clear(); 207 for (Iterator i = pool.iterator(); i.hasNext();) { 208 MultiConnection con = (MultiConnection) i.next(); 209 con.realclose(); 210 } 211 pool.clear(); 212 } catch (Throwable e) { 213 log.error(e); 214 } finally { 215 conMax = busyPool.size() + pool.size(); log.info("Having " + conMax + " connections now."); 217 if (conMax != 0) log.error("Still having connections!"); 218 } 219 } 220 } 221 222 226 void checkTime() { 227 228 if (log.isDebugEnabled()) { 229 log.debug("JDBC -> Starting the pool check (" + this + ") : busy=" + busyPool.size() + " free=" + pool.size()); 230 } 231 if (semaphore == null || conMax == 0 ) return; 234 synchronized (semaphore) { 235 236 int releaseCount = 0; 238 240 261 264 265 long nowTime = System.currentTimeMillis(); 266 267 for (Iterator i = busyPool.iterator(); i.hasNext();) { 268 MultiConnection con = (MultiConnection) i.next(); 269 270 boolean isClosed = true; 271 272 try { 273 isClosed = con.isClosed(); 274 } catch (SQLException e) { 275 log.warn("Could not check isClosed on connection, assuming it closed: " + e.getMessage()); 276 } 277 278 279 280 if (isClosed) { 281 MultiConnection newCon = null; 282 log.warn("WILL KILL SQL because connection was closed. ID=" + con.hashCode() + " SQL: " + con.lastSql); 283 try { 284 newCon = getMultiConnection(); 286 } catch(SQLException e) { 287 log.error("Can't add connection to pool (after close) " + e.toString()); 288 } 289 if (newCon != null) { new ConnectionCloser(con); 292 } else { 293 newCon = con; } 297 pool.add(newCon); 298 releaseCount++; 299 i.remove(); 300 301 continue; 302 } 303 304 long diff = nowTime - con.getStartTimeMillis(); 305 306 if (log.isDebugEnabled()) { 307 if (diff > 5000 || diff > maxZeroTime) { log.debug("Checking a busy connection " + con + " time = " + diff + " seconds"); 309 } 310 } 311 312 if (diff < maxZeroTime) { 313 } else if (diff < maxLifeTime) { 315 if (con.lastSql == null || con.lastSql.length() == 0) { 317 log.warn("null connection putBack " + Logging.stackTrace()); 318 pool.add(con); 319 releaseCount++; 320 i.remove(); 321 } 322 } else { 323 MultiConnection newCon = null; 325 log.warn("WILL KILL SQL. It took already " + (diff / 1000) + " seconds, which is too long. ID=" + con.hashCode() + " SQL: " + con.lastSql); 326 try { 327 newCon = getMultiConnection(); 329 } catch(SQLException e) { 330 log.error("Can't add connection to pool (after kill) " + e.toString()); 331 } 332 if (newCon != null) { pool.add(newCon); 334 releaseCount++; 335 i.remove(); 336 new ConnectionCloser(con); 338 } else { 339 } 341 } 342 } 343 344 if ((busyPool.size() + pool.size()) != conMax) { 345 log.error("Number of connections is not correct: " + busyPool.size() + " + " + pool.size () + " = " + (busyPool.size() + pool.size()) + " != " + conMax); 347 for(Iterator i = busyPool.iterator(); i.hasNext();) { 349 MultiConnection bcon = (MultiConnection) i.next(); 350 int j = pool.indexOf(bcon); 351 if (j >= 0) { 352 if (log.isDebugEnabled()) { 353 log.debug("duplicate connection found at " + j); 354 } 355 pool.remove(j); 356 } 357 } 358 359 while(((busyPool.size() + pool.size()) > conMax) && pool.size()>2) { 360 MultiConnection con = (MultiConnection) pool.remove(0); 362 if (log.isDebugEnabled()) { 363 log.debug("removing connection "+con); 364 } 365 } 366 367 } 368 semaphore.release(releaseCount); 369 } if (log.isDebugEnabled()){ 371 log.debug("finished checkTime()"); 372 } 373 } 374 375 378 MultiConnection getFree() { 379 380 MultiConnection con = null; 381 try { 382 if (semaphore == null) return null; synchronized (semaphore) { 385 totalConnections++; 386 if (conMax == 0) { try { 388 con = getMultiConnection(); con.claim(); 390 return con; 391 } catch (SQLException sqe) { 392 return null; } 394 } 395 396 semaphore.acquire(); if (log.isDebugEnabled()) { 398 log.debug("Getting free connection from pool " + pool.size()); 399 } 400 con = (MultiConnection) pool.remove(0); 401 con.claim(); 402 try { 403 if (con.isClosed()) { 404 con = getMultiConnection(); 405 con.claim(); 406 log.service("Got a closed connection from the connection Pool, it was replaced by a new one."); 407 } 408 } catch (SQLException sqe) { 409 log.error("Closed connection could not be replaced because: " + sqe.getClass().getName() + ": " + sqe.getMessage()); 410 } 411 busyPool.add(con); 412 if (log.isDebugEnabled()) { 413 log.debug("Pool: " + pool.size() + " busypool: " + busyPool.size()); 414 } 415 } 416 } catch (InterruptedException e) { 417 log.error("GetFree was Interrupted"); 418 } 419 return con; 420 } 421 422 425 void putBack(MultiConnection con) { 426 synchronized (semaphore) { 428 if (log.isDebugEnabled()) { 429 log.debug("Putting back to Pool: " + pool.size() + " from busypool: " + busyPool.size()); 430 } 431 432 if (! busyPool.contains(con)) { 433 MMBase mmb = MMBase.getMMBase(); 435 if (! mmb.isShutdown()) { 436 try { 437 if (con.isClosed()) { 438 log.debug("Connection " + con + " as closed an not in busypool, so it was removed from busyPool by checkTime"); 439 } else { 440 log.warn("Put back connection (" + con.lastSql + ") was not in busyPool!!"); 441 } 442 } catch (SQLException sqe) { 443 log.warn("Connection " + con + " not in busypool : " + sqe.getMessage()); 444 } 445 } else { 446 log.service("Connection " + con.lastSql + " was put back, but MMBase is shut down, so it was ignored."); 447 } 448 return; 449 } 450 451 452 con.release(); 454 MultiConnection oldCon = con; 455 456 if (doReconnect && (con.getUsage() > maxQueries)) { 457 log.debug("Re-oppening connection"); 458 459 boolean gotNew = false; 460 try { 461 con = getMultiConnection(); 462 gotNew = true; 463 } catch(SQLException sqe) { 464 log.error("Can't add connection to pool (during reconnect) " + sqe.toString()); 465 } 466 467 if (gotNew) { new ConnectionCloser(oldCon); 469 } else { log.service("Will continue use original connection"); 471 con.resetUsage(); 472 } 473 } 474 475 pool.add(con); 476 busyPool.remove(oldCon); 477 if (log.isDebugEnabled()) { 478 log.debug("Pool: " + pool.size() + " busypool: " + busyPool.size()); 479 } 480 semaphore.release(); 481 } 482 } 483 484 487 public int getSize() { 488 return pool.size(); 489 } 490 491 494 public int getTotalConnectionsCreated() { 495 return totalConnections; 496 } 497 498 504 public Iterator getPool() { 505 synchronized(semaphore) { 506 return new ArrayList(pool).iterator(); 507 } 508 } 509 510 511 516 517 public Iterator getBusyPool() { 518 synchronized(semaphore) { 519 return new ArrayList(busyPool).iterator(); 520 } 521 } 522 523 526 public String toString() { 527 return "dbm=" + url + ",name=" + name + ",conmax=" + conMax; 528 } 529 530 531 535 static class ConnectionCloser implements Runnable { 536 private static final Logger log = Logging.getLoggerInstance(ConnectionCloser.class); 537 538 private MultiConnection connection; 539 540 public ConnectionCloser(MultiConnection con) { 541 connection = con; 542 start(); 543 } 544 545 548 protected void start() { 549 Thread kicker = new Thread (this, "ConnectionCloser"); 551 kicker.setDaemon(true); kicker.start(); 553 } 554 555 558 public void run() { 559 try { 560 connection.realclose(); 561 } catch(Exception re) { 562 log.error("Can't close connection with ID " + connection.hashCode() + ", cause: " + re); 563 } 564 log.debug("Closed connection with ID " + connection.hashCode()); 565 } 566 } 567 } 568 | Popular Tags |