KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > TOTAL_TOKEN


1 //$Id: TOTAL_TOKEN.java,v 1.8 2004/09/23 16:29:42 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.blocks.GroupRequest;
7 import org.jgroups.protocols.pbcast.Digest;
8 import org.jgroups.protocols.ring.RingNodeFlowControl;
9 import org.jgroups.protocols.ring.RingToken;
10 import org.jgroups.protocols.ring.TokenLostException;
11 import org.jgroups.protocols.ring.UdpRingNode;
12 import org.jgroups.stack.IpAddress;
13 import org.jgroups.stack.RpcProtocol;
14 import org.jgroups.util.RspList;
15 import org.jgroups.util.Util;
16
17 import java.io.IOException JavaDoc;
18 import java.io.ObjectInput JavaDoc;
19 import java.io.ObjectOutput JavaDoc;
20 import java.util.*;
21
22 /**
23  * <p>
24  * Total order implementation based on <a HREF="http://citeseer.nj.nec.com/amir95totem.html">
25  * The Totem Single-Ring Ordering and Membership Protocol</a>
26  * <p>
27  *
28  * <p>
29  * However, this is an adaption of algorithm mentioned in the research paper above since we reuse
30  * our own membership protocol and failure detectors. Somewhat different flow control mechanism is
31  * also implemented.
32  *
33  * <p>
34  * Token passing is done through reliable point-to-point udp channels provided by UNICAST layer.
35  * Process groups nodes members are organized in a logical ring.
36  * </p>
37  *
38  * <p>
39  * Total token layer doesn't need NAKACK nor STABLE layer beneath it since it implements it's own
40  * retransmission and tracks stability of the messages from the information piggybacked on the
41  * token itself.
42  * </p>
43  *
44  * <p>
45  * For the typical protocol stack configuration used, see org.jgroups.demos.TotalTokenDemo and
46  * total-token.xml configuration file provided with this distribution of JGroups.
47  * </p>
48  *
49  *
50  *
51  *@author Vladimir Blagojevic vladimir@cs.yorku.ca
52  *@version $Revision: 1.8 $
53  *
54  *@see org.jgroups.protocols.ring.RingNodeFlowControl
55  *@see org.jgroups.protocols.ring.RingNode
56  *@see org.jgroups.protocols.ring.TcpRingNode
57  *@see org.jgroups.protocols.ring.UdpRingNode
58  *
59  **/

