1 22 package org.jboss.mq.sm.jdbc; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.ByteArrayOutputStream ; 26 import java.io.IOException ; 27 import java.sql.Connection ; 28 import java.sql.PreparedStatement ; 29 import java.sql.ResultSet ; 30 import java.sql.SQLException ; 31 import java.sql.Statement ; 32 import java.util.ArrayList ; 33 import java.util.Collection ; 34 import java.util.HashSet ; 35 import java.util.Iterator ; 36 import java.util.List ; 37 import java.util.Map ; 38 import java.util.Properties ; 39 40 import javax.jms.InvalidClientIDException ; 41 import javax.jms.JMSException ; 42 import javax.jms.JMSSecurityException ; 43 import javax.management.ObjectName ; 44 import javax.naming.InitialContext ; 45 import javax.sql.DataSource ; 46 import javax.transaction.Status ; 47 import javax.transaction.Transaction ; 48 import javax.transaction.TransactionManager ; 49 50 import org.jboss.logging.Logger; 51 import org.jboss.mq.DurableSubscriptionID; 52 import org.jboss.mq.SpyJMSException; 53 import org.jboss.mq.SpyTopic; 54 import org.jboss.mq.sm.AbstractStateManager; 55 import org.jboss.mq.sm.StateManager; 56 import org.jboss.tm.TransactionManagerService; 57 58 69 public class JDBCStateManager extends AbstractStateManager implements JDBCStateManagerMBean 70 { 71 static final Logger log = Logger.getLogger(JDBCStateManager.class); 72 73 74 private ObjectName connectionManagerName; 75 76 77 protected DataSource dataSource; 78 79 80 protected int connectionRetryAttempts = 5; 81 82 83 private boolean hasSecurityManager = true; 84 85 86 protected TransactionManager tm; 87 88 89 private Properties sqlProperties = new Properties (); 90 91 92 private boolean createTables = true; 93 94 95 private String CREATE_USER_TABLE = "CREATE TABLE JMS_USERS (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, CLIENTID VARCHAR(128)," 96 + " PRIMARY KEY(USERID))"; 97 98 99 private String CREATE_ROLE_TABLE = "CREATE TABLE JMS_ROLES (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL," 100 + " PRIMARY KEY(USERID, ROLEID))"; 101 102 private String CREATE_SUBSCRIPTION_TABLE = "CREATE TABLE JMS_SUBSCRIPTIONS (CLIENTID VARCHAR(128) NOT NULL, NAME VARCHAR(128) NOT NULL," 103 + " TOPIC VARCHAR(255) NOT NULL, SELECTOR VARCHAR(255)," + " PRIMARY KEY(CLIENTID, NAME))"; 104 105 106 private String GET_SUBSCRIPTION = "SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND NAME=?"; 107 108 109 private String GET_SUBSCRIPTIONS_FOR_TOPIC = "SELECT CLIENTID, NAME, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE TOPIC=?"; 110 111 112 private String LOCK_SUBSCRIPTION = "SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND NAME=?"; 113 114 115 private String INSERT_SUBSCRIPTION = "INSERT INTO JMS_SUBSCRIPTIONS (CLIENTID, NAME, TOPIC, SELECTOR) VALUES(?,?,?,?)"; 116 117 118 private String UPDATE_SUBSCRIPTION = "UPDATE JMS_SUBSCRIPTIONS SET TOPIC=?, SELECTOR=? WHERE CLIENTID=? AND NAME=?"; 119 120 121 private String REMOVE_SUBSCRIPTION = "DELETE FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND NAME=?"; 122 123 124 private String GET_USER_BY_CLIENTID = "SELECT USERID, PASSWD, CLIENTID FROM JMS_USERS WHERE CLIENTID=?"; 125 126 127 private String GET_USER = "SELECT PASSWD, CLIENTID FROM JMS_USERS WHERE USERID=?"; 128 129 130 private List POPULATE_TABLES = new ArrayList (); 131 132 public ObjectName getConnectionManager() 133 { 134 return connectionManagerName; 135 } 136 137 public void setConnectionManager(ObjectName connectionManagerName) 138 { 139 this.connectionManagerName = connectionManagerName; 140 } 141 142 public boolean hasSecurityManager() 143 { 144 return hasSecurityManager; 145 } 146 147 public void setHasSecurityManager(boolean hasSecurityManager) 148 { 149 this.hasSecurityManager = hasSecurityManager; 150 } 151 152 public String getSqlProperties() 153 { 154 try 155 { 156 ByteArrayOutputStream boa = new ByteArrayOutputStream (); 157 sqlProperties.store(boa, ""); 158 return new String (boa.toByteArray()); 159 } 160 catch (IOException shouldnothappen) 161 { 162 return ""; 163 } 164 } 165 166 public void setSqlProperties(String value) 167 { 168 try 169 { 170 171 ByteArrayInputStream is = new ByteArrayInputStream (value.getBytes()); 172 sqlProperties = new Properties (); 173 sqlProperties.load(is); 174 175 } 176 catch (IOException shouldnothappen) 177 { 178 } 179 } 180 181 public void setConnectionRetryAttempts(int value) 182 { 183 this.connectionRetryAttempts = value; 184 } 185 186 public int getConnectionRetryAttempts() 187 { 188 return this.connectionRetryAttempts; 189 } 190 191 protected DurableSubscription getDurableSubscription(DurableSubscriptionID sub) throws JMSException 192 { 193 JDBCSession session = new JDBCSession(); 194 try 195 { 196 PreparedStatement statement = session.prepareStatement(GET_SUBSCRIPTION); 197 statement.setString(1, sub.getClientID()); 198 statement.setString(2, sub.getSubscriptionName()); 199 ResultSet rs = statement.executeQuery(); 200 session.addResultSet(rs); 201 if (rs.next() == false) 202 return null; 203 204 return new DurableSubscription(sub.getClientID(), sub.getSubscriptionName(), rs.getString(1), rs.getString(2)); 205 } 206 catch (SQLException e) 207 { 208 session.setRollbackOnly(); 209 throw new SpyJMSException("Error getting durable subscription " + sub, e); 210 } 211 finally 212 { 213 session.close(); 214 } 215 } 216 217 protected void saveDurableSubscription(DurableSubscription ds) throws JMSException 218 { 219 JDBCSession session = new JDBCSession(); 220 try 221 { 222 PreparedStatement statement = session.prepareStatement(LOCK_SUBSCRIPTION); 223 statement.setString(1, ds.getClientID()); 224 statement.setString(2, ds.getName()); 225 ResultSet rs = statement.executeQuery(); 226 session.addResultSet(rs); 227 if (rs.next() == false) 228 { 229 statement = session.prepareStatement(INSERT_SUBSCRIPTION); 230 statement.setString(1, ds.getClientID()); 231 statement.setString(2, ds.getName()); 232 statement.setString(3, ds.getTopic()); 233 statement.setString(4, ds.getSelector()); 234 } 235 else 236 { 237 statement = session.prepareStatement(UPDATE_SUBSCRIPTION); 238 statement.setString(1, ds.getTopic()); 239 statement.setString(2, ds.getSelector()); 240 statement.setString(3, ds.getClientID()); 241 statement.setString(4, ds.getName()); 242 } 243 if (statement.executeUpdate() != 1) 244 { 245 session.setRollbackOnly(); 246 throw new SpyJMSException("Insert subscription failed " + ds); 247 } 248 } 249 catch (SQLException e) 250 { 251 session.setRollbackOnly(); 252 throw new SpyJMSException("Error saving durable subscription " + ds, e); 253 } 254 finally 255 { 256 session.close(); 257 } 258 } 259 260 protected void removeDurableSubscription(DurableSubscription ds) throws JMSException 261 { 262 JDBCSession session = new JDBCSession(); 263 try 264 { 265 PreparedStatement statement = session.prepareStatement(REMOVE_SUBSCRIPTION); 266 statement.setString(1, ds.getClientID()); 267 statement.setString(2, ds.getName()); 268 if (statement.executeUpdate() != 1) 269 throw new JMSException ("Durable subscription does not exist " + ds); 270 } 271 catch (SQLException e) 272 { 273 session.setRollbackOnly(); 274 throw new SpyJMSException("Error removing durable subscription " + ds, e); 275 } 276 finally 277 { 278 session.close(); 279 } 280 } 281 282 public Collection getDurableSubscriptionIdsForTopic(SpyTopic topic) throws JMSException 283 { 284 ArrayList result = new ArrayList (); 285 286 JDBCSession session = new JDBCSession(); 287 try 288 { 289 PreparedStatement statement = session.prepareStatement(GET_SUBSCRIPTIONS_FOR_TOPIC); 290 statement.setString(1, topic.getName()); 291 ResultSet rs = statement.executeQuery(); 292 session.addResultSet(rs); 293 while (rs.next()) 294 { 295 result.add(new DurableSubscriptionID(rs.getString(1), rs.getString(2), rs.getString(3))); 296 } 297 298 return result; 299 } 300 catch (SQLException e) 301 { 302 session.setRollbackOnly(); 303 throw new SpyJMSException("Error getting durable subscriptions for topic " + topic, e); 304 } 305 finally 306 { 307 session.close(); 308 } 309 } 310 311 protected void checkLoggedOnClientId(String clientID) throws JMSException 312 { 313 JDBCSession session = new JDBCSession(); 314 try 315 { 316 PreparedStatement statement = session.prepareStatement(GET_USER_BY_CLIENTID); 317 statement.setString(1, clientID); 318 ResultSet rs = statement.executeQuery(); 319 session.addResultSet(rs); 320 if (rs.next()) 321 throw new InvalidClientIDException ("This client id is password protected " + clientID); 322 } 323 catch (SQLException e) 324 { 325 session.setRollbackOnly(); 326 throw new SpyJMSException("Error checking logged on client id " + clientID, e); 327 } 328 finally 329 { 330 session.close(); 331 } 332 } 333 334 protected String getPreconfClientId(String logon, String passwd) throws JMSException 335 { 336 JDBCSession session = new JDBCSession(); 337 try 338 { 339 PreparedStatement statement = session.prepareStatement(GET_USER); 340 statement.setString(1, logon); 341 ResultSet rs = statement.executeQuery(); 342 session.addResultSet(rs); 343 if (rs.next() == false) 344 { 345 if (hasSecurityManager) 346 return null; 347 else 348 throw new JMSSecurityException ("This user does not exist " + logon); 349 } 350 351 if (hasSecurityManager == false && passwd.equals(rs.getString(1)) == false) 352 throw new JMSSecurityException ("Bad password for user " + logon); 353 354 return rs.getString(2); 355 } 356 catch (SQLException e) 357 { 358 session.setRollbackOnly(); 359 throw new SpyJMSException("Error retrieving preconfigured user " + logon, e); 360 } 361 finally 362 { 363 session.close(); 364 } 365 } 366 367 public StateManager getInstance() 368 { 369 return this; 370 } 371 372 protected void startService() throws Exception 373 { 374 if (connectionManagerName == null) 375 throw new IllegalStateException ("No connection manager configured"); 376 377 String dsName = (String ) getServer().getAttribute(connectionManagerName, "BindName"); 379 380 InitialContext ctx = new InitialContext (); 381 try 382 { 383 dataSource = (DataSource ) ctx.lookup(dsName); 384 tm = (TransactionManager ) ctx.lookup(TransactionManagerService.JNDI_NAME); 385 } 386 finally 387 { 388 ctx.close(); 389 } 390 391 try 392 { 393 initDB(); 394 } 395 catch (Exception e) 396 { 397 log.warn("Error initialising state manager db", e); 398 } 399 } 400 401 protected void initDB() throws Exception 402 { 403 CREATE_USER_TABLE = sqlProperties.getProperty("CREATE_USER_TABLE", CREATE_USER_TABLE); 404 CREATE_ROLE_TABLE = sqlProperties.getProperty("CREATE_ROLE_TABLE", CREATE_ROLE_TABLE); 405 CREATE_SUBSCRIPTION_TABLE = sqlProperties.getProperty("CREATE_SUBSCRIPTION_TABLE", CREATE_SUBSCRIPTION_TABLE); 406 GET_SUBSCRIPTION = sqlProperties.getProperty("GET_SUBSCRIPTION", GET_SUBSCRIPTION); 407 GET_SUBSCRIPTIONS_FOR_TOPIC = sqlProperties.getProperty("GET_SUBSCRIPTIONS_FOR_TOPIC", 408 GET_SUBSCRIPTIONS_FOR_TOPIC); 409 LOCK_SUBSCRIPTION = sqlProperties.getProperty("LOCK_SUBSCRIPTION", LOCK_SUBSCRIPTION); 410 INSERT_SUBSCRIPTION = sqlProperties.getProperty("INSERT_SUBSCRIPTION", INSERT_SUBSCRIPTION); 411 UPDATE_SUBSCRIPTION = sqlProperties.getProperty("UPDATE_SUBSCRIPTION", UPDATE_SUBSCRIPTION); 412 REMOVE_SUBSCRIPTION = sqlProperties.getProperty("REMOVE_SUBSCRIPTION", REMOVE_SUBSCRIPTION); 413 GET_USER_BY_CLIENTID = sqlProperties.getProperty("GET_USER_BY_CLIENTID", GET_USER_BY_CLIENTID); 414 GET_USER = sqlProperties.getProperty("GET_USER", GET_USER); 415 416 for (Iterator i = sqlProperties.entrySet().iterator(); i.hasNext();) 418 { 419 Map.Entry entry = (Map.Entry ) i.next(); 420 String key = (String ) entry.getKey(); 421 if (key.startsWith("POPULATE.TABLES.")) 422 POPULATE_TABLES.add(entry.getValue()); 423 } 424 425 String createString = sqlProperties.getProperty("CREATE_TABLES_ON_START_UP"); 426 if (createString == null) 427 createString = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP"); 428 if (createString == null) 429 createTables = true; 430 else 431 createTables = createString.trim().equalsIgnoreCase("true"); 432 433 if (createTables) 434 { 435 JDBCSession session = new JDBCSession(); 436 try 437 { 438 PreparedStatement statement; 439 try 440 { 441 statement = session.prepareStatement(CREATE_USER_TABLE); 442 statement.executeUpdate(); 443 } 444 catch (SQLException ignored) 445 { 446 log.trace("Error creating table: " + CREATE_USER_TABLE, ignored); 447 } 448 try 449 { 450 statement = session.prepareStatement(CREATE_ROLE_TABLE); 451 statement.executeUpdate(); 452 } 453 catch (SQLException ignored) 454 { 455 log.trace("Error creating table: " + CREATE_ROLE_TABLE, ignored); 456 } 457 try 458 { 459 statement = session.prepareStatement(CREATE_SUBSCRIPTION_TABLE); 460 statement.executeUpdate(); 461 } 462 catch (SQLException ignored) 463 { 464 log.trace("Error creating table: " + CREATE_SUBSCRIPTION_TABLE, ignored); 465 } 466 467 Iterator iter = POPULATE_TABLES.iterator(); 468 String nextQry = null; 469 while (iter.hasNext()) 470 { 471 try 472 { 473 nextQry = (String ) iter.next(); 474 statement = session.prepareStatement(nextQry); 475 statement.execute(); 476 } 477 catch (SQLException ignored) 478 { 479 log.trace("Error populating tables: " + nextQry, ignored); 480 } 481 } 482 } 483 finally 484 { 485 session.close(); 486 } 487 } 488 } 489 490 493 class JDBCSession 494 { 495 boolean trace = log.isTraceEnabled(); 496 497 Transaction threadTx; 498 499 Connection connection; 500 501 HashSet statements = new HashSet (); 502 503 HashSet resultSets = null; 504 505 JDBCSession() throws JMSException 506 { 507 try 508 { 509 threadTx = tm.suspend(); 511 try 512 { 513 tm.begin(); 515 try 516 { 517 connection = getConnection(); 519 } 520 catch (Throwable t) 521 { 522 try 524 { 525 tm.rollback(); 526 } 527 catch (Throwable ignored) 528 { 529 log.warn("Unable to rollback transaction", ignored); 530 } 531 throw t; 532 } 533 } 534 catch (Throwable t) 535 { 536 try 538 { 539 if (threadTx != null) 540 tm.resume(threadTx); 541 } 542 catch (Throwable ignored) 543 { 544 log.warn("Unable to resume transaction " + threadTx, ignored); 545 } 546 throw t; 547 } 548 } 549 catch (Throwable t) 550 { 551 throw new SpyJMSException("Error creating connection to the database.", t); 552 } 553 } 554 555 PreparedStatement prepareStatement(String sql) throws SQLException 556 { 557 PreparedStatement result = connection.prepareStatement(sql); 558 statements.add(result); 559 return result; 560 } 561 562 void setRollbackOnly() throws JMSException 563 { 564 try 565 { 566 tm.setRollbackOnly(); 567 } 568 catch (Exception e) 569 { 570 throw new SpyJMSException("Could not mark the transaction for rollback.", e); 571 } 572 } 573 574 void addResultSet(ResultSet rs) 575 { 576 if (resultSets == null) 577 resultSets = new HashSet (); 578 resultSets.add(rs); 579 } 580 581 void close() throws JMSException 582 { 583 if (resultSets != null) 584 { 585 for (Iterator i = resultSets.iterator(); i.hasNext();) 586 { 587 ResultSet rs = (ResultSet ) i.next(); 588 try 589 { 590 rs.close(); 591 } 592 catch (Throwable ignored) 593 { 594 if (trace) 595 log.trace("Unable to close result set", ignored); 596 } 597 } 598 } 599 600 for (Iterator i = statements.iterator(); i.hasNext();) 601 { 602 Statement s = (Statement ) i.next(); 603 try 604 { 605 s.close(); 606 } 607 catch (Throwable ignored) 608 { 609 if (trace) 610 log.trace("Unable to close statement", ignored); 611 } 612 } 613 614 try 615 { 616 if (connection != null) 617 connection.close(); 618 } 619 catch (Throwable ignored) 620 { 621 if (trace) 622 log.trace("Unable to close connection", ignored); 623 } 624 625 try 626 { 627 if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) 628 { 629 tm.rollback(); 630 } 631 else 632 { 633 tm.commit(); 634 } 635 } 636 catch (Exception e) 637 { 638 throw new SpyJMSException("Could not commit/rollback a transaction with the transaction manager.", e); 639 } 640 finally 641 { 642 try 643 { 644 if (threadTx != null) 645 tm.resume(threadTx); 646 } 647 catch (Throwable ignored) 648 { 649 log.warn("Unable to resume transaction " + threadTx, ignored); 650 } 651 } 652 } 653 654 663 protected Connection getConnection() throws SQLException 664 { 665 int attempts = connectionRetryAttempts; 666 int attemptCount = 0; 667 SQLException sqlException = null; 668 while (attempts-- > 0) 669 { 670 if (++attemptCount > 1) 671 { 672 log.debug("Retrying connection: attempt # " + attemptCount); 673 } 674 try 675 { 676 sqlException = null; 677 return dataSource.getConnection(); 678 } 679 catch (SQLException exception) 680 { 681 log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception); 682 sqlException = exception; 683 } 684 finally 685 { 686 if (sqlException == null && attemptCount > 1) 687 { 688 log.debug("Connection succeeded on attempt # " + attemptCount); 689 } 690 } 691 692 if (attempts > 0) 693 { 694 try 695 { 696 Thread.sleep(1500); 697 } 698 catch (InterruptedException interruptedException) 699 { 700 break; 701 } 702 } 703 } 704 if (sqlException != null) 705 { 706 throw sqlException; 707 } 708 throw new SQLException ("connection attempt interrupted"); 709 } 710 } 711 } 712 | Popular Tags |