KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > sandesha > storage > queue > SandeshaQueue


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */

17
18 package org.apache.sandesha.storage.queue;
19
20 import org.apache.axis.components.logger.LogFactory;
21 import org.apache.axis.components.uuid.UUIDGen;
22 import org.apache.axis.components.uuid.UUIDGenFactory;
23 import org.apache.commons.logging.Log;
24 import org.apache.sandesha.Constants;
25 import org.apache.sandesha.RMMessageContext;
26 import org.apache.sandesha.util.PolicyLoader;
27
28 import java.util.*;
29
30
31 /*
32  * Created on Aug 4, 2004 at 4:49:49 PM
33  */

34
35 /**
36  * @author Chamikara Jayalath
37  * @author Jaliya Ekanayaka
38  */

39
40 public class SandeshaQueue {
41
42     private static SandeshaQueue clientQueue = null;
43     private static SandeshaQueue serverQueue = null;
44     HashMap incomingMap; //In comming messages.
45
HashMap outgoingMap; //Response messages
46
ArrayList highPriorityQueue; // Acks and create seq. responses.
47
HashMap queueBin; // Messaged processed from out queue will be moved
48
ArrayList lowPriorityQueue;
49     private List requestedSequences;
50     HashMap acksToMap;
51     HashMap offerMap;
52     private static final Log log = LogFactory.getLog(SandeshaQueue.class.getName());
53
54     public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
55
56     private SandeshaQueue() {
57         incomingMap = new HashMap();
58         outgoingMap = new HashMap();
59         highPriorityQueue = new ArrayList();
60         queueBin = new HashMap();
61         lowPriorityQueue = new ArrayList();
62         requestedSequences = new ArrayList();
63         acksToMap = new HashMap();
64         offerMap = new HashMap();
65     }
66
67     public static SandeshaQueue getInstance(byte endPoint) {
68         if (endPoint == Constants.CLIENT) {
69             if (clientQueue == null) {
70                 clientQueue = new SandeshaQueue();
71             }
72             return clientQueue;
73         } else {
74             if (serverQueue == null) {
75                 serverQueue = new SandeshaQueue();
76             }
77             return serverQueue;
78         }
79
80     }
81
82     public boolean addMessageToIncomingSequence(String JavaDoc seqId, Long JavaDoc messageNo,
83                                                 RMMessageContext msgCon) throws QueueException {
84         boolean successful = false;
85
86         if (seqId == null || msgCon == null)
87             throw new QueueException(Constants.Queue.ADD_ERROR);
88
89         if (isIncomingSequenceExists(seqId)) {
90             IncomingSequence seqHash = (IncomingSequence) incomingMap.get(seqId);
91
92             synchronized (seqHash) {
93                 if (seqHash == null)
94                     throw new QueueException(Constants.Queue.QUEUE_INCONSIS);
95
96                 if (seqHash.hasMessage(messageNo))
97                     throw new QueueException(Constants.Queue.MESSAGE_EXISTS);
98
99                 if (msgCon.isLastMessage())
100                     seqHash.setLastMsg(msgCon.getMsgNumber());
101
102                 seqHash.setSequenceId(msgCon.getSequenceID());
103                 seqHash.putNewMessage(messageNo, msgCon);
104                 successful = true;
105             }
106         }
107
108         return successful;
109     }
110
111     public boolean addMessageToOutgoingSequence(String JavaDoc seqId, RMMessageContext msgCon)
112             throws QueueException {
113         boolean successful = false;
114
115         if (seqId == null || msgCon == null)
116             throw new QueueException(Constants.Queue.ADD_ERROR);
117
118         if (isOutgoingSequenceExists(seqId)) {
119             OutgoingSequence resSeqHash = (OutgoingSequence) outgoingMap.get(seqId);
120
121             synchronized (resSeqHash) {
122                 if (resSeqHash == null)
123                     throw new QueueException(Constants.Queue.QUEUE_INCONSIS);
124                 resSeqHash.putNewMessage(msgCon);
125                 successful = true;
126
127                 //if last message
128
if (msgCon.isLastMessage())
129                     resSeqHash.setLastMsg(msgCon.getMsgNumber());
130
131                 if (msgCon.isHasResponse())
132                     resSeqHash.setHasResponse(true);
133             }
134         }
135         return successful;
136     }
137
138     public boolean isIncomingSequenceExists(String JavaDoc seqId) {
139         synchronized (incomingMap) {
140             return incomingMap.containsKey(seqId);
141         }
142     }
143
144     public synchronized boolean isOutgoingSequenceExists(String JavaDoc resSeqId) {
145         synchronized (outgoingMap) {
146             return outgoingMap.containsKey(resSeqId);
147         }
148     }
149
150     public RMMessageContext nextIncomingMessageToProcess(Object JavaDoc sequence) throws QueueException {
151         if (sequence == null)
152             return null;
153
154         AbstractSequence absSeq = (AbstractSequence) sequence;
155
156         IncomingSequence sh = (IncomingSequence) incomingMap.get(absSeq.getSequenceId());
157         synchronized (sh) {
158             if (sh == null)
159                 throw new QueueException(Constants.Queue.SEQUENCE_ABSENT);
160
161             if (!sh.hasProcessableMessages())
162                 return null;
163
164             RMMessageContext msgCon = sh.getNextMessageToProcess();
165             return msgCon;
166         }
167     }
168
169     public RMMessageContext nextOutgoingMessageToSend() throws QueueException {
170         RMMessageContext msg = null;
171         synchronized (outgoingMap) {
172             Iterator it = outgoingMap.keySet().iterator();
173
174             whileLoop: while (it.hasNext()) {
175                 RMMessageContext tempMsg;
176                 String JavaDoc tempKey = (String JavaDoc) it.next();
177                 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempKey);
178                 if (rsh.isOutSeqApproved()) {
179                     tempMsg = rsh.getNextMessageToSend();
180                     if (tempMsg != null) {
181                         msg = tempMsg;
182                         msg.setSequenceID(rsh.getOutSequenceId());
183                         msg.setOldSequenceID(rsh.getSequenceId());
184                         break whileLoop;
185                     }
186                 }
187             }
188         }
189         return msg;
190     }
191
192     public void createNewIncomingSequence(String JavaDoc sequenceId) throws QueueException {
193         if (sequenceId == null)
194             throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL);
195
196         synchronized (incomingMap) {
197             IncomingSequence sh = new IncomingSequence(sequenceId);
198             incomingMap.put(sequenceId, sh);
199
200         }
201     }
202
203     public void createNewOutgoingSequence(String JavaDoc sequenceId) throws QueueException {
204         if (sequenceId == null)
205             throw new QueueException(Constants.Queue.SEQUENCE_ID_NULL);
206
207         synchronized (outgoingMap) {
208             OutgoingSequence rsh = new OutgoingSequence(sequenceId);
209             outgoingMap.put(sequenceId, rsh);
210         }
211
212     }
213
214     /**
215      * Adds a new message to the responses queue.
216      */

