1 45 package org.exolab.jms.persistence; 46 47 48 import java.sql.Connection ; 49 import java.util.Enumeration ; 50 import java.util.HashMap ; 51 import java.util.Iterator ; 52 import java.util.LinkedList ; 53 import java.util.Vector ; 54 55 import org.apache.commons.logging.Log; 56 import org.apache.commons.logging.LogFactory; 57 58 import org.exolab.jms.authentication.User; 59 import org.exolab.jms.client.JmsDestination; 60 import org.exolab.jms.events.EventHandler; 61 import org.exolab.jms.message.MessageId; 62 import org.exolab.jms.message.MessageImpl; 63 import org.exolab.jms.messagemgr.MessageHandle; 64 65 66 82 public class BatchingRdbmsAdapter 83 extends PersistenceAdapter 84 implements EventHandler { 85 86 91 private int _maxStatementsToBatch = 500; 92 93 97 private String _logDirectory = "."; 98 99 103 private RDBMSAdapter _rdbms = null; 104 105 108 private LinkedList _batch = new LinkedList (); 109 110 113 private HashMap _handles = new HashMap (); 114 115 118 private HashMap _messages = new HashMap (); 119 120 123 private static final Log _log = 124 LogFactory.getLog(BatchingRdbmsAdapter.class); 125 126 127 137 BatchingRdbmsAdapter(String driver, String url, String userName, 138 String password, int batchSize) 139 throws PersistenceException { 140 _rdbms = new RDBMSAdapter(driver, url, userName, password); 142 _maxStatementsToBatch = batchSize; 143 } 144 145 149 public void close() { 150 if (_rdbms != null) { 151 try { 152 flush(); 153 } catch (PersistenceException exception) { 154 _log.error("Failed to flush statements", exception); 155 } 156 _rdbms.close(); 157 } 158 } 159 160 165 public void setMaxStatementsToBatch(int max) { 166 _maxStatementsToBatch = max; 167 } 168 169 174 public int getMaxStatementsToBatch() { 175 return _maxStatementsToBatch; 176 } 177 178 public long getLastId(Connection connection) 180 throws PersistenceException { 181 return _rdbms.getLastId(connection); 182 } 183 184 public void updateIds(Connection connection, long id) 186 throws PersistenceException { 187 _rdbms.updateIds(connection, id); 188 } 189 190 public void addMessage(Connection connection, MessageImpl message) 192 throws PersistenceException { 193 addToBatch(TransactionalObjectWrapper.ADD_MESSAGE, message); 194 } 195 196 public void updateMessage(Connection connection, MessageImpl message) 198 throws PersistenceException { 199 addToBatch(TransactionalObjectWrapper.UPDATE_MESSAGE, message); 200 } 201 202 public Vector getUnprocessedMessages(Connection connection) 204 throws PersistenceException { 205 flush(); 206 return _rdbms.getUnprocessedMessages(connection); 207 } 208 209 210 public void removeMessage(Connection connection, String id) 212 throws PersistenceException { 213 addToBatch(TransactionalObjectWrapper.DELETE_MESSAGE, id); 214 } 215 216 public MessageImpl getMessage(Connection connection, String id) 218 throws PersistenceException { 219 flush(); 220 return _rdbms.getMessage(connection, id); 221 } 222 223 public Vector getMessages(Connection connection, 225 MessageHandle handle) 226 throws PersistenceException { 227 flush(); 228 return _rdbms.getMessages(connection, handle); 229 } 230 231 public void addMessageHandle(Connection connection, 233 MessageHandle handle) 234 throws PersistenceException { 235 addToBatch(TransactionalObjectWrapper.ADD_HANDLE, handle); 236 } 237 238 public void updateMessageHandle(Connection connection, 240 MessageHandle handle) 241 throws PersistenceException { 242 addToBatch(TransactionalObjectWrapper.UPDATE_HANDLE, handle); 243 } 244 245 public void removeMessageHandle(Connection connection, 247 MessageHandle handle) 248 throws PersistenceException { 249 addToBatch(TransactionalObjectWrapper.DELETE_HANDLE, handle); 250 } 251 252 public Vector getMessageHandles(Connection connection, 254 JmsDestination destination, String name) 255 throws PersistenceException { 256 flush(); 257 return _rdbms.getMessageHandles(connection, destination, name); 258 } 259 260 public void addDurableConsumer(Connection connection, String topic, 262 String consumer) 263 throws PersistenceException { 264 flush(); 265 _rdbms.addDurableConsumer(connection, topic, consumer); 266 } 267 268 public void removeDurableConsumer(Connection connection, String consumer) 270 throws PersistenceException { 271 flush(); 272 _rdbms.removeDurableConsumer(connection, consumer); 273 } 274 275 public Enumeration getDurableConsumers(Connection connection, 277 String topic) 278 throws PersistenceException { 279 flush(); 280 return _rdbms.getDurableConsumers(connection, topic); 281 } 282 283 public HashMap getAllDurableConsumers(Connection connection) 285 throws PersistenceException { 286 flush(); 287 return _rdbms.getAllDurableConsumers(connection); 288 } 289 290 public boolean durableConsumerExists(Connection connection, String name) 292 throws PersistenceException { 293 flush(); 294 return _rdbms.durableConsumerExists(connection, name); 295 } 296 297 public void addDestination(Connection connection, String name, 299 boolean queue) 300 throws PersistenceException { 301 flush(); 302 _rdbms.addDestination(connection, name, queue); 303 } 304 305 public void removeDestination(Connection connection, String name) 307 throws PersistenceException { 308 flush(); 309 _rdbms.removeDestination(connection, name); 310 } 311 312 public Enumeration getAllDestinations(Connection connection) 314 throws PersistenceException { 315 flush(); 316 return _rdbms.getAllDestinations(connection); 317 } 318 319 public boolean checkDestination(Connection connection, String name) 321 throws PersistenceException { 322 flush(); 323 return _rdbms.checkDestination(connection, name); 324 } 325 326 public int getQueueMessageCount(Connection connection, String name) 328 throws PersistenceException { 329 flush(); 330 return _rdbms.getQueueMessageCount(connection, name); 331 } 332 333 public int getDurableConsumerMessageCount(Connection connection, 335 String destination, String name) 336 throws PersistenceException { 337 flush(); 338 return _rdbms.getDurableConsumerMessageCount(connection, destination, 339 name); 340 } 341 342 public void removeExpiredMessages(Connection connection) 344 throws PersistenceException { 345 flush(); 346 _rdbms.removeExpiredMessages(connection); 347 } 348 349 public void removeExpiredMessageHandles(Connection connection, 351 String consumer) 352 throws PersistenceException { 353 flush(); 354 _rdbms.removeExpiredMessageHandles(connection, consumer); 355 } 356 357 public Vector getNonExpiredMessages(Connection connection, 359 JmsDestination destination) 360 throws PersistenceException { 361 flush(); 362 return _rdbms.getNonExpiredMessages(connection, destination); 363 } 364 365 374 public Connection getConnection() 375 throws PersistenceException { 376 377 return _rdbms.getConnection(); 378 } 379 380 385 public synchronized int purgeMessages() { 386 try { 387 flush(); 388 } catch (PersistenceException exception) { 389 _log.error("Error in purgeMessages " + exception); 390 } 391 392 return _rdbms.purgeMessages(); 393 } 394 395 public void handleEvent(int event, Object callback, long time) { 397 _rdbms.handleEvent(event, callback, time); 398 } 399 400 410 private synchronized void flush() throws PersistenceException { 411 if (_batch.size() == 0) { 412 return; 413 } 414 415 Thread thread = new Thread (new Runnable () { 418 419 public void run() { 420 Connection connection = null; 421 try { 422 connection = _rdbms.getConnection(); 423 424 Iterator iter = _batch.iterator(); 425 while (iter.hasNext()) { 426 TransactionalObjectWrapper wrapper = 427 (TransactionalObjectWrapper) iter.next(); 428 switch (wrapper._action) { 429 case TransactionalObjectWrapper.ADD_MESSAGE: 430 _rdbms.addMessage(connection, 431 (MessageImpl) wrapper._object); 432 break; 433 434 case TransactionalObjectWrapper.UPDATE_MESSAGE: 435 _rdbms.updateMessage(connection, 436 (MessageImpl) wrapper._object); 437 break; 438 439 case TransactionalObjectWrapper.DELETE_MESSAGE: 440 _rdbms.removeMessage(connection, 441 (String ) wrapper._object); 442 break; 443 444 case TransactionalObjectWrapper.ADD_HANDLE: 445 _rdbms.addMessageHandle(connection, 446 (MessageHandle) wrapper._object); 447 break; 448 449 case TransactionalObjectWrapper.UPDATE_HANDLE: 450 _rdbms.updateMessageHandle(connection, 451 (MessageHandle) wrapper._object); 452 break; 453 454 case TransactionalObjectWrapper.DELETE_HANDLE: 455 _rdbms.removeMessageHandle(connection, 456 (MessageHandle) wrapper._object); 457 break; 458 } 459 } 460 connection.commit(); 461 462 _batch.clear(); 464 _messages.clear(); 465 _handles.clear(); 466 } catch (PersistenceException exception) { 467 SQLHelper.rollback(connection); 468 _log.error("Failure in flush()", exception); 469 } catch (Exception exception) { 470 _log.error("Failure in flush()", exception); 471 } finally { 472 if (connection != null) { 473 try { 474 connection.close(); 475 } catch (Exception nested) { 476 _log.error("Failure in flush()", nested); 477 } 478 } 479 } 480 } 481 }); 482 483 thread.start(); 485 486 try { 488 thread.join(); 489 } catch (InterruptedException exception) { 490 } 492 } 493 494 502 private synchronized void addToBatch(int action, Object object) 503 throws PersistenceException { 504 if (_batch.size() >= _maxStatementsToBatch) { 505 flush(); 506 } 507 508 switch (action) { 509 case TransactionalObjectWrapper.ADD_MESSAGE: 510 { 511 TransactionalObjectWrapper txobj = 512 new TransactionalObjectWrapper(action, object); 513 MessageImpl message = (MessageImpl) object; 514 MessageId id = message.getMessageId(); 515 if (_messages.containsKey(id)) { 516 throw new PersistenceException("Inconsistency in cache " + 517 id + " is present when it shouldn't be."); 518 } 519 _messages.put(id, txobj); 520 _batch.addLast(txobj); 521 break; 522 } 523 524 case TransactionalObjectWrapper.UPDATE_MESSAGE: 525 { 526 MessageImpl message = (MessageImpl) object; 527 MessageId id = message.getMessageId(); 528 TransactionalObjectWrapper txobj = 529 (TransactionalObjectWrapper) _messages.get(id); 530 TransactionalObjectWrapper newtxobj = 531 new TransactionalObjectWrapper(action, object); 532 533 if (txobj != null) { 534 _batch.remove(txobj); 538 if (txobj._action == TransactionalObjectWrapper.ADD_MESSAGE) { 539 newtxobj._action = TransactionalObjectWrapper.ADD_MESSAGE; 540 _batch.addLast(newtxobj); 541 } else if (txobj._action == TransactionalObjectWrapper.UPDATE_MESSAGE) { 542 _batch.addLast(newtxobj); 543 } else { 544 throw new PersistenceException("Inconsistency in cache." + 546 " Cannot update a deleted message."); 547 } 548 } else { 549 _batch.addLast(newtxobj); 550 } 551 _messages.put(id, newtxobj); 552 break; 553 } 554 555 case TransactionalObjectWrapper.DELETE_MESSAGE: 556 { 557 MessageImpl message = (MessageImpl) object; 558 MessageId id = message.getMessageId(); 559 TransactionalObjectWrapper txobj = 560 (TransactionalObjectWrapper) _messages.get(id); 561 TransactionalObjectWrapper newtxobj = 562 new TransactionalObjectWrapper(action, object); 563 564 if (txobj != null) { 565 _batch.remove(txobj); 569 if (txobj._action == TransactionalObjectWrapper.ADD_MESSAGE) { 570 } else if (txobj._action == TransactionalObjectWrapper.UPDATE_MESSAGE) { 573 _batch.addLast(newtxobj); 576 } else { 577 } 579 } else { 580 _batch.addLast(newtxobj); 581 } 582 _messages.put(id, newtxobj); 583 break; 584 } 585 586 case TransactionalObjectWrapper.ADD_HANDLE: 587 { 588 TransactionalObjectWrapper txobj = 589 new TransactionalObjectWrapper(action, object); 590 MessageHandle handle = (MessageHandle) object; 591 if (_handles.containsKey(handle)) { 592 throw new PersistenceException("Inconsistency in cache " + 593 handle + " is present when it shouldn't be."); 594 } 595 _handles.put(handle, txobj); 596 _batch.addLast(txobj); 597 break; 598 } 599 600 case TransactionalObjectWrapper.UPDATE_HANDLE: 601 { 602 MessageHandle handle = (MessageHandle) object; 603 TransactionalObjectWrapper txobj = 604 (TransactionalObjectWrapper) _handles.get(handle); 605 TransactionalObjectWrapper newtxobj = 606 new TransactionalObjectWrapper(action, object); 607 608 if (txobj != null) { 609 _batch.remove(txobj); 612 if (txobj._action == TransactionalObjectWrapper.ADD_HANDLE) { 613 newtxobj._action = TransactionalObjectWrapper.ADD_HANDLE; 614 _batch.addLast(newtxobj); 615 } else if (txobj._action == TransactionalObjectWrapper.UPDATE_HANDLE) { 616 _batch.addLast(newtxobj); 617 } else { 618 throw new PersistenceException("Inconsistency in cache." + 620 " Cannot update a deleted handle."); 621 } 622 } else { 623 _batch.addLast(newtxobj); 624 } 625 _handles.put(handle, newtxobj); 626 break; 627 } 628 629 case TransactionalObjectWrapper.DELETE_HANDLE: 630 { 631 MessageHandle handle = (MessageHandle) object; 632 TransactionalObjectWrapper txobj = 633 (TransactionalObjectWrapper) _handles.get(handle); 634 TransactionalObjectWrapper newtxobj = 635 new TransactionalObjectWrapper(action, object); 636 637 if (txobj != null) { 638 _batch.remove(txobj); 642 if (txobj._action == TransactionalObjectWrapper.ADD_HANDLE) { 643 } else if (txobj._action == TransactionalObjectWrapper.UPDATE_HANDLE) { 646 _batch.addLast(newtxobj); 649 } else { 650 } 652 } else { 653 _batch.addLast(newtxobj); 654 } 655 _handles.put(handle, newtxobj); 656 break; 657 } 658 } 659 660 } 661 662 public void addUser(Connection connection, User user) 663 throws PersistenceException { 664 } 665 666 public Enumeration getAllUsers(Connection connection) 667 throws PersistenceException { 668 return null; 669 } 670 671 public User getUser(Connection connection, User user) 672 throws PersistenceException { 673 return null; 674 } 675 676 public void removeUser(Connection connection, User user) 677 throws PersistenceException { 678 } 679 680 public void updateUser(Connection connection, User user) 681 throws PersistenceException { 682 } 683 684 688 private class TransactionalObjectWrapper { 689 690 693 public final static int ADD_MESSAGE = 1; 694 public final static int UPDATE_MESSAGE = 2; 695 public final static int DELETE_MESSAGE = 3; 696 public final static int ADD_HANDLE = 4; 697 public final static int UPDATE_HANDLE = 5; 698 public final static int DELETE_HANDLE = 6; 699 700 703 public int _action; 704 705 708 public Object _object; 709 710 713 public TransactionalObjectWrapper(int action, Object object) { 714 _action = action; 715 _object = object; 716 } 717 } 718 } 719 | Popular Tags |