1 45 package org.exolab.jms.persistence; 46 47 import java.sql.Connection ; 48 import java.sql.Date ; 49 import java.sql.PreparedStatement ; 50 import java.sql.ResultSet ; 51 import java.sql.SQLException ; 52 import java.util.Enumeration ; 53 import java.util.HashMap ; 54 import java.util.Vector ; 55 56 import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock; 57 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; 58 59 import org.apache.commons.logging.Log; 60 import org.apache.commons.logging.LogFactory; 61 62 import org.exolab.jms.authentication.User; 63 import org.exolab.jms.client.JmsDestination; 64 import org.exolab.jms.client.JmsQueue; 65 import org.exolab.jms.client.JmsTopic; 66 import org.exolab.jms.config.ConfigurationManager; 67 import org.exolab.jms.config.DatabaseConfiguration; 68 import org.exolab.jms.config.RdbmsDatabaseConfiguration; 69 import org.exolab.jms.events.EventHandler; 70 import org.exolab.jms.events.BasicEventManager; 71 import org.exolab.jms.message.MessageImpl; 72 import org.exolab.jms.messagemgr.MessageHandle; 73 74 75 84 85 public class RDBMSAdapter 86 extends PersistenceAdapter 87 implements EventHandler { 88 89 93 public static final String SCHEMA_VERSION = "V0.7.6"; 94 95 98 private static DBConnectionManager _connectionManager = null; 99 100 104 private int _gcInterval = 600; 105 106 109 private int _gcBlockSize = 500; 110 111 115 private long _lastTime = 0; 116 117 120 private int _gcThreadPriority = Thread.NORM_PRIORITY; 121 122 129 private ReadWriteLock _destinationLock = new FIFOReadWriteLock(); 130 131 135 private static final int COLLECT_DATABASE_GARBAGE_EVENT = 1; 136 137 140 private static final Log _log = LogFactory.getLog(RDBMSAdapter.class); 141 142 143 149 RDBMSAdapter(String driver, String url, String userName, String password) 150 throws PersistenceException { 151 152 DatabaseConfiguration dbConfig = 153 ConfigurationManager.getConfig().getDatabaseConfiguration(); 154 RdbmsDatabaseConfiguration config = 155 dbConfig.getRdbmsDatabaseConfiguration(); 156 157 _connectionManager = getConnectionManager(config.getClazz()); 159 _connectionManager.setUser(userName); 160 _connectionManager.setPassword(password); 161 _connectionManager.setDriver(driver); 162 _connectionManager.setURL(url); 163 _connectionManager.setMaxActive(config.getMaxActive()); 164 _connectionManager.setMaxIdle(config.getMaxIdle()); 165 _connectionManager.setMinIdleTime(config.getMinIdleTime()); 166 _connectionManager.setEvictionInterval(config.getEvictionInterval()); 167 _connectionManager.setTestQuery(config.getTestQuery()); 168 _connectionManager.setTestBeforeUse(config.getTestBeforeUse()); 169 170 _connectionManager.init(); 172 173 Connection connection = null; 174 try { 175 connection = getConnection(); 178 179 String version = getSchemaVersion(connection); 180 if (version == null) { 181 initSchemaVersion(connection); 182 } else if (!version.equals(SCHEMA_VERSION)) { 183 throw new PersistenceException( 184 "Schema needs to be converted from version=" + version 185 + " to version=" + SCHEMA_VERSION 186 + "\nBack up your database, and run 'dbtool -migrate'" 187 + "to convert the schema"); 188 } 189 190 SeedGenerator.initialise(); 191 Destinations.initialise(connection); 192 Consumers.initialise(connection); 193 Messages.initialise(); 194 MessageHandles.initialise(); 195 connection.commit(); 196 Users.initialise(); 197 } catch (PersistenceException exception) { 198 SQLHelper.rollback(connection); 199 throw exception; 200 } catch (Exception exception) { 201 throw new PersistenceException( 202 "Failed to initialise database adapter", exception); 203 } finally { 204 SQLHelper.close(connection); 205 206 } 207 208 228 } 229 230 233 public void close() { 234 if (SeedGenerator.instance() != null) { 235 SeedGenerator.instance().close(); 236 } 237 238 if (Destinations.instance() != null) { 239 Destinations.instance().close(); 240 } 241 242 if (Consumers.instance() != null) { 243 Consumers.instance().close(); 244 } 245 246 if (Messages.instance() != null) { 247 Messages.instance().close(); 248 } 249 250 if (MessageHandles.instance() != null) { 251 MessageHandles.instance().close(); 252 } 253 254 if (Users.instance() != null) { 255 Users.instance().close(); 256 } 257 } 258 259 public long getLastId(Connection connection) 261 throws PersistenceException { 262 263 long lastId = -1; 264 boolean successful = false; 265 PreparedStatement query = null; 266 ResultSet result = null; 267 PreparedStatement insert = null; 268 try { 269 query = connection.prepareStatement( 270 "select maxid from message_id where id = 1"); 271 result = query.executeQuery(); 272 273 if (result.next()) { 274 lastId = result.getInt(1); 275 } else { 276 insert = connection.prepareStatement( 278 "insert into message_id values (?,?)"); 279 insert.setInt(1, 1); 280 insert.setLong(2, 0); 281 insert.executeUpdate(); 282 lastId = 0; 283 } 284 } catch (Exception exception) { 285 throw new PersistenceException("Failed to get last message id", 286 exception); 287 } finally { 288 SQLHelper.close(result); 289 SQLHelper.close(insert); 290 SQLHelper.close(query); 291 } 292 293 return lastId; 294 } 295 296 public void updateIds(Connection connection, long id) 298 throws PersistenceException { 299 PreparedStatement insert = null; 300 try { 301 insert = connection.prepareStatement( 302 "update message_id set maxId = ? where id = 1"); 303 304 insert.setLong(1, id); 305 insert.executeUpdate(); 306 } catch (Exception exception) { 307 throw new PersistenceException("Failed to update message id", 308 exception); 309 } finally { 310 SQLHelper.close(insert); 311 } 312 } 313 314 public void addMessage(Connection connection, MessageImpl message) 316 throws PersistenceException { 317 318 long start = 0; 319 320 if (_log.isDebugEnabled()) { 321 start = System.currentTimeMillis(); 322 } 323 324 try { 325 _destinationLock.readLock().acquire(); 326 Messages.instance().add(connection, message); 327 } catch (InterruptedException exception) { 328 throw new PersistenceException("Failed to acquire lock", 329 exception); 330 } finally { 331 _destinationLock.readLock().release(); 332 333 if (_log.isDebugEnabled()) { 334 _log.debug("addMessage," + 335 (System.currentTimeMillis() - start)); 336 } 337 } 338 } 339 340 public void updateMessage(Connection connection, MessageImpl message) 342 throws PersistenceException { 343 long start = 0; 344 if (_log.isDebugEnabled()) { 345 start = System.currentTimeMillis(); 346 } 347 348 try { 349 _destinationLock.readLock().acquire(); 350 Messages.instance().update(connection, message); 351 } catch (InterruptedException exception) { 352 throw new PersistenceException("Failed to acquire lock", 353 exception); 354 } finally { 355 _destinationLock.readLock().release(); 356 if (_log.isDebugEnabled()) { 357 _log.debug("updateMessage," + 358 (System.currentTimeMillis() - start)); 359 } 360 } 361 } 362 363 public Vector getUnprocessedMessages(Connection connection) 365 throws PersistenceException { 366 long start = 0; 367 if (_log.isDebugEnabled()) { 368 start = System.currentTimeMillis(); 369 } 370 371 try { 372 return Messages.instance().getUnprocessedMessages(connection); 373 } finally { 374 if (_log.isDebugEnabled()) { 375 _log.debug("getUnprocessedMessages," + (System.currentTimeMillis() - start)); 376 } 377 } 378 } 379 380 381 public void removeMessage(Connection connection, String id) 383 throws PersistenceException { 384 long start = 0; 385 if (_log.isDebugEnabled()) { 386 start = System.currentTimeMillis(); 387 } 388 389 try { 390 _destinationLock.readLock().acquire(); 391 Messages.instance().remove(connection, id); 392 } catch (InterruptedException exception) { 393 throw new PersistenceException("Failed to acquire lock", 394 exception); 395 } finally { 396 _destinationLock.readLock().release(); 397 if (_log.isDebugEnabled()) { 398 _log.debug("removeMessage," + 399 (System.currentTimeMillis() - start)); 400 } 401 } 402 } 403 404 public MessageImpl getMessage(Connection connection, String id) 406 throws PersistenceException { 407 long start = 0; 408 if (_log.isDebugEnabled()) { 409 start = System.currentTimeMillis(); 410 } 411 412 try { 413 return Messages.instance().get(connection, id); 414 } finally { 415 if (_log.isDebugEnabled()) { 416 _log.debug("getMessage," + (System.currentTimeMillis() - start)); 417 } 418 } 419 } 420 421 public Vector getMessages(Connection connection, MessageHandle handle) 423 throws PersistenceException { 424 long start = 0; 425 if (_log.isDebugEnabled()) { 426 start = System.currentTimeMillis(); 427 } 428 429 try { 430 return Messages.instance().getMessages(connection, 431 handle.getDestination().getName(), handle.getPriority(), 432 handle.getAcceptedTime()); 433 } finally { 434 if (_log.isDebugEnabled()) { 435 _log.debug("getMessages," + (System.currentTimeMillis() - start)); 436 } 437 } 438 } 439 440 public void addMessageHandle(Connection connection, MessageHandle handle) 442 throws PersistenceException { 443 long start = 0; 444 if (_log.isDebugEnabled()) { 445 start = System.currentTimeMillis(); 446 } 447 448 try { 449 _destinationLock.readLock().acquire(); 450 MessageHandles.instance().addMessageHandle(connection, handle); 451 } catch (InterruptedException exception) { 452 throw new PersistenceException("Failed to acquire lock", 453 exception); 454 } finally { 455 _destinationLock.readLock().release(); 456 if (_log.isDebugEnabled()) { 457 _log.debug("addMessageHandle," + (System.currentTimeMillis() - start)); 458 } 459 } 460 } 461 462 public void updateMessageHandle(Connection connection, MessageHandle handle) 464 throws PersistenceException { 465 long start = 0; 466 if (_log.isDebugEnabled()) { 467 start = System.currentTimeMillis(); 468 } 469 470 try { 471 _destinationLock.readLock().acquire(); 472 MessageHandles.instance().updateMessageHandle(connection, handle); 473 } catch (InterruptedException exception) { 474 throw new PersistenceException("Failed to acquire lock", 475 exception); 476 } finally { 477 _destinationLock.readLock().release(); 478 if (_log.isDebugEnabled()) { 479 _log.debug("updateMessageHandle," + (System.currentTimeMillis() - start)); 480 } 481 } 482 } 483 484 public void removeMessageHandle(Connection connection, MessageHandle handle) 486 throws PersistenceException { 487 long start = 0; 488 if (_log.isDebugEnabled()) { 489 start = System.currentTimeMillis(); 490 } 491 492 try { 493 _destinationLock.readLock().acquire(); 494 MessageHandles.instance().removeMessageHandle(connection, handle); 495 } catch (InterruptedException exception) { 496 throw new PersistenceException("Failed to acquire lock", 497 exception); 498 } finally { 499 _destinationLock.readLock().release(); 500 if (_log.isDebugEnabled()) { 501 _log.debug("removeMessageHandle," + (System.currentTimeMillis() - start)); 502 } 503 } 504 } 505 506 public Vector getMessageHandles(Connection connection, 508 JmsDestination destination, String name) 509 throws PersistenceException { 510 long start = 0; 511 if (_log.isDebugEnabled()) { 512 start = System.currentTimeMillis(); 513 } 514 515 try { 516 return MessageHandles.instance().getMessageHandles(connection, 517 destination.getName(), name); 518 } finally { 519 if (_log.isDebugEnabled()) { 520 _log.debug("getMessageHandles," 521 + (System.currentTimeMillis() - start)); 522 } 523 } 524 } 525 526 public void addDurableConsumer(Connection connection, String topic, 528 String consumer) 529 throws PersistenceException { 530 531 try { 532 _destinationLock.readLock().acquire(); 533 Consumers.instance().add(connection, topic, consumer); 534 } catch (InterruptedException exception) { 535 throw new PersistenceException("Failed to acquire lock", 536 exception); 537 } finally { 538 _destinationLock.readLock().release(); 539 } 540 } 541 542 public void removeDurableConsumer(Connection connection, String consumer) 544 throws PersistenceException { 545 546 try { 547 _destinationLock.readLock().acquire(); 548 Consumers.instance().remove(connection, consumer); 549 } catch (InterruptedException exception) { 550 throw new PersistenceException("Failed to acquire lock", 551 exception); 552 } finally { 553 _destinationLock.readLock().release(); 554 } 555 } 556 557 public Enumeration getDurableConsumers(Connection connection, String topic) 559 throws PersistenceException { 560 return Consumers.instance().getDurableConsumers(topic).elements(); 561 } 562 563 public HashMap getAllDurableConsumers(Connection connection) 565 throws PersistenceException { 566 567 return Consumers.instance().getAllDurableConsumers(); 568 } 569 570 public boolean durableConsumerExists(Connection connection, String name) 572 throws PersistenceException { 573 574 return Consumers.instance().exists(name); 575 } 576 577 public void addDestination(Connection connection, String name, 579 boolean queue) 580 throws PersistenceException { 581 582 JmsDestination destination = (queue) 583 ? (JmsDestination) new JmsQueue(name) 584 : (JmsDestination) new JmsTopic(name); 585 586 try { 589 _destinationLock.readLock().acquire(); 590 Destinations.instance().add(connection, destination); 591 if (queue) { 592 Consumers.instance().add(connection, name, name); 593 } 594 } catch (InterruptedException exception) { 595 throw new PersistenceException("Failed to acquire lock", 596 exception); 597 } finally { 598 _destinationLock.readLock().release(); 599 } 600 } 601 602 public void removeDestination(Connection connection, String name) 604 throws PersistenceException { 605 606 JmsDestination destination = Destinations.instance().get(name); 607 if (destination != null) { 608 try { 609 _destinationLock.writeLock().acquire(); 610 Destinations.instance().remove(connection, destination); 611 } catch (InterruptedException exception) { 612 throw new PersistenceException("Failed to acquire lock", 613 exception); 614 } finally { 615 _destinationLock.writeLock().release(); 616 } 617 } 618 } 619 620 public Enumeration getAllDestinations(Connection connection) 622 throws PersistenceException { 623 624 return Destinations.instance().getDestinations().elements(); 625 } 626 627 public boolean checkDestination(Connection connection, String name) 629 throws PersistenceException { 630 631 return (Destinations.instance().get(name) != null); 632 } 633 634 public int getQueueMessageCount(Connection connection, String name) 636 throws PersistenceException { 637 638 return MessageHandles.instance().getMessageCount( 639 connection, name, name); 640 } 641 642 public int getDurableConsumerMessageCount(Connection connection, 644 String destination, String name) 645 throws PersistenceException { 646 647 return MessageHandles.instance().getMessageCount(connection, 648 destination, name); 649 } 650 651 public void removeExpiredMessages(Connection connection) 653 throws PersistenceException { 654 655 Messages.instance().removeExpiredMessages(connection); 656 } 657 658 public void removeExpiredMessageHandles(Connection connection, 660 String consumer) 661 throws PersistenceException { 662 663 MessageHandles.instance().removeExpiredMessageHandles(connection, 664 consumer); 665 } 666 667 public Vector getNonExpiredMessages(Connection connection, 669 JmsDestination destination) 670 throws PersistenceException { 671 672 return Messages.instance().getNonExpiredMessages( 673 connection, destination); 674 } 675 676 public void handleEvent(int event, Object callback, long time) { 678 } 691 692 701 public Connection getConnection() 702 throws PersistenceException { 703 return _connectionManager.getConnection(); 704 } 705 706 711 public DBConnectionManager getDBConnectionManager() { 712 return _connectionManager; 713 } 714 715 public void addUser(Connection connection, User user) 716 throws PersistenceException { 717 Users.instance().add(connection, user); 718 } 719 720 public Enumeration getAllUsers(Connection connection) 721 throws PersistenceException { 722 return Users.instance().getAllUsers(connection).elements(); 723 } 724 725 public User getUser(Connection connection, User user) 726 throws PersistenceException { 727 return Users.instance().get(connection, user); 728 } 729 730 public void removeUser(Connection connection, User user) 731 throws PersistenceException { 732 Users.instance().remove(connection, user); 733 } 734 735 public void updateUser(Connection connection, User user) 736 throws PersistenceException { 737 Users.instance().update(connection, user); 738 } 739 740 755 public synchronized int purgeMessages() { 756 int deleted = 0; 757 758 Connection connection = null; 759 try { 760 connection = getConnection(); 761 removeExpiredMessages(connection); 762 connection.commit(); 763 } catch (Exception exception) { 764 _log.error("Exception in purgeMessages", exception); 765 } finally { 766 SQLHelper.close(connection); 767 } 768 return 0; 769 770 774 780 789 797 804 815 } 854 855 862 private String getSchemaVersion(Connection connection) 863 throws PersistenceException { 864 865 String version = null; 866 PreparedStatement query = null; 867 ResultSet result = null; 868 try { 869 query = connection.prepareStatement( 870 "select version from system_data where id = 1"); 871 result = query.executeQuery(); 872 if (result.next()) { 873 version = result.getString(1); 874 } 875 } catch (SQLException exception) { 876 throw new PersistenceException( 877 "Failed to get the schema version", exception); 878 } finally { 879 SQLHelper.close(result); 880 SQLHelper.close(query); 881 882 } 883 return version; 884 } 885 886 891 private void initSchemaVersion(Connection connection) 892 throws PersistenceException { 893 894 _log.info("Initialising schema version " + SCHEMA_VERSION); 895 PreparedStatement insert = null; 896 try { 897 insert = connection.prepareStatement( 898 "insert into system_data (id, version, creationdate) " 899 + "values (?,?,?)"); 900 insert.setInt(1, 1); 901 insert.setString(2, SCHEMA_VERSION); 902 insert.setDate(3, new Date (System.currentTimeMillis())); 903 insert.executeUpdate(); 904 905 } catch (SQLException exception) { 906 throw new PersistenceException( 907 "Failed to initialise schema version", exception); 908 } finally{ 909 SQLHelper.close(insert); 910 } 911 } 912 913 917 private void registerEvent() { 918 } 927 928 935 private DBConnectionManager getConnectionManager(String className) 936 throws PersistenceException { 937 938 DBConnectionManager result = null; 939 Class clazz = null; 940 ClassLoader loader = Thread.currentThread().getContextClassLoader(); 941 try { 942 if (loader != null) { 943 clazz = loader.loadClass(className); 944 } 945 } catch (ClassNotFoundException ignore) { 946 } 947 try { 948 if (clazz == null) { 949 clazz = Class.forName(className); 950 } 951 } catch (ClassNotFoundException exception) { 952 throw new PersistenceException( 953 "Failed to locate connection manager implementation: " 954 + className, exception); 955 } 956 957 try { 958 result = (DBConnectionManager) clazz.newInstance(); 959 } catch (Exception exception) { 960 throw new PersistenceException( 961 "Failed to create connection manager", exception); 962 } 963 964 return result; 965 } 966 967 } 968 | Popular Tags |