217     public void addPriorityMessage(RMMessageContext msg) throws QueueException {
218         synchronized (highPriorityQueue) {
219             if (msg == null)
220                 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
221
222             highPriorityQueue.add(msg);
223         }
224     }
225
226     public void addLowPriorityMessage(RMMessageContext msg) throws QueueException {
227         synchronized (lowPriorityQueue) {
228             if (msg == null)
229                 throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
230             lowPriorityQueue.add(msg);
231         }
232     }
233
234
235     public RMMessageContext nextPriorityMessageToSend() throws QueueException {
236
237         synchronized (highPriorityQueue) {
238
239
240             if (highPriorityQueue.size() <= 0)
241                 return null;
242
243             RMMessageContext msg = null;
244             int size = highPriorityQueue.size();
245             synchronized (highPriorityQueue) {
246                 forLoop: //Label
247
for (int i = 0; i < size; i++) {
248                     RMMessageContext tempMsg = (RMMessageContext) highPriorityQueue.get(i);
249                     if (tempMsg != null) {
250                         switch (tempMsg.getMessageType()) {
251                             //Create seq messages will not be removed.
252
case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
253                                 long lastSentTime = tempMsg.getLastSentTime();
254                                 Date d = new Date();
255                                 long currentTime = d.getTime();
256                                 if (currentTime >=
257                                         lastSentTime + Constants.RETRANSMISSION_INTERVAL) {
258
259                                     String JavaDoc newCreateSeqId = Constants.UUID + uuidGen.nextUUID();
260                                     tempMsg.setMessageID(newCreateSeqId);
261
262                                     tempMsg.setLastSentTime(currentTime);
263                                     msg = tempMsg;
264                                     break forLoop;
265
266
267                                 }
268                                 break;
269                             case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
270                                 
271                                 //acks are send in the folowing manner.
272
//If a ack the system has asked to send a ack (sequence.sendAck==true)
273
//then send it immediately.
274
//Also send a ack when a interval (ACKNOWLEDGEMENT_INTERVAL) has passed
275
//since last message arrived.
276

277                                 String JavaDoc sequenceId = tempMsg.getSequenceID();
278                                 if (sequenceId == null)
279                                     continue;
280
281                                 String JavaDoc key = getKeyFromIncomingSequenceId(sequenceId);
282                                 IncomingSequence sequence = (IncomingSequence) incomingMap.get(key);
283                                 if (sequence == null)
284                                     continue;
285
286                                 d = new Date();
287                                 currentTime = d.getTime();
288
289                                 if (sequence.isSendAck()) {
290
291                                     tempMsg.setLastSentTime(currentTime);
292                                     msg = tempMsg;
293                                     sequence.setSendAck(false);
294                                     sequence.setFinalAckedTime(currentTime);
295                                     break forLoop;
296
297                                 } else {
298                                     long ackInterval = PolicyLoader.getInstance()
299                                             .getAcknowledgementInterval();
300                                     long finalAckedTime = sequence.getFinalAckedTime();
301                                     long finalMsgArrivedTime = sequence.getFinalMsgArrivedTime();
302
303                                     if ((finalMsgArrivedTime > finalAckedTime) &&
304                                             (currentTime > finalMsgArrivedTime + ackInterval))
305                                         sequence.setSendAck(true);
306                                 }
307
308                                 break;
309                             default:
310                                 highPriorityQueue.remove(i);
311                                 queueBin.put(tempMsg.getMessageID(), tempMsg);
312                                 msg = tempMsg;
313                                 break forLoop;
314                         }
315                     }
316                 }
317             }
318
319
320             return msg;
321
322         }
323     }
324
325     public List nextAllSeqsToProcess() {
326         List seqs = new ArrayList();
327
328         synchronized (incomingMap) {
329             Iterator it = incomingMap.keySet().iterator();
330
331             while (it.hasNext()) {
332                 Object JavaDoc tempKey = it.next();
333                 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
334                 if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
335                     seqs.add(sh);
336             }
337             return seqs;
338         }
339     }
340
341     public List nextAllSeqIdsToProcess() {
342         List ids = new ArrayList();
343
344         synchronized (incomingMap) {
345             Iterator it = incomingMap.keySet().iterator();
346
347             while (it.hasNext()) {
348                 Object JavaDoc tempKey = it.next();
349                 IncomingSequence sh = (IncomingSequence) incomingMap.get(tempKey);
350                 if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
351                     ids.add(sh.getSequenceId());
352             }
353             return ids;
354         }
355     }
356
357     public void clear(boolean yes) {
358         if (!yes)
359             return;
360         incomingMap.clear();
361         highPriorityQueue.clear();
362         outgoingMap.clear();
363         queueBin.clear();
364     }
365
366     // --Commented out by Inspection START (6/8/05 1:19 PM):
367
// public void removeIncomingSequence(String sequenceId, boolean yes) {
368
// if (!yes)
369
// return;
370
// incomingMap.remove(sequenceId);
371
// }
372
// --Commented out by Inspection STOP (6/8/05 1:19 PM)
373

