1 43 package org.exolab.jms.persistence; 44 45 import java.io.ByteArrayInputStream ; 46 import java.io.ByteArrayOutputStream ; 47 import java.io.ObjectInputStream ; 48 import java.io.ObjectOutputStream ; 49 import java.sql.Connection ; 50 import java.sql.PreparedStatement ; 51 import java.sql.ResultSet ; 52 import java.sql.SQLException ; 53 import java.util.HashMap ; 54 import java.util.Vector ; 55 56 import javax.jms.JMSException ; 57 58 import org.apache.commons.logging.Log; 59 import org.apache.commons.logging.LogFactory; 60 61 import org.exolab.jms.client.JmsDestination; 62 import org.exolab.jms.client.JmsTopic; 63 import org.exolab.jms.message.MessageImpl; 64 import org.exolab.jms.messagemgr.PersistentMessageHandle; 65 66 67 73 public class Messages { 74 75 78 private static Messages _instance; 79 80 84 private static final Object _block = new Object (); 85 86 89 private static final Log _log = LogFactory.getLog(Messages.class); 90 91 92 100 public static Messages instance() { 101 return _instance; 102 } 103 104 109 public static Messages initialise() { 110 if (_instance == null) { 111 synchronized (_block) { 112 if (_instance == null) { 113 _instance = new Messages(); 114 } 115 } 116 } 117 return _instance; 118 } 119 120 128 public void add(Connection connection, MessageImpl message) 129 throws PersistenceException { 130 131 PreparedStatement insert = null; 132 133 String messageId = message.getMessageId().getId(); 135 136 String name; 139 try { 140 name = ((JmsDestination) message.getJMSDestination()).getName(); 141 } catch (JMSException exception) { 142 throw new PersistenceException( 143 "Failed to get destination for message=" + 144 message.getMessageId(), exception); 145 } 146 147 long destinationId = Destinations.instance().getId(name); 148 if (destinationId == 0) { 149 throw new PersistenceException( 150 "Cannot add message=" + message.getMessageId() + 151 ", destination=" + name + " (" + destinationId + 152 "): destination does not exist"); 153 } 154 155 try { 156 insert = connection.prepareStatement( 158 "insert into messages (messageid, destinationid, priority, " 159 + "createtime, expirytime, processed, messageblob) values " 160 + "(?,?,?,?,?,?,?)"); 161 insert.setString(1, messageId); 162 insert.setLong(2, destinationId); 163 insert.setInt(3, message.getJMSPriority()); 164 insert.setLong(4, message.getAcceptedTime()); 165 insert.setLong(5, message.getJMSExpiration()); 166 insert.setInt(6, (message.getProcessed()) ? 1 : 0); 167 168 byte[] bytes = serialize(message); 170 insert.setBinaryStream(7, new ByteArrayInputStream (bytes), 171 bytes.length); 172 174 if (insert.executeUpdate() != 1) { 176 throw new PersistenceException( 177 "Failed to add message=" + message.getMessageId() + 178 ", destination=" + name + " (" + destinationId + ")"); 179 } 180 } catch (PersistenceException exception) { 181 throw exception; 182 } catch (Exception exception) { 183 throw new PersistenceException( 184 "Failed to add message=" + message.getMessageId() + 185 ", destination=" + name + " (" + destinationId + ")", 186 exception); 187 } finally { 188 SQLHelper.close(insert); 189 } 190 } 191 192 200 public void update(Connection connection, MessageImpl message) 201 throws PersistenceException { 202 203 PreparedStatement update = null; 204 205 String messageId = message.getMessageId().getId(); 207 208 try { 209 update = connection.prepareStatement( 210 "update messages set processed=? where messageId=?"); 211 update.setInt(1, message.getProcessed() ? 1 : 0); 212 update.setString(2, messageId); 213 214 if (update.executeUpdate() != 1) { 216 _log.error("Cannot update message=" + messageId); 217 } 218 } catch (SQLException exception) { 219 throw new PersistenceException( 220 "Failed to update message, id=" + messageId, exception); 221 } finally { 222 SQLHelper.close(update); 223 } 224 } 225 226 233 public void remove(Connection connection, String messageId) 234 throws PersistenceException { 235 236 PreparedStatement delete = null; 237 try { 238 delete = connection.prepareStatement( 239 "delete from messages where messageId=?"); 240 delete.setString(1, messageId); 241 242 if (delete.executeUpdate() != 1) { 244 _log.error("Cannot remove message=" + messageId); 245 } 246 } catch (SQLException exception) { 247 throw new PersistenceException( 248 "Failed to remove message, id=" + messageId, exception); 249 } finally { 250 SQLHelper.close(delete); 251 } 252 } 253 254 262 public MessageImpl get(Connection connection, String messageId) 263 throws PersistenceException { 264 265 MessageImpl result = null; 266 PreparedStatement select = null; 267 ResultSet set = null; 268 try { 269 select = connection.prepareStatement( 270 "select messageBlob, processed from messages where messageId=?"); 271 272 select.setString(1, messageId); 273 set = select.executeQuery(); 274 if (set.next()) { 275 result = deserialize(set.getBytes(1)); 276 result.setProcessed((set.getInt(2) == 1 ? true : false)); 277 } 278 } catch (SQLException exception) { 279 throw new PersistenceException( 280 "Failed to retrieve message, id=" + messageId, exception); 281 } finally { 282 SQLHelper.close(set); 283 SQLHelper.close(select); 284 } 285 286 return result; 287 } 288 289 297 public int removeMessages(Connection connection, String destination) 298 throws PersistenceException { 299 300 int result = 0; 301 PreparedStatement delete = null; 302 303 long destinationId = Destinations.instance().getId(destination); 305 if (destinationId == 0) { 306 throw new PersistenceException("Cannot delete messages for " + 307 "destination=" + destination + 308 ": destination does not exist"); 309 } 310 311 try { 312 delete = connection.prepareStatement( 313 "delete from messages where destinationId = ?"); 314 delete.setLong(1, destinationId); 315 result = delete.executeUpdate(); 316 } catch (SQLException exception) { 317 throw new PersistenceException( 318 "Failed to remove messages for destination=" + destination, 319 exception); 320 } finally { 321 SQLHelper.close(delete); 322 } 323 324 return result; 325 } 326 327 339 public Vector getMessages(Connection connection, String destination, 340 int priority, long time) 341 throws PersistenceException { 342 343 PreparedStatement select = null; 344 ResultSet set = null; 345 Vector messages = new Vector (); 346 347 try { 348 JmsDestination dest = Destinations.instance().get(destination); 349 if (dest == null) { 350 throw new PersistenceException( 351 "Cannot getMessages for destination=" + destination 352 + ": destination does not exist"); 353 } 354 355 long destinationId = Destinations.instance().getId(destination); 356 if (destinationId == 0) { 357 throw new PersistenceException( 358 "Cannot getMessages for destination=" + destination 359 + ": destination does not exist"); 360 } 361 362 if ((dest instanceof JmsTopic) && 363 (((JmsTopic) dest).isWildCard())) { 364 select = connection.prepareStatement( 368 "select createtime,processed,messageblob from messages " 369 + "where priority=? and createTime>=? " 370 + "order by createTime asc"); 371 select.setInt(1, priority); 372 select.setLong(2, time); 373 } else { 374 select = connection.prepareStatement( 378 "select createtime,processed,messageblob from messages " 379 + "where destinationId=? and priority=? and createTime>=? " 380 + "order by createTime asc"); 381 select.setLong(1, destinationId); 382 select.setInt(2, priority); 383 select.setLong(3, time); 384 } 385 set = select.executeQuery(); 386 387 int count = 0; 389 long lastTimeStamp = time; 390 while (set.next()) { 391 MessageImpl m = deserialize(set.getBytes(3)); 392 m.setProcessed((set.getInt(2) == 1 ? true : false)); 393 messages.add(m); 394 if (++count > 200) { 395 if (set.getLong(1) > lastTimeStamp) { 399 break; 400 } 401 } else { 402 lastTimeStamp = set.getLong(1); 403 } 404 } 405 } catch (SQLException exception) { 406 throw new PersistenceException( 407 "Failed to retrieve messages", exception); 408 } finally { 409 SQLHelper.close(set); 410 SQLHelper.close(select); 411 } 412 413 return messages; 414 } 415 416 428 public HashMap getMessageIds(Connection connection, long time, int hint) 429 throws PersistenceException { 430 431 PreparedStatement select = null; 432 ResultSet set = null; 433 HashMap messages = new HashMap (); 434 435 try { 436 select = connection.prepareStatement( 437 "select messageId,createTime from messages where createTime>? " 438 + "order by createTime asc"); 439 select.setLong(1, time); 440 set = select.executeQuery(); 441 442 int count = 0; 444 long lastTimeStamp = time; 445 while (set.next()) { 446 messages.put(set.getString(1), new Long (set.getLong(2))); 447 if (++count > hint) { 448 if (set.getLong(2) > lastTimeStamp) { 449 break; 450 } 451 } else { 452 lastTimeStamp = set.getLong("createTime"); 453 } 454 455 } 456 } catch (SQLException exception) { 457 throw new PersistenceException( 458 "Failed to retrieve message identifiers", exception); 459 } finally { 460 SQLHelper.close(set); 461 SQLHelper.close(select); 462 } 463 464 return messages; 465 } 466 467 476 public Vector getUnprocessedMessages(Connection connection) 477 throws PersistenceException { 478 479 PreparedStatement select = null; 480 ResultSet set = null; 481 Vector messages = new Vector (); 482 483 try { 484 select = connection.prepareStatement( 485 "select messageblob from messages where processed=0"); 486 set = select.executeQuery(); 487 while (set.next()) { 489 MessageImpl m = deserialize(set.getBytes(1)); 490 m.setProcessed(false); 491 messages.add(m); 492 } 493 } catch (SQLException exception) { 494 throw new PersistenceException( 495 "Failed to retrieve unprocessed messages", exception); 496 } finally { 497 SQLHelper.close(set); 498 SQLHelper.close(select); 499 } 500 501 return messages; 502 } 503 504 512 public Vector getNonExpiredMessages(Connection connection, 513 JmsDestination destination) 514 throws PersistenceException { 515 516 Vector result = new Vector (); 517 PreparedStatement select = null; 518 ResultSet set = null; 519 520 try { 521 long destinationId = Destinations.instance().getId( 522 destination.getName()); 523 524 if (destinationId == 0) { 525 throw new PersistenceException( 526 "Cannot getMessages for destination=" + destination 527 + ": destination does not exist"); 528 } 529 530 select = connection.prepareStatement( 531 "select messageId,destinationId,priority,createTime," 532 + "sequenceNumber,expiryTime " 533 + "from messages " 534 + "where expiryTime>0 and destinationId=? " 535 + "order by expiryTime asc"); 536 select.setLong(1, destinationId); 537 set = select.executeQuery(); 538 539 while (set.next()) { 540 String messageId = set.getString(1); 541 int priority = set.getInt(3); 542 long acceptedTime = set.getLong(4); 543 long sequenceNumber = set.getLong(5); 544 long expiryTime = set.getLong(6); 545 PersistentMessageHandle handle = new PersistentMessageHandle( 546 messageId, priority, acceptedTime, sequenceNumber, 547 expiryTime, destination); 548 result.add(handle); 549 } 550 } catch (SQLException exception) { 551 throw new PersistenceException( 552 "Failed to retrieve non-expired messages", exception); 553 } finally { 554 SQLHelper.close(set); 555 SQLHelper.close(select); 556 } 557 558 return result; 559 } 560 561 567 public void removeExpiredMessages(Connection connection) 568 throws PersistenceException { 569 570 PreparedStatement delete = null; 571 try { 572 long time = System.currentTimeMillis(); 573 574 delete = connection.prepareStatement( 576 "delete from messages where expiryTime > 0 and expiryTime < ?"); 577 delete.setLong(1, time); 578 delete.executeUpdate(); 579 delete.close(); 580 581 delete = connection.prepareStatement( 583 "delete from message_handles where expiryTime > 0 and expiryTime < ?"); 584 delete.setLong(1, time); 585 delete.executeUpdate(); 586 } catch (SQLException exception) { 587 throw new PersistenceException( 588 "Failed to remove expired messages", exception); 589 } finally { 590 SQLHelper.close(delete); 591 } 592 } 593 594 598 public void close() { 599 _instance = null; 600 } 601 602 605 protected Messages() { 606 } 607 608 614 public byte[] serialize(MessageImpl message) 615 throws PersistenceException { 616 617 byte[] result = null; 618 ObjectOutputStream ostream = null; 619 try { 620 ByteArrayOutputStream bstream = new ByteArrayOutputStream (); 621 ostream = new ObjectOutputStream (bstream); 622 ostream.writeObject(message); 623 result = bstream.toByteArray(); 624 } catch (Exception exception) { 625 throw new PersistenceException("Failed to serialize message", 626 exception); 627 } finally { 628 SQLHelper.close(ostream); 629 } 630 631 return result; 632 } 633 634 640 public MessageImpl deserialize(byte[] blob) throws PersistenceException { 641 MessageImpl message = null; 642 643 if (blob != null) { 644 ObjectInputStream istream = null; 645 try { 646 ByteArrayInputStream bstream = new ByteArrayInputStream (blob); 647 istream = new ObjectInputStream (bstream); 648 message = (MessageImpl) istream.readObject(); 649 } catch (Exception exception) { 650 throw new PersistenceException( 651 "Failed to de-serialize message", exception); 652 } finally { 653 SQLHelper.close(istream); 654 } 655 } else { 656 throw new PersistenceException( 657 "Cannot de-serialize null message blob"); 658 } 659 660 return message; 661 } 662 663 } 664 | Popular Tags |