1 21 22 package com.rift.coad.daemon.messageservice; 24 25 import java.lang.ThreadLocal ; 27 import java.util.Date ; 28 import java.util.Queue ; 29 import java.util.PriorityQueue ; 30 import java.util.HashMap ; 31 import java.util.Map ; 32 import java.util.ArrayList ; 33 import java.util.concurrent.ConcurrentHashMap ; 34 import javax.transaction.xa.XAException ; 35 import javax.transaction.xa.XAResource ; 36 import javax.transaction.xa.Xid ; 37 38 import org.apache.log4j.Logger; 40 41 import com.rift.coad.util.transaction.TransactionManager; 43 import com.rift.coad.util.lock.LockRef; 44 import com.rift.coad.util.lock.ObjectLockFactory; 45 46 51 public class MessageQueue implements XAResource { 52 53 56 public class TransactionChange { 57 58 private ArrayList addList = new ArrayList (); 60 private ArrayList updateList = new ArrayList (); 61 private ArrayList removeList = new ArrayList (); 62 63 66 public TransactionChange() { 67 68 } 69 70 71 76 public void add(MessageManager messageManager) { 77 addList.add(messageManager); 78 } 79 80 81 86 public ArrayList getAddList() { 87 return addList; 88 } 89 90 91 96 public void update(MessageManager messageManager) { 97 updateList.add(messageManager); 98 } 99 100 101 106 public ArrayList getUpdateList() { 107 return updateList; 108 } 109 110 111 117 public void remove(MessageManager messageManager) { 118 removeList.add(messageManager); 119 } 120 121 122 126 public ArrayList getRemoveList() { 127 return removeList; 128 } 129 } 130 131 132 135 public class IDIndex { 136 private Map baseIndex = new HashMap (); 137 private ThreadLocal threadIndex = new ThreadLocal (); 138 private Map transactionIndex = new HashMap (); 139 140 141 144 public IDIndex() { 145 146 } 147 148 149 155 public synchronized void commit(Xid xid) throws 156 MessageServiceException { 157 try { 158 TransactionChange changes = (TransactionChange)transactionChange. 159 get(xid); 160 ArrayList addedEntries = changes.getAddList(); 162 for (int index = 0; index < addedEntries.size(); index++) { 163 MessageManager messageManager = 164 (MessageManager)addedEntries.get(index); 165 baseIndex.put(messageManager.getID(),messageManager); 166 } 167 168 ArrayList removedEntries = changes.getRemoveList(); 170 for (int index = 0; index < removedEntries.size(); index++) { 171 MessageManager messageManager = 172 (MessageManager)removedEntries.get(index); 173 baseIndex.remove(messageManager.getID()); 174 } 175 transactionIndex.remove(xid); 176 } catch (Exception ex) { 177 log.error("Failed to commit the transaction : " + ex.getMessage() 178 ,ex); 179 throw new MessageServiceException("Failed to commit the " + 180 "transaction : " + ex.getMessage()); 181 } 182 } 183 184 185 191 public synchronized void forget(Xid xid) throws MessageServiceException { 192 transactionIndex.remove(xid); 193 } 194 195 196 202 public synchronized void rollback(Xid xid) throws MessageServiceException { 203 transactionIndex.remove(xid); 204 } 205 206 207 213 public synchronized void start(Xid xid) throws MessageServiceException { 214 Map transactionScopedIndex = null; 215 if (transactionIndex.containsKey(xid)) { 216 transactionScopedIndex = (Map )transactionIndex.get( 217 xid); 218 } else { 219 transactionScopedIndex = new HashMap (baseIndex); 220 transactionIndex.put(xid, 221 transactionScopedIndex); 222 } 223 threadIndex.set(transactionScopedIndex); 224 } 225 226 227 233 public void addMessage(MessageManager message) throws 234 MessageServiceException { 235 Map index = (Map )threadIndex.get(); 236 index.put(message.getID(),message); 237 } 238 239 240 246 public void removeMessage(String messageId) throws 247 MessageServiceException { 248 try { 249 Map index = (Map )threadIndex.get(); 250 if (index.containsKey(messageId)) { 251 MessageManager message = (MessageManager)index.get(messageId); 252 TransactionChange change = (TransactionChange) 253 transactionChange.get(transactionId.get()); 254 change.remove(message); 255 index.remove(messageId); 256 } else { 257 throw new MessageServiceException("The message [" + messageId 258 + "] was not found to remove"); 259 } 260 } catch (MessageServiceException ex) { 261 throw ex; 262 } catch (Exception ex) { 263 log.error("Failed to remove the message : " + 264 ex.getMessage(),ex); 265 throw new MessageServiceException( 266 "Failed to remove the message : " + ex.getMessage(),ex); 267 } 268 } 269 270 271 277 public MessageManager getMessage(String messageId) throws 278 MessageServiceException { 279 Map index = (Map )threadIndex.get(); 280 if (index.containsKey(messageId)) { 281 return (MessageManager)index.get(messageId); 282 } else { 283 throw new MessageServiceException("The message [" + messageId 284 + "] was not found."); 285 } 286 } 287 288 289 } 290 291 292 295 public class QueueIndex { 296 private PriorityQueue baseQueue = new PriorityQueue (); 298 private Map processingEntries = new HashMap (); 299 300 301 304 public QueueIndex() { 305 306 } 307 308 309 315 public synchronized void commit(Xid xid) throws 316 MessageServiceException { 317 try { 318 TransactionChange changes = (TransactionChange)transactionChange. 319 get(xid); 320 ArrayList addedEntries = changes.getAddList(); 322 for (int index = 0; index < addedEntries.size(); index++) { 323 MessageManager messageManager = 324 (MessageManager)addedEntries.get(index); 325 baseQueue.add(messageManager); 326 } 327 328 ArrayList removedEntries = changes.getRemoveList(); 330 for (int index = 0; index < removedEntries.size(); index++) { 331 MessageManager messageManager = 332 (MessageManager)removedEntries.get(index); 333 if (baseQueue.contains(messageManager)) { 334 baseQueue.remove(messageManager); 335 } else if (processingEntries.containsKey(messageManager)) { 336 LockRef lockRef = (LockRef)processingEntries.get( 337 messageManager); 338 processingEntries.remove(messageManager); 339 lockRef.release(); 340 } 341 } 342 } catch (Exception ex) { 343 log.error("Failed to commit the transaction : " + ex.getMessage() 344 ,ex); 345 throw new MessageServiceException("Failed to commit the " + 346 "transaction : " + ex.getMessage()); 347 } 348 } 349 350 351 357 public synchronized void forget(Xid xid) throws MessageServiceException { 358 359 } 360 361 362 368 public synchronized void rollback(Xid xid) throws MessageServiceException { 369 370 } 371 372 373 379 public synchronized void start(Xid xid) throws MessageServiceException { 380 381 } 382 383 384 391 public synchronized MessageManager popFrontMessage(Date nextRunTime) throws 392 MessageServiceException { 393 LockRef lockRef = null; 394 try { 395 MessageManager messageManager = (MessageManager)baseQueue.peek(); 396 if (messageManager == null) { 397 return null; 398 } 399 try { 400 lockRef = ObjectLockFactory.getInstance().acquireWriteLock( 401 messageManager,ObjectLockFactory.WAIT_ON_THREAD); 402 } catch (Exception ex) { 403 log.debug("Cannot aquire a lock on this object because : " + 404 ex.getMessage(),ex); 405 return null; 406 } 407 Date currentDate = new Date (); 408 Date nextProcessDate = messageManager.nextProcessTime(); 409 if (nextProcessDate == null) { 410 throw new MessageServiceException( 411 "The next process date is invalid cannot be null"); 412 } else if (nextProcessDate.getTime() <= currentDate.getTime()) { 413 baseQueue.poll(); 414 processingEntries.put(messageManager,lockRef); 415 lockRef = null; 416 return messageManager; 417 } 418 nextRunTime.setTime(nextProcessDate.getTime()); 419 return null; 420 } catch (Exception ex) { 421 log.error("Failed to pop a message off the queue : " + 422 ex.getMessage(),ex); 423 throw new MessageServiceException( 424 "Failed to pop a message off the queue : " + 425 ex.getMessage(),ex); 426 } finally { 427 try { 428 if (lockRef != null) { 429 lockRef.release(); 430 lockRef = null; 431 } 432 } catch (Exception ex2) { 433 log.error("Failed to release the lock :" + ex2.getMessage(), 434 ex2); 435 } 436 } 437 } 438 439 440 447 public synchronized void pushBackMessage(MessageManager messageManager) throws 448 MessageServiceException { 449 try { 450 LockRef lockRef = (LockRef)processingEntries.get(messageManager); 451 if (lockRef == null) { 452 log.error("This message is not locked : " + 453 messageManager.getID()); 454 throw new MessageServiceException( 455 "This message is not locked : " + 456 messageManager.getID()); 457 } 458 baseQueue.add(messageManager); 459 processingEntries.remove(messageManager); 460 lockRef.release(); 461 } catch (MessageServiceException ex) { 462 throw ex; 463 } catch (Exception ex) { 464 log.error("Failed to push a message back in the queue for " + 465 "processing : " + ex.getMessage(),ex); 466 throw new MessageServiceException( 467 "Failed to push a message back in the queue for " + 468 "processing : " + ex.getMessage(),ex); 469 } 470 } 471 472 } 473 474 protected static Logger log = 476 Logger.getLogger(MessageQueue.class.getName()); 477 478 private ThreadLocal transactionId = new ThreadLocal (); 480 private Map transactionChange = new ConcurrentHashMap (); 481 private IDIndex idIndex = new IDIndex(); 482 private QueueIndex queueIndex = new QueueIndex(); 483 private String name = null; 484 485 488 public MessageQueue(String name) { 489 this.name = name; 490 } 491 492 493 500 public synchronized void commit(Xid xid, boolean onePhase) throws 501 XAException { 502 try { 503 idIndex.commit(xid); 504 queueIndex.commit(xid); 505 transactionChange.remove(xid); 506 ProcessMonitor.getInstance().notifyProcessor(); 507 } catch (Exception ex) { 508 log.error("Failed to commit the changes : " + 509 ex.getMessage(),ex); 510 throw new XAException ("Failed to commit the changes : " + 511 ex.getMessage()); 512 } 513 } 514 515 516 523 public void end(Xid xid, int flags) throws XAException { 524 } 525 526 527 533 public void forget(Xid xid) throws XAException { 534 try { 535 idIndex.forget(xid); 536 queueIndex.forget(xid); 537 transactionChange.remove(xid); 538 } catch (Exception ex) { 539 log.error("Failed to forget the changes : " + 540 ex.getMessage(),ex); 541 throw new XAException ("Failed to forget the changes : " + 542 ex.getMessage()); 543 } 544 } 545 546 547 553 public int getTransactionTimeout() throws XAException { 554 return -1; 555 } 556 557 558 566 public boolean isSameRM(XAResource xAResource) throws XAException { 567 return this == xAResource; 568 } 569 570 571 578 public int prepare(Xid xid) throws XAException { 579 return XAResource.XA_OK; 580 } 581 582 583 591 public Xid [] recover(int flags) throws XAException { 592 return null; 593 } 594 595 596 602 public void rollback(Xid xid) throws XAException { 603 try { 604 idIndex.rollback(xid); 605 queueIndex.rollback(xid); 606 transactionChange.remove(xid); 607 } catch (Exception ex) { 608 log.error("Failed to rollback the changes : " + 609 ex.getMessage(),ex); 610 throw new XAException ("Failed to rollback the changes : " + 611 ex.getMessage()); 612 } 613 } 614 615 616 623 public boolean setTransactionTimeout(int transactionTimeout) throws 624 XAException { 625 return true; 626 } 627 628 629 636 public void start(Xid xid, int flags) throws XAException { 637 try { 638 if (!transactionChange.containsKey(xid)) { 639 transactionChange.put(xid,new TransactionChange()); 640 } 641 transactionId.set(xid); 642 idIndex.start(xid); 643 queueIndex.start(xid); 644 } catch (Exception ex) { 645 log.error("Failed to start the transaction : " + ex.getMessage(),ex); 646 throw new XAException ("Failed to start the transaction : " + 647 ex.getMessage()); 648 } 649 } 650 651 652 657 public String getName() { 658 return name; 659 } 660 661 662 667 public void addMessage(MessageManager message) throws 668 MessageServiceException { 669 try { 670 TransactionManager.getInstance().bindResource(this,false); 671 TransactionChange change = (TransactionChange)transactionChange.get( 672 transactionId.get()); 673 change.add(message); 674 idIndex.addMessage(message); 675 } catch (Exception ex) { 676 log.error("Failed to add a message : " + ex.getMessage(),ex); 677 throw new MessageServiceException("Failed to add a message : " + 678 ex.getMessage()); 679 } 680 } 681 682 683 688 public void removeMessage(String messageId) throws 689 MessageServiceException { 690 try { 691 TransactionManager.getInstance().bindResource(this,false); 692 idIndex.removeMessage(messageId); 693 } catch (Exception ex) { 694 log.error("Failed to remove a message : " + ex.getMessage(),ex); 695 throw new MessageServiceException("Failed to remove a message : " + 696 ex.getMessage()); 697 } 698 } 699 700 701 706 public MessageManager getMessage(String messageId) throws 707 MessageServiceException { 708 try { 709 TransactionManager.getInstance().bindResource(this,false); 710 return idIndex.getMessage(messageId); 711 } catch (Exception ex) { 712 log.error("Failed to get a message : " + ex.getMessage(),ex); 713 throw new MessageServiceException("Failed to get a message : " + 714 ex.getMessage()); 715 } 716 } 717 718 719 726 public MessageManager popFrontMessage(Date nextRunTime) throws 727 MessageServiceException { 728 return queueIndex.popFrontMessage(nextRunTime); 729 } 730 731 732 739 public void pushBackMessage(MessageManager messageManager) throws 740 MessageServiceException { 741 queueIndex.pushBackMessage(messageManager); 742 } 743 } 744 | Popular Tags |