374     public void setSequenceLock(String JavaDoc sequenceId, boolean lock) {
375         IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
376         sh.setProcessLock(lock);
377     }
378
379     public Set getAllReceivedMsgNumsOfIncomingSeq(String JavaDoc sequenceId) {
380         IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
381         if (sh != null)
382             return sh.getAllKeys();
383         else
384             return null;
385     }
386
387     public boolean isIncomingMessageExists(String JavaDoc sequenceId, Long JavaDoc messageNo) {
388         IncomingSequence sh = (IncomingSequence) incomingMap.get(sequenceId);
389         //sh can be null if there are no messages at the initial point.
390
if (sh != null)
391             return sh.hasMessage(messageNo);
392         else
393             return false;
394     }
395
396     public void setOutSequence(String JavaDoc seqId, String JavaDoc outSeqId) {
397         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
398         synchronized (rsh) {
399             if (rsh == null) {
400                 if (log.isDebugEnabled())
401                     log.debug("ERROR: RESPONSE SEQ IS NULL");
402                 return;
403             }
404             rsh.setOutSequenceId(outSeqId);
405         }
406     }
407
408     public void setOutSequenceApproved(String JavaDoc seqId, boolean approved) {
409         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
410         synchronized (rsh) {
411             if (rsh == null) {
412                 if (log.isDebugEnabled())
413                     log.debug("ERROR: RESPONSE SEQ IS NULL");
414                 return;
415             }
416             rsh.setOutSeqApproved(approved);
417         }
418     }
419
420     public String JavaDoc getSequenceOfOutSequence(String JavaDoc outSequence) {
421         synchronized (outgoingMap) {
422             if (outSequence == null) {
423                 return null;
424             }
425             String JavaDoc tempSeqId = null;
426             Iterator it = outgoingMap.keySet().iterator();
427             while (it.hasNext()) {
428                 tempSeqId = (String JavaDoc) it.next();
429                 OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(tempSeqId);
430                 String JavaDoc tempOutSequence = rsh.getOutSequenceId();
431                 if (outSequence.equals(tempOutSequence)) {
432                     break;
433                 }
434             }
435             return tempSeqId;
436         }
437
438     }
439
440     public void displayOutgoingMap() {
441         Iterator it = outgoingMap.keySet().iterator();
442         System.out.println("------------------------------------");
443         System.out.println(" DISPLAYING RESPONSE MAP");
444         System.out.println("------------------------------------");
445         while (it.hasNext()) {
446             String JavaDoc s = (String JavaDoc) it.next();
447             System.out.println("\n Sequence id - " + s);
448             OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(s);
449
450             System.out.println("out seq id:" + rsh.getOutSequenceId());
451             Iterator it1 = rsh.getAllKeys().iterator();
452             while (it1.hasNext()) {
453                 Long JavaDoc l = (Long JavaDoc) it1.next();
454                 String JavaDoc msgId = rsh.getMessageId(l);
455                 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-");
456             }
457         }
458         System.out.println("\n");
459     }
460
461     public void displayIncomingMap() {
462         Iterator it = incomingMap.keySet().iterator();
463         System.out.println("------------------------------------");
464         System.out.println(" DISPLAYING SEQUENCE MAP");
465         System.out.println("------------------------------------");
466         while (it.hasNext()) {
467             String JavaDoc s = (String JavaDoc) it.next();
468             System.out.println("\n Sequence id - " + s);
469
470             IncomingSequence sh = (IncomingSequence) incomingMap.get(s);
471
472             Iterator it1 = sh.getAllKeys().iterator();
473             while (it1.hasNext()) {
474                 Long JavaDoc l = (Long JavaDoc) it1.next();
475                 String JavaDoc msgId = sh.getMessageId(l);
476                 System.out.println("* key -" + l.longValue() + "- MessageID -" + msgId + "-");
477             }
478         }
479         System.out.println("\n");
480     }
481
482     public void displayPriorityQueue() {
483
484         System.out.println("------------------------------------");
485         System.out.println(" DISPLAYING PRIORITY QUEUE");
486         System.out.println("------------------------------------");
487
488         Iterator it = highPriorityQueue.iterator();
489         while (it.hasNext()) {
490             RMMessageContext msg = (RMMessageContext) it.next();
491             String JavaDoc id = msg.getMessageID();
492             int type = msg.getMessageType();
493
494             System.out.println("Message " + id + " Type " + type);
495         }
496         System.out.println("\n");
497     }
498
499     public void markOutgoingMessageToDelete(String JavaDoc sequenceId, Long JavaDoc messageNo) {
500         String JavaDoc sequence = getSequenceOfOutSequence(sequenceId);
501         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(sequence);
502
503         if (rsh == null) {
504             log.error(Constants.Queue.RESPONSE_SEQ_NULL);
505             return;
506         }
507
508         synchronized (rsh) {
509             //Deleting retuns the deleted message.
510
rsh.markMessageDeleted(messageNo);
511             //If we jave already deleted then no message to return.
512
}
513
514     }
515
516     public void movePriorityMsgToBin(String JavaDoc messageId) {
517
518         synchronized (highPriorityQueue) {
519             int size = highPriorityQueue.size();
520             for (int i = 0; i < size; i++) {
521                 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i);
522
523                 String JavaDoc tempMsgId;
524                 try {
525                     tempMsgId = (String JavaDoc) msg.getMessageIdList().get(0);
526                 } catch (Exception JavaDoc ex) {
527                     tempMsgId = msg.getMessageID();
528                 }
529                 if (tempMsgId.equals(messageId)) {
530                     highPriorityQueue.remove(i);
531                     queueBin.put(messageId, msg);
532                     return;
533                 }
534             }
535         }
536     }
537
538     public long getNextOutgoingMessageNumber(String JavaDoc seq) {
539         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seq);
540         if (rsh == null) { //saquence not created yet.
541
try {
542                 createNewOutgoingSequence(seq);
543             } catch (QueueException q) {
544                 log.error(q.getStackTrace());
545             }
546         }
547         rsh = (OutgoingSequence) outgoingMap.get(seq);
548         synchronized (rsh) {
549             Iterator keys = rsh.getAllKeys().iterator();
550
551             long msgNo = rsh.nextMessageNumber();
552             return (msgNo);
553         }
554     }
555
556     public synchronized RMMessageContext checkForResponseMessage(String JavaDoc requestId, String JavaDoc seqId) {
557         IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
558         if (sh == null) {
559             return null;
560         }
561         synchronized (sh) {
562             RMMessageContext msg = sh.getMessageRelatingTo(requestId);
563             return msg;
564         }
565     }
566
567     public String JavaDoc searchForSequenceId(String JavaDoc messageId) {
568         Iterator it = outgoingMap.keySet().iterator();
569
570         String JavaDoc key = null;
571         while (it.hasNext()) {
572             key = (String JavaDoc) it.next();
573             Object JavaDoc obj = outgoingMap.get(key);
574             if (obj != null) {
575                 OutgoingSequence hash = (OutgoingSequence) obj;
576                 boolean hasMsg = hash.hasMessageWithId(messageId);
577
578                 if (!hasMsg)
579                     key = null;
580                 else
581                     break;
582
583             }
584
585         }
586
587         return key;
588     }
589
590     public void setAckReceived(String JavaDoc seqId, long msgNo) {
591         Iterator it = outgoingMap.keySet().iterator();
592         String JavaDoc key = null;
593         while (it.hasNext()) {
594             key = (String JavaDoc) it.next();
595             Object JavaDoc obj = outgoingMap.get(key);
596
597             if (obj != null) {
598                 OutgoingSequence hash = (OutgoingSequence) obj;
599                 if (hash.getOutSequenceId().equals(seqId)) {
600                     hash.setAckReceived(msgNo);
601                 }
602             }
603         }
604
605     }
606
607     public RMMessageContext getLowPriorityMessageIfAcked() {
608         synchronized (lowPriorityQueue) {
609             int size = lowPriorityQueue.size();
610             RMMessageContext terminateMsg = null;
611             for (int i = 0; i < size; i++) {
612
613                 RMMessageContext temp;
614                 temp = (RMMessageContext) lowPriorityQueue.get(i);
615                 String JavaDoc seqId = temp.getSequenceID();
616                 OutgoingSequence hash = null;
617                 hash = (OutgoingSequence) outgoingMap.get(seqId);
618                 if (hash == null) {
619                     log.error("ERROR: HASH NOT FOUND SEQ ID " + seqId);
620                 }
621                 if (hash != null) {
622                     boolean complete = hash.isAckComplete();
623                     if (complete)
624                         terminateMsg = temp;
625                     if (terminateMsg != null) {
626                         terminateMsg.setSequenceID(hash.getOutSequenceId());
627                         lowPriorityQueue.remove(i);
628                         break;
629                     }
630                 }
631             }
632             return terminateMsg;
633         }
634
635     }
636
637     public void addSendMsgNo(String JavaDoc seqId, long msgNo) {
638         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
639         if (rsh != null) {
640
641             synchronized (rsh) {
642                 rsh.addMsgToSendList(msgNo);
643             }
644         }
645     }
646
647     public boolean isSentMsg(String JavaDoc seqId, long msgNo) {
648         OutgoingSequence rsh = (OutgoingSequence) outgoingMap.get(seqId);
649
650         if (rsh == null) {
651             return false;
652         }
653         synchronized (rsh) {
654             return rsh.isMsgInSentList(msgNo);
655         }
656
657
658     }
659
660     public boolean hasLastIncomingMsgReceived(String JavaDoc seqId) {
661
662         IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
663
664         if (sh == null) {
665             return false;
666         }
667         synchronized (sh) {
668             return sh.hasLastMsgReceived();
669         }
670     }
671
672     public long getLastIncomingMsgNo(String JavaDoc seqId) {
673         IncomingSequence sh = (IncomingSequence) incomingMap.get(seqId);
674         if (sh == null) {
675             return 0;
676         }
677         synchronized (sh) {
678             return sh.getLastMsgNumber();
679         }
680     }
681
682     public void addRequestedSequence(String JavaDoc seqId) {
683         requestedSequences.add(seqId);
684     }
685
686     public boolean isRequestedSeqPresent(String JavaDoc seqId) {
687         return requestedSequences.contains(seqId);
688     }
689
690     public String JavaDoc getKeyFromIncomingSequenceId(String JavaDoc seqId) {
691         synchronized (incomingMap) {
692             Iterator it = incomingMap.keySet().iterator();
693             while (it.hasNext()) {
694                 String JavaDoc key = (String JavaDoc) it.next();
695                 IncomingSequence is = (IncomingSequence) incomingMap.get(key);
696                 String JavaDoc seq = is.getSequenceId();
697                 if (seq == null)
698                     continue;
699
700                 if (seq.equals(seqId))
701                     return key;
702             }
703             return null;
704         }
705     }
706
707     /*public String getKeyFromOutgoingSequenceId(String seqId) {
708
709         synchronized (outgoingMap) {
710             System.out.println(" getKeyFromOutgoingSequenceId Received "+seqId);
711             String key = null;
712             Iterator it = outgoingMap.keySet().iterator();
713
714             while (it.hasNext()) {
715                 key = (String) it.next();
716                 OutgoingSequence os = (OutgoingSequence) outgoingMap.get(key);
717
718                 String seq = os.getSequenceId();
719                 if (seq == null)
720                     continue;
721
722                 if (seq.equals(seqId)) {
723                      System.out.println(" getKeyFromOutgoingSequenceId Found "+key);
724                     return key;
725
726                 }
727             }
728             System.out.println(" getKeyFromOutgoingSequenceId Found "+key);
729             return key;
730         }
731
732
733     }*/