60
61
62 public class TOTAL_TOKEN extends RpcProtocol
63 {
64
65
66    public static class TotalTokenHeader extends Header
67    {
68
69
70       /**
71        * sequence number of the message
72        */

73       private long seq;
74
75
76       /**
77        *used for externalization
78        */

79       public TotalTokenHeader()
80       {
81       }
82
83       public TotalTokenHeader(long seq)
84       {
85          this.seq = seq;
86       }
87
88       public TotalTokenHeader(Long JavaDoc seq)
89       {
90          this.seq = seq.longValue();
91       }
92
93
94       /**
95        *Returns sequence number of the message that owns this header
96        *@return sequence number
97        */

98       public long getSeq()
99       {
100          return seq;
101       }
102
103       /**
104        *Returns size of the header
105        * @return headersize in bytes
106        */

107       public long size()
108       {
109          //calculated using Util.SizeOf(Object)
110
return 121;
111       }
112
113       /**
114        * Manual serialization
115        *
116        *
117        */

118       public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc
119       {
120          out.writeLong(seq);
121       }
122
123       /**
124        * Manual deserialization
125        *
126        */

127       public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc
128       {
129          seq = in.readLong();
130       }
131
132       public String JavaDoc toString()
133       {
134          return "[TotalTokenHeader=" + seq + ']';
135       }
136    }
137
138    public static class RingTokenHeader extends Header
139    {
140       public RingTokenHeader()
141       {
142       }
143
144       public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc
145       {
146       }
147
148       public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc
149       {
150       }
151
152       public long size()
153       {
154          //calculated using Util.SizeOf(Object)
155
return 110;
156       }
157    }
158
159
160    private static final int OPERATIONAL_STATE = 0;
161    private static final int RECOVERY_STATE = 1;
162
163    UdpRingNode node;
164    RingNodeFlowControl flowControl;
165    Address localAddress;
166    final TokenTransmitter tokenRetransmitter=new TokenTransmitter();
167    List newMessagesQueue;
168    SortedSet liveMembersInRecovery,suspects;
169
170    final Object JavaDoc mutex = new Object JavaDoc();
171    TreeMap receivedMessagesQueue;
172    long myAru = 0;
173
174    final Object JavaDoc threadCoordinationMutex = new Object JavaDoc();
175    final boolean tokenInStack = false;
176    final boolean threadDeliveringMessage = false;
177    boolean tokenSeen = false;
178
179
180    volatile boolean isRecoveryLeader = false;
181    volatile int state;
182    volatile int sleepTime = 10;
183
184    long highestSeenSeq = 0;
185    long lastRoundTokensAru = 0;
186    int lastRoundTransmitCount,lastRoundRebroadcastCount = 0;
187    int blockSendingBacklogThreshold = Integer.MAX_VALUE;
188    int unblockSendingBacklogThreshold = Integer.MIN_VALUE;
189    boolean tokenCirculating = false;
190    boolean senderBlocked = false;
191    public static final String JavaDoc prot_name = "TOTAL_TOKEN";
192
193
194    public String JavaDoc getName()
195    {
196       return prot_name;
197    }
198
199    private String JavaDoc getState()
200    {
201       if (state == OPERATIONAL_STATE)
202       {
203          return "OPERATIONAL";
204       }
205       else
206          return "RECOVERY";
207    }
208
209
210     public void start() throws Exception JavaDoc {
211         super.start();
212         newMessagesQueue = Collections.synchronizedList(new ArrayList());
213         receivedMessagesQueue = new TreeMap();
214         tokenRetransmitter.start();
215     }
216
217    /**
218     * Overrides @org.jgroups.stack.MessageProtocol#stop().
219     */

220    public void stop()
221    {
222        super.stop();
223        tokenRetransmitter.shutDown();
224    }
225
226
227
228
229    /**
230     * Setup the Protocol instance acording to the configuration string
231     *
232     */

233    public boolean setProperties(Properties props)
234    {
235       String JavaDoc str;
236
237        super.setProperties(props);
238       str = props.getProperty("block_sending");
239       if (str != null)
240       {
241          blockSendingBacklogThreshold = Integer.parseInt(str);
242          props.remove("block_sending");
243       }
244
245       str = props.getProperty("unblock_sending");
246       if (str != null)
247       {
248          unblockSendingBacklogThreshold = Integer.parseInt(str);
249          props.remove("unblock_sending");
250       }
251
252       if (props.size() > 0)
253       {
254          System.err.println("UDP.setProperties(): the following properties are not recognized:");
255          props.list(System.out);
256          return false;
257       }
258       return true;
259    }
260
261    public IpAddress getTokenReceiverAddress()
262    {
263       return node != null? node.getTokenReceiverAddress() : null;
264    }
265
266    public Vector providedUpServices()
267    {
268       Vector retval = new Vector();
269       retval.addElement(new Integer JavaDoc(Event.GET_DIGEST));
270       retval.addElement(new Integer JavaDoc(Event.GET_DIGEST_STATE));
271       retval.addElement(new Integer JavaDoc(Event.SET_DIGEST));
272       return retval;
273    }
274
275    public boolean handleUpEvent(Event evt)
276    {
277       Message msg;
278       Header h;
279       switch (evt.getType())
280       {
281
282          case Event.SET_LOCAL_ADDRESS:
283             localAddress = (Address) evt.getArg();
284             node = new UdpRingNode(this, localAddress);
285             flowControl = new RingNodeFlowControl();
286             break;
287
288          case Event.SUSPECT:
289             Address suspect = (Address) evt.getArg();
290             onSuspectMessage(suspect);
291             break;
292
293          case Event.MSG:
294             msg = (Message) evt.getArg();
295             h = msg.getHeader(getName());
296             if (h instanceof TotalTokenHeader)
297             {
298                messageArrived(msg);
299                return false;
300             }
301             else if (h instanceof RingTokenHeader)
302             {
303                 if(node != null) {
304                     Object JavaDoc tmp=msg.getObject();
305                     node.tokenArrived(tmp);
306                 }
307                return false;
308             }
309       }
310       return true;
311    }
312
313    public boolean handleDownEvent(Event evt)
314    {
315       switch (evt.getType())
316       {
317          case Event.GET_DIGEST:
318          case Event.GET_DIGEST_STATE:
319
320             Digest d = new Digest(members.size());
321             Address sender = null;
322             //all members have same digest :)
323
for (int j = 0; j < members.size(); j++)
324             {
325                sender = (Address) members.elementAt(j);
326                d.add(sender, highestSeenSeq, highestSeenSeq);
327             }
328             passUp(new Event(Event.GET_DIGEST_OK, d));
329             return false;
330          case Event.SET_DIGEST:
331             Digest receivedDigest = (Digest) evt.getArg();
332             myAru = receivedDigest.highSeqnoAt(0);
333             return false;
334
335          case Event.VIEW_CHANGE:
336             onViewChange();
337             return true;
338
339             /*
340          case Event.CLEANUP:
341             // do not pass cleanup event
342             //further down. This is a hack to enable
343             // sucessfull leave from group when using pbcast.GMS.
344             // It just buys us 5 seconds to imminent STOP
345             // event following CLEANUP. We hope that the moment
346             // this node disconnect up until new view is installed
347             // at other members is less than 5 seconds.
348
349             //The proper way would be to:
350             //trap DISCONNECT event on the way down, do not pass it further.
351             //wait for the new view to be installed (effectively excluding this node out of
352             //ring) , wait for one token roundtrip time, and then send that trapped
353             //DISCONNECT event down furhter to generate DISCONNECT_OK on the way up.
354             // CLEANUP and STOP are generated after DISCONNECT.
355
356             //However, as the things stand right now pbcast.GMS stops working immediately
357             //when it receives DISCONNECT thus the new view is never generated in node that is
358             //leaving the group.
359
360             //pbcsat.GMS should still generate new view and stop working when
361             //it receives STOP event.
362
363             //In timeline DISCONNECT < CLEANUP < STOP
364             return false;
365             */

366
367          case Event.MSG:
368             Message msg = (Message) evt.getArg();
369             //handle only multicasts
370
if (msg == null) return false;
371             if (msg.getDest() == null || msg.getDest().isMulticastAddress())
372             {
373                newMessagesQueue.add(msg);
374                return false;
375             }
376       }
377       return true;
378    }
379
380    private void onViewChange()
381    {
382       isRecoveryLeader = false;
383
384       if (suspects != null)
385       {
386          suspects.clear();
387          suspects = null;
388       }
389       if (liveMembersInRecovery != null)
390       {
391          liveMembersInRecovery.clear();
392          liveMembersInRecovery = null;
393       }
394    }
395
396    private void onSuspectMessage(Address suspect)
397    {
398       state = RECOVERY_STATE;
399       if (suspects == null || suspects.size() == 0)
400       {
401          suspects = Collections.synchronizedSortedSet(new TreeSet());
402          liveMembersInRecovery = Collections.synchronizedSortedSet(new TreeSet(members));
403       }
404       suspects.add(suspect);
405       liveMembersInRecovery.removeAll(suspects);
406       isRecoveryLeader = isRecoveryLeader(liveMembersInRecovery);
407    }
408
409    /**
410     * Given a set of surviving members in the transitioanl view
411     * returns true if this stack is elected to be recovery leader.
412     *
413     */

414    private boolean isRecoveryLeader(SortedSet liveMembers)
415    {
416       boolean recoveryLeader = false;
417       if (liveMembers.size() > 0)
418       {
419          recoveryLeader = localAddress.equals(liveMembers.first());
420       }
421
422       {
423          if(log.isInfoEnabled()) log.info("live memebers are " + liveMembers);
424          if(log.isInfoEnabled()) log.info("I am recovery leader?" + recoveryLeader);
425       }
426       return recoveryLeader;
427
428    }
429
430    public long getAllReceivedUpTo()
431    {
432       return myAru;
433    }
434
435    public void installTransitionalView(Vector members)
436    {
437        if(node != null)
438            node.reconfigure(members);
439    }
440
441    /**
442     * Total Token recovery protocol (TTRP)
443     *
444     *
445     *
446     * Upon transition to recovery state, coordinator sends multiple reliable
447     * unicasts message requesting each ring member to report it's allReceivedUpto
448     * value. When all replies are received, a response list of allReceivedUpto
449     * values is sorted and transformed into a set while dropping the lowest
450     * allReceivedUpto value. For example , received response list [4,4,5,6,7,7,8]
451     * is transformed into [5,6,7,8] thus not including the lowest value 4.
452     *
453     * The objective of the recovery protocol is to have each member receive all
454     * messages up to maximum sequence value M from the response list, thus
455     * satisfying virtual synchrony properties.
456     *
457     * Note that if all members report the same allReceivedUpto values, then
458     * virtual synchrony is satisfied (since all surviving members have seen
459     * the same set of messages) and we can immediately inject operational
460     * token which will enable installment of the new view.
461     *
462     * Otherwise, a constructed set S of all allReceivedUpto values represent sequence ids
463     * of messages that have to be received by all mebers prior to installing new
464     * view thus satisfying virtual synchrony properties.
465     *
466     * A transitional view, visible only to TOTAL_TOKEN layer is then installed
467     * on the ring by a coordinator. Again a multiple unicast are used. A
468     * transitional view is deduced from current view by excluding suspected members.
469     * Coordinator creates a recovery token by appending the set S of sequence ids to
470     * token retransmission request list. Recovery token is then inserted into
471     * transitional ring view.
472     *
473     * Upon reception of recovery token, ring members are not allowed to transmit
474     * any additional new messages but only to retransmit messages from the
475     * specified token retransmission request list.
476     *
477     * When all member detect that they have received all messages upto sequence
478     * value M , the next member that first receives token converts it to operatioanl
479     * token and normal operational state is restored in all nodes.
480     *
481     * If token is lost during recovery stage, recovery protocol is restarted.
482     *
483     * */

484    private void recover()
485    {
486
487       if (isRecoveryLeader && state == RECOVERY_STATE)
488       {
489
490          {
491             if(log.isInfoEnabled()) log.info("I am starting recovery now");
492          }
493
494          Vector m = new Vector(liveMembersInRecovery);
495
496           RspList list=callRemoteMethods(m, "getAllReceivedUpTo", new Object JavaDoc[]{}, new Class JavaDoc[]{}, GroupRequest.GET_ALL, 0);
497          //RspList list = callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
498
Vector myAllReceivedUpTos = list.getResults();
499
500           callRemoteMethods(m, "getAllReceivedUpTo", new Object JavaDoc[]{}, new Class JavaDoc[]{}, GroupRequest.GET_ALL, 0);
501          //callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
502
Vector myAllReceivedUpTosConfirm = list.getResults();
503
504
505          while (!myAllReceivedUpTos.equals(myAllReceivedUpTosConfirm))
506          {
507             myAllReceivedUpTos = myAllReceivedUpTosConfirm;
508              callRemoteMethods(m, "getAllReceivedUpTo", new Object JavaDoc[]{}, new Class JavaDoc[]{}, GroupRequest.GET_ALL, 0);
509             // callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);
510
myAllReceivedUpTosConfirm = list.getResults();
511
512             {
513                if(log.isInfoEnabled()) log.info("myAllReceivedUpto values are" + myAllReceivedUpTos);
514                if(log.isInfoEnabled()) log.info("myAllReceivedUpto confirm values are " + myAllReceivedUpTosConfirm);
515             }
516          }
517
518
519          {
520             if(log.isInfoEnabled()) log.info("myAllReceivedUpto stabilized values are" + myAllReceivedUpTos);
521             if(log.isInfoEnabled()) log.info("installing transitional view to repair the ring...");
522          }
523
524           callRemoteMethods(m, "installTransitionalView", new Object JavaDoc[]{m}, new String JavaDoc[]{Vector.class.getName()},
525                             GroupRequest.GET_ALL, 0);
526          //callRemoteMethods(m, "installTransitionalView", m, GroupRequest.GET_ALL, 0);
527

528          Vector xmits = prepareRecoveryRetransmissionList(myAllReceivedUpTos);
529          RingToken injectToken = null;
530          if (xmits.size() > 1)
531          {
532
533             {
534                if(log.isInfoEnabled()) log.info("VS not satisfied, injecting recovery token...");
535             }
536             long aru = ((Long JavaDoc) xmits.firstElement()).longValue();
537             long highest = ((Long JavaDoc) xmits.lastElement()).longValue();
538
539             injectToken = new RingToken(RingToken.RECOVERY);
540             injectToken.setHighestSequence(highest);
541             injectToken.setAllReceivedUpto(aru);
542
543
544             Collection rtr = injectToken.getRetransmissionRequests();
545             rtr.addAll(xmits);
546          }
547          else
548          {
549
550             {
551                if(log.isInfoEnabled()) log.info("VS satisfied, injecting operational token...");
552             }
553             injectToken = new RingToken();
554             long sequence = ((Long JavaDoc) xmits.firstElement()).longValue();
555             injectToken.setHighestSequence(sequence);
556             injectToken.setAllReceivedUpto(sequence);
557          }
558           if(node != null)
559               node.passToken(injectToken);
560          tokenRetransmitter.resetTimeout();
561       }
562    }
563
564    /**
565     * Prepares a retransmissions list for recovery protocol
566     * given a collection of all myReceivedUpTo values as
567     * reported by polled surviving members.
568     *
569     *
570     *
571     */

572    private Vector prepareRecoveryRetransmissionList(Vector sequences)
573    {
574       Collections.sort(sequences);
575       Long JavaDoc first = (Long JavaDoc) sequences.firstElement();
576       Long JavaDoc last = (Long JavaDoc) sequences.lastElement();
577
578
579       Vector retransmissions = new Vector();
580       if (first.equals(last))
581       {
582          retransmissions.add(new Long JavaDoc(first.longValue()));
583       }
584       else
585       {
586          for (long j = first.longValue() + 1; j <= last.longValue(); j++)
587          {
588             retransmissions.add(new Long JavaDoc(j));
589          }
590       }
591       return retransmissions;
592    }
593
594
595    protected void updateView(View newMembers)
596    {
597       super.updateView(newMembers);
598       Vector newViewMembers = newMembers.getMembers();
599       flowControl.viewChanged(newViewMembers.size());
600        if(node != null)
601            node.reconfigure(newViewMembers);
602       boolean isCoordinator = localAddress.equals(newViewMembers.firstElement());
603       int memberSize = newViewMembers.size();
604
605       if (memberSize == 1 && isCoordinator && !tokenCirculating)
606       {
607          //create token for the first time , lets roll
608
tokenCirculating = true;
609          RingToken token = new RingToken();
610           if(node != null)
611               node.passToken(token);
612          tokenRetransmitter.resetTimeout();
613       }
614       sleepTime = (20/memberSize);
615    }
616
617
618    /**
619     * TOTAL_TOKEN's up-handler thread invokes this method after multicast
620     * message originating from some other TOTAL_TOKEN stack layer arrives at
621     * this stack layer.
622     *
623     * Up-handler thread coordinates it's access to a shared variables
624     * with TokenTransmitter thread.
625     *
626     * See tokenReceived() for details.
627     *
628     */

629    private void messageArrived(Message m)
630    {
631       TotalTokenHeader h = (TotalTokenHeader) m.getHeader(getName());
632       long seq = h.getSeq();
633
634
635       synchronized (mutex)
636       {
637          if ((myAru + 1) <= seq)
638          {
639             if (seq > highestSeenSeq)
640             {
641                highestSeenSeq = seq;
642             }
643
644             receivedMessagesQueue.put(new Long JavaDoc(seq), m);
645             if ((myAru + 1) == seq)
646             {
647                myAru = seq;
648                passUp(new Event(Event.MSG, m));
649             }
650             if (isReceiveQueueHolePlugged())
651             {
652                myAru = deliverMissingMessages();
653             }
654          }
655       }
656    }
657
658    /**
659     * Returns true if there is a hole in receive queue and at
660     * least one messages with sequence id consecutive to myAru.
661     *
662     *
663     */

664    private boolean isReceiveQueueHolePlugged()
665    {
666       return ((myAru < highestSeenSeq) && receivedMessagesQueue.containsKey(new Long JavaDoc(myAru + 1)));
667    }
668
669
670    /**
671     * Delivers as much as possible messages from receive
672     * message queue as long as they are consecutive with
673     * respect to their sequence ids.
674     *
675     */

676    private long deliverMissingMessages()
677    {
678       Map.Entry entry = null;
679       boolean inOrder = true;
680       long lastDelivered = myAru;
681       Set deliverySet = receivedMessagesQueue.tailMap(new Long JavaDoc(myAru + 1)).entrySet();
682
683
684       {
685          if(log.isInfoEnabled()) log.info("hole getting plugged, prior muAru " + myAru);
686       }
687
688
689       for (Iterator iterator = deliverySet.iterator();inOrder && iterator.hasNext();)
690       {
691          entry = (Map.Entry) iterator.next();
692          long nextInQueue = ((Long JavaDoc) entry.getKey()).longValue();
693          if (lastDelivered + 1 == nextInQueue)
694          {
695             Message m = (Message) entry.getValue();
696             passUp(new Event(Event.MSG, m));
697             lastDelivered++;
698          }
699          else
700          {
701             inOrder = false;
702          }
703       }
704
705
706       {
707          if(log.isInfoEnabled()) log.info("hole getting plugged, post muAru " + lastDelivered);
708       }
709       return lastDelivered;
710    }
711
712    /**
713     * Checks if the receivedMessageQueue has any missing sequence
714     * numbers in it, and if it does it finds holes in sequences from
715     * this stack's receivedMessageQueue and adds them to token retransmission
716     * list, thus informing other group members about messages missing
717     * from this stack.
718     *
719     *
720     */

721    private void updateTokenRtR(RingToken token)
722    {
723       long holeLowerBound = 0;
724       long holeUpperBound = 0;
725       Long JavaDoc missingSequence = null;
726       Collection retransmissionList = null;
727
728
729       //any holes?
730
if (myAru < token.getHighestSequence())
731       {
732          retransmissionList = token.getRetransmissionRequests();
733          Set received = receivedMessagesQueue.tailMap(new Long JavaDoc(myAru + 1)).keySet();
734          Iterator nonMissing = received.iterator();
735          holeLowerBound = myAru;
736
737
738             if(log.isDebugEnabled()) log.debug("retransmission request prior" + retransmissionList);
739
740          while (nonMissing.hasNext())
741          {
742             Long JavaDoc seq = (Long JavaDoc) nonMissing.next();
743             holeUpperBound = seq.longValue();
744
745             while (holeLowerBound < holeUpperBound)
746             {
747                missingSequence = new Long JavaDoc(++holeLowerBound);
748                retransmissionList.add(missingSequence);
749             }
750             holeLowerBound = holeUpperBound;
751          }
752
753          holeUpperBound = token.getHighestSequence();
754          while (holeLowerBound < holeUpperBound)
755          {
756             missingSequence = new Long JavaDoc(++holeLowerBound);
757             retransmissionList.add(missingSequence);
758          }
759
760
761             if(log.isDebugEnabled()) log.debug("retransmission request after" + retransmissionList);
762       }
763    }
764
765
766    /**
767     * Sends messages in this stacks's outgoing queue and
768     * saves a copy of each outgoing message in case they got lost.
769     * If messages get lost it is thus guaranteed that each stack
770     * that sent any message has a copy of it ready for retransmitting.
771     *
772     * Each sent message is stamped by monotonically increasing
773     * sequence number starting from the highest sequence "seen"
774     * on the ring.
775     *
776     * Returns number of messages actually sent. The number of
777     * sent messages is bounded above by the flow control
778     * algorithm (allowedCount) and bounded below by the number
779     * of pending messages in newMessagesQueue.
780     */

781    private int broadcastMessages(int allowedCount, RingToken token)
782    {
783       List sendList = null;
784       synchronized (newMessagesQueue)
785       {
786          int queueSize = newMessagesQueue.size();
787
788          if (queueSize <= 0)
789          {
790             return 0;
791          }
792
793          else if (queueSize > allowedCount)
794          {
795             sendList = new ArrayList(newMessagesQueue.subList(0, allowedCount));
796             newMessagesQueue.removeAll(sendList);
797          }
798
799          else
800          {
801             sendList = new ArrayList();
802             sendList.addAll(newMessagesQueue);
803             newMessagesQueue.clear();
804          }
805       }
806
807       long tokenSeq = token.getHighestSequence();
808
809       for (Iterator iterator = sendList.iterator(); iterator.hasNext();)
810       {
811          Message m = (Message) iterator.next();
812          m.setSrc(localAddress);
813          m.setDest(null); // mcast address
814
m.putHeader(getName(), new TotalTokenHeader(++tokenSeq));
815          receivedMessagesQueue.put(new Long JavaDoc(tokenSeq), m);
816          passDown(new Event(Event.MSG, m));
817       }
818
819       if (token.getHighestSequence() == token.getAllReceivedUpto())
820       {
821          token.setAllReceivedUpto(tokenSeq);
822       }
823       token.setHighestSequence(tokenSeq);
824       return sendList.size();
825    }
826
827
828    /**
829     * TokenTransmitter thread runs this method after receiving token.
830     * Thread is possibly blocked if up-handler thread is currently running
831     * through this stack i.e delivering an Event. Up-hanlder thread will
832     * notify blocked TokenTransmitter thread when it has delivered current
833     * Event so TokenTransmitter can proceed.
834     * TokenTransmitter thread in turn notifies possibly blocked up-handler thread
835     * when token has left the stack. Thus TokenTransmitter and up-hadler thread
836     * coordinate their access to shared variables(receivedMessageQueue and myAru).
837     *
838     * tokenReceived method and subsequent methods called from tokenReceived represent
839     * in some parts the totaly ordered algorithm presented in Amir's paper (see class
840     * header for link)
841     *
842     *
843     *
844     */

845    private void tokenReceived(RingToken token)
846    {
847
848
849       {
850          if(log.isInfoEnabled()) log.info(token.toString());
851          if(log.isDebugEnabled()) log.debug(getState());
852       }
853
854
855       flowControl.setBacklog(newMessagesQueue.size());
856       flowControl.updateWindow(token);
857
858
859       blockSenderIfRequired();
860       unBlockSenderIfAcceptable();
861
862
863       long tokensAru = 0;
864       int broadcastCount = 0;
865       int rebroadcastCount = 0;
866       synchronized (mutex)
867       {
868          if (!tokenSeen)
869          {
870             long lastRoundAru = token.getHighestSequence() - token.getLastRoundBroadcastCount();
871             if (myAru < token.getAllReceivedUpto())
872             {
873                myAru = lastRoundAru;
874             }
875             //if(log.isInfoEnabled()) log.info("TOTAL_TOKEN.tokenReceived()", "tokenSeen " + myAru);
876
tokenSeen = true;
877          }
878
879          if (token.getType() == RingToken.RECOVERY)
880          {
881             highestSeenSeq = token.getHighestSequence();
882             if (highestSeenSeq == myAru)
883             {
884                if(log.isInfoEnabled()) log.info("member node recovered");
885                token.addRecoveredMember(localAddress);
886             }
887          }
888
889          updateTokenRtR(token);
890
891          int allowedToBroadcast = flowControl.getAllowedToBroadcast(token);
892          rebroadcastCount = rebroadcastMessages(token);
893          allowedToBroadcast -= rebroadcastCount;
894
895
896          {
897             if(log.isInfoEnabled()) log.info("myAllReceivedUpto" + myAru);
898             if(log.isInfoEnabled()) log.info("allowedToBroadcast" + allowedToBroadcast);
899             if(log.isInfoEnabled()) log.info("newMessagesQueue.size()" + newMessagesQueue.size());
900          }
901
902          tokensAru = token.getAllReceivedUpto();
903
904          if (myAru < tokensAru ||
905                  localAddress.equals(token.getAruId()) ||
906                  token.getAruId() == null)
907          {
908             token.setAllReceivedUpto(myAru);
909             if (token.getAllReceivedUpto() == token.getHighestSequence())
910             {
911                token.setAruId(null);
912             }
913             else
914             {
915                token.setAruId(localAddress);
916             }
917          }
918          if (allowedToBroadcast > 0 && token.getType() == RingToken.OPERATIONAL)
919          {
920             broadcastCount = broadcastMessages(allowedToBroadcast, token);
921          }
922
923          if (tokensAru > lastRoundTokensAru)
924          {
925             removeStableMessages(receivedMessagesQueue, lastRoundTokensAru);
926          }
927
928       } //end synchronized
929

930       //give CPU some breath
931
Util.sleep(sleepTime);
932
933       token.incrementTokenSequence();
934       token.addLastRoundBroadcastCount(broadcastCount - lastRoundTransmitCount);
935       token.addBacklog(flowControl.getBacklogDifference());
936       flowControl.setPreviousBacklog();
937       lastRoundTransmitCount = broadcastCount;
938       lastRoundRebroadcastCount = rebroadcastCount;
939       lastRoundTokensAru = tokensAru;
940    }
941
942    /**
943     *
944     * Rebroadcasts messages specified in token's retransmission
945     * request list if those messages are available in this stack.
946     * Returns number of rebroadcasted messages.
947     *
948     */

949    private int rebroadcastMessages(RingToken token)
950    {
951       int rebroadCastCount = 0;
952       Collection rexmitRequests = token.getRetransmissionRequests();
953       if (rexmitRequests.size() > 0)
954       {
955          Collection rbl = getRebroadcastList(rexmitRequests);
956          rebroadCastCount = rbl.size();
957          if (rebroadCastCount > 0)
958          {
959
960             {
961                if(log.isInfoEnabled()) log.info("rebroadcasting " + rbl);
962             }
963
964             Long JavaDoc s = null;
965             for (Iterator iterator = rbl.iterator(); iterator.hasNext();)
966             {
967                s = (Long JavaDoc) iterator.next();
968                Message m = (Message) receivedMessagesQueue.get(s);
969                passDown(new Event(Event.MSG, m));
970             }
971          }
972       }
973       return rebroadCastCount;
974    }
975
976
977    private void invalidateOnTokenloss()
978    {
979       lastRoundTransmitCount = 0;
980       flowControl.invalidate();
981    }
982
983    /**
984     * Checks if the down pending queue's (newMessagesQueue) size is
985     * greater than blockSendingBacklogThreshold specified in the properties.
986     * If it is, client's sending thread is effectively blocked until
987     * down pending queue's size drops below unblockSendingBacklogThreshold.
988     *
989     *
990     */

991    private void blockSenderIfRequired()
992    {
993       if (!senderBlocked && flowControl.getBacklog() > blockSendingBacklogThreshold)
994       {
995          passUp(new Event(Event.BLOCK_SEND));
996          senderBlocked = true;
997       }
998    }
999
1000   /**
1001    * Checks if the down pending queue's (newMessagesQueue) size is
1002    * smaller than unblockSendingBacklogThreshold specified in the properties.
1003    * If it is, client's sending thread is effectively unblocked enabling
1004    * new messages to be queued for transmission.
1005    *
1006    *
1007    */

1008   private void unBlockSenderIfAcceptable()
1009   {
1010      if (senderBlocked && flowControl.getBacklog() < unblockSendingBacklogThreshold)
1011      {
1012         passUp(new Event(Event.UNBLOCK_SEND));
1013         senderBlocked = false;
1014      }
1015
1016   }
1017
1018   /**
1019    * Removes messages determined to be stable(i.e seen by all members)
1020    * from the specified queue. If the client also clears all reference
1021    * to these messages (in application space) they become eligible for garabge
1022    * collection.
1023    *
1024    *
1025    */

1026
1027   private void removeStableMessages(TreeMap m, long upToSeq)
1028   {
1029
1030      if (m.size() > 0)
1031      {
1032         long first = ((Long JavaDoc) m.firstKey()).longValue();
1033         if (first > upToSeq)
1034         {
1035            upToSeq = first;
1036         }
1037
1038
1039         {
1040            if(log.isDebugEnabled()) log.debug("cutting queue first key " + m.firstKey()
1041                        + " cut at " + upToSeq + " last key " + m.lastKey());
1042         }
1043         SortedMap stable = m.headMap(new Long JavaDoc(upToSeq));
1044         stable.clear();
1045      }
1046   }
1047
1048   /**
1049    * Determines a subset of message sequence numbers
1050    * available for retransmission from this stack.
1051    *
1052    */

1053   private Collection getRebroadcastList(Collection rtr)
1054   {
1055      ArrayList rebroadcastList = new ArrayList(rtr);
1056      rebroadcastList.retainAll(receivedMessagesQueue.keySet());
1057      rtr.removeAll(rebroadcastList);
1058      Collections.sort(rebroadcastList);
1059      return rebroadcastList;
1060   }
1061
1062   /**
1063    * TokenTransimitter thread transmits the token to the next member
1064    * in the logical ring as well as it receives token from the previous
1065    * member in the ring. Smoothed ring roundtrip time is computed
1066    * in order to detect token loss. If the timeout expires AND this
1067    * stack has received SUSPECT message, recovery protocol is invoked.
1068    * See recover method for details.
1069    *
1070    */

1071   private class TokenTransmitter extends Thread JavaDoc
1072   {
1073      long rtt = 0;
1074      long timer;
1075      double srtt = 1000; //1 second to start
1076
final double a = 0.09;
1077      final int timeoutFactor = 10;
1078      volatile boolean running = false;
1079
1080      private TokenTransmitter()
1081      {
1082         super("TokenTransmitter");
1083         resetTimeout();
1084         running = true;
1085      }
1086
1087      private void shutDown()
1088      {
1089          running = false;
1090      }
1091
1092      private void recalculateTimeout()
1093      {
1094         long now = System.currentTimeMillis();
1095         if (timer > 0)
1096         {
1097            rtt = now - timer;
1098            srtt = (1 - a) * srtt + a * rtt;
1099         }
1100      }
1101
1102      private double getTimeout()
1103      {
1104         return srtt * timeoutFactor;
1105      }
1106
1107      private void resetTimeout()
1108      {
1109         timer = System.currentTimeMillis();
1110      }
1111
1112      private boolean isRecoveryCompleted(RingToken token)
1113      {
1114         if (liveMembersInRecovery.equals(token.getRecoveredMembers()))
1115         {
1116            return true;
1117         }
1118         return false;
1119      }
1120
1121      public void run()
1122      {
1123         while (running)
1124         {
1125            RingToken token = null;
1126            int timeout = 0;
1127
1128             if(node == null) {
1129                 // sleep some time, then retry
1130
Util.sleep(500);
1131                 continue;
1132             }
1133
1134            try
1135            {
1136               timeout = (int) getTimeout();
1137
1138                  if(log.isInfoEnabled()) log.info("timeout(ms)=" + timeout);
1139
1140               token = (RingToken) node.receiveToken(timeout);
1141
1142               if (token.getType() == RingToken.OPERATIONAL &&
1143                       state == RECOVERY_STATE)
1144               {
1145                  state = OPERATIONAL_STATE;
1146               }
1147
1148               tokenReceived(token);
1149               recalculateTimeout();
1150
1151               if (token.getType() == RingToken.RECOVERY &&
1152                       isRecoveryCompleted(token))
1153               {
1154
1155                     if(log.isInfoEnabled()) log.info("all members recovered, injecting operational token");
1156                  token.setType(RingToken.OPERATIONAL);
1157               }
1158               node.passToken(token);
1159               resetTimeout();
1160            }
1161            catch (TokenLostException tle)
1162            {
1163               invalidateOnTokenloss();
1164               state = RECOVERY_STATE;
1165               recover();
1166            }
1167         }
1168      }
1169   }
1170}
1171
1172
Popular Tags