| 1 package org.objectweb.celtix.bus.ws.rm.persistence.jdbc; 2 3 import java.io.File ; 4 import java.io.IOException ; 5 import java.io.InputStream ; 6 import java.math.BigDecimal ; 7 import java.math.BigInteger ; 8 import java.sql.Connection ; 9 import java.sql.DriverManager ; 10 import java.sql.PreparedStatement ; 11 import java.sql.ResultSet ; 12 import java.sql.SQLException ; 13 import java.sql.Statement ; 14 import java.text.MessageFormat ; 15 import java.util.ArrayList ; 16 import java.util.Collection ; 17 import java.util.Date ; 18 import java.util.HashMap ; 19 import java.util.Map ; 20 import java.util.logging.Level ; 21 import java.util.logging.Logger ; 22 23 import org.objectweb.celtix.bus.ws.rm.DestinationSequence; 24 import org.objectweb.celtix.bus.ws.rm.RMMessageImpl; 25 import org.objectweb.celtix.bus.ws.rm.RMUtils; 26 import org.objectweb.celtix.bus.ws.rm.SourceSequence; 27 import org.objectweb.celtix.bus.ws.rm.persistence.RMStoreException; 28 import org.objectweb.celtix.common.i18n.Message; 29 import org.objectweb.celtix.common.logging.LogUtils; 30 import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType; 31 import org.objectweb.celtix.ws.rm.Identifier; 32 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement; 33 import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence; 34 import org.objectweb.celtix.ws.rm.persistence.RMMessage; 35 import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence; 36 import org.objectweb.celtix.ws.rm.persistence.RMStore; 37 38 39 public class RMTxStore implements RMStore { 40 41 public static final String DRIVER_CLASS_NAME_PROPERTY = 42 "org.objectweb.celtix.rm.persistence.jdbc.driver"; 43 public static final String CONNECTION_URL_PROPERTY = 44 "org.objectweb.celtix.rm.persistence.jdbc.url"; 45 public static final String USER_NAME_PROPERTY = 46 "org.objectweb.celtix.rm.persistence.jdbc.user"; 47 public static final String PASSWORD_PROPERTY = 48 "org.objectweb.celtix.rm.persistence.jdbc.password"; 49 50 private static final String CREATE_DEST_SEQUENCES_TABLE_STMT = 51 "CREATE TABLE CELTIX_RM_DEST_SEQUENCES " 52 + "(SEQ_ID VARCHAR(256) NOT NULL, " 53 + "ACKS_TO VARCHAR(1024) NOT NULL, " 54 + "LAST_MSG_NO DECIMAL(31, 0), " 55 + "ENDPOINT_ID VARCHAR(1024), " 56 + "ACKNOWLEDGED BLOB, " 57 + "PRIMARY KEY (SEQ_ID))"; 58 private static final String CREATE_SRC_SEQUENCES_TABLE_STMT = 59 "CREATE TABLE CELTIX_RM_SRC_SEQUENCES " 60 + "(SEQ_ID VARCHAR(256) NOT NULL, " 61 + "CUR_MSG_NO DECIMAL(31, 0) NOT NULL DEFAULT 1, " 62 + "LAST_MSG CHAR(1), " 63 + "EXPIRY BIGINT, " 64 + "OFFERING_SEQ_ID VARCHAR(256), " 65 + "ENDPOINT_ID VARCHAR(1024), " 66 + "PRIMARY KEY (SEQ_ID))"; 67 private static final String CREATE_MESSAGES_TABLE_STMT = 68 "CREATE TABLE {0} " 69 + "(SEQ_ID VARCHAR(256) NOT NULL, " 70 + "MSG_NO DECIMAL(31, 0) NOT NULL, " 71 + "CONTEXT BLOB, " 72 + "PRIMARY KEY (SEQ_ID, MSG_NO))"; 73 private static final String INBOUND_MSGS_TABLE_NAME = "CELTIX_RM_INBOUND_MESSAGES"; 74 private static final String OUTBOUND_MSGS_TABLE_NAME = "CELTIX_RM_OUTBOUND_MESSAGES"; 75 76 77 private static final String CREATE_DEST_SEQUENCE_STMT_STR 78 = "INSERT INTO CELTIX_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID) VALUES(?, ?, ?)"; 79 private static final String CREATE_SRC_SEQUENCE_STMT_STR 80 = "INSERT INTO CELTIX_RM_SRC_SEQUENCES VALUES(?, 1, '0', ?, ?, ?)"; 81 private static final String DELETE_DEST_SEQUENCE_STMT_STR = 82 "DELETE FROM CELTIX_RM_DEST_SEQUENCES WHERE SEQ_ID = ?"; 83 private static final String DELETE_SRC_SEQUENCE_STMT_STR = 84 "DELETE FROM CELTIX_RM_SRC_SEQUENCES WHERE SEQ_ID = ?"; 85 private static final String UPDATE_DEST_SEQUENCE_STMT_STR = 86 "UPDATE CELTIX_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?"; 87 private static final String UPDATE_SRC_SEQUENCE_STMT_STR = 88 "UPDATE CELTIX_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?"; 89 private static final String CREATE_MESSAGE_STMT_STR 90 = "INSERT INTO {0} VALUES(?, ?, ?)"; 91 private static final String DELETE_MESSAGE_STMT_STR = 92 "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?"; 93 94 private static final String SELECT_DEST_SEQUENCES_STMT_STR = 95 "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, ACKNOWLEDGED FROM CELTIX_RM_DEST_SEQUENCES " 96 + "WHERE ENDPOINT_ID = ?"; 97 private static final String SELECT_SRC_SEQUENCES_STMT_STR = 98 "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID FROM CELTIX_RM_SRC_SEQUENCES " 99 + "WHERE ENDPOINT_ID = ?"; 100 private static final String SELECT_MESSAGES_STMT_STR = 101 "SELECT MSG_NO, CONTEXT FROM {0} WHERE SEQ_ID = ?"; 102 103 private static final Logger LOG = LogUtils.getL7dLogger(RMTxStore.class); 104 105 private static Map <String , Connection > connectionMap; 106 private Connection connection; 107 private PreparedStatement createDestSequenceStmt; 108 private PreparedStatement createSrcSequenceStmt; 109 private PreparedStatement deleteDestSequenceStmt; 110 private PreparedStatement deleteSrcSequenceStmt; 111 private PreparedStatement updateDestSequenceStmt; 112 private PreparedStatement updateSrcSequenceStmt; 113 private PreparedStatement selectDestSequencesStmt; 114 private PreparedStatement selectSrcSequencesStmt; 115 116 private PreparedStatement createInboundMessageStmt; 117 private PreparedStatement createOutboundMessageStmt; 118 private PreparedStatement deleteInboundMessageStmt; 119 private PreparedStatement deleteOutboundMessageStmt; 120 private PreparedStatement selectInboundMessagesStmt; 121 private PreparedStatement selectOutboundMessagesStmt; 122 123 125 public void init(Map <String , String > params) { 126 connect(params); 127 } 128 129 public void createSourceSequence(RMSourceSequence seq) { 130 String sequenceIdentifier = seq.getIdentifier().getValue(); 131 String endpointIdentifier = seq.getEndpointIdentifier(); 132 if (LOG.isLoggable(Level.FINE)) { 133 LOG.info("Creating source sequence: " + sequenceIdentifier + ", (endpoint: " 134 + endpointIdentifier + ")"); 135 } 136 137 try { 138 beginTransaction(); 139 140 if (null == createSrcSequenceStmt) { 141 createSrcSequenceStmt = connection.prepareStatement(CREATE_SRC_SEQUENCE_STMT_STR); 142 } 143 assert null != createSrcSequenceStmt; 144 createSrcSequenceStmt.setString(1, sequenceIdentifier); 145 Date expiry = seq.getExpiry(); 146 createSrcSequenceStmt.setLong(2, expiry == null ? 0 : expiry.getTime()); 147 Identifier osid = seq.getOfferingSequenceIdentifier(); 148 createSrcSequenceStmt.setString(3, osid == null ? null : osid.getValue()); 149 createSrcSequenceStmt.setString(4, endpointIdentifier); 150 createSrcSequenceStmt.execute(); 151 152 commit(); 153 154 } catch (SQLException ex) { 155 abort(); 156 throw new RMStoreException(ex); 157 } 158 } 159 160 public void createDestinationSequence(RMDestinationSequence seq) { 161 String sequenceIdentifier = seq.getIdentifier().getValue(); 162 String endpointIdentifier = seq.getEndpointIdentifier(); 163 if (LOG.isLoggable(Level.FINE)) { 164 LOG.info("Creating destination sequence: " + sequenceIdentifier + ", (endpoint: " 165 + endpointIdentifier + ")"); 166 } 167 try { 168 beginTransaction(); 169 170 if (null == createDestSequenceStmt) { 171 createDestSequenceStmt = connection.prepareStatement(CREATE_DEST_SEQUENCE_STMT_STR); 172 } 173 createDestSequenceStmt.setString(1, sequenceIdentifier); 174 String addr = seq.getAcksTo().getAddress().getValue(); 175 createDestSequenceStmt.setString(2, addr); 176 createDestSequenceStmt.setString(3, endpointIdentifier); 177 178 createDestSequenceStmt.execute(); 179 180 commit(); 181 182 } catch (SQLException ex) { 183 abort(); 184 throw new RMStoreException(ex); 185 } 186 } 187 188 public void removeDestinationSequence(Identifier sid) { 189 try { 190 beginTransaction(); 191 192 if (null == deleteDestSequenceStmt) { 193 deleteDestSequenceStmt = connection.prepareStatement(DELETE_DEST_SEQUENCE_STMT_STR); 194 } 195 deleteDestSequenceStmt.setString(1, sid.getValue()); 196 deleteDestSequenceStmt.execute(); 197 198 commit(); 199 200 } catch (SQLException ex) { 201 abort(); 202 throw new RMStoreException(ex); 203 } 204 } 205 206 207 public void removeSourceSequence(Identifier sid) { 208 try { 209 beginTransaction(); 210 211 if (null == deleteSrcSequenceStmt) { 212 deleteSrcSequenceStmt = connection.prepareStatement(DELETE_SRC_SEQUENCE_STMT_STR); 213 } 214 deleteSrcSequenceStmt.setString(1, sid.getValue()); 215 deleteSrcSequenceStmt.execute(); 216 217 commit(); 218 219 } catch (SQLException ex) { 220 abort(); 221 throw new RMStoreException(ex); 222 } 223 } 224 225 226 public Collection <RMDestinationSequence> getDestinationSequences(String endpointIdentifier) { 227 if (LOG.isLoggable(Level.FINE)) { 228 LOG.info("Getting destination sequences for endpoint: " + endpointIdentifier); 229 } 230 Collection <RMDestinationSequence> seqs = new ArrayList <RMDestinationSequence>(); 231 try { 232 if (null == selectDestSequencesStmt) { 233 selectDestSequencesStmt = 234 connection.prepareStatement(SELECT_DEST_SEQUENCES_STMT_STR); 235 } 236 selectDestSequencesStmt.setString(1, endpointIdentifier); 237 238 ResultSet res = selectDestSequencesStmt.executeQuery(); 239 while (res.next()) { 240 Identifier sid = RMUtils.getWSRMFactory().createIdentifier(); 242 sid.setValue(res.getString(1)); 243 EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2)); 244 BigDecimal lm = res.getBigDecimal(3); 245 InputStream is = res.getBinaryStream(4); 246 SequenceAcknowledgement ack = null; 247 if (null != is) { 248 ack = RMUtils.getPersistenceUtils().getSequenceAcknowledgment(is); 249 } 250 DestinationSequence seq = new DestinationSequence(sid, acksTo, 251 lm == null ? null : lm.toBigInteger(), ack); 252 seqs.add(seq); 253 } 254 } catch (SQLException ex) { 255 LOG.log(Level.WARNING, new Message("SELECT_DEST_SEQ_FAILED_MSG", LOG).toString(), ex); 256 } 257 return seqs; 258 } 259 260 public Collection <RMSourceSequence> getSourceSequences(String endpointIdentifier) { 261 if (LOG.isLoggable(Level.FINE)) { 262 LOG.info("Getting source sequences for endpoint: " + endpointIdentifier); 263 } 264 Collection <RMSourceSequence> seqs = new ArrayList <RMSourceSequence>(); 265 try { 266 if (null == selectSrcSequencesStmt) { 267 selectSrcSequencesStmt = 268 connection.prepareStatement(SELECT_SRC_SEQUENCES_STMT_STR); 269 } 270 selectSrcSequencesStmt.setString(1, endpointIdentifier); 271 ResultSet res = selectSrcSequencesStmt.executeQuery(); 272 273 while (res.next()) { 274 Identifier sid = RMUtils.getWSRMFactory().createIdentifier(); 275 sid.setValue(res.getString(1)); 276 BigInteger cmn = res.getBigDecimal(2).toBigInteger(); 277 boolean lm = res.getBoolean(3); 278 long lval = res.getLong(4); 279 Date expiry = 0 == lval ? null : new Date (lval); 280 String oidValue = res.getString(5); 281 Identifier oi = null; 282 if (null != oidValue) { 283 oi = RMUtils.getWSRMFactory().createIdentifier(); 284 oi.setValue(oidValue); 285 } 286 SourceSequence seq = new SourceSequence(sid, expiry, oi, cmn, lm); 287 seqs.add(seq); 288 } 289 } catch (SQLException ex) { 290 LOG.log(Level.WARNING, new Message("SELECT_SRC_SEQ_FAILED_MSG", LOG).toString(), ex); 292 } 293 return seqs; 294 } 295 296 297 public Collection <RMMessage> getMessages(Identifier sid, boolean outbound) { 298 Collection <RMMessage> msgs = new ArrayList <RMMessage>(); 299 try { 300 PreparedStatement stmt = outbound ? selectOutboundMessagesStmt : selectInboundMessagesStmt; 301 if (null == stmt) { 302 stmt = connection.prepareStatement(MessageFormat.format(SELECT_MESSAGES_STMT_STR, 303 outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME)); 304 if (outbound) { 305 selectOutboundMessagesStmt = stmt; 306 } else { 307 selectInboundMessagesStmt = stmt; 308 } 309 } 310 311 stmt.setString(1, sid.getValue()); 312 ResultSet res = stmt.executeQuery(); 313 while (res.next()) { 314 BigInteger mn = res.getBigDecimal(1).toBigInteger(); 315 InputStream is = res.getBinaryStream(2); 316 RMMessageImpl msg = new RMMessageImpl(mn, is); 317 msgs.add(msg); 318 } 319 } catch (SQLException ex) { 320 LOG.log(Level.WARNING, new Message(outbound ? "SELECT_OUTBOUND_MSGS_FAILED_MSG" 321 : "SELECT_INBOUND_MSGS_FAILED_MSG", LOG).toString(), ex); 322 } 323 return msgs; 324 } 325 326 327 public void persistIncoming(RMDestinationSequence seq, RMMessage msg) { 328 try { 329 beginTransaction(); 330 331 updateDestinationSequence(seq); 332 333 storeMessage(seq.getIdentifier(), msg, false); 334 335 commit(); 336 337 } catch (SQLException ex) { 338 abort(); 339 throw new RMStoreException(ex); 340 } catch (IOException ex) { 341 abort(); 342 throw new RMStoreException(ex); 343 } 344 } 345 346 public void persistOutgoing(RMSourceSequence seq, RMMessage msg) { 347 try { 348 beginTransaction(); 349 350 updateSourceSequence(seq); 351 352 storeMessage(seq.getIdentifier(), msg, true); 353 354 commit(); 355 356 } catch (SQLException ex) { 357 abort(); 358 throw new RMStoreException(ex); 359 } catch (IOException ex) { 360 abort(); 361 throw new RMStoreException(ex); 362 } 363 364 } 365 366 public void removeMessages(Identifier sid, Collection <BigInteger > messageNrs, boolean outbound) { 367 try { 368 beginTransaction(); 369 PreparedStatement stmt = outbound ? deleteOutboundMessageStmt : deleteInboundMessageStmt; 370 if (null == stmt) { 371 stmt = connection.prepareStatement(MessageFormat.format(DELETE_MESSAGE_STMT_STR, 372 outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME)); 373 if (outbound) { 374 deleteOutboundMessageStmt = stmt; 375 } else { 376 deleteInboundMessageStmt = stmt; 377 } 378 } 379 380 stmt.setString(1, sid.getValue()); 381 382 for (BigInteger messageNr : messageNrs) { 383 stmt.setBigDecimal(2, new BigDecimal (messageNr)); 384 stmt.execute(); 385 } 386 387 commit(); 388 389 } catch (SQLException ex) { 390 abort(); 391 throw new RMStoreException(ex); 392 } 393 } 394 395 397 protected void beginTransaction() { 398 } 400 401 protected void commit() throws SQLException { 402 connection.commit(); 403 } 404 405 protected void abort() { 406 try { 407 connection.rollback(); 408 } catch (SQLException ex) { 409 LOG.log(Level.SEVERE, new Message("ABORT_FAILED_MSG", LOG).toString(), ex); 410 } 411 } 412 413 415 protected void storeMessage(Identifier sid, RMMessage msg, boolean outbound) 416 throws IOException , SQLException { 417 PreparedStatement stmt = outbound ? createOutboundMessageStmt : createInboundMessageStmt; 418 if (null == stmt) { 419 stmt = connection.prepareStatement(MessageFormat.format(CREATE_MESSAGE_STMT_STR, 420 outbound ? OUTBOUND_MSGS_TABLE_NAME : INBOUND_MSGS_TABLE_NAME)); 421 if (outbound) { 422 createOutboundMessageStmt = stmt; 423 } else { 424 createInboundMessageStmt = stmt; 425 } 426 } 427 428 429 430 int i = 1; 431 stmt.setString(i++, sid.getValue()); 432 stmt.setBigDecimal(i++, new BigDecimal (msg.getMessageNr())); 433 InputStream is = msg.getContextAsStream(); 434 stmt.setBinaryStream(i++, is, is.available()); 435 stmt.execute(); 436 } 437 438 protected void updateSourceSequence(RMSourceSequence seq) 439 throws SQLException { 440 if (null == updateSrcSequenceStmt) { 441 updateSrcSequenceStmt = connection.prepareStatement(UPDATE_SRC_SEQUENCE_STMT_STR); 442 } 443 updateSrcSequenceStmt.setBigDecimal(1, new BigDecimal (seq.getCurrentMessageNr())); 444 updateSrcSequenceStmt.setBoolean(2, seq.isLastMessage()); 445 updateSrcSequenceStmt.setString(3, seq.getIdentifier().getValue()); 446 updateSrcSequenceStmt.execute(); 447 } 448 449 protected void updateDestinationSequence(RMDestinationSequence seq) 450 throws SQLException , IOException { 451 if (null == updateDestSequenceStmt) { 452 updateDestSequenceStmt = connection.prepareStatement(UPDATE_DEST_SEQUENCE_STMT_STR); 453 } 454 BigInteger lastMessageNr = seq.getLastMessageNr(); 455 updateDestSequenceStmt.setBigDecimal(1, lastMessageNr == null ? null 456 : new BigDecimal (lastMessageNr)); 457 InputStream is = seq.getAcknowledgmentAsStream(); 458 updateDestSequenceStmt.setBinaryStream(2, is, is.available()); 459 updateDestSequenceStmt.setString(3, seq.getIdentifier() .getValue()); 460 updateDestSequenceStmt.execute(); 461 } 462 463 protected void createTables() throws SQLException { 464 465 Statement stmt = null; 466 467 stmt = connection.createStatement(); 468 try { 469 stmt.executeUpdate(CREATE_SRC_SEQUENCES_TABLE_STMT); 470 } catch (SQLException ex) { 471 if (!"X0Y32".equals(ex.getSQLState())) { 472 throw ex; 473 } else { 474 LOG.fine("Table CELTIX_RM_SRC_SEQUENCES already exists."); 475 } 476 } 477 stmt.close(); 478 479 stmt = connection.createStatement(); 480 try { 481 stmt.executeUpdate(CREATE_DEST_SEQUENCES_TABLE_STMT); 482 } catch (SQLException ex) { 483 if (!"X0Y32".equals(ex.getSQLState())) { 484 throw ex; 485 } else { 486 LOG.fine("Table CELTIX_RM_DEST_SEQUENCES already exists."); 487 } 488 } 489 stmt.close(); 490 491 for (String tableName : new String [] {OUTBOUND_MSGS_TABLE_NAME, INBOUND_MSGS_TABLE_NAME}) { 492 stmt = connection.createStatement(); 493 try { 494 stmt.executeUpdate(MessageFormat.format(CREATE_MESSAGES_TABLE_STMT, tableName)); 495 } catch (SQLException ex) { 496 if (!"X0Y32".equals(ex.getSQLState())) { 497 throw ex; 498 } else { 499 if (LOG.isLoggable(Level.FINE)) { 500 LOG.fine("Table " + tableName + " already exists."); 501 } 502 } 503 } 504 stmt.close(); 505 } 506 } 507 508 synchronized void connect(Map <String , String > params) { 509 510 if (null == connectionMap) { 511 connectionMap = new HashMap <String , Connection >(); 512 } 513 String url = params.get(CONNECTION_URL_PROPERTY); 514 assert null != url; 515 connection = connectionMap.get(url); 516 if (null != connection) { 517 return; 518 } 519 520 String driverClassName = params.get(DRIVER_CLASS_NAME_PROPERTY); 521 assert null != driverClassName; 522 try { 523 Class.forName(driverClassName); 524 } catch (ClassNotFoundException ex) { 525 throw new RMStoreException(ex); 526 } 527 528 530 try { 531 connection = DriverManager.getConnection(url, 532 params.get(USER_NAME_PROPERTY), params.get(PASSWORD_PROPERTY)); 533 connection.setAutoCommit(false); 534 createTables(); 535 536 } catch (SQLException ex) { 537 throw new RMStoreException(ex); 538 } 539 540 connectionMap.put(url, connection); 541 assert connection == connectionMap.get(url); 542 } 543 544 548 Connection getConnection() { 549 return connection; 550 } 551 552 public static void deleteDatabaseFiles(String path, boolean now) { 553 File root = null; 554 String dsh = System.getProperty("derby.system.home"); 555 if (null == dsh) { 556 File log = new File ("derby.log"); 557 if (log.exists()) { 558 if (now) { 559 log.delete(); 560 } else { 561 log.deleteOnExit(); 562 } 563 } 564 root = new File (path); 565 } else { 566 root = new File (dsh); 567 } 568 if (root.exists()) { 569 recursiveDelete(root, now); 570 } 571 572 } 573 574 private static void recursiveDelete(File dir, boolean now) { 575 for (File f : dir.listFiles()) { 576 if (f.isDirectory()) { 577 recursiveDelete(f, now); 578 } else { 579 if (now) { 580 f.delete(); 581 } else { 582 f.deleteOnExit(); 583 } 584 } 585 } 586 if (now) { 587 dir.delete(); 588 } else { 589 dir.deleteOnExit(); 590 } 591 } 592 } 593 | Popular Tags |