KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > stack > NakReceiverWindow


1 // $Id: NakReceiverWindow.java,v 1.20 2005/04/18 15:31:37 belaban Exp $
2

3
4 package org.jgroups.stack;
5
6 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
7 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
8 import org.apache.commons.logging.Log;
9 import org.apache.commons.logging.LogFactory;
10 import org.jgroups.Address;
11 import org.jgroups.Message;
12 import org.jgroups.util.List;
13 import org.jgroups.util.TimeScheduler;
14
15 import java.util.*;
16
17
18
19 /**
20  * Keeps track of messages according to their sequence numbers. Allows
21  * messages to be added out of order, and with gaps between sequence numbers.
22  * Method <code>remove()</code> removes the first message with a sequence
23  * number that is 1 higher than <code>next_to_remove</code> (this variable is
24  * then incremented), or it returns null if no message is present, or if no
25  * message's sequence number is 1 higher.
26  * <p>
27  * When there is a gap upon adding a message, its seqno will be added to the
28  * Retransmitter, which (using a timer) requests retransmissions of missing
29  * messages and keeps on trying until the message has been received, or the
30  * member who sent the message is suspected.
31  * <p>
32  * Started out as a copy of SlidingWindow. Main diff: RetransmitCommand is
33  * different, and retransmission thread is only created upon detection of a
34  * gap.
35  * <p>
36  * Change Nov 24 2000 (bela): for PBCAST, which has its own retransmission
37  * (via gossip), the retransmitter thread can be turned off
38  * <p>
39  * Change April 25 2001 (igeorg):<br>
40  * i. Restructuring: placed all nested class definitions at the top, then
41  * class static/non-static variables, then class private/public methods.<br>
42  * ii. Class and all nested classes are thread safe. Readers/writer lock
43  * added on <tt>NakReceiverWindow</tt> for finer grained locking.<br>
44  * iii. Internal or externally provided retransmission scheduler thread.<br>
45  * iv. Exponential backoff in time for retransmissions.<br>
46  *
47  * @author Bela Ban May 27 1999, May 2004
48  * @author John Georgiadis May 8 2001
49  */

50 public class NakReceiverWindow {
51
52
53
54
55 // HashMap xmits=new HashMap(); // Long (seqno)/ XmitEntry
56
//
57
// class XmitEntry {
58
// long created=System.currentTimeMillis();
59
// long received;
60
// }
61

62
63     /** The big read/write lock */
64     private final ReadWriteLock lock=new WriterPreferenceReadWriteLock();
65     //private final ReadWriteLock lock=new NullReadWriteLock();
66

67     /** keep track of *next* seqno to remove and highest received */
68     private long head=0;
69     private long tail=0;
70
71     /** lowest seqno delivered so far */
72     private long lowest_seen=0;
73
74     /** highest deliverable (or delivered) seqno so far */
75     private long highest_seen=0;
76
77     /** TreeMap<Long,Message>. Maintains messages keyed by (sorted) sequence numbers */
78     private final TreeMap received_msgs=new TreeMap();
79
80     /** TreeMap<Long,Message>. Delivered (= seen by all members) messages. A remove() method causes a message to be
81      moved from received_msgs to delivered_msgs. Message garbage collection will gradually remove elements in this map */

82     private final TreeMap delivered_msgs=new TreeMap();
83
84     /**
85      * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered
86      * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where
87      * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never
88      * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message
89      * around, and don't need to wait for garbage collection to remove them.
90      */

91     private boolean discard_delivered_msgs=false;
92
93
94     /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,
95      * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers
96      */

97     private int max_xmit_buf_size=0;
98
99     /** if not set, no retransmitter thread will be started. Useful if
100      * protocols do their own retransmission (e.g PBCAST) */

101     private Retransmitter retransmitter=null;
102
103     protected static final Log log=LogFactory.getLog(NakReceiverWindow.class);
104
105
106     /**
107      * Creates a new instance with the given retransmit command
108      *
109      * @param sender The sender associated with this instance
110      * @param cmd The command used to retransmit a missing message, will
111      * be invoked by the table. If null, the retransmit thread will not be
112      * started
113      * @param start_seqno The first sequence number to be received
114      * @param sched the external scheduler to use for retransmission
115      * requests of missing msgs. If it's not provided or is null, an internal
116      * one is created
117      */

118     public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd,
119                              long start_seqno, TimeScheduler sched) {
120         head=start_seqno;
121         tail=head;
122
123         if(cmd != null)
124             retransmitter=sched == null ?
125                     new Retransmitter(sender, cmd) :
126                     new Retransmitter(sender, cmd, sched);
127     }
128
129     /**
130      * Creates a new instance with the given retransmit command
131      *
132      * @param sender The sender associated with this instance
133      * @param cmd The command used to retransmit a missing message, will
134      * be invoked by the table. If null, the retransmit thread will not be
135      * started
136      * @param start_seqno The first sequence number to be received
137      */

