1 package org.jgroups.persistence; 2 3 12 13 14 import org.apache.commons.logging.Log; 15 import org.apache.commons.logging.LogFactory; 16 17 import java.io.*; 18 import java.sql.*; 19 import java.util.*; 20 21 22 25 public class DBPersistenceManager implements PersistenceManager { 26 27 protected final Log log=LogFactory.getLog(this.getClass()); 28 29 34 public DBPersistenceManager(String filename) throws Exception { 35 String home_dir = null; 36 37 try { 39 home_dir = System.getProperty("user.home"); 40 } 41 catch (SecurityException ex1) { 42 } 43 44 try { 46 home_dir=home_dir + '/' + filename; 47 init(new FileInputStream(home_dir)); 48 return; 49 } 50 catch(Exception ex) { 51 ; 52 } 53 54 try { 56 InputStream in=DBPersistenceManager.class.getResourceAsStream('/' + filename); 57 if(in != null) { 58 init(in); 59 return; 60 } 61 } 62 catch(Exception x) { 63 if(log.isErrorEnabled()) log.error("failed reading database properties from " + filename + ", exception=" + x); 64 } 65 66 try { 68 home_dir=System.getProperty("persist.properties"); 69 init(new FileInputStream(home_dir)); 70 return; 71 } 72 catch(Exception ex) { 73 ; 74 } 75 76 throw new Exception ("DBPersistenceManager.DBPersistenceManager(): " + 78 "failed reading database properties from " + filename); 79 } 80 81 82 87 public DBPersistenceManager(InputStream input) throws Exception { 88 init(input); 89 } 90 91 92 99 protected void init(InputStream in) throws Exception { 100 list=new Vector(); 101 readProps(in); 102 loadDriver(); 103 104 Connection conn=this.getConnection(); 106 this.closeConnection(conn); 107 createDBTables(); 108 retrieveAll(); System.err.println(" Done constructing DB Persist Manager"); 110 } 111 112 113 117 118 125 public void save(Serializable key, Serializable val) throws CannotPersistException { 126 if(!entryExists(key)) { 128 System.err.println(" entry doesnt exist for " + key.toString()); 129 try { 130 addNewEntry(key, val); 131 list.add(key.toString()); 132 return; 133 } 134 catch(Throwable t1) { 135 t1.printStackTrace(); 136 throw new CannotPersistException(t1, " error adding a completely new entry in to DB "); 138 } 139 } 141 Connection conn=null; 143 PreparedStatement prepStat=null; 144 try { 145 conn=this.getConnection(); 146 String keyStr=null; 147 keyStr=key.toString(); 148 byte[] keyBytes=getBytes(key); 149 byte[] valBytes=getBytes(val); 150 System.err.println(" value is " + val); 151 prepStat=conn.prepareStatement(updateStat); 153 prepStat.setString(3, keyStr); 154 prepStat.setBytes(1, keyBytes); 155 prepStat.setBytes(2, valBytes); 156 prepStat.executeQuery(); 157 } 158 catch(Throwable t) { 159 t.printStackTrace(); 161 throw new CannotPersistException(t, "error updating an existing entry in to the database "); 163 } 164 finally { 166 try { 167 if(prepStat != null) prepStat.close(); 168 this.closeConnection(conn); 169 } 170 catch(Throwable t) { 171 conn=null; 173 prepStat=null; 174 } 175 } 176 } 177 178 179 185 public Serializable remove(Serializable key) throws CannotRemoveException { 186 Connection conn=null; 187 Statement stat=null; 188 PreparedStatement prepStat=null; 189 ResultSet set=null; 190 Serializable val=null; 191 192 try { 193 conn=this.getConnection(); 194 stat=conn.createStatement(); 195 String exQuery=" select * from replhashmap where key like '" + key.toString() + '\''; 196 set=stat.executeQuery(exQuery); 197 set.next(); 198 val=getSerializable(set.getBinaryStream(3)); 199 } 200 catch(Throwable t3) { 201 t3.printStackTrace(); 203 throw new CannotRemoveException(t3, " Error retrieving value for given key"); 204 } 205 finally { 206 try { 207 if(prepStat != null) prepStat.close(); 208 this.closeConnection(conn); 209 } 210 catch(Throwable t) { 211 conn=null; 213 prepStat=null; 214 } 215 } 216 217 218 try { 219 conn=this.getConnection(); 220 prepStat=conn.prepareStatement(removeStat); 221 prepStat.setString(1, key.toString()); 222 prepStat.executeQuery(); 223 list.remove(key.toString()); 224 } 225 catch(Throwable t) { 226 t.printStackTrace(); 228 throw new CannotRemoveException(t, "Could not remove existing entry due to error in jdbc transaction"); 230 } 231 232 finally { 234 try { 235 set.close(); 236 stat.close(); 237 if(prepStat != null) prepStat.close(); 238 this.closeConnection(conn); 239 } 240 catch(Throwable t) { 241 conn=null; 243 stat=null; 244 } } return val; 247 } 249 250 255 public synchronized void saveAll(Map map) throws CannotPersistException { 256 Iterator iter=null; 257 try { 258 Set keySet=map.keySet(); 259 iter=keySet.iterator(); 260 } 261 catch(Throwable t) { 262 t.printStackTrace(); 263 throw new CannotPersistException(t, "Error with the map entered to saveAll"); 265 } 266 267 while(iter.hasNext()) { 269 try { 270 Serializable key=(Serializable) iter.next(); 271 Serializable val=(Serializable) map.get(key); 272 273 this.save(key, val); 275 } 276 catch(Throwable t2) { 277 t2.printStackTrace(); 278 continue; 280 } 281 } } 284 285 290 public synchronized Map retrieveAll() throws CannotRetrieveException { 291 Connection conn=null; 292 Statement stat=null; 293 ResultSet set=null; 294 Map map=null; 295 try { 296 conn=this.getConnection(); 297 stat=conn.createStatement(); 298 set=stat.executeQuery(" select * from replhashmap"); 299 map=retrieveAll(set); 300 } 301 catch(Throwable t) { 302 throw new CannotRetrieveException(t, "Error happened while querying the database for bulk retrieve, try starting DB manually"); 304 } 305 306 307 try { 309 stat.close(); 310 this.closeConnection(conn); 311 } 312 catch(Throwable t1) { 313 } 316 317 return map; 318 } 320 321 326 private Map retrieveAll(ResultSet result) throws Exception { 327 HashMap map=new HashMap(); 328 while(result.next()) { 329 InputStream inputStrKey=result.getBinaryStream(2); 330 InputStream inputStrVal=result.getBinaryStream(3); 331 Serializable key=getSerializable(inputStrKey); 332 Serializable val=getSerializable(inputStrVal); 333 map.put(key, val); 334 list.add(key.toString()); 335 } return map; 337 } 338 339 340 344 public void clear() throws CannotRemoveException { 345 Connection conn=null; 346 Statement stat=null; 347 try { 348 conn=this.getConnection(); 349 stat=conn.createStatement(); 350 stat.executeQuery("delete from replhashmap"); 351 } 352 catch(Throwable t) { 353 throw new CannotRemoveException(t, " delete all query failed with existing database"); 355 } 356 357 try { 359 stat.close(); 360 this.closeConnection(conn); 361 } 362 catch(Throwable t) { 363 conn=null; 364 stat=null; 365 } 366 } 367 368 369 372 public void shutDown() { 373 } 376 377 378 379 382 383 389 private void addNewEntry(Serializable key, Serializable val) throws CannotPersistException, CannotConnectException { 390 Connection conn=getConnection(); 391 try { 392 PreparedStatement prepStat=conn.prepareStatement(insertStat); 393 prepStat.setString(1, key.toString()); 394 byte[] keyBytes=getBytes(key); 395 byte[] valBytes=getBytes(val); 396 prepStat.setBytes(2, keyBytes); 399 prepStat.setBytes(3, valBytes); 400 prepStat.executeQuery(); 403 conn.commit(); 404 System.err.println(" executing insert " + insertStat); 405 } 406 catch(Throwable t) { 407 t.printStackTrace(); 409 throw new CannotPersistException(t, "error adding new entry using creating Db connection and schema"); 411 } 412 } 414 415 421 private java.io.InputStream getBinaryInputStream(Serializable ser) throws Exception { 422 ByteArrayOutputStream stream=new ByteArrayOutputStream(); 423 ObjectOutputStream keyoos=new ObjectOutputStream(stream); 424 keyoos.writeObject(ser); 425 ByteArrayInputStream pipe=new ByteArrayInputStream(stream.toByteArray()); 426 return pipe; 427 } 429 430 436 private Serializable getSerializable(java.io.InputStream stream) throws Exception { 437 ObjectInputStream ooStr=new ObjectInputStream(stream); 438 Serializable tmp=(Serializable) ooStr.readObject(); 439 return tmp; 440 } 441 442 443 449 private void addNewEntryGen(Serializable key, Serializable val) throws CannotPersistException, CannotConnectException { 450 Connection conn=getConnection(); 451 try { 452 PreparedStatement prepStat=conn.prepareStatement(insertStat); 453 prepStat.setString(1, key.toString()); 454 prepStat.setBytes(2, getBytes(key)); 455 prepStat.setBytes(3, getBytes(val)); 456 prepStat.executeUpdate(); 457 } 458 catch(Throwable t) { 459 throw new CannotPersistException(t, "error adding new entry using creating Db connection and schema"); 461 } 462 } 464 470 private void addNewEntryOra(Serializable key, Serializable val) throws CannotPersistException, CannotConnectException { 471 Connection conn=getConnection(); 472 try { 473 PreparedStatement prepStat=conn.prepareStatement(insertStat); 474 prepStat.setString(1, key.toString()); 475 InputStream keyBin=getBinaryInputStream(key); 476 InputStream keyVal=getBinaryInputStream(val); 477 byte[] keyBytes=getBytes(key); 478 byte[] valBytes=getBytes(val); 479 prepStat.setBytes(2, keyBytes); 480 prepStat.setBytes(3, valBytes); 481 prepStat.executeBatch(); 482 } 483 catch(Throwable t) { 484 throw new CannotPersistException(t, "error adding new entry using creating Db connection and schema"); 486 } 487 } 489 490 495 private boolean entryExists(Serializable key) { 496 return list.contains(key.toString()); 497 } 498 499 500 505 private byte[] getBytes(Serializable ser) throws Exception { 506 ByteArrayOutputStream stream=new ByteArrayOutputStream(); 507 ObjectOutputStream keyoos=new ObjectOutputStream(stream); 508 keyoos.writeObject(ser); 509 byte[] keyBytes=stream.toByteArray(); 510 return keyBytes; 511 } 513 514 515 516 519 520 527 private void readProps(String filePath) throws Exception { 528 FileInputStream _stream=new FileInputStream(filePath); 529 props=new Properties(); 530 props.load(_stream); 531 532 driverName=props.getProperty("jdbc.Driver"); 534 connStr=props.getProperty("jdbc.Conn").trim(); 535 userName=props.getProperty("jdbc.User").trim(); 536 userPass=props.getProperty("jdbc.Pass").trim(); 537 createTable=props.getProperty("jdbc.table").trim(); 538 } 539 540 541 546 private void readProps(InputStream input) throws Exception { 547 props=new Properties(); 548 props.load(input); 549 550 driverName=props.getProperty("jdbc.Driver"); 552 connStr=props.getProperty("jdbc.Conn"); 553 userName=props.getProperty("jdbc.User"); 554 userPass=props.getProperty("jdbc.Pass"); 555 createTable=props.getProperty("jdbc.table"); 556 } 557 558 559 566 private void loadDriver() throws Exception { 567 Class.forName(driverName); 569 } 570 571 572 578 private Connection getConnection() throws CannotConnectException { 579 try { 580 connStr=connStr.trim(); 581 Connection conn=DriverManager.getConnection(connStr, userName, userPass); 582 if(log.isInfoEnabled()) log.info("userName=" + userName + 583 ", userPass=" + userPass + ", connStr=" + connStr); 584 return conn; 585 } 586 catch(Throwable t) { 587 t.printStackTrace(); 588 throw new CannotConnectException(t, "Error in creating connection using provided properties "); 590 } 591 } 593 594 600 private void closeConnection(Connection conn) { 601 try { 602 if(conn != null) { 603 conn.close(); 604 conn=null; 605 } 606 } 607 catch(Throwable t) { 608 conn=null; 610 } 611 } 613 614 619 private void createDBTables() throws CannotCreateSchemaException, CannotConnectException { 620 Connection conn=this.getConnection(); 621 Statement stat=null; 622 try { 623 stat=conn.createStatement(); 624 } 625 catch(Exception e) { 626 e.printStackTrace(); 628 throw new CannotConnectException(e, "there was an error in creating statements for persisting data using created connection"); 629 } 630 try { 631 ResultSet set=stat.executeQuery("select * from replhashmap"); 632 } 633 catch(Throwable t) { 634 t.printStackTrace(); 635 addSchemaToDB(conn); 637 } } 640 641 646 private void addSchemaToDB(Connection conn) throws CannotCreateSchemaException { 647 Statement stat=null; 648 Statement stat2=null; 649 try { 650 651 stat=conn.createStatement(); 652 System.err.println(" executing query for oracle " + createTable); 653 stat.executeQuery(createTable); 654 } 655 catch(Throwable t) { 656 t.printStackTrace(); 657 throw new CannotCreateSchemaException(t, "error was using schema with blobs"); 659 } 661 finally { 663 try { 664 if(stat != null) stat.close(); 665 this.closeConnection(conn); 666 } 667 catch(Throwable t3) { 668 } 669 } } 672 private Properties props=null; 673 private String driverName=null; 674 private String userName=null; 675 private String userPass=null; 676 private String connStr=null; 677 private String createTable=null; 678 private final boolean oracleDB=false; 679 private Vector list=null; 680 681 682 private static final String tabName="replhashmap"; 683 private static final String insertStat="insert into replhashmap(key, keyBin, valBin) values (?, ?, ?)"; 684 private static final String updateStat="update replhashmap set keyBin = ?, valBin = ? where key like ?"; 685 private static final String removeStat=" delete from replhashmap where key like ?"; 686 private static final String createTableGen=" create table replhashmap(key varchar, keyBin varbinary, valBin varbinary)"; 687 private static final String createTableOra=" create table replhashmap ( key varchar2(100), keyBin blob, valBin blob)"; 688 } 689 | Popular Tags |