1 17 18 package org.apache.sandesha.storage.queue; 19 20 import org.apache.axis.components.logger.LogFactory; 21 import org.apache.axis.components.uuid.UUIDGen; 22 import org.apache.axis.components.uuid.UUIDGenFactory; 23 import org.apache.commons.logging.Log; 24 import org.apache.sandesha.Constants; 25 import org.apache.sandesha.RMMessageContext; 26 import org.apache.sandesha.util.PolicyLoader; 27 28 import java.util.*; 29 30 31 34 35 39 40 public class SandeshaQueue { 41 42 private static SandeshaQueue clientQueue = null; 43 private static SandeshaQueue serverQueue = null; 44 HashMap incomingMap; HashMap outgoingMap; ArrayList highPriorityQueue; HashMap queueBin; ArrayList lowPriorityQueue; 49 private List requestedSequences; 50 HashMap acksToMap; 51 HashMap offerMap; 52 private static final Log log = LogFactory.getLog(SandeshaQueue.class.getName()); 53 54 public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen(); 55 56 private SandeshaQueue() { 57 incomingMap = new HashMap(); 58 outgoingMap = new HashMap(); 59 highPriorityQueue = new ArrayList(); 60 queueBin = new HashMap(); 61 lowPriorityQueue = new ArrayList(); 62 requestedSequences = new ArrayList(); 63 acksToMap = new HashMap(); 64 offerMap = new HashMap(); 65 } 66 67 public static SandeshaQueue getInstance(byte endPoint) { 68 if (endPoint == Constants.CLIENT) { 69 if (clientQueue == null) { 70 clientQueue = new SandeshaQueue(); 71 } 72 return clientQueue; 73 } else { 74 if (serverQueue == null) { 75 serverQueue = new SandeshaQueue(); 76 } 77 return serverQueue; 78 } 79 80 } 81 82 public boolean addMessageToIncomingSequence(String seqId, Long messageNo, 83 RMMessageContext msgCon) throws QueueException { 84 boolean successful = false; 85 86 if (seqId == null || msgCon == null) 87 throw new QueueException(Constants.Queue.ADD_ERROR); 88 89 if (isIncomingSequenceExists(seqId)) { 90 IncomingSequence seqHash = (IncomingSequence) incomingMap.get(seqId); 91 92 synchronized (seqHash) { 93 if (seqHash == null) 94 throw new QueueException(Constants.Queue.QUEUE_INCONSIS); 95 96 if (seqHash.hasMessage(messageNo)) 97 throw new QueueException(Constants.Queue.MESSAGE_EXISTS); 98 99 if (msgCon.isLastMessage()) 100 seqHash.setLastMsg(msgCon.getMsgNumber()); 101 102 seqHash.setSequenceId(msgCon.getSequenceID()); 103 seqHash.putNewMessage(messageNo, msgCon); 104 successful = true; 105 } 106 } 107 108 return successful; 109 } 110 111 public boolean addMessageToOutgoingSequence(String seqId, RMMessageContext msgCon) 112 throws QueueException { 113 boolean successful = false; 114 115 if (seqId == null || msgCon == null) 116 throw new QueueException(Constants.Queue.ADD_ERROR); 117 118 if (isOutgoingSequenceExists(seqId)) { 119 OutgoingSequence resSeqHash = (OutgoingSequence) outgoingMap.get(seqId); 120 121 synchronized (resSeqHash) { 122 if (resSeqHash == null) 123 throw new QueueException(Constants.Queue.QUEUE_INCONSIS); 124 resSeqHash.putNewMessage(msgCon); 125 successful = true; 126 127 if (msgCon.isLastMessage()) 129 resSeqHash.setLastMsg(msgCon.getMsgNumber()); 130 131 if (msgCon.isHasResponse()) 132 resSeqHash.setHasResponse(true); 133 } 134 } 135 return successful; 136 } 137 138 public boolean isIncomingSequenceExists(String seqId) { 139 synchronized (incomingMap) { 140 return incomingMap.containsKey(seqId); 141 } 142 } 143 144 public synchronized boolean isOutgoingSequenceExists(String resSeqId) { 145 synchronized (outgoingMap) { 146 return outgoingMap.containsKey(resSeqId); 147 } 148 } 149 150 public RMMessageContext nextIncomingMessageToProcess(Object sequence) throws QueueException { 151 if (sequence == null) 152 return null; 153 154 AbstractSequence absSeq = (AbstractSequence) sequence; 155 156 IncomingSequence sh = (IncomingSequence) incomingMap.get(absSeq.getSequenceId()); 157 synchronized (sh) { 158 if (sh == null) 159 throw new QueueException(Constants.Queue.SEQUENCE_ABSENT); 160 161 if (!sh.hasProcessableMessages()) 162 return null; 163 164 RMMessageContext msgCon = sh.getNextMessageToProcess(); 165 return msgCon; 166 } 167 } 168 169 public RMMessageContext nextOutgoingMessageToSend() throws QueueException { 170 RMMessageContext msg = null; 171 synchronized (outgoingMap) { 172 Iterator it = outgoingMap.keySet().iterator(); 173 174 whileLoop: while (it.hasNext()) { 175 RMMessageContext tempMsg; 176 String tempKey = (String ) it.next(); 177 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempKey); 178 if (rsh.isOutSeqApproved()) { 179 tempMsg = rsh.getNextMessageToSend(); 180 if (tempMsg != null) { 181 msg = tempMsg; 182 msg.setSequenceID(rsh.getOutSequenceId()); 183 msg.setOldSequenceID(rsh.getSequenceId()); 184 break whileLoop; 185 } 186 } 187 } 188 } 189 return msg; 190 } 191 192 public void createNewIncomingSequence(String sequenceId) throws QueueException { 193 if (sequenceId == null) 194 throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL); 195 196 synchronized (incomingMap) { 197 IncomingSequence sh = new IncomingSequence(sequenceId); 198 incomingMap.put(sequenceId, sh); 199 200 } 201 } 202 203 public void createNewOutgoingSequence(String sequenceId) throws QueueException { 204 if (sequenceId == null) 205 throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL); 206 207 synchronized (outgoingMap) { 208 OutgoingSequence rsh = new OutgoingSequence(sequenceId); 209 outgoingMap.put(sequenceId, rsh); 210 } 211 212 } 213 214 217 public void addPriorityMessage(RMMessageContext msg) throws QueueException { 218 synchronized (highPriorityQueue) { 219 if (msg == null) 220 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL); 221 222 highPriorityQueue.add(msg); 223 } 224 } 225 226 public void addLowPriorityMessage(RMMessageContext msg) throws QueueException { 227 synchronized (lowPriorityQueue) { 228 if (msg == null) 229 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL); 230 lowPriorityQueue.add(msg); 231 } 232 } 233 234 235 public RMMessageContext nextPriorityMessageToSend() throws QueueException { 236 237 synchronized (highPriorityQueue) { 238 239 240 if (highPriorityQueue.size() <= 0) 241 return null; 242 243 RMMessageContext msg = null; 244 int size = highPriorityQueue.size(); 245 synchronized (highPriorityQueue) { 246 forLoop: for (int i = 0; i < size; i++) { 248 RMMessageContext tempMsg = (RMMessageContext) highPriorityQueue.get(i); 249 if (tempMsg != null) { 250 switch (tempMsg.getMessageType()) { 251 case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST: 253 long lastSentTime = tempMsg.getLastSentTime(); 254 Date d = new Date(); 255 long currentTime = d.getTime(); 256 if (currentTime >= 257 lastSentTime + Constants.RETRANSMISSION_INTERVAL) { 258 259 String newCreateSeqId = Constants.UUID + uuidGen.nextUUID(); 260 tempMsg.setMessageID(newCreateSeqId); 261 262 tempMsg.setLastSentTime(currentTime); 263 msg = tempMsg; 264 break forLoop; 265 266 267 } 268 break; 269 case Constants.MSG_TYPE_ACKNOWLEDGEMENT: 270 271 277 String sequenceId = tempMsg.getSequenceID(); 278 if (sequenceId == null) 279 continue; 280 281 String key = getKeyFromIncomingSequenceId(sequenceId); 282 IncomingSequence sequence = (IncomingSequence) incomingMap.get(key); 283 if (sequence == null) 284 continue; 285 286 d = new Date(); 287 currentTime = d.getTime(); 288 289 if (sequence.isSendAck()) { 290 291 tempMsg.setLastSentTime(currentTime); 292 msg = tempMsg; 293 sequence.setSendAck(false); 294 sequence.setFinalAckedTime(currentTime); 295 break forLoop; 296 297 } else { 298 long ackInterval = PolicyLoader.getInstance() 299 .getAcknowledgementInterval(); 300 long finalAckedTime = sequence.getFinalAckedTime(); 301 long finalMsgArrivedTime = sequence.getFinalMsgArrivedTime(); 302 303 if ((finalMsgArrivedTime > finalAckedTime) && 304 (currentTime > finalMsgArrivedTime + ackInterval)) 305 sequence.setSendAck(true); 306 } 307 308 break; 309 default: 310 highPriorityQueue.remove(i); 311 queueBin.put(tempMsg.getMessageID(), tempMsg); 312 msg = tempMsg; 313 break forLoop; 314 } 315 } 316 } 317 } 318 319 320 return msg; 321 322 } 323 } 324 325 public List nextAllSeqsToProcess() { 326 List seqs = new ArrayList(); 327 328 synchronized (incomingMap) { 329 Iterator it = incomingMap.keySet().iterator(); 330 331 while (it.hasNext()) { 332 Object tempKey = it.next(); 333 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey); 334 if (sh.hasProcessableMessages() && !sh.isSequenceLocked()) 335 seqs.add(sh); 336 } 337 return seqs; 338 } 339 } 340 341 public List nextAllSeqIdsToProcess() { 342 List ids = new ArrayList(); 343 344 synchronized (incomingMap) { 345 Iterator it = incomingMap.keySet().iterator(); 346 347 while (it.hasNext()) { 348 Object tempKey = it.next(); 349 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey); 350 if (sh.hasProcessableMessages() && !sh.isSequenceLocked()) 351 ids.add(sh.getSequenceId()); 352 } 353 return ids; 354 } 355 } 356 357 public void clear(boolean yes) { 358 if (!yes) 359 return; 360 incomingMap.clear(); 361 highPriorityQueue.clear(); 362 outgoingMap.clear(); 363 queueBin.clear(); 364 } 365 366 374 public void setSequenceLock(String sequenceId, boolean lock) { 375 IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId); 376 sh.setProcessLock(lock); 377 } 378 379 public Set getAllReceivedMsgNumsOfIncomingSeq(String sequenceId) { 380 IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId); 381 if (sh != null) 382 return sh.getAllKeys(); 383 else 384 return null; 385 } 386 387 public boolean isIncomingMessageExists(String sequenceId, Long messageNo) { 388 IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId); 389 if (sh != null) 391 return sh.hasMessage(messageNo); 392 else 393 return false; 394 } 395 396 public void setOutSequence(String seqId, String outSeqId) { 397 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId); 398 synchronized (rsh) { 399 if (rsh == null) { 400 if (log.isDebugEnabled()) 401 log.debug("ERROR: RESPONSE SEQ IS NULL"); 402 return; 403 } 404 rsh.setOutSequenceId(outSeqId); 405 } 406 } 407 408 public void setOutSequenceApproved(String seqId, boolean approved) { 409 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId); 410 synchronized (rsh) { 411 if (rsh == null) { 412 if (log.isDebugEnabled()) 413 log.debug("ERROR: RESPONSE SEQ IS NULL"); 414 return; 415 } 416 rsh.setOutSeqApproved(approved); 417 } 418 } 419 420 public String getSequenceOfOutSequence(String outSequence) { 421 synchronized (outgoingMap) { 422 if (outSequence == null) { 423 return null; 424 } 425 String tempSeqId = null; 426 Iterator it = outgoingMap.keySet().iterator(); 427 while (it.hasNext()) { 428 tempSeqId = (String ) it.next(); 429 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempSeqId); 430 String tempOutSequence = rsh.getOutSequenceId(); 431 if (outSequence.equals(tempOutSequence)) { 432 break; 433 } 434 } 435 return tempSeqId; 436 } 437 438 } 439 440 public void displayOutgoingMap() { 441 Iterator it = outgoingMap.keySet().iterator(); 442 System.out.println("------------------------------------"); 443 System.out.println(" DISPLAYING RESPONSE MAP"); 444 System.out.println("------------------------------------"); 445 while (it.hasNext()) { 446 String s = (String ) it.next(); 447 System.out.println("\n Sequence id - " + s); 448 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(s); 449 450 System.out.println("out seq id:" + rsh.getOutSequenceId()); 451 Iterator it1 = rsh.getAllKeys().iterator(); 452 while (it1.hasNext()) { 453 Long l = (Long ) it1.next(); 454 String msgId = rsh.getMessageId(l); 455 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-"); 456 } 457 } 458 System.out.println("\n"); 459 } 460 461 public void displayIncomingMap() { 462 Iterator it = incomingMap.keySet().iterator(); 463 System.out.println("------------------------------------"); 464 System.out.println(" DISPLAYING SEQUENCE MAP"); 465 System.out.println("------------------------------------"); 466 while (it.hasNext()) { 467 String s = (String ) it.next(); 468 System.out.println("\n Sequence id - " + s); 469 470 IncomingSequence sh = (IncomingSequence) incomingMap.get(s); 471 472 Iterator it1 = sh.getAllKeys().iterator(); 473 while (it1.hasNext()) { 474 Long l = (Long ) it1.next(); 475 String msgId = sh.getMessageId(l); 476 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-"); 477 } 478 } 479 System.out.println("\n"); 480 } 481 482 public void displayPriorityQueue() { 483 484 System.out.println("------------------------------------"); 485 System.out.println(" DISPLAYING PRIORITY QUEUE"); 486 System.out.println("------------------------------------"); 487 488 Iterator it = highPriorityQueue.iterator(); 489 while (it.hasNext()) { 490 RMMessageContext msg = (RMMessageContext) it.next(); 491 String id = msg.getMessageID(); 492 int type = msg.getMessageType(); 493 494 System.out.println("Message " + id + " Type " + type); 495 } 496 System.out.println("\n"); 497 } 498 499 public void markOutgoingMessageToDelete(String sequenceId, Long messageNo) { 500 String sequence = getSequenceOfOutSequence(sequenceId); 501 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(sequence); 502 503 if (rsh == null) { 504 log.error(Constants.Queue.RESPONSE_SEQ_NULL); 505 return; 506 } 507 508 synchronized (rsh) { 509 rsh.markMessageDeleted(messageNo); 511 } 513 514 } 515 516 public void movePriorityMsgToBin(String messageId) { 517 518 synchronized (highPriorityQueue) { 519 int size = highPriorityQueue.size(); 520 for (int i = 0; i < size; i++) { 521 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i); 522 523 String tempMsgId; 524 try { 525 tempMsgId = (String ) msg.getMessageIdList().get(0); 526 } catch (Exception ex) { 527 tempMsgId = msg.getMessageID(); 528 } 529 if (tempMsgId.equals(messageId)) { 530 highPriorityQueue.remove(i); 531 queueBin.put(messageId, msg); 532 return; 533 } 534 } 535 } 536 } 537 538 public long getNextOutgoingMessageNumber(String seq) { 539 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seq); 540 if (rsh == null) { try { 542 createNewOutgoingSequence(seq); 543 } catch (QueueException q) { 544 log.error(q.getStackTrace()); 545 } 546 } 547 rsh = (OutgoingSequence) outgoingMap.get(seq); 548 synchronized (rsh) { 549 Iterator keys = rsh.getAllKeys().iterator(); 550 551 long msgNo = rsh.nextMessageNumber(); 552 return (msgNo); 553 } 554 } 555 556 public synchronized RMMessageContext checkForResponseMessage(String requestId, String seqId) { 557 IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId); 558 if (sh == null) { 559 return null; 560 } 561 synchronized (sh) { 562 RMMessageContext msg = sh.getMessageRelatingTo(requestId); 563 return msg; 564 } 565 } 566 567 public String searchForSequenceId(String messageId) { 568 Iterator it = outgoingMap.keySet().iterator(); 569 570 String key = null; 571 while (it.hasNext()) { 572 key = (String ) it.next(); 573 Object obj = outgoingMap.get(key); 574 if (obj != null) { 575 OutgoingSequence hash = (OutgoingSequence) obj; 576 boolean hasMsg = hash.hasMessageWithId(messageId); 577 578 if (!hasMsg) 579 key = null; 580 else 581 break; 582 583 } 584 585 } 586 587 return key; 588 } 589 590 public void setAckReceived(String seqId, long msgNo) { 591 Iterator it = outgoingMap.keySet().iterator(); 592 String key = null; 593 while (it.hasNext()) { 594 key = (String ) it.next(); 595 Object obj = outgoingMap.get(key); 596 597 if (obj != null) { 598 OutgoingSequence hash = (OutgoingSequence) obj; 599 if (hash.getOutSequenceId().equals(seqId)) { 600 hash.setAckReceived(msgNo); 601 } 602 } 603 } 604 605 } 606 607 public RMMessageContext getLowPriorityMessageIfAcked() { 608 synchronized (lowPriorityQueue) { 609 int size = lowPriorityQueue.size(); 610 RMMessageContext terminateMsg = null; 611 for (int i = 0; i < size; i++) { 612 613 RMMessageContext temp; 614 temp = (RMMessageContext) lowPriorityQueue.get(i); 615 String seqId = temp.getSequenceID(); 616 OutgoingSequence hash = null; 617 hash = (OutgoingSequence) outgoingMap.get(seqId); 618 if (hash == null) { 619 log.error("ERROR: HASH NOT FOUND SEQ ID " + seqId); 620 } 621 if (hash != null) { 622 boolean complete = hash.isAckComplete(); 623 if (complete) 624 terminateMsg = temp; 625 if (terminateMsg != null) { 626 terminateMsg.setSequenceID(hash.getOutSequenceId()); 627 lowPriorityQueue.remove(i); 628 break; 629 } 630 } 631 } 632 return terminateMsg; 633 } 634 635 } 636 637 public void addSendMsgNo(String seqId, long msgNo) { 638 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId); 639 if (rsh != null) { 640 641 synchronized (rsh) { 642 rsh.addMsgToSendList(msgNo); 643 } 644 } 645 } 646 647 public boolean isSentMsg(String seqId, long msgNo) { 648 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId); 649 650 if (rsh == null) { 651 return false; 652 } 653 synchronized (rsh) { 654 return rsh.isMsgInSentList(msgNo); 655 } 656 657 658 } 659 660 public boolean hasLastIncomingMsgReceived(String seqId) { 661 662 IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId); 663 664 if (sh == null) { 665 return false; 666 } 667 synchronized (sh) { 668 return sh.hasLastMsgReceived(); 669 } 670 } 671 672 public long getLastIncomingMsgNo(String seqId) { 673 IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId); 674 if (sh == null) { 675 return 0; 676 } 677 synchronized (sh) { 678 return sh.getLastMsgNumber(); 679 } 680 } 681 682 public void addRequestedSequence(String seqId) { 683 requestedSequences.add(seqId); 684 } 685 686 public boolean isRequestedSeqPresent(String seqId) { 687 return requestedSequences.contains(seqId); 688 } 689 690 public String getKeyFromIncomingSequenceId(String seqId) { 691 synchronized (incomingMap) { 692 Iterator it = incomingMap.keySet().iterator(); 693 while (it.hasNext()) { 694 String key = (String ) it.next(); 695 IncomingSequence is = (IncomingSequence) incomingMap.get(key); 696 String seq = is.getSequenceId(); 697 if (seq == null) 698 continue; 699 700 if (seq.equals(seqId)) 701 return key; 702 } 703 return null; 704 } 705 } 706 707 734 735 public String getKeyFromOutgoingSequenceId(String seqId) { 736 synchronized (outgoingMap) { 737 Iterator it = outgoingMap.keySet().iterator(); 738 while (it.hasNext()) { 739 String key = (String ) it.next(); 740 OutgoingSequence is = (OutgoingSequence) outgoingMap.get(key); 741 String seq = is.getOutSequenceId(); 742 if (seq == null) 743 continue; 744 745 if (seq.equals(seqId)) 746 return key; 747 } 748 return null; 749 } 750 } 751 752 public boolean isAllOutgoingTerminateSent() { 753 synchronized (outgoingMap) { 754 Iterator keys = outgoingMap.keySet().iterator(); 755 boolean found = false; 756 757 while (keys.hasNext()) { 758 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(keys.next()); 759 if (ogs.isTerminateSent()) { 760 found = true; 761 break; 762 } 763 } 764 765 return found; 766 } 767 } 768 769 public boolean isAllIncommingTerminateReceived() { 770 synchronized (incomingMap) { 771 Iterator keys = incomingMap.keySet().iterator(); 772 773 while (keys.hasNext()) { 774 Object key = keys.next(); 775 IncomingSequence ics = (IncomingSequence) incomingMap.get(key); 776 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(key); 777 778 boolean hasResponse = ogs.hasResponse(); 779 780 if (hasResponse && !ics.isTerminateReceived()) 781 return false; 782 } 783 784 return true; 785 } 786 } 787 788 public void setTerminateSend(String seqId) { 789 synchronized (outgoingMap) { 790 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId); 791 ogs.setTerminateSent(true); 792 } 793 } 794 795 public void setTerminateReceived(String seqId) { 796 IncomingSequence ics = (IncomingSequence) incomingMap.get(getKeyFromIncomingSequenceId(seqId)); 797 ics.setTerminateReceived(true); 798 } 799 800 public void setAcksTo(String seqId, String acksTo) { 801 802 if (seqId == null) { 803 log.error("ERROR: seq is null in setAcksTo"); 804 return; 805 } 806 807 acksToMap.put(seqId, acksTo); 808 } 809 810 public String getAcksTo(String seqId) { 811 812 if (seqId == null) { 813 log.error("ERROR: seq is null in getAcksTo"); 814 return null; 815 } 816 817 return (String ) acksToMap.get(seqId); 818 } 819 820 821 public void addOffer(String msgID, String offerID) { 822 if (msgID == null) { 823 log.error(" MessageID is null in addOffer"); 824 } 825 offerMap.put(msgID, offerID); 826 } 827 828 public String getOffer(String msgID) { 829 if (msgID == null) { 830 log.error(" MessageID is null in getOffer"); 831 return null; 832 } 833 return (String ) offerMap.get(msgID); 834 } 835 836 public boolean isOutgoingTerminateSent(String seqId) { 837 synchronized (outgoingMap) { 838 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId); 839 if (ogs != null) { 840 if (ogs.isTerminateSent()) 841 return true; 842 else 843 return false; 844 } 845 return false; 846 } 847 848 } 849 850 public boolean isIncommingTerminateReceived(String seqId) { 851 synchronized (incomingMap) { 852 853 IncomingSequence ics = (IncomingSequence) incomingMap.get(seqId); 854 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId); 855 856 boolean hasResponse = false; 857 if (ogs != null) { 858 hasResponse = ogs.hasResponse(); 859 } 860 861 if (hasResponse && ics != null && !ics.isTerminateReceived()) 862 return false; 863 else 864 return true; 865 } 866 867 } 868 869 public void updateFinalMessageArrivedTime(String sequenceId) { 870 synchronized (incomingMap) { 871 IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId); 872 if (ics == null) 873 return; 874 875 Date d = new Date(); 876 long time = d.getTime(); 877 ics.setFinalMsgArrivedTime(time); 878 } 879 } 880 881 public void sendAck(String sequenceId) { 882 synchronized (incomingMap) { 883 IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId); 884 if (ics == null) 885 return; 886 887 ics.setSendAck(true); 888 } 889 } 890 891 public void removeAllAcks(String sequenceID) { 892 synchronized (highPriorityQueue) { 893 int size = highPriorityQueue.size(); 894 895 ArrayList remLst = new ArrayList(); 896 897 for (int i = 0; i < size; i++) { 898 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i); 899 if (msg.getSequenceID() != null) 900 if (msg.getSequenceID().equals(sequenceID) && msg.getMessageType() == Constants.MSG_TYPE_ACKNOWLEDGEMENT) 901 remLst.add(new Integer (i)); 902 } 903 904 for (int i = 0; i < remLst.size(); i++) { 905 Integer in = (Integer ) remLst.get(i); 906 highPriorityQueue.remove(in.intValue()); 907 } 908 } 909 } 910 911 912 } 913 914 | Popular Tags |