138     public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long start_seqno) {
139         this(sender, cmd, start_seqno, null);
140     }
141
142     /**
143      * Creates a new instance without a retransmission thread
144      *
145      * @param sender The sender associated with this instance
146      * @param start_seqno The first sequence number to be received
147      */

148     public NakReceiverWindow(Address sender, long start_seqno) {
149         this(sender, null, start_seqno);
150     }
151
152
153     public void setRetransmitTimeouts(long[] timeouts) {
154         if(retransmitter != null)
155             retransmitter.setRetransmitTimeouts(timeouts);
156     }
157
158
159     public void setDiscardDeliveredMessages(boolean flag) {
160         this.discard_delivered_msgs=flag;
161     }
162
163     public int getMaxXmitBufSize() {
164         return max_xmit_buf_size;
165     }
166
167     public void setMaxXmitBufSize(int max_xmit_buf_size) {
168         this.max_xmit_buf_size=max_xmit_buf_size;
169     }
170
171
172     /**
173      * Adds a message according to its sequence number (ordered).
174      * <p>
175      * Variables <code>head</code> and <code>tail</code> mark the start and
176      * end of the messages received, but not delivered yet. When a message is
177      * received, if its seqno is smaller than <code>head</code>, it is
178      * discarded (already received). If it is bigger than <code>tail</code>,
179      * we advance <code>tail</code> and add empty elements. If it is between
180      * <code>head</code> and <code>tail</code>, we set the corresponding
181      * missing (or already present) element. If it is equal to
182      * <code>tail</code>, we advance the latter by 1 and add the message
183      * (default case).
184      */

