1 18 package org.apache.activemq.store.jdbc; 19 20 26 public class Statements { 27 28 private String tablePrefix = ""; 29 protected String messageTableName = "ACTIVEMQ_MSGS"; 30 protected String durableSubAcksTableName = "ACTIVEMQ_ACKS"; 31 protected String lockTableName = "ACTIVEMQ_LOCK"; 32 33 protected String binaryDataType = "BLOB"; 34 protected String containerNameDataType = "VARCHAR(250)"; 35 protected String msgIdDataType = "VARCHAR(250)"; 36 protected String sequenceDataType = "INTEGER"; 37 protected String longDataType = "BIGINT"; 38 protected String stringIdDataType = "VARCHAR(250)"; 39 40 protected boolean useExternalMessageReferences = false; 41 42 private String addMessageStatement; 43 private String updateMessageStatement; 44 private String removeMessageStatment; 45 private String findMessageSequenceIdStatement; 46 private String findMessageStatement; 47 private String findAllMessagesStatement; 48 private String findLastSequenceIdInMsgsStatement; 49 private String findLastSequenceIdInAcksStatement; 50 private String createDurableSubStatement; 51 private String findDurableSubStatement; 52 private String findAllDurableSubsStatement; 53 private String updateLastAckOfDurableSubStatement; 54 private String deleteSubscriptionStatement; 55 private String findAllDurableSubMessagesStatement; 56 private String findDurableSubMessagesStatement; 57 private String findAllDestinationsStatement; 58 private String removeAllMessagesStatement; 59 private String removeAllSubscriptionsStatement; 60 private String deleteOldMessagesStatement; 61 private String [] createSchemaStatements; 62 private String [] dropSchemaStatements; 63 private String lockCreateStatement; 64 private String lockUpdateStatement; 65 private String nextDurableSubscriberMessageStatement; 66 private String durableSubscriberMessageCountStatement; 67 private String lastAckedDurableSubscriberMessageStatement; 68 private String destinationMessageCountStatement; 69 private String findNextMessagesStatement; 70 private boolean useLockCreateWhereClause; 71 72 public String [] getCreateSchemaStatements() { 73 if (createSchemaStatements == null) { 74 createSchemaStatements = new String [] { 75 "CREATE TABLE " + getFullMessageTableName() + "(" + "ID " + sequenceDataType + " NOT NULL" 76 + ", CONTAINER " + containerNameDataType + ", MSGID_PROD " + msgIdDataType + ", MSGID_SEQ " 77 + sequenceDataType + ", EXPIRATION " + longDataType + ", MSG " 78 + (useExternalMessageReferences ? stringIdDataType : binaryDataType) 79 + ", PRIMARY KEY ( ID ) )", 80 "CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " + getFullMessageTableName() 81 + " (MSGID_PROD,MSGID_SEQ)", 82 "CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " + getFullMessageTableName() 83 + " (CONTAINER)", 84 "CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName() 85 + " (EXPIRATION)", 86 "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL" 87 + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType 88 + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType 89 + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 90 "CREATE TABLE " + getFullLockTableName() + "( ID " + longDataType + " NOT NULL, TIME " + longDataType 91 + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )", 92 "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", 93 }; 94 } 95 return createSchemaStatements; 96 } 97 98 public String [] getDropSchemaStatements() { 99 if (dropSchemaStatements == null) { 100 dropSchemaStatements = new String [] { "DROP TABLE " + getFullAckTableName() + "", 101 "DROP TABLE " + getFullMessageTableName() + "", }; 102 } 103 return dropSchemaStatements; 104 } 105 106 public String getAddMessageStatement() { 107 if (addMessageStatement == null) { 108 addMessageStatement = "INSERT INTO " + getFullMessageTableName() 109 + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)"; 110 } 111 return addMessageStatement; 112 } 113 114 public String getUpdateMessageStatement() { 115 if (updateMessageStatement == null) { 116 updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE ID=?"; 117 } 118 return updateMessageStatement; 119 } 120 121 public String getRemoveMessageStatment() { 122 if (removeMessageStatment == null) { 123 removeMessageStatment = "DELETE FROM " + getFullMessageTableName() + " WHERE ID=?"; 124 } 125 return removeMessageStatment; 126 } 127 128 public String getFindMessageSequenceIdStatement() { 129 if (findMessageSequenceIdStatement == null) { 130 findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName() 131 + " WHERE MSGID_PROD=? AND MSGID_SEQ=?"; 132 } 133 return findMessageSequenceIdStatement; 134 } 135 136 public String getFindMessageStatement() { 137 if (findMessageStatement == null) { 138 findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?"; 139 } 140 return findMessageStatement; 141 } 142 143 public String getFindAllMessagesStatement() { 144 if (findAllMessagesStatement == null) { 145 findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() 146 + " WHERE CONTAINER=? ORDER BY ID"; 147 } 148 return findAllMessagesStatement; 149 } 150 151 public String getFindLastSequenceIdInMsgsStatement() { 152 if (findLastSequenceIdInMsgsStatement == null) { 153 findLastSequenceIdInMsgsStatement = "SELECT MAX(ID) FROM " + getFullMessageTableName(); 154 } 155 return findLastSequenceIdInMsgsStatement; 156 } 157 158 public String getFindLastSequenceIdInAcksStatement() { 159 if (findLastSequenceIdInAcksStatement == null) { 160 findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName(); 161 } 162 return findLastSequenceIdInAcksStatement; 163 } 164 165 public String getCreateDurableSubStatement() { 166 if (createDurableSubStatement == null) { 167 createDurableSubStatement = "INSERT INTO " + getFullAckTableName() 168 + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID) " + "VALUES (?, ?, ?, ?, ?)"; 169 } 170 return createDurableSubStatement; 171 } 172 173 public String getFindDurableSubStatement() { 174 if (findDurableSubStatement == null) { 175 findDurableSubStatement = "SELECT SELECTOR, SUB_NAME " + "FROM " + getFullAckTableName() 176 + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; 177 } 178 return findDurableSubStatement; 179 } 180 181 public String getFindAllDurableSubsStatement() { 182 if (findAllDurableSubsStatement == null) { 183 findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID" + " FROM " + getFullAckTableName() 184 + " WHERE CONTAINER=?"; 185 } 186 return findAllDurableSubsStatement; 187 } 188 189 public String getUpdateLastAckOfDurableSubStatement() { 190 if (updateLastAckOfDurableSubStatement == null) { 191 updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?" 192 + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; 193 } 194 return updateLastAckOfDurableSubStatement; 195 } 196 197 public String getDeleteSubscriptionStatement() { 198 if (deleteSubscriptionStatement == null) { 199 deleteSubscriptionStatement = "DELETE FROM " + getFullAckTableName() 200 + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; 201 } 202 return deleteSubscriptionStatement; 203 } 204 205 public String getFindAllDurableSubMessagesStatement() { 206 if (findAllDurableSubMessagesStatement == null) { 207 findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " 208 + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 209 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID"; 210 } 211 return findAllDurableSubMessagesStatement; 212 } 213 214 public String getFindDurableSubMessagesStatement(){ 215 if(findDurableSubMessagesStatement==null){ 216 findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " 217 + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 218 + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID"; 219 } 220 return findDurableSubMessagesStatement; 221 } 222 223 public String findAllDurableSubMessagesStatement() { 224 if (findAllDurableSubMessagesStatement == null) { 225 findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " 226 + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 227 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID"; 228 } 229 return findAllDurableSubMessagesStatement; 230 } 231 232 public String getNextDurableSubscriberMessageStatement(){ 233 if (nextDurableSubscriberMessageStatement == null){ 234 nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " 235 + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 236 + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID "; 237 } 238 return nextDurableSubscriberMessageStatement; 239 } 240 241 244 245 246 public String getDurableSubscriberMessageCountStatement(){ 247 if (durableSubscriberMessageCountStatement==null){ 248 durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, " 249 + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 250 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"; 251 } 252 return durableSubscriberMessageCountStatement; 253 } 254 255 public String getFindAllDestinationsStatement() { 256 if (findAllDestinationsStatement == null) { 257 findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName(); 258 } 259 return findAllDestinationsStatement; 260 } 261 262 public String getRemoveAllMessagesStatement() { 263 if (removeAllMessagesStatement == null) { 264 removeAllMessagesStatement = "DELETE FROM " + getFullMessageTableName() + " WHERE CONTAINER=?"; 265 } 266 return removeAllMessagesStatement; 267 } 268 269 public String getRemoveAllSubscriptionsStatement() { 270 if (removeAllSubscriptionsStatement == null) { 271 removeAllSubscriptionsStatement = "DELETE FROM " + getFullAckTableName() + " WHERE CONTAINER=?"; 272 } 273 return removeAllSubscriptionsStatement; 274 } 275 276 public String getDeleteOldMessagesStatement() { 277 if (deleteOldMessagesStatement == null) { 278 deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName() 279 + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?) OR ID <= " + "( SELECT min(" + getFullAckTableName() 280 + ".LAST_ACKED_ID) " + "FROM " + getFullAckTableName() + " WHERE " + getFullAckTableName() 281 + ".CONTAINER=" + getFullMessageTableName() + ".CONTAINER)"; 282 } 283 return deleteOldMessagesStatement; 284 } 285 286 public String getLockCreateStatement() { 287 if (lockCreateStatement == null) { 288 lockCreateStatement = "SELECT * FROM " + getFullLockTableName(); 289 if (useLockCreateWhereClause) { 290 lockCreateStatement += " WHERE ID = 1"; 291 } 292 lockCreateStatement += " FOR UPDATE"; 293 } 294 return lockCreateStatement; 295 } 296 297 public String getLockUpdateStatement() { 298 if (lockUpdateStatement == null) { 299 lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET time = ? WHERE ID = 1"; 300 } 301 return lockUpdateStatement; 302 } 303 304 307 public String getDestinationMessageCountStatement(){ 308 if (destinationMessageCountStatement==null) { 309 destinationMessageCountStatement= "SELECT COUNT(*) FROM " + getFullMessageTableName() 310 + " WHERE CONTAINER=?"; 311 } 312 return destinationMessageCountStatement; 313 } 314 315 318 public String getFindNextMessagesStatement(){ 319 if(findNextMessagesStatement == null) { 320 findNextMessagesStatement="SELECT ID, MSG FROM " + getFullMessageTableName() 321 + " WHERE CONTAINER=? AND ID > ? ORDER BY ID"; 322 } 323 return findNextMessagesStatement; 324 } 325 326 329 public String getLastAckedDurableSubscriberMessageStatement(){ 330 if(lastAckedDurableSubscriberMessageStatement==null) { 331 lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName() 332 + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; 333 } 334 return lastAckedDurableSubscriberMessageStatement; 335 } 336 337 338 339 public String getFullMessageTableName() { 340 return getTablePrefix() + getMessageTableName(); 341 } 342 343 public String getFullAckTableName() { 344 return getTablePrefix() + getDurableSubAcksTableName(); 345 } 346 347 public String getFullLockTableName() { 348 return getTablePrefix() + getLockTableName(); 349 } 350 351 352 355 public String getContainerNameDataType() { 356 return containerNameDataType; 357 } 358 359 363 public void setContainerNameDataType(String containerNameDataType) { 364 this.containerNameDataType = containerNameDataType; 365 } 366 367 370 public String getBinaryDataType() { 371 return binaryDataType; 372 } 373 374 378 public void setBinaryDataType(String messageDataType) { 379 this.binaryDataType = messageDataType; 380 } 381 382 385 public String getMessageTableName() { 386 return messageTableName; 387 } 388 389 393 public void setMessageTableName(String messageTableName) { 394 this.messageTableName = messageTableName; 395 } 396 397 400 public String getMsgIdDataType() { 401 return msgIdDataType; 402 } 403 404 408 public void setMsgIdDataType(String msgIdDataType) { 409 this.msgIdDataType = msgIdDataType; 410 } 411 412 415 public String getSequenceDataType() { 416 return sequenceDataType; 417 } 418 419 423 public void setSequenceDataType(String sequenceDataType) { 424 this.sequenceDataType = sequenceDataType; 425 } 426 427 430 public String getTablePrefix() { 431 return tablePrefix; 432 } 433 434 438 public void setTablePrefix(String tablePrefix) { 439 this.tablePrefix = tablePrefix; 440 } 441 442 445 public String getDurableSubAcksTableName() { 446 return durableSubAcksTableName; 447 } 448 449 453 public void setDurableSubAcksTableName(String durableSubAcksTableName) { 454 this.durableSubAcksTableName = durableSubAcksTableName; 455 } 456 457 public String getLockTableName() { 458 return lockTableName; 459 } 460 461 public void setLockTableName(String lockTableName) { 462 this.lockTableName = lockTableName; 463 } 464 465 public String getLongDataType() { 466 return longDataType; 467 } 468 469 public void setLongDataType(String longDataType) { 470 this.longDataType = longDataType; 471 } 472 473 public String getStringIdDataType() { 474 return stringIdDataType; 475 } 476 477 public void setStringIdDataType(String stringIdDataType) { 478 this.stringIdDataType = stringIdDataType; 479 } 480 481 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 482 this.useExternalMessageReferences = useExternalMessageReferences; 483 } 484 485 public boolean isUseExternalMessageReferences() { 486 return useExternalMessageReferences; 487 } 488 489 public void setAddMessageStatement(String addMessageStatment) { 490 this.addMessageStatement = addMessageStatment; 491 } 492 493 public void setCreateDurableSubStatement(String createDurableSubStatment) { 494 this.createDurableSubStatement = createDurableSubStatment; 495 } 496 497 public void setCreateSchemaStatements(String [] createSchemaStatments) { 498 this.createSchemaStatements = createSchemaStatments; 499 } 500 501 public void setDeleteOldMessagesStatement(String deleteOldMessagesStatment) { 502 this.deleteOldMessagesStatement = deleteOldMessagesStatment; 503 } 504 505 public void setDeleteSubscriptionStatement(String deleteSubscriptionStatment) { 506 this.deleteSubscriptionStatement = deleteSubscriptionStatment; 507 } 508 509 public void setDropSchemaStatements(String [] dropSchemaStatments) { 510 this.dropSchemaStatements = dropSchemaStatments; 511 } 512 513 public void setFindAllDestinationsStatement(String findAllDestinationsStatment) { 514 this.findAllDestinationsStatement = findAllDestinationsStatment; 515 } 516 517 public void setFindAllDurableSubMessagesStatement(String findAllDurableSubMessagesStatment) { 518 this.findAllDurableSubMessagesStatement = findAllDurableSubMessagesStatment; 519 } 520 521 public void setFindAllDurableSubsStatement(String findAllDurableSubsStatment) { 522 this.findAllDurableSubsStatement = findAllDurableSubsStatment; 523 } 524 525 public void setFindAllMessagesStatement(String findAllMessagesStatment) { 526 this.findAllMessagesStatement = findAllMessagesStatment; 527 } 528 529 public void setFindDurableSubStatement(String findDurableSubStatment) { 530 this.findDurableSubStatement = findDurableSubStatment; 531 } 532 533 public void setFindLastSequenceIdInAcksStatement(String findLastSequenceIdInAcks) { 534 this.findLastSequenceIdInAcksStatement = findLastSequenceIdInAcks; 535 } 536 537 public void setFindLastSequenceIdInMsgsStatement(String findLastSequenceIdInMsgs) { 538 this.findLastSequenceIdInMsgsStatement = findLastSequenceIdInMsgs; 539 } 540 541 public void setFindMessageSequenceIdStatement(String findMessageSequenceIdStatment) { 542 this.findMessageSequenceIdStatement = findMessageSequenceIdStatment; 543 } 544 545 public void setFindMessageStatement(String findMessageStatment) { 546 this.findMessageStatement = findMessageStatment; 547 } 548 549 public void setRemoveAllMessagesStatement(String removeAllMessagesStatment) { 550 this.removeAllMessagesStatement = removeAllMessagesStatment; 551 } 552 553 public void setRemoveAllSubscriptionsStatement(String removeAllSubscriptionsStatment) { 554 this.removeAllSubscriptionsStatement = removeAllSubscriptionsStatment; 555 } 556 557 public void setRemoveMessageStatment(String removeMessageStatment) { 558 this.removeMessageStatment = removeMessageStatment; 559 } 560 561 public void setUpdateLastAckOfDurableSubStatement(String updateLastAckOfDurableSub) { 562 this.updateLastAckOfDurableSubStatement = updateLastAckOfDurableSub; 563 } 564 565 public void setUpdateMessageStatement(String updateMessageStatment) { 566 this.updateMessageStatement = updateMessageStatment; 567 } 568 569 public boolean isUseLockCreateWhereClause() { 570 return useLockCreateWhereClause; 571 } 572 573 public void setUseLockCreateWhereClause(boolean useLockCreateWhereClause) { 574 this.useLockCreateWhereClause = useLockCreateWhereClause; 575 } 576 577 public void setLockCreateStatement(String lockCreateStatement) { 578 this.lockCreateStatement = lockCreateStatement; 579 } 580 581 public void setLockUpdateStatement(String lockUpdateStatement) { 582 this.lockUpdateStatement = lockUpdateStatement; 583 } 584 585 588 public void setFindDurableSubMessagesStatement(String findDurableSubMessagesStatement){ 589 this.findDurableSubMessagesStatement=findDurableSubMessagesStatement; 590 } 591 592 595 public void setNextDurableSubscriberMessageStatement(String nextDurableSubscriberMessageStatement){ 596 this.nextDurableSubscriberMessageStatement=nextDurableSubscriberMessageStatement; 597 } 598 599 600 603 public void setDurableSubscriberMessageCountStatement(String durableSubscriberMessageCountStatement){ 604 this.durableSubscriberMessageCountStatement=durableSubscriberMessageCountStatement; 605 } 606 607 610 public void setFindNextMessagesStatement(String findNextMessagesStatement){ 611 this.findNextMessagesStatement=findNextMessagesStatement; 612 } 613 614 617 public void setDestinationMessageCountStatement(String destinationMessageCountStatement){ 618 this.destinationMessageCountStatement=destinationMessageCountStatement; 619 } 620 621 622 623 624 627 public void setLastAckedDurableSubscriberMessageStatement(String lastAckedDurableSubscriberMessageStatement){ 628 this.lastAckedDurableSubscriberMessageStatement=lastAckedDurableSubscriberMessageStatement; 629 } 630 631 632 633 634 } 635 | Popular Tags |