1 43 package org.exolab.jms.persistence; 44 45 import java.sql.Connection ; 46 import java.sql.PreparedStatement ; 47 import java.sql.ResultSet ; 48 import java.sql.SQLException ; 49 import java.util.Vector ; 50 51 import org.apache.commons.logging.Log; 52 import org.apache.commons.logging.LogFactory; 53 54 import org.exolab.jms.client.JmsDestination; 55 import org.exolab.jms.client.JmsTopic; 56 import org.exolab.jms.messagemgr.PersistentMessageHandle; 57 import org.exolab.jms.messagemgr.MessageHandle; 58 59 60 67 class MessageHandles { 68 69 72 private static final String INSERT_MSG_HANDLE_STMT = 73 "insert into message_handles (messageid, destinationid, consumerid, " 74 + "priority, acceptedtime, sequencenumber, expirytime, delivered) " 75 + "values (?,?,?,?,?,?,?,?)"; 76 77 80 private static final String DELETE_MSG_HANDLE_STMT1 = 81 "delete from message_handles where messageId=? and consumerId=?"; 82 private static final String DELETE_MSG_HANDLE_STMT2 = 83 "delete from message_handles where messageId=? and destinationId=? " + 84 "and consumerId=?"; 85 86 89 private static final String DELETE_MSG_HANDLES_STMT = 90 "delete from message_handles where messageId=?"; 91 92 95 private static final String UPDATE_MSG_HANDLE_STMT = 96 "update message_handles set delivered=? where messageId=? and " + 97 "destinationId=? and consumerId=?"; 98 99 102 private static final String DELETE_MSG_HANDLES_FOR_DEST = 103 "delete from message_handles where destinationId=?"; 104 105 108 private static final String GET_MSG_HANDLES_FOR_DEST = 109 "select messageid, destinationid, consumerid, priority, acceptedtime, " 110 + "sequencenumber, expirytime, delivered from message_handles " 111 + "where consumerId=? order by acceptedTime asc"; 112 113 116 private static final String GET_MESSAGE_HANDLES_IN_RANGE = 117 "select distinct messageId from message_handles where " + 118 " acceptedTime >= ? and acceptedTime <=?"; 119 120 123 private static final String GET_MESSAGE_HANDLE_WITH_ID = 124 "select distinct messageId from message_handles where messageId=?"; 125 126 129 private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER = 130 "select count(messageId) from message_handles where destinationId=? " + 131 "and consumerId=?"; 132 133 136 private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER = 137 "select count(messageId) from message_handles where consumerId=?"; 138 139 142 private static final String DELETE_EXPIRED_MESSAGES = 143 "delete from message_handles where consumerId=? and expiryTime != 0 " + 144 "and expiryTime<?"; 145 146 149 private static MessageHandles _instance; 150 151 154 private static final Object _block = new Object (); 155 156 159 private static final Log _log = LogFactory.getLog(MessageHandles.class); 160 161 162 170 public static MessageHandles instance() { 171 return _instance; 172 } 173 174 177 protected MessageHandles() { 178 } 179 180 185 public static MessageHandles initialise() { 186 if (_instance == null) { 187 synchronized (_block) { 188 if (_instance == null) { 189 _instance = new MessageHandles(); 190 } 191 } 192 } 193 194 return _instance; 195 } 196 197 204 public void addMessageHandle(Connection connection, 205 MessageHandle handle) 206 throws PersistenceException { 207 208 if (_log.isDebugEnabled()) { 209 _log.debug("addMessageHandle(handle=[consumer=" 210 + handle.getConsumerPersistentId() 211 + ", destination=" + handle.getDestination() 212 + ", id=" + handle.getMessageId() + "])"); 213 } 214 215 PreparedStatement insert = null; 216 try { 217 long destinationId = Destinations.instance().getId( 219 handle.getDestination().getName()); 220 if (destinationId == 0) { 221 throw new PersistenceException( 222 "Cannot add message handle id=" + handle.getMessageId() + 223 " for destination=" + handle.getDestination().getName() + 224 " and consumer=" + handle.getConsumerPersistentId() + 225 " since the destination cannot be mapped to an id"); 226 } 227 228 long consumerId = Consumers.instance().getConsumerId( 230 handle.getConsumerPersistentId()); 231 if (consumerId == 0) { 232 throw new PersistenceException( 233 "Cannot add message handle id=" + handle.getMessageId() + 234 " for destination=" + handle.getDestination().getName() + 235 " and consumer=" + handle.getConsumerPersistentId() + 236 " since the consumer cannot be mapped to an id"); 237 } 238 239 insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT); 240 insert.setString(1, handle.getMessageId()); 241 insert.setLong(2, destinationId); 242 insert.setLong(3, consumerId); 243 insert.setInt(4, handle.getPriority()); 244 insert.setLong(5, handle.getAcceptedTime()); 245 insert.setLong(6, handle.getSequenceNumber()); 246 insert.setLong(7, handle.getExpiryTime()); 247 insert.setInt(8, (handle.getDelivered()) ? 1 : 0); 248 249 if (insert.executeUpdate() != 1) { 251 _log.error( 252 "Failed to execute addMessageHandle for handle=" 253 + handle.getMessageId() + ", destination Id=" 254 + destinationId); 255 } 256 } catch (SQLException exception) { 257 throw new PersistenceException("Failed to add message handle=" + 258 handle, exception); 259 } finally { 260 SQLHelper.close(insert); 261 } 262 } 263 264 274 public void removeMessageHandle(Connection connection, 275 MessageHandle handle) 276 throws PersistenceException { 277 278 if (_log.isDebugEnabled()) { 279 _log.debug("removeMessageHandle(handle=[consumer=" 280 + handle.getConsumerPersistentId() 281 + ", destination=" + handle.getDestination() 282 + ", id=" + handle.getMessageId() + "])"); 283 } 284 285 PreparedStatement delete = null; 286 PreparedStatement select = null; 287 ResultSet rs = null; 288 289 try { 290 long consumerId = Consumers.instance().getConsumerId( 293 handle.getConsumerPersistentId()); 294 if (consumerId != 0) { 295 String id = handle.getMessageId(); 297 298 long destinationId = Destinations.instance().getId( 302 handle.getDestination().getName()); 303 304 if (destinationId == 0) { 305 delete = connection.prepareStatement( 306 DELETE_MSG_HANDLE_STMT1); 307 delete.setString(1, id); 308 delete.setLong(2, consumerId); 309 310 } else { 311 delete = connection.prepareStatement( 312 DELETE_MSG_HANDLE_STMT2); 313 delete.setString(1, id); 314 delete.setLong(2, destinationId); 315 delete.setLong(3, consumerId); 316 } 317 318 if (delete.executeUpdate() != 1 && !handle.hasExpired()) { 320 _log.error("Failed to execute removeMessageHandle for " 323 + "handle=" + id + " destination id=" 324 + destinationId + " consumer id=" + consumerId); 325 } 326 } 327 } catch (SQLException exception) { 328 throw new PersistenceException("Failed to remove message handle=" + 329 handle, exception); 330 } finally { 331 SQLHelper.close(rs); 332 SQLHelper.close(delete); 333 SQLHelper.close(select); 334 } 335 } 336 337 344 public void updateMessageHandle(Connection connection, 345 MessageHandle handle) 346 throws PersistenceException { 347 PreparedStatement update = null; 348 349 if (_log.isDebugEnabled()) { 350 _log.debug("updateMessageHandle(handle=[consumer=" 351 + handle.getConsumerPersistentId() 352 + ", destination=" + handle.getDestination() 353 + ", id=" + handle.getMessageId() + "])"); 354 } 355 356 try { 357 String id = handle.getMessageId(); 359 360 long destinationId = Destinations.instance().getId( 362 handle.getDestination().getName()); 363 if (destinationId == 0) { 364 throw new PersistenceException( 365 "Cannot update message handle id=" + 366 handle.getMessageId() + " for destination=" + 367 handle.getDestination().getName() + " and consumer=" + 368 handle.getConsumerPersistentId() + 369 " since the destination cannot be mapped to an id"); 370 } 371 372 long consumerId = Consumers.instance().getConsumerId( 374 handle.getConsumerPersistentId()); 375 if (consumerId == 0) { 376 throw new PersistenceException( 377 "Cannot update message handle id=" + 378 handle.getMessageId() + " for destination=" + 379 handle.getDestination().getName() + " and consumer=" + 380 handle.getConsumerPersistentId() + 381 " since the consumer cannot be mapped to an id"); 382 } 383 384 update = connection.prepareStatement(UPDATE_MSG_HANDLE_STMT); 385 update.setInt(1, handle.getDelivered() ? 1 : 0); 386 update.setString(2, id); 387 update.setLong(3, destinationId); 388 update.setLong(4, consumerId); 389 390 if (update.executeUpdate() != 1 && !handle.hasExpired()) { 392 _log.error( 394 "Failed to execute updateMessageHandle for handle=" + 395 id + ", destination id=" + destinationId + 396 ", consumer id=" + consumerId); 397 } 398 } catch (SQLException exception) { 399 throw new PersistenceException("Failed to update message handle=" + 400 handle, exception); 401 } finally { 402 SQLHelper.close(update); 403 } 404 } 405 406 413 public void removeMessageHandles(Connection connection, String destination) 414 throws PersistenceException { 415 416 PreparedStatement delete = null; 417 418 try { 419 long destinationId = Destinations.instance().getId(destination); 421 if (destinationId == 0) { 422 throw new PersistenceException( 423 "Cannot remove message handles for destination=" + 424 destination + " since the destination cannot be " + 425 "mapped to an id"); 426 } 427 428 delete = connection.prepareStatement(DELETE_MSG_HANDLES_FOR_DEST); 429 delete.setLong(1, destinationId); 430 delete.executeUpdate(); 431 } catch (SQLException exception) { 432 throw new PersistenceException( 433 "Failed to remove message handles for destination=" + 434 destination, exception); 435 } finally { 436 SQLHelper.close(delete); 437 } 438 } 439 440 447 public void removeMessageHandles(Connection connection, long messageId) 448 throws PersistenceException { 449 450 PreparedStatement delete = null; 451 452 try { 453 delete = connection.prepareStatement(DELETE_MSG_HANDLES_STMT); 454 delete.setLong(1, messageId); 455 delete.executeUpdate(); 456 } catch (SQLException exception) { 457 throw new PersistenceException( 458 "Failed to remove message handles for message id=" + messageId, 459 exception); 460 } finally { 461 SQLHelper.close(delete); 462 } 463 } 464 465 475 public Vector getMessageHandles(Connection connection, String destination, 476 String name) 477 throws PersistenceException { 478 479 Vector result = new Vector (); 480 PreparedStatement select = null; 481 ResultSet set = null; 482 483 long destinationId = Destinations.instance().getId(destination); 486 long consumerId = Consumers.instance().getConsumerId(name); 487 if ((consumerId == 0) || 488 (destinationId == 0)) { 489 return result; 490 } 491 492 try { 495 select = connection.prepareStatement(GET_MSG_HANDLES_FOR_DEST); 496 select.setLong(1, consumerId); 497 498 set = select.executeQuery(); 501 while (set.next()) { 502 JmsDestination dest = Destinations.instance().get( 504 set.getLong(2)); 505 if (dest == null) { 506 throw new PersistenceException( 507 "Cannot create persistent handle, because " + 508 "destination mapping failed for " + set.getLong(2)); 509 } 510 511 String consumer = Consumers.instance().getConsumerName( 512 set.getLong(3)); 513 if (name == null) { 514 throw new PersistenceException( 515 "Cannot create persistent handle because " + 516 "consumer mapping failed for " + set.getLong(3)); 517 } 518 519 String messageId = set.getString(1); 520 int priority = set.getInt(4); 521 long acceptedTime = set.getLong(5); 522 long sequenceNumber = set.getLong(6); 523 long expiryTime = set.getLong(7); 524 boolean delivered = (set.getInt(8) == 0) ? false : true; 525 MessageHandle handle = new PersistentMessageHandle( 526 messageId, priority, acceptedTime, sequenceNumber, 527 expiryTime, dest, consumer); 528 handle.setDelivered(delivered); 529 result.add(handle); 530 } 531 } catch (SQLException exception) { 532 throw new PersistenceException( 533 "Failed to get message handles for destination=" + 534 destination + ", consumer=" + name, exception); 535 } finally { 536 SQLHelper.close(set); 537 SQLHelper.close(select); 538 } 539 540 return result; 541 } 542 543 553 public Vector getMessageIds(Connection connection, long min, long max) 554 throws PersistenceException { 555 556 Vector result = new Vector (); 557 PreparedStatement select = null; 558 ResultSet set = null; 559 560 try { 561 select = connection.prepareStatement(GET_MESSAGE_HANDLES_IN_RANGE); 562 select.setLong(1, min); 563 select.setLong(2, max); 564 565 set = select.executeQuery(); 568 while (set.next()) { 569 result.add(set.getString(1)); 570 } 571 572 573 } catch (SQLException exception) { 574 throw new PersistenceException("Failed to retrieve message ids", 575 exception); 576 } finally { 577 SQLHelper.close(set); 578 SQLHelper.close(select); 579 } 580 581 return result; 582 } 583 584 593 public boolean messageExists(Connection connection, long messageId) 594 throws PersistenceException { 595 596 boolean result = false; 597 PreparedStatement select = null; 598 ResultSet set = null; 599 600 try { 601 select = connection.prepareStatement(GET_MESSAGE_HANDLE_WITH_ID); 602 select.setLong(1, messageId); 603 set = select.executeQuery(); 604 605 if (set.next()) { 606 result = true; 607 } 608 609 } catch (SQLException exception) { 610 throw new PersistenceException( 611 "Failed to determine if message exists, id=" + messageId, 612 exception); 613 } finally { 614 SQLHelper.close(set); 615 SQLHelper.close(select); 616 } 617 return result; 618 } 619 620 630 public int getMessageCount(Connection connection, String destination, 631 String name) 632 throws PersistenceException { 633 634 int result = -1; 635 boolean destinationIsWildCard = false; 636 637 long destinationId = Destinations.instance().getId(destination); 639 if (destinationId == 0) { 640 if (JmsTopic.isWildCard(destination)) { 641 destinationIsWildCard = true; 642 } else { 643 throw new PersistenceException( 644 "Cannot get message handle count for destination=" + 645 destination + " and consumer=" + name + 646 " since the destination cannot be mapped to an id"); 647 } 648 } 649 650 long consumerId = Consumers.instance().getConsumerId(name); 652 if (consumerId == 0) { 653 throw new PersistenceException( 654 "Cannot get message handle count for destination=" + 655 destination + " and consumer=" + name + 656 " since the consumer cannot be mapped to an id"); 657 } 658 659 PreparedStatement select = null; 660 ResultSet set = null; 661 662 try { 663 if (destinationIsWildCard) { 664 select = connection.prepareStatement( 665 GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER); 666 select.setLong(1, destinationId); 667 select.setLong(2, consumerId); 668 } else { 669 select = connection.prepareStatement( 670 GET_MSG_HANDLE_COUNT_FOR_CONSUMER); 671 select.setLong(1, consumerId); 672 } 673 674 set = select.executeQuery(); 675 if (set.next()) { 676 result = set.getInt(1); 677 } 678 } catch (SQLException exception) { 679 throw new PersistenceException( 680 "Failed to count messages for destination=" + destination + 681 ", consumer=" + name, exception); 682 } finally { 683 SQLHelper.close(set); 684 SQLHelper.close(select); 685 } 686 687 return result; 688 } 689 690 697 public void removeExpiredMessageHandles(Connection connection, 698 String consumer) 699 throws PersistenceException { 700 701 PreparedStatement delete = null; 702 703 long consumerId = Consumers.instance().getConsumerId(consumer); 705 if (consumerId != 0) { 706 try { 707 delete = connection.prepareStatement(DELETE_EXPIRED_MESSAGES); 708 delete.setLong(1, consumerId); 709 delete.setLong(2, System.currentTimeMillis()); 710 delete.executeUpdate(); 711 } catch (SQLException exception) { 712 throw new PersistenceException( 713 "Failed to remove expired message handles", 714 exception); 715 } finally { 716 SQLHelper.close(delete); 717 } 718 } 719 } 720 721 724 public void close() { 725 _instance = null; 726 } 727 728 } 729 | Popular Tags |