185     public void add(long seqno, Message msg) {
186         long old_tail;
187
188         try {
189             lock.writeLock().acquire();
190             try {
191                 old_tail=tail;
192                 if(seqno < head) {
193                     if(log.isTraceEnabled()) {
194                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc("seqno ");
195                         sb.append(seqno).append(" is smaller than ").append(head).append("); discarding message");
196                         log.trace(sb.toString());
197                     }
198                     return;
199                 }
200
201                 // add at end (regular expected msg)
202
if(seqno == tail) {
203                     received_msgs.put(new Long JavaDoc(seqno), msg);
204                     tail++;
205                     highest_seen=seqno;
206                 }
207                 // gap detected
208
// i. add placeholders, creating gaps
209
// ii. add real msg
210
// iii. tell retransmitter to retrieve missing msgs
211
else if(seqno > tail) {
212                     for(long i=tail; i < seqno; i++) {
213                         received_msgs.put(new Long JavaDoc(i), null);
214                         // XmitEntry xmit_entry=new XmitEntry();
215
//xmits.put(new Long(i), xmit_entry);
216
tail++;
217                     }
218                     received_msgs.put(new Long JavaDoc(seqno), msg);
219                     tail=seqno + 1;
220                     if(retransmitter != null) {
221                         retransmitter.add(old_tail, seqno - 1);
222                     }
223                     // finally received missing message
224
}
225                 else if(seqno < tail) {
226                     if(log.isTraceEnabled()) {
227                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc("added missing msg ");
228                         sb.append(msg.getSrc()).append('#').append(seqno);
229                         log.trace(sb.toString());
230                     }
231
232                     Object JavaDoc val=received_msgs.get(new Long JavaDoc(seqno));
233                     if(val == null) {
234                         // only set message if not yet received (bela July 23 2003)
235
received_msgs.put(new Long JavaDoc(seqno), msg);
236
237                         if(highest_seen +1 == seqno || seqno == head)
238                             updateHighestSeen();
239
240                         //XmitEntry xmit_entry=(XmitEntry)xmits.get(new Long(seqno));
241
//if(xmit_entry != null)
242
// xmit_entry.received=System.currentTimeMillis();
243
//long xmit_diff=xmit_entry == null? -1 : xmit_entry.received - xmit_entry.created;
244
//NAKACK.addXmitResponse(msg.getSrc(), seqno);
245
if(retransmitter != null) retransmitter.remove(seqno);
246                     }
247                 }
248                 updateLowestSeen();
249             }
250             finally {
251                 lock.writeLock().release();
252             }
253         }
254         catch(InterruptedException JavaDoc e) {
255             log.error("failed acquiring write lock", e);
256         }
257     }
258
259
260     /** Start from the current sequence number and set highest_seen until we find a gap (null value in the entry) */
261     void updateHighestSeen() {
262         SortedMap map=received_msgs.tailMap(new Long JavaDoc(highest_seen));
263         Map.Entry entry;
264         for(Iterator it=map.entrySet().iterator(); it.hasNext();) {
265             entry=(Map.Entry)it.next();
266             if(entry.getValue() != null)
267                 highest_seen=((Long JavaDoc)entry.getKey()).longValue();
268             else
269                 break;
270         }
271     }
272
273     public Message remove() {
274         Message retval=null;
275         Long JavaDoc key;
276         boolean bounded_buffer_enabled=max_xmit_buf_size > 0;
277
278         try {
279             lock.writeLock().acquire();
280             try {
281                 while(received_msgs.size() > 0) {
282                     if(log.isTraceEnabled()) {
283                         StringBuffer JavaDoc sb=new StringBuffer JavaDoc("received msgs=");
284                         sb.append(received_msgs.size()).append(", max_xmit_buf_size=").append(max_xmit_buf_size);
285                         log.trace(sb.toString());
286                     }
287
288                     key=(Long JavaDoc)received_msgs.firstKey();
289                     retval=(Message)received_msgs.get(key);
290                     if(retval != null) { // message exists and is ready for delivery
291
received_msgs.remove(key); // move from received_msgs to ...
292
if(discard_delivered_msgs == false) {
293                             delivered_msgs.put(key, retval); // delivered_msgs
294
}
295                         head++; // is removed from retransmitter somewhere else (when missing message is received)
296
return retval;
297                     }
298                     else { // message has not yet been received (gap in the message sequence stream)
299
if(bounded_buffer_enabled && received_msgs.size() > max_xmit_buf_size) {
300                             received_msgs.remove(key); // move from received_msgs to ...
301
head++;
302                             retransmitter.remove(key.longValue());
303                         }
304                         else {
305                             break;
306                         }
307                     }
308                 }
309                 return retval;
310             }
311             finally {
312                 lock.writeLock().release();
313             }
314         }
315         catch(InterruptedException JavaDoc e) {
316             log.error("failed acquiring write lock", e);
317             return null;
318         }
319     }
320
321
322
323     /**
324      * Delete all messages <= seqno (they are stable, that is, have been
325      * received at all members). Stop when a number > seqno is encountered
326      * (all messages are ordered on seqnos).
327      */

