1 46 47 package org.mr.core.persistent.db; 48 49 import java.io.ByteArrayInputStream ; 50 import java.io.IOException ; 51 import java.io.InputStream ; 52 import java.nio.ByteBuffer ; 53 import java.sql.Blob ; 54 import java.sql.Connection ; 55 import java.sql.DriverManager ; 56 import java.sql.PreparedStatement ; 57 import java.sql.ResultSet ; 58 import java.sql.SQLException ; 59 import java.sql.Statement ; 60 import java.util.Set ; 61 import java.util.HashSet ; 62 import java.util.ArrayList ; 63 import java.util.Iterator ; 64 65 import org.apache.commons.logging.Log; 66 import org.apache.commons.logging.LogFactory; 67 import org.mr.MantaAgent; 68 import org.mr.kernel.services.topics.VirtualTopicManager; 69 import org.mr.core.configuration.ConfigManager; 70 import org.mr.core.persistent.PersistentConst; 71 import org.mr.core.persistent.PersistentManager; 72 import org.mr.core.util.byteable.ByteBufferFactory; 73 import org.mr.core.util.byteable.Byteable; 74 import org.mr.core.util.byteable.ByteableInputStream; 75 import org.mr.core.util.byteable.ByteableOutputStream; 76 import org.mr.core.util.StringUtils; 77 78 87 public class DBPersistencyProxy { 88 private static DBPersistencyProxy instance = null; 89 90 private Connection connection; 91 private Log log; 92 private String lookupTable; 93 private String dataTable; 94 private String tableSuffix; 95 private ByteBufferFactory pool; 96 private ByteableInputStream bistream; 97 98 private String dbUser; 99 private String dbPasswd; 100 private String dbURL; 101 102 private PreparedStatement stmtGetID; 104 private PreparedStatement stmtKeyCount; 105 private PreparedStatement stmtGetKeys; 106 private PreparedStatement stmtGetValue; 107 private PreparedStatement stmtInsertID; 108 private PreparedStatement stmtDeleteValue; 109 private PreparedStatement stmtSaveValue; 110 private PreparedStatement stmtDeleteAll; 111 112 private PreparedStatement stmtGetAllServices; 113 114 private DBPersistencyProxy() { 115 this.log = LogFactory.getLog("DBPersistencyProxy"); 116 this.pool = PersistentConst.getPersistentByteBufferPool(); 117 this.bistream = new ByteableInputStream(); 118 119 ConfigManager config = MantaAgent.getInstance().getSingletonRepository().getConfigManager(); 120 String dbDriver; 122 123 dbUser = config.getStringProperty("persistency.db.user", "manta"); 127 dbPasswd = config.getStringProperty("persistency.db.password", "manta"); 128 dbDriver = config.getStringProperty("persistency.db.driver", "com.mysql.jdbc.Driver"); 129 dbURL = null; 130 131 if (dbDriver == null) { 132 if (this.log.isErrorEnabled()) { 133 this.log.error("Persistency DB Driver is not set (config " + 134 "option persistency.db.driver)"); 135 this.log.error("DBPersistencyProxy is disabled."); 136 } 137 return; 138 } 139 140 dbURL = config.getStringProperty("persistency.db.url", 142 "jdbc:mysql://localhost/"); 143 if (dbURL == null) { 144 if (this.log.isErrorEnabled()) { 145 this.log.error("Persistency DB URL is not set " + 146 "(config option persistency.db.url)"); 147 150 this.log.error("DBPersistencyProxy is disabled."); 151 } 152 return; 153 } 154 161 162 try { 164 Class.forName(dbDriver); 165 } catch (Exception e) { 166 if (this.log.isErrorEnabled()) { 167 this.log.error("Error loading DB driver: " + e.getMessage()); 168 this.log.error("DBPersistencyProxy is disabled."); 169 } 170 return; 171 } 172 173 try { 174 this.connection = DriverManager.getConnection(dbURL, dbUser, 175 dbPasswd); 176 } catch (SQLException e) { 177 if (this.log.isErrorEnabled()) { 178 this.log.error("Error connecting to DB (" + dbURL + "): " + 179 e.getMessage()); 180 this.log.error("DBPersistencyProxy is disabled."); 181 } 182 return; 183 } 184 185 MantaAgent agent = MantaAgent.getInstance(); 186 String defaultVal = agent.getAgentName()+"_"+agent.getDomainName(); 187 188 defaultVal = defaultVal.replace('.', '_'); 190 191 tableSuffix = 192 config.getStringProperty("persistency.db.table_suffix", 193 defaultVal); 194 tableSuffix = tableSuffix.trim(); 195 if (tableSuffix.equals("")) { 196 tableSuffix = defaultVal; 197 } 198 try { 199 ensureSchema(tableSuffix); 200 } catch (SQLException e) { 201 if (this.log.isErrorEnabled()) { 202 this.log.error("Error creating persistency tables: " + 203 e.getMessage()); 204 } 205 this.log.error("DBPersistencyProxy is disabled."); 206 this.connection = null; 207 } 208 try { 209 preparePreparedStatements(); 210 } catch (SQLException e) { 211 if (this.log.isErrorEnabled()) { 212 this.log.error("Error creating statements: " + 213 e.getMessage()); 214 } 215 this.log.error("DBPersistencyProxy is disabled."); 216 this.connection = null; 217 } 218 } 220 private void ensureSchema(String tableSuffix) throws SQLException { 221 if (this.dbURL.startsWith("jdbc:mysql")) { 222 ensureSchemaMySQL(tableSuffix); 223 } else if (this.dbURL.startsWith("jdbc:oracle")) { 224 ensureSchemaOracle(tableSuffix); 225 } else if (this.dbURL.startsWith("jdbc:sqlserver")) { 226 ensureSchemaSQLServer(tableSuffix); 227 } else { 228 if (this.log.isErrorEnabled()) { 229 this.log.error("Unsupported DB: " + this.dbURL); 230 } 231 } 232 } 233 234 237 private void ensureSchemaOracle(String tableSuffix) throws SQLException { 238 Statement stmt = null; 239 240 this.lookupTable = "prsl_" + tableSuffix; 241 this.dataTable = "prsd_" + tableSuffix; 242 243 try { 245 stmt = this.connection.createStatement(); 246 stmt.executeQuery("SELECT count(*) FROM " + lookupTable); 247 } catch (SQLException e) { 248 if (e.getErrorCode() == 942) { createSchemaOracle(stmt); 250 } 251 } finally { 252 if (stmt != null) { 253 stmt.close(); 254 } 255 } 256 } 257 258 private void createSchemaOracle(Statement stmt) throws SQLException { 259 stmt.executeUpdate("CREATE TABLE " + lookupTable + 260 "(object_id INTEGER NOT NULL " + 261 " UNIQUE," + 262 " object_name VARCHAR2(4000)," + 263 " PRIMARY KEY (object_name))"); 264 stmt.executeUpdate("CREATE SEQUENCE " + lookupTable + "_seq"); 265 stmt.executeUpdate("CREATE TRIGGER " + lookupTable + "_tri " + 266 "BEFORE INSERT ON " + lookupTable + " FOR EACH " + 267 "ROW BEGIN SELECT " + lookupTable + 268 "_seq.nextval INTO :new.object_id FROM DUAL; " + 269 "END;"); 270 stmt.executeUpdate("CREATE TABLE " + dataTable + 271 "(object_id INTEGER NOT NULL," + 272 " keyx INTEGER NOT NULL," + 273 " value BLOB," + 274 " PRIMARY KEY (object_id, keyx)," + 275 " FOREIGN KEY (object_id) REFERENCES " + 276 lookupTable + "(object_id))"); 277 } 278 279 282 private void ensureSchemaSQLServer(String tableSuffix) 283 throws SQLException 284 { 285 Statement stmt = null; 286 287 this.lookupTable = "prsl_" + tableSuffix; 288 this.dataTable = "prsd_" + tableSuffix; 289 290 try { 292 stmt = this.connection.createStatement(); 293 stmt.executeQuery("SELECT count(*) FROM " + lookupTable); 294 } catch (SQLException e) { 295 if (e.getErrorCode() == 208) { createSchemaSQLServer(stmt); 297 } 298 } finally { 299 if (stmt != null) { 300 stmt.close(); 301 } 302 } 303 } 304 305 private void createSchemaSQLServer(Statement stmt) throws SQLException { 306 stmt.executeUpdate("CREATE TABLE " + lookupTable + 307 "(object_id INTEGER IDENTITY NOT NULL " + 308 " UNIQUE," + 309 " object_name VARCHAR(4000)," + 310 " PRIMARY KEY (object_name))"); 311 stmt.executeUpdate("CREATE TABLE " + dataTable + 312 "(object_id INTEGER NOT NULL," + 313 " keyx INTEGER NOT NULL," + 314 " value IMAGE," + 315 " PRIMARY KEY (object_id, keyx)," + 316 " FOREIGN KEY (object_id) REFERENCES " + 317 lookupTable + "(object_id))"); 318 } 319 320 private void ensureSchemaMySQL(String tableSuffix) throws SQLException { 321 Statement stmt = this.connection.createStatement(); 322 323 this.lookupTable = "manta.persist_lookup_" + tableSuffix; 324 this.dataTable = "manta.persist_data_" + tableSuffix; 325 326 stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS manta;"); 327 328 stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + lookupTable + 329 "(object_id INT UNSIGNED NOT NULL AUTO_INCREMENT " + 330 " UNIQUE," + 331 " object_name TEXT," + 332 " KEY (object_name(32)));"); 333 stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + dataTable + 334 "(object_id INT UNSIGNED NOT NULL," + 335 " keyx INT NOT NULL," + 336 " value BLOB," + 337 " KEY (object_id, keyx));"); 338 } 339 340 public static synchronized DBPersistencyProxy getInstance() { 341 if (instance == null) { 342 instance = new DBPersistencyProxy(); 343 } 344 return instance; 345 } 346 347 public synchronized int[] getKeys(String name) { 348 if (this.connection == null) { 349 return null; 350 } 351 352 int[] keys = null; 353 try { 354 int objectID = getObjectID(name, false); 355 int rowCount = 0; 356 357 if (objectID == -1) { 358 return new int[0]; 359 } 360 this.stmtKeyCount.setInt(1, objectID); 361 ResultSet results = this.stmtKeyCount.executeQuery(); 362 if (results.next()) { 363 rowCount = results.getInt(1); 364 } 365 366 keys = new int[rowCount]; 367 this.stmtGetKeys.setInt(1, objectID); 368 results = this.stmtGetKeys.executeQuery(); 369 370 int i = 0; 371 while (results.next() && i < keys.length) { 372 keys[i++] = results.getInt(1); 373 } 374 } catch (SQLException e) { 375 if (this.log.isErrorEnabled()) { 376 this.log.error("Error in getKeys(" + name + "): " + 377 e.getMessage() + " Trying to reconnect."); 378 } 379 380 if (ensureConnection()) { 382 return getKeys(name); 383 } 384 } 385 return keys; 386 } 387 388 public synchronized Object getPersistentObject(String name, int i) { 389 if (this.connection == null) { 390 return null; 391 } 392 393 Byteable result = null; 394 try { 395 ByteBuffer buffer = getPersistentBuffer(name, i); 396 this.bistream.setUnderLine(buffer); 397 result = this.bistream.readByteable(); 398 this.pool.release(buffer); 399 } catch (Exception e) { 400 if(log.isErrorEnabled()) { 401 log.error("Could not recover " + name + "/" + i + ": " + 402 e.getMessage() + " Trying to reconnect."); 403 } 404 if (ensureConnection()) { 406 return getPersistentBuffer(name, i); 407 } 408 } 409 return result; 410 } 411 412 public synchronized ByteBuffer getPersistentBuffer(String name, int i) { 413 if (this.connection == null) { 414 return null; 415 } 416 417 try { 418 ResultSet results; 419 int objectID = getObjectID(name, false); 420 421 this.stmtGetValue.setInt(1, objectID); 422 this.stmtGetValue.setInt(2, i); 423 results = this.stmtGetValue.executeQuery(); 424 425 if (results.next()) { 426 Blob blob = results.getBlob(1); 427 int length = (int) blob.length(); 428 InputStream istream = blob.getBinaryStream(); 429 ByteBuffer buffer = this.pool.getBuffer(length); 430 byte[] array = buffer.array(); 431 int offset = buffer.arrayOffset(); 432 int bytesRead = 0; 433 434 try { 435 while (bytesRead < length) { 436 int chunk = istream.read(array, offset + bytesRead, 437 length - bytesRead); 438 if (chunk < 0) { 439 if (this.log.isErrorEnabled()) { 440 this.log.error("Permature EOF when reading " + 441 name + "/" + i); 442 } 443 this.pool.release(buffer); 444 buffer = null; 445 break; 446 } 447 bytesRead += chunk; 448 } 449 } catch (IOException e) { 450 if (this.log.isErrorEnabled()) { 451 this.log.error("I/O error while reading " + name + 452 "/" + i + ": " + e.getMessage()); 453 } 454 this.pool.release(buffer); 455 buffer = null; 456 return buffer; 457 } 458 459 buffer.limit(length); 460 buffer.position(0); 461 return buffer; 462 } else { 463 if (this.log.isErrorEnabled()) { 464 this.log.error("Cannot find entry " + name + "/" + i); 465 } 466 return null; 467 } 468 } catch (SQLException e) { 469 if (this.log.isErrorEnabled()) { 470 this.log.error("Error in getPersistentBuffer(" + name + 471 "): " + e.getMessage() + 472 " Trying to reconnect"); 473 } 474 if (ensureConnection()) { 476 return getPersistentBuffer(name, i); 477 } else { 478 return null; 479 } 480 } 481 } 482 483 public synchronized void deletePersistentObject(String name, int i) { 484 if (this.connection == null) { 485 return; 486 } 487 488 try { 489 int objectID = getObjectID(name, false); 490 realDeletePersistentObject(objectID, i); 491 } catch (SQLException e) { 492 if (this.log.isErrorEnabled()) { 493 this.log.error("Error in deletePersistentObject(" + name + 494 "): " + e.getMessage() + 495 " Trying to reconnect."); 496 } 497 if (ensureConnection()) { 499 deletePersistentObject(name, i); 500 } 501 } 502 } 503 504 public synchronized void savePersistentBuffer(String name, int i, 505 ByteBuffer buffer) 506 throws IOException 507 { 508 if (this.connection == null) { 509 return; 510 } 511 try { 512 int objectID = getObjectID(name, true); 513 InputStream istream = 514 new ByteArrayInputStream (buffer.array(), buffer.arrayOffset(), 515 buffer.remaining()); 516 517 realDeletePersistentObject(objectID, i); 518 this.stmtSaveValue.setInt(1, objectID); 519 this.stmtSaveValue.setInt(2, i); 520 this.stmtSaveValue.setBinaryStream(3, istream, buffer.remaining()); 521 this.stmtSaveValue.executeUpdate(); 522 } catch (SQLException e) { 523 if (this.log.isErrorEnabled()) { 524 this.log.error("Error in savePersistentBuffer(" + name + 525 "): " + e.getMessage() + 526 " Trying to reconnect."); 527 } 528 if (ensureConnection()) { 530 savePersistentBuffer(name, i, buffer); 531 } else { 532 if (this.log.isErrorEnabled()) { 533 this.log.error("Error in savePersistentBuffer(" + name + 534 "): " + e.getMessage()); 535 } 536 throw new IOException (e.getMessage()); 537 } 538 } 539 } 540 541 public synchronized void savePersistentObject(String name, int key, 542 Byteable object) 543 throws IOException 544 { 545 if (this.connection == null) { 546 return; 547 } 548 549 try { 550 ByteableOutputStream ostream = new ByteableOutputStream(this.pool); 551 ostream.writeByteable(object); 552 savePersistentBuffer(name, key, ostream.getByteBuffer()); 553 ostream.release(); 554 } catch (IOException e) { 555 if (this.log.isErrorEnabled()) { 556 this.log.error("Error in savePersistentBuffer(" + name + 557 "): " + e.getMessage() + 558 " Trying to reconnect."); 559 } 560 if (ensureConnection()) { 562 savePersistentObject(name, key, object); 563 } 564 } 565 } 566 567 public synchronized void clearStorage(String name) throws IOException { 568 if (this.connection == null) { 569 return; 570 } 571 572 try { 573 int objectID = getObjectID(name, false); 574 this.stmtDeleteAll.setInt(1, objectID); 575 this.stmtDeleteAll.executeUpdate(); 576 } catch (SQLException e) { 577 if (this.log.isErrorEnabled()) { 578 this.log.error("Error in clearStorage(): " + 579 e.getMessage() + " Trying to reconnect."); 580 } 581 if (ensureConnection()) { 583 clearStorage(name); 584 } else { 585 throw new IOException (e.getMessage()); 586 } 587 } 588 } 589 590 public synchronized Set getNames() throws SQLException { 591 if (this.connection == null) { 592 return null; 593 } 594 595 Set names = new HashSet (); 596 StringBuffer sql = new StringBuffer (); 597 Statement stmt = null; 598 599 try { 600 sql.append("SELECT object_name FROM ").append(lookupTable); 601 stmt = this.connection.createStatement(); 602 ResultSet results = stmt.executeQuery(sql.toString()); 603 while (results.next()) { 604 String name = results.getString(1); 605 names.add(name); 606 } 607 608 return names; 609 } finally { 610 if (stmt != null) { 611 stmt.close(); 612 } 613 } 614 } 615 616 private int getObjectID(String objectName, boolean create) 617 throws SQLException 618 { 619 ResultSet results; 620 621 this.stmtGetID.setString(1, objectName); 622 results = this.stmtGetID.executeQuery(); 623 if (results.next()) { 624 return results.getInt(1); 625 } else if (create) { 626 newObjectID(objectName); 627 return getObjectID(objectName, false); 628 } else { 629 return -1; 630 } 631 } 632 633 private void newObjectID(String objectName) throws SQLException { 634 this.stmtInsertID.setString(1, objectName); 635 this.stmtInsertID.executeUpdate(); 636 } 637 638 private void realDeletePersistentObject(int objectID, int i) 639 throws SQLException 640 { 641 this.stmtDeleteValue.setInt(1, objectID); 642 this.stmtDeleteValue.setInt(2, i); 643 this.stmtDeleteValue.executeUpdate(); 644 } 645 646 private void preparePreparedStatements() throws SQLException { 647 this.stmtGetID = 648 connection.prepareStatement("SELECT object_id FROM " + 649 this.lookupTable + 650 " WHERE object_name = ?"); 651 this.stmtKeyCount = 652 connection.prepareStatement("SELECT count(*) FROM " + 653 this.dataTable + 654 " WHERE object_id = ?"); 655 this.stmtGetKeys = 656 connection.prepareStatement("SELECT keyx FROM " + this.dataTable + 657 " WHERE object_id = ? ORDER BY keyx"); 658 this.stmtGetValue = 659 connection.prepareStatement("SELECT value FROM " + this.dataTable + 660 " WHERE object_id = ? AND keyx = ?"); 661 this.stmtInsertID = 662 connection.prepareStatement("INSERT INTO " + this.lookupTable + 663 " (object_name) VALUES (?)"); 664 this.stmtDeleteValue = 665 connection.prepareStatement("DELETE FROM " + this.dataTable + 666 " WHERE object_id = ? AND keyx = ?"); 667 this.stmtSaveValue = 668 connection.prepareStatement("INSERT INTO " + this.dataTable + 669 " (object_id, keyx, value) VALUES " + 670 "(?, ? ,?)"); 671 this.stmtDeleteAll = 672 connection.prepareStatement("DELETE FROM " + this.dataTable + 673 " WHERE object_id = ?"); 674 675 String lookupTable = "manta.persist_lookup_" + tableSuffix; 676 this.stmtGetAllServices = 677 connection.prepareStatement("SELECT object_name FROM " + lookupTable); 678 } 679 680 private boolean ensureConnection() { 681 final int MAX_CONNECT = 100; 682 683 try { 685 this.connection.close(); 686 } catch (SQLException e) {} 687 688 int counter; 689 long interval = 500; 690 if (this.log.isInfoEnabled()) { 691 this.log.info("Trying to reconnect to DB..."); 692 } 693 for (counter = 1; counter <= MAX_CONNECT; counter++) { 694 try { 695 this.connection = 696 DriverManager.getConnection(dbURL, dbUser, dbPasswd); 697 if (this.log.isInfoEnabled()) { 698 this.log.info("DB reconnect succeeded (retries: " + 699 counter + ")"); 700 } 701 preparePreparedStatements(); 702 return true; 703 } catch (SQLException e) {} 704 try { 705 Thread.sleep(interval); 706 } catch (InterruptedException e) {} 707 interval = (long) (1.5 * interval); 708 } 709 if (this.log.isErrorEnabled()) { 710 this.log.error("DB reconnect FAILED (retries: " + 711 counter + "). Persistency operation will FAIL."); 712 } 713 714 return false; 715 } 716 717 public String [] getAllServices() { 718 HashSet set = new HashSet (); 719 try { 720 ResultSet results = this.stmtGetAllServices.executeQuery(); 721 while (results.next()) { 722 set.add(results.getString(1)); 723 } 724 725 } catch (SQLException e) { 726 if (this.log.isErrorEnabled()) { 727 log.error("failed to retrieve services",e); 728 } 729 return new String [0]; 730 731 } 732 String delimiter = MantaAgent.getInstance().getSingletonRepository(). 733 getConfigManager().getStringProperty("persistency.hierarchy_delimiter","~"); 734 Iterator itr = set.iterator(); 735 HashSet ready = new HashSet (); 736 while (itr.hasNext()) { 737 String tmp = (String ) itr.next(); 738 if (tmp.startsWith(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX)){ 739 ready.add(StringUtils.replace(tmp.substring(PersistentManager.SUBSCRIBERS_PERSISTENT_PREFIX.length()) 740 ,delimiter, VirtualTopicManager.HIERARCHICAL_TOPIC_DELIMITER)); 741 } 742 } 743 return (String []) ready.toArray(new String [0]); 744 } 745 } | Popular Tags |