1 29 30 package com.caucho.server.cluster; 31 32 import com.caucho.config.ConfigException; 33 import com.caucho.db.jdbc.DataSourceImpl; 34 import com.caucho.util.Alarm; 35 import com.caucho.util.FreeList; 36 import com.caucho.util.L10N; 37 import com.caucho.util.Log; 38 import com.caucho.vfs.Path; 39 import com.caucho.vfs.ReadStream; 40 import com.caucho.vfs.WriteStream; 41 42 import javax.sql.DataSource ; 43 import java.io.IOException ; 44 import java.io.InputStream ; 45 import java.sql.Connection ; 46 import java.sql.PreparedStatement ; 47 import java.sql.ResultSet ; 48 import java.sql.SQLException ; 49 import java.sql.Statement ; 50 import java.util.logging.Level ; 51 import java.util.logging.Logger ; 52 53 54 57 public class FileBacking { 58 private static final L10N L = new L10N(FileBacking.class); 59 private static final Logger log = Log.open(FileBacking.class); 60 61 private FreeList<ClusterConnection> _freeConn 62 = new FreeList<ClusterConnection>(32); 63 64 private String _name; 65 66 private Path _path; 67 68 private DataSource _dataSource; 69 70 private String _tableName; 71 private String _loadQuery; 72 private String _updateQuery; 73 private String _accessQuery; 74 private String _setExpiresQuery; 75 private String _insertQuery; 76 private String _invalidateQuery; 77 private String _timeoutQuery; 78 private String _dumpQuery; 79 private String _countQuery; 80 81 84 public Path getPath() 85 { 86 return _path; 87 } 88 89 92 public void setPath(Path path) 93 { 94 _path = path; 95 } 96 97 100 public void setTableName(String table) 101 { 102 _tableName = table; 103 } 104 105 public boolean init(int clusterLength) 106 throws Exception 107 { 108 if (_path == null) 109 throw new ConfigException(L.l("file-backing needs path.")); 110 111 if (_tableName == null) 112 throw new ConfigException(L.l("file-backing needs tableName.")); 113 114 int length = clusterLength; 115 116 if (length <= 0) 117 length = 1; 118 119 _loadQuery = "SELECT access_time,data FROM " + _tableName + " WHERE id=?"; 120 _insertQuery = ("INSERT into " + _tableName + " (id,data,mod_time,access_time,expire_interval,server1,server2,server3) " + 121 "VALUES(?,?,?,?,?,?,?,?)"); 122 _updateQuery = "UPDATE " + _tableName + " SET data=?, mod_time=?, access_time=? WHERE id=?"; 123 _accessQuery = "UPDATE " + _tableName + " SET access_time=? WHERE id=?"; 124 _setExpiresQuery = "UPDATE " + _tableName + " SET expire_interval=? WHERE id=?"; 125 _invalidateQuery = "DELETE FROM " + _tableName + " WHERE id=?"; 126 127 _timeoutQuery = "DELETE FROM " + _tableName + " WHERE access_time + 5 * expire_interval / 4 < ?"; 129 130 _dumpQuery = ("SELECT id, expire_interval, data FROM " + _tableName + 131 " WHERE ? <= mod_time AND " + 132 " (?=server1 OR ?=server2 OR ?=server3)"); 133 134 _countQuery = "SELECT count(*) FROM " + _tableName; 135 136 try { 137 _path.mkdirs(); 138 } catch (IOException e) { 139 } 140 141 DataSourceImpl dataSource = new DataSourceImpl(); 142 dataSource.setPath(_path); 143 dataSource.setRemoveOnError(true); 144 dataSource.init(); 145 146 _dataSource = dataSource; 147 148 initDatabase(); 149 150 return true; 151 } 152 153 156 public DataSource getDataSource() 157 { 158 return _dataSource; 159 } 160 161 164 private void initDatabase() 165 throws Exception 166 { 167 Connection conn = _dataSource.getConnection(); 168 169 try { 170 Statement stmt = conn.createStatement(); 171 172 boolean hasDatabase = false; 173 174 try { 175 String sql = "SELECT expire_interval FROM " + _tableName + " WHERE 1=0"; 176 177 ResultSet rs = stmt.executeQuery(sql); 178 rs.next(); 179 rs.close(); 180 181 return; 182 } catch (Throwable e) { 183 log.finer(e.toString()); 184 } 185 186 try { 187 stmt.executeQuery("DROP TABLE " + _tableName); 188 } catch (Throwable e) { 189 log.log(Level.FINEST, e.toString(), e); 190 } 191 192 String sql = ("CREATE TABLE " + _tableName + " (\n" + 193 " id VARCHAR(128) PRIMARY KEY,\n" + 194 " data BLOB,\n" + 195 " expire_interval INTEGER,\n" + 196 " access_time INTEGER,\n" + 197 " mod_time INTEGER,\n" + 198 " mod_count BIGINT,\n" + 199 " server1 INTEGER,\n" + 200 " server2 INTEGER,\n" + 201 " server3 INTEGER)"); 202 203 log.fine(sql); 204 205 stmt.executeUpdate(sql); 206 } finally { 207 conn.close(); 208 } 209 } 210 211 public long start() 212 throws Exception 213 { 214 long delta = - Alarm.getCurrentTime(); 215 216 Connection conn = null; 217 try { 218 conn = _dataSource.getConnection(); 219 220 Statement stmt = conn.createStatement(); 221 222 String sql = "SELECT MAX(access_time) FROM " + _tableName; 223 224 ResultSet rs = stmt.executeQuery(sql); 225 226 if (rs.next()) 227 delta = rs.getInt(1) * 60000L - Alarm.getCurrentTime(); 228 } finally { 229 if (conn != null) 230 conn.close(); 231 } 232 233 return delta; 234 } 235 236 239 public void clearOldObjects(long maxIdleTime) 240 throws SQLException 241 { 242 Connection conn = null; 243 244 try { 245 if (maxIdleTime > 0) { 246 conn = _dataSource.getConnection(); 247 248 PreparedStatement pstmt = conn.prepareStatement(_timeoutQuery); 249 250 long now = Alarm.getCurrentTime(); 251 int nowMinute = (int) (now / 60000L); 252 253 pstmt.setInt(1, nowMinute); 254 255 int count = pstmt.executeUpdate(); 256 257 259 if (count > 0) 260 log.fine(this + " purged " + count + " old sessions"); 261 262 pstmt.close(); 263 } 264 } finally { 265 if (conn != null) 266 conn.close(); 267 } 268 } 269 270 277 public boolean loadSelf(ClusterObject clusterObj, Object obj) 278 throws Exception 279 { 280 String uniqueId = clusterObj.getUniqueId(); 281 282 ClusterConnection conn = getConnection(); 283 try { 284 PreparedStatement stmt = conn.prepareLoad(); 285 stmt.setString(1, uniqueId); 286 287 ResultSet rs = stmt.executeQuery(); 288 boolean validLoad = false; 289 290 if (rs.next()) { 291 long accessTime = rs.getInt(1) * 60000L; 293 294 InputStream is = rs.getBinaryStream(2); 295 296 if (log.isLoggable(Level.FINE)) 297 log.fine("load local object: " + uniqueId); 298 299 validLoad = clusterObj.load(is, obj); 300 301 if (validLoad) 302 clusterObj.setAccessTime(accessTime); 303 304 is.close(); 305 } 306 else if (log.isLoggable(Level.FINE)) 307 log.fine("no local object loaded for " + uniqueId); 308 else { 309 } 311 312 rs.close(); 313 314 return validLoad; 315 } finally { 316 conn.close(); 317 } 318 } 319 320 325 public void updateAccess(String uniqueId) 326 throws Exception 327 { 328 ClusterConnection conn = getConnection(); 329 330 try { 331 PreparedStatement stmt = conn.prepareAccess(); 332 333 long now = Alarm.getCurrentTime(); 334 int nowMinutes = (int) (now / 60000L); 335 stmt.setInt(1, nowMinutes); 336 stmt.setString(2, uniqueId); 337 338 int count = stmt.executeUpdate(); 339 340 if (count > 0) { 341 if (log.isLoggable(Level.FINE)) 342 log.fine("access cluster: " + uniqueId); 343 return; 344 } 345 } finally { 346 conn.close(); 347 } 348 } 349 350 355 public void setExpireInterval(String uniqueId, long expireInterval) 356 throws Exception 357 { 358 ClusterConnection conn = getConnection(); 359 360 try { 361 PreparedStatement stmt = conn.prepareSetExpireInterval(); 362 363 int expireMinutes = (int) (expireInterval / 60000L); 364 stmt.setInt(1, expireMinutes); 365 stmt.setString(2, uniqueId); 366 367 int count = stmt.executeUpdate(); 368 369 if (count > 0) { 370 if (log.isLoggable(Level.FINE)) 371 log.fine("set expire interval: " + uniqueId + " " + expireInterval); 372 return; 373 } 374 } finally { 375 conn.close(); 376 } 377 } 378 379 382 public void remove(String uniqueId) 383 throws Exception 384 { 385 ClusterConnection conn = getConnection(); 386 387 try { 388 PreparedStatement pstmt = conn.prepareInvalidate(); 389 pstmt.setString(1, uniqueId); 390 391 int count = pstmt.executeUpdate(); 392 393 if (log.isLoggable(Level.FINE)) 394 log.fine("invalidate: " + uniqueId); 395 } finally { 396 conn.close(); 397 } 398 } 399 400 403 public long read(String uniqueId, WriteStream os) 404 throws IOException 405 { 406 Connection conn = null; 407 try { 408 conn = _dataSource.getConnection(); 409 410 PreparedStatement pstmt = conn.prepareStatement(_loadQuery); 411 pstmt.setString(1, uniqueId); 412 413 ResultSet rs = pstmt.executeQuery(); 414 if (rs.next()) { 415 long accessTime = rs.getInt(1) * 60000L; 416 417 InputStream is = rs.getBinaryStream(2); 418 419 os.writeStream(is); 420 421 is.close(); 422 423 return accessTime; 424 } 425 } catch (SQLException e) { 426 log.log(Level.FINE, e.toString(), e); 427 } finally { 428 try { 429 if (conn != null) 430 conn.close(); 431 } catch (SQLException e) { 432 } 433 } 434 435 return -1; 436 } 437 438 446 public void storeSelf(String uniqueId, 447 ReadStream is, int length, 448 long expireInterval, 449 int primary, int secondary, int tertiary) 450 { 451 ClusterConnection conn = null; 452 453 try { 454 conn = getConnection(); 455 459 if (storeSelfUpdate(conn, uniqueId, is, length)) { 461 } 463 else if (storeSelfInsert(conn, uniqueId, is, length, expireInterval, 464 primary, secondary, tertiary)) { 465 } 467 else if (storeSelfUpdate(conn, uniqueId, is, length)) { 468 } 471 else { 472 log.warning(L.l("Can't store session {0}", uniqueId)); 473 } 474 } catch (SQLException e) { 475 e.printStackTrace(); 476 log.log(Level.FINE, e.toString(), e); 477 } finally { 478 if (conn != null) 479 conn.close(); 480 } 481 } 482 483 491 private boolean storeSelfUpdate(ClusterConnection conn, String uniqueId, 492 ReadStream is, int length) 493 { 494 try { 495 PreparedStatement stmt = conn.prepareUpdate(); 496 stmt.setBinaryStream(1, is, length); 497 498 long now = Alarm.getCurrentTime(); 499 int nowMinutes = (int) (now / 60000L); 500 stmt.setInt(2, nowMinutes); 501 stmt.setInt(3, nowMinutes); 502 stmt.setString(4, uniqueId); 503 504 int count = stmt.executeUpdate(); 505 506 if (count > 0) { 507 if (log.isLoggable(Level.FINE)) 508 log.fine("update cluster: " + uniqueId + " length:" + length); 509 510 return true; 511 } 512 } catch (SQLException e) { 513 log.log(Level.WARNING, e.toString(), e); 514 } 515 516 return false; 517 } 518 519 private boolean storeSelfInsert(ClusterConnection conn, String uniqueId, 520 ReadStream is, int length, 521 long expireInterval, 522 int primary, int secondary, int tertiary) 523 { 524 try { 525 PreparedStatement stmt = conn.prepareInsert(); 526 527 stmt.setString(1, uniqueId); 528 529 stmt.setBinaryStream(2, is, length); 530 531 int nowMinutes = (int) (Alarm.getCurrentTime() / 60000L); 532 533 stmt.setInt(3, nowMinutes); 534 stmt.setInt(4, nowMinutes); 535 stmt.setInt(5, (int) (expireInterval / 60000L)); 536 537 stmt.setInt(6, primary); 538 stmt.setInt(7, secondary); 539 stmt.setInt(8, tertiary); 540 541 stmt.executeUpdate(); 542 543 if (log.isLoggable(Level.FINE)) 544 log.fine("insert cluster: " + uniqueId + " length:" + length); 545 546 return true; 547 } catch (SQLException e) { 548 System.out.print(e); 549 550 log.log(Level.FINE, e.toString(), e); 551 } 552 553 return false; 554 } 555 556 560 public long getObjectCount() 561 throws SQLException 562 { 563 ClusterConnection conn = getConnection(); 564 565 try { 566 PreparedStatement stmt = conn.prepareCount(); 567 568 ResultSet rs = stmt.executeQuery(); 569 570 if (rs != null && rs.next()) { 571 long value = rs.getLong(1); 572 rs.close(); 573 return value; 574 } 575 576 return -1; 577 } catch (SQLException e) { 578 e.printStackTrace(); 579 log.log(Level.FINE, e.toString(), e); 580 } finally { 581 conn.close(); 582 } 583 584 return -1; 585 } 586 587 public void destroy() 588 { 589 _dataSource = null; 590 _freeConn = null; 591 } 592 593 private ClusterConnection getConnection() 594 throws SQLException 595 { 596 ClusterConnection cConn = _freeConn.allocate(); 597 598 if (cConn == null) { 599 Connection conn = _dataSource.getConnection(); 600 cConn = new ClusterConnection(conn); 601 } 602 603 return cConn; 604 } 605 606 public String serverNameToTableName(String serverName) 607 { 608 if (serverName == null) 609 return "srun"; 610 611 StringBuilder cb = new StringBuilder (); 612 cb.append("srun_"); 613 614 for (int i = 0; i < serverName.length(); i++) { 615 char ch = serverName.charAt(i); 616 617 if ('a' <= ch && ch <= 'z') { 618 cb.append(ch); 619 } 620 else if ('A' <= ch && ch <= 'Z') { 621 cb.append(ch); 622 } 623 else if ('0' <= ch && ch <= '9') { 624 cb.append(ch); 625 } 626 else if (ch == '_') { 627 cb.append(ch); 628 } 629 else 630 cb.append('_'); 631 } 632 633 return cb.toString(); 634 } 635 636 public String toString() 637 { 638 return "ClusterStore[" + _name + "]"; 639 } 640 641 class ClusterConnection { 642 private Connection _conn; 643 644 private PreparedStatement _loadStatement; 645 private PreparedStatement _updateStatement; 646 private PreparedStatement _insertStatement; 647 private PreparedStatement _accessStatement; 648 private PreparedStatement _setExpiresStatement; 649 private PreparedStatement _invalidateStatement; 650 private PreparedStatement _timeoutStatement; 651 private PreparedStatement _countStatement; 652 653 ClusterConnection(Connection conn) 654 { 655 _conn = conn; 656 } 657 658 PreparedStatement prepareLoad() 659 throws SQLException 660 { 661 if (_loadStatement == null) 662 _loadStatement = _conn.prepareStatement(_loadQuery); 663 664 return _loadStatement; 665 } 666 667 PreparedStatement prepareUpdate() 668 throws SQLException 669 { 670 if (_updateStatement == null) 671 _updateStatement = _conn.prepareStatement(_updateQuery); 672 673 return _updateStatement; 674 } 675 676 PreparedStatement prepareInsert() 677 throws SQLException 678 { 679 if (_insertStatement == null) 680 _insertStatement = _conn.prepareStatement(_insertQuery); 681 682 return _insertStatement; 683 } 684 685 PreparedStatement prepareAccess() 686 throws SQLException 687 { 688 if (_accessStatement == null) 689 _accessStatement = _conn.prepareStatement(_accessQuery); 690 691 return _accessStatement; 692 } 693 694 PreparedStatement prepareSetExpireInterval() 695 throws SQLException 696 { 697 if (_setExpiresStatement == null) 698 _setExpiresStatement = _conn.prepareStatement(_setExpiresQuery); 699 700 return _setExpiresStatement; 701 } 702 703 PreparedStatement prepareInvalidate() 704 throws SQLException 705 { 706 if (_invalidateStatement == null) 707 _invalidateStatement = _conn.prepareStatement(_invalidateQuery); 708 709 return _invalidateStatement; 710 } 711 712 PreparedStatement prepareCount() 713 throws SQLException 714 { 715 if (_countStatement == null) 716 _countStatement = _conn.prepareStatement(_countQuery); 717 718 return _countStatement; 719 } 720 721 void close() 722 { 723 _freeConn.free(this); 724 } 725 } 726 } 727 | Popular Tags |