328     public void stable(long seqno) {
329         try {
330             lock.writeLock().acquire();
331             try {
332                 // we need to remove all seqnos *including* seqno: because headMap() *excludes* seqno, we
333
// simply increment it, so we have to correct behavior
334
SortedMap m=delivered_msgs.headMap(new Long JavaDoc(seqno +1));
335                 if(m.size() > 0)
336                     lowest_seen=Math.max(lowest_seen, ((Long JavaDoc)m.lastKey()).longValue());
337                 m.clear(); // removes entries from delivered_msgs
338
}
339             finally {
340                 lock.writeLock().release();
341             }
342         }
343         catch(InterruptedException JavaDoc e) {
344             log.error("failed acquiring write lock", e);
345         }
346     }
347
348
349     /**
350      * Reset the retransmitter and the nak window<br>
351      */

352     public void reset() {
353         try {
354             lock.writeLock().acquire();
355             try {
356                 if(retransmitter != null)
357                     retransmitter.reset();
358                 _reset();
359             }
360             finally {
361                 lock.writeLock().release();
362             }
363         }
364         catch(InterruptedException JavaDoc e) {
365             log.error("failed acquiring write lock", e);
366         }
367     }
368
369
370     /**
371      * Stop the retransmitter and reset the nak window<br>
372      */

373     public void destroy() {
374         try {
375             lock.writeLock().acquire();
376             try {
377                 if(retransmitter != null)
378                     retransmitter.stop();
379                 _reset();
380             }
381             finally {
382                 lock.writeLock().release();
383             }
384         }
385         catch(InterruptedException JavaDoc e) {
386             log.error("failed acquiring write lock", e);
387         }
388     }
389
390
391     /**
392      * @return the highest sequence number of a message consumed by the
393      * application (by <code>remove()</code>)
394      */

395     public long getHighestDelivered() {
396         try {
397             lock.readLock().acquire();
398             try {
399                 return (Math.max(head - 1, -1));
400             }
401             finally {
402                 lock.readLock().release();
403             }
404         }
405         catch(InterruptedException JavaDoc e) {
406             log.error("failed acquiring read lock", e);
407             return -1;
408         }
409     }
410
411
412     /**
413      * @return the lowest sequence number of a message that has been
414      * delivered or is a candidate for delivery (by the next call to
415      * <code>remove()</code>)
416      */

417     public long getLowestSeen() {
418         try {
419             lock.readLock().acquire();
420             try {
421                 return (lowest_seen);
422             }
423             finally {
424                 lock.readLock().release();
425             }
426         }
427         catch(InterruptedException JavaDoc e) {
428             log.error("failed acquiring read lock", e);
429             return -1;
430         }
431     }
432
433
434     /**
435      * Returns the highest deliverable seqno, e.g. for 1,2,3,5,6 it would
436      * be 3.
437      *
438      * @see NakReceiverWindow#getHighestReceived
439      */

440     public long getHighestSeen() {
441         try {
442             lock.readLock().acquire();
443             try {
444                 return (highest_seen);
445             }
446             finally {
447                 lock.readLock().release();
448             }
449         }
450         catch(InterruptedException JavaDoc e) {
451             log.error("failed acquiring read lock", e);
452             return -1;
453         }
454     }
455
456
457     /**
458      * Find all messages between 'low' and 'high' (including 'low' and
459      * 'high') that have a null msg.
460      * Return them as a list of longs
461      *
462      * @return List<Long>. A list of seqnos, sorted in ascending order.
463      * E.g. [1, 4, 7, 8]
464      */