734
735     public String JavaDoc getKeyFromOutgoingSequenceId(String JavaDoc seqId) {
736         synchronized (outgoingMap) {
737             Iterator it = outgoingMap.keySet().iterator();
738             while (it.hasNext()) {
739                 String JavaDoc key = (String JavaDoc) it.next();
740                 OutgoingSequence is = (OutgoingSequence) outgoingMap.get(key);
741                 String JavaDoc seq = is.getOutSequenceId();
742                 if (seq == null)
743                     continue;
744
745                 if (seq.equals(seqId))
746                     return key;
747             }
748             return null;
749         }
750     }
751
752     public boolean isAllOutgoingTerminateSent() {
753         synchronized (outgoingMap) {
754             Iterator keys = outgoingMap.keySet().iterator();
755             boolean found = false;
756
757             while (keys.hasNext()) {
758                 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(keys.next());
759                 if (ogs.isTerminateSent()) {
760                     found = true;
761                     break;
762                 }
763             }
764
765             return found;
766         }
767     }
768
769     public boolean isAllIncommingTerminateReceived() {
770         synchronized (incomingMap) {
771             Iterator keys = incomingMap.keySet().iterator();
772
773             while (keys.hasNext()) {
774                 Object JavaDoc key = keys.next();
775                 IncomingSequence ics = (IncomingSequence) incomingMap.get(key);
776                 OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(key);
777
778                 boolean hasResponse = ogs.hasResponse();
779
780                 if (hasResponse && !ics.isTerminateReceived())
781                     return false;
782             }
783
784             return true;
785         }
786     }
787
788     public void setTerminateSend(String JavaDoc seqId) {
789         synchronized (outgoingMap) {
790             OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
791             ogs.setTerminateSent(true);
792         }
793     }
794
795     public void setTerminateReceived(String JavaDoc seqId) {
796         IncomingSequence ics = (IncomingSequence) incomingMap.get(getKeyFromIncomingSequenceId(seqId));
797         ics.setTerminateReceived(true);
798     }
799
800     public void setAcksTo(String JavaDoc seqId, String JavaDoc acksTo) {
801
802         if (seqId == null) {
803             log.error("ERROR: seq is null in setAcksTo");
804             return;
805         }
806
807         acksToMap.put(seqId, acksTo);
808     }
809
810     public String JavaDoc getAcksTo(String JavaDoc seqId) {
811
812         if (seqId == null) {
813             log.error("ERROR: seq is null in getAcksTo");
814             return null;
815         }
816
817         return (String JavaDoc) acksToMap.get(seqId);
818     }
819
820
821     public void addOffer(String JavaDoc msgID, String JavaDoc offerID) {
822         if (msgID == null) {
823             log.error(" MessageID is null in addOffer");
824         }
825         offerMap.put(msgID, offerID);
826     }
827
828     public String JavaDoc getOffer(String JavaDoc msgID) {
829         if (msgID == null) {
830             log.error(" MessageID is null in getOffer");
831             return null;
832         }
833         return (String JavaDoc) offerMap.get(msgID);
834     }
835
836     public boolean isOutgoingTerminateSent(String JavaDoc seqId) {
837         synchronized (outgoingMap) {
838             OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
839             if (ogs != null) {
840                 if (ogs.isTerminateSent())
841                     return true;
842                 else
843                     return false;
844             }
845             return false;
846         }
847
848     }
849
850     public boolean isIncommingTerminateReceived(String JavaDoc seqId) {
851         synchronized (incomingMap) {
852
853             IncomingSequence ics = (IncomingSequence) incomingMap.get(seqId);
854             OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
855
856             boolean hasResponse = false;
857             if (ogs != null) {
858                 hasResponse = ogs.hasResponse();
859             }
860
861             if (hasResponse && ics != null && !ics.isTerminateReceived())
862                 return false;
863             else
864                 return true;
865         }
866
867     }
868
869     public void updateFinalMessageArrivedTime(String JavaDoc sequenceId) {
870         synchronized (incomingMap) {
871             IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId);
872             if (ics == null)
873                 return;
874
875             Date d = new Date();
876             long time = d.getTime();
877             ics.setFinalMsgArrivedTime(time);
878         }
879     }
880
881     public void sendAck(String JavaDoc sequenceId) {
882         synchronized (incomingMap) {
883             IncomingSequence ics = (IncomingSequence) incomingMap.get(sequenceId);
884             if (ics == null)
885                 return;
886
887             ics.setSendAck(true);
888         }
889     }
890
891     public void removeAllAcks(String JavaDoc sequenceID) {
892         synchronized (highPriorityQueue) {
893             int size = highPriorityQueue.size();
894
895             ArrayList remLst = new ArrayList();
896
897             for (int i = 0; i < size; i++) {
898                 RMMessageContext msg = (RMMessageContext) highPriorityQueue.get(i);
899                 if (msg.getSequenceID() != null)
900                     if (msg.getSequenceID().equals(sequenceID) && msg.getMessageType() == Constants.MSG_TYPE_ACKNOWLEDGEMENT)
901                         remLst.add(new Integer JavaDoc(i));
902             }
903
904             for (int i = 0; i < remLst.size(); i++) {
905                 Integer JavaDoc in = (Integer JavaDoc) remLst.get(i);
906                 highPriorityQueue.remove(in.intValue());
907             }
908         }
909     }
910
911
912 }
913
914
Popular Tags