465     public List getMissingMessages(long low, long high) {
466         List retval=new List();
467         // long my_high;
468

469         if(low > high) {
470             if(log.isErrorEnabled()) log.error("invalid range: low (" + low +
471                     ") is higher than high (" + high + ')');
472             return null;
473         }
474
475         try {
476             lock.readLock().acquire();
477             try {
478
479                 // my_high=Math.max(head - 1, 0);
480
// check only received messages, because delivered messages *must* have a non-null msg
481
SortedMap m=received_msgs.subMap(new Long JavaDoc(low), new Long JavaDoc(high+1));
482                 for(Iterator it=m.keySet().iterator(); it.hasNext();) {
483                     retval.add(it.next());
484                 }
485
486 // if(received_msgs.size() > 0) {
487
// entry=(Entry)received_msgs.peek();
488
// if(entry != null) my_high=entry.seqno;
489
// }
490
// for(long i=my_high + 1; i <= high; i++)
491
// retval.add(new Long(i));
492

493                 return retval;
494             }
495             finally {
496                 lock.readLock().release();
497             }
498         }
499         catch(InterruptedException JavaDoc e) {
500             log.error("failed acquiring read lock", e);
501             return null;
502         }
503     }
504
505
506     /**
507      * Returns the highest sequence number received so far (which may be
508      * higher than the highest seqno <em>delivered</em> so far, e.g. for
509      * 1,2,3,5,6 it would be 6
510      *
511      * @see NakReceiverWindow#getHighestSeen
512      */

513     public long getHighestReceived() {
514         try {
515             lock.readLock().acquire();
516             try {
517                 return Math.max(tail - 1, -1);
518             }
519             finally {
520                 lock.readLock().release();
521             }
522         }
523         catch(InterruptedException JavaDoc e) {
524             log.error("failed acquiring read lock", e);
525             return -1;
526         }
527     }
528
529
530     /**
531      * Return messages that are higher than <code>seqno</code> (excluding
532      * <code>seqno</code>). Check both received <em>and</em> delivered
533      * messages.
534      * @return List<Message>. All messages that have a seqno greater than <code>seqno</code>
535      */

536     public List getMessagesHigherThan(long seqno) {
537         List retval=new List();
538
539         try {
540             lock.readLock().acquire();
541             try {
542                 // check received messages
543
SortedMap m=received_msgs.tailMap(new Long JavaDoc(seqno+1));
544                 for(Iterator it=m.values().iterator(); it.hasNext();) {
545                     retval.add((it.next()));
546                 }
547
548                 // we retrieve all msgs whose seqno is strictly greater than seqno (tailMap() *includes* seqno,
549
// but we need to exclude seqno, that's why we increment it
550
m=delivered_msgs.tailMap(new Long JavaDoc(seqno +1));
551                 for(Iterator it=m.values().iterator(); it.hasNext();) {
552                     retval.add(((Message)it.next()).copy());
553                 }
554                 return (retval);
555
556             }
557             finally {
558                 lock.readLock().release();
559             }
560         }
561         catch(InterruptedException JavaDoc e) {
562             log.error("failed acquiring read lock", e);
563             return null;
564         }
565     }
566
567
568     /**
569      * Return all messages m for which the following holds:
570      * m > lower && m <= upper (excluding lower, including upper). Check both
571      * <code>received_msgs</code> and <code>delivered_msgs</code>.
572      */

573     public List getMessagesInRange(long lower, long upper) {
574         List retval=new List();
575
576         try {
577             lock.readLock().acquire();
578             try {
579                 // check received messages
580
SortedMap m=received_msgs.subMap(new Long JavaDoc(lower +1), new Long JavaDoc(upper +1));
581                 for(Iterator it=m.values().iterator(); it.hasNext();) {
582                     retval.add(it.next());
583                 }
584
585                 m=delivered_msgs.subMap(new Long JavaDoc(lower +1), new Long JavaDoc(upper +1));
586                 for(Iterator it=m.values().iterator(); it.hasNext();) {
587                     retval.add(((Message)it.next()).copy());
588                 }
589                 return retval;
590
591             }
592             finally {
593                 lock.readLock().release();
594             }
595         }
596         catch(InterruptedException JavaDoc e) {
597             log.error("failed acquiring read lock", e);
598             return null;
599         }
600     }
601
602
603     /**
604      * Return a list of all messages for which there is a seqno in
605      * <code>missing_msgs</code>. The seqnos of the argument list are
606      * supposed to be in ascending order
607      * @param missing_msgs A List<Long> of seqnos
608      * @return List<Message>
609      */

610     public List getMessagesInList(List missing_msgs) {
611         List ret=new List();
612
613         if(missing_msgs == null) {
614             if(log.isErrorEnabled()) log.error("argument list is null");
615             return ret;
616         }
617
618         try {
619             lock.readLock().acquire();
620             try {
621                 Long JavaDoc seqno;
622                 Message msg;
623                 for(Enumeration en=missing_msgs.elements(); en.hasMoreElements();) {
624                     seqno=(Long JavaDoc)en.nextElement();
625                     msg=(Message)delivered_msgs.get(seqno);
626                     if(msg != null)
627                         ret.add(msg.copy());
628                     msg=(Message)received_msgs.get(seqno);
629                     if(msg != null)
630                         ret.add(msg.copy());
631                 }
632                 return ret;
633             }
634             finally {
635                 lock.readLock().release();
636             }
637         }
638         catch(InterruptedException JavaDoc e) {
639             log.error("failed acquiring read lock", e);
640             return null;
641         }
642     }
643
644
645     public int size() {
646         boolean acquired=false;
647         try {
648             lock.readLock().acquire();
649             acquired=true;
650         }
651         catch(InterruptedException JavaDoc e) {}
652         try {
653             return received_msgs.size();
654         }
655         finally {
656             if(acquired)
657                 lock.readLock().release();
658         }
659     }
660
661
662     public String JavaDoc toString() {
663         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
664         try {
665             lock.readLock().acquire();
666             try {
667                 sb.append("received_msgs: " + printReceivedMessages());
668                 sb.append(", delivered_msgs: " + printDeliveredMessages());
669             }
670             finally {
671                 lock.readLock().release();
672             }
673         }
674         catch(InterruptedException JavaDoc e) {
675             log.error("failed acquiring read lock", e);
676             return "";
677         }
678
679         return sb.toString();
680     }
681
682
683     /**
684      * Prints delivered_msgs. Requires read lock present.
685      * @return
686      */

687     String JavaDoc printDeliveredMessages() {
688         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
689         Long JavaDoc min=null, max=null;
690
691         if(delivered_msgs.size() > 0) {
692             try {min=(Long JavaDoc)delivered_msgs.firstKey();} catch(NoSuchElementException ex) {}
693             try {max=(Long JavaDoc)delivered_msgs.lastKey();} catch(NoSuchElementException ex) {}
694         }
695         sb.append('[').append(min).append(" - ").append(max).append(']');
696         return sb.toString();
697     }
698
699
700     /**
701      * Prints received_msgs. Requires read lock to be present
702      * @return
703      */

704     String JavaDoc printReceivedMessages() {
705         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
706         sb.append('[');
707         if(received_msgs.size() > 0) {
708             Long JavaDoc first=null, last=null;
709             try {first=(Long JavaDoc)received_msgs.firstKey();} catch(NoSuchElementException ex) {}
710             try {last=(Long JavaDoc)received_msgs.lastKey();} catch(NoSuchElementException ex) {}
711             sb.append(first).append(" - ").append(last);
712             int non_received=0;
713             Map.Entry entry;
714
715             for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
716                 entry=(Map.Entry)it.next();
717                 if(entry.getValue() == null)
718                     non_received++;
719             }
720             sb.append(" (size=").append(received_msgs.size()).append(", missing=").append(non_received).append(')');
721         }
722         sb.append(']');
723         return sb.toString();
724     }
725
726     /* ------------------------------- Private Methods -------------------------------------- */
727
728
729     /**
730      * Sets the value of lowest_seen to the lowest seqno of the delivered messages (if available), otherwise
731      * to the lowest seqno of received messages.
732      */

733     private void updateLowestSeen() {
734         Long JavaDoc lowest_seqno=null;
735
736         // If both delivered and received messages are empty, let the highest
737
// seen seqno be the one *before* the one which is expected to be
738
// received next by the NakReceiverWindow (head-1)
739

740         // incorrect: if received and delivered msgs are empty, don't do anything: we may have initial values,
741
// but both lists are cleaned after some time of inactivity
742
// (bela April 19 2004)
743
/*
744         if((delivered_msgs.size() == 0) && (msgs.size() == 0)) {
745             lowest_seen=0;
746             return;
747         }
748         */

749
750         // The lowest seqno is the first seqno of the delivered messages
751
if(delivered_msgs.size() > 0) {
752             try {
753                 lowest_seqno=(Long JavaDoc)delivered_msgs.firstKey();
754                 if(lowest_seqno != null)
755                     lowest_seen=lowest_seqno.longValue();
756             }
757             catch(NoSuchElementException ex) {
758             }
759         }
760         // If no elements in delivered messages (e.g. due to message garbage collection), use the received messages
761
else {
762             if(received_msgs.size() > 0) {
763                 try {
764                     lowest_seqno=(Long JavaDoc)received_msgs.firstKey();
765                     if(received_msgs.get(lowest_seqno) != null) { // only set lowest_seen if we *have* a msg
766
lowest_seen=lowest_seqno.longValue();
767                     }
768                 }
769                 catch(NoSuchElementException ex) {}
770             }
771         }
772     }
773
774
775     /**
776      * Find the highest seqno that is deliverable or was actually delivered.
777      * Returns seqno-1 if there are no messages in the queues (the first
778      * message to be expected is always seqno).
779      */

780 // private void updateHighestSeen() {
781
// long ret=0;
782
// Map.Entry entry=null;
783
//
784
// // If both delivered and received messages are empty, let the highest
785
// // seen seqno be the one *before* the one which is expected to be
786
// // received next by the NakReceiverWindow (head-1)
787
//
788
// // changed by bela (April 19 2004): we don't change the value if received and delivered msgs are empty
789
// /*if((delivered_msgs.size() == 0) && (msgs.size() == 0)) {
790
// highest_seen=0;
791
// return;
792
// }*/
793
//
794
//
795
// // The highest seqno is the last of the delivered messages, to start with,
796
// // or again the one before the first seqno expected (if no delivered
797
// // msgs). Then iterate through the received messages, and find the highest seqno *before* a gap
798
// Long highest_seqno=null;
799
// if(delivered_msgs.size() > 0) {
800
// try {
801
// highest_seqno=(Long)delivered_msgs.lastKey();
802
// ret=highest_seqno.longValue();
803
// }
804
// catch(NoSuchElementException ex) {
805
// }
806
// }
807
// else {
808
// ret=Math.max(head - 1, 0);
809
// }
810
//
811
// // Now check the received msgs head to tail. if there is an entry
812
// // with a non-null msg, increment ret until we find an entry with
813
// // a null msg
814
// for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {
815
// entry=(Map.Entry)it.next();
816
// if(entry.getValue() != null)
817
// ret=((Long)entry.getKey()).longValue();
818
// else
819
// break;
820
// }
821
// highest_seen=Math.max(ret, 0);
822
// }
823

824
825     /**
826      * Reset the Nak window. Should be called from within a writeLock() context.
827      * <p>
828      * i. Delete all received entries<br>
829      * ii. Delete alll delivered entries<br>
830      * iii. Reset all indices (head, tail, etc.)<br>
831      */

832     private void _reset() {
833         received_msgs.clear();
834         delivered_msgs.clear();
835         head=0;
836         tail=0;
837         lowest_seen=0;
838         highest_seen=0;
839     }
840     /* --------------------------- End of Private Methods ----------------------------------- */
841
842
843 }
844
Popular Tags