KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > TOTAL


1 // $Id: TOTAL.java,v 1.10 2005/04/08 08:10:54 belaban Exp $
2
package org.jgroups.protocols;
3
4
5 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
6 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
7 import org.jgroups.Address;
8 import org.jgroups.Event;
9 import org.jgroups.Message;
10 import org.jgroups.View;
11 import org.jgroups.stack.AckSenderWindow;
12 import org.jgroups.stack.Protocol;
13 import org.jgroups.util.TimeScheduler;
14
15 import java.io.IOException JavaDoc;
16 import java.io.ObjectInput JavaDoc;
17 import java.io.ObjectOutput JavaDoc;
18 import java.util.*;
19
20
21 /**
22  * Implements the total ordering layer using a message sequencer
23  * <p/>
24  * <p/>
25  * The protocol guarantees that all bcast sent messages will be delivered in
26  * the same order to all members. For that it uses a sequencer which assignes
27  * monotonically increasing sequence ID to broadcasts. Then all group members
28  * deliver the bcasts in ascending sequence ID order.
29  * <p/>
30  * <ul>
31  * <li>
32  * When a bcast message comes down to this layer, it is placed in the pending
33  * down queue. A bcast request is sent to the sequencer.</li>
34  * <li>
35  * When the sequencer receives a bcast request, it creates a bcast reply
36  * message and assigns to it a monotonically increasing seqID and sends it back
37  * to the source of the bcast request.</li>
38  * <li>
39  * When a broadcast reply is received, the corresponding bcast message is
40  * assigned the received seqID. Then it is broadcasted.</li>
41  * <li>
42  * Received bcasts are placed in the up queue. The queue is sorted according
43  * to the seqID of the bcast. Any message at the head of the up queue with a
44  * seqID equal to the next expected seqID is delivered to the layer above.</li>
45  * <li>
46  * Unicast messages coming from the layer below are forwarded above.</li>
47  * <li>
48  * Unicast messages coming from the layer above are forwarded below.</li>
49  * </ul>
50  * <p/>
51  * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages
52  * coming from above are discarded!</i> Either the application must stop
53  * sending messages when a <code>BLOCK</code> event is received from the
54  * channel or a QUEUE layer should be placed above this one. Received messages
55  * are still delivered above though.
56  * <p/>
57  * bcast requests are retransmitted periodically until a bcast reply is
58  * received. In case a BCAST_REP is on its way during a BCAST_REQ
59  * retransmission, then the next BCAST_REP will be to a non-existing
60  * BCAST_REQ. So, a nulll BCAST message is sent to fill the created gap in
61  * the seqID of all members.
62  *
63  * @author i.georgiadis@doc.ic.ac.uk
64  */

65 public class TOTAL extends Protocol {
66     /**
67      * The header processed by the TOTAL layer and intended for TOTAL
68      * inter-stack communication
69      */

70     public static class Header extends org.jgroups.Header {
71         // Header types
72
/**
73          * Null value for the tag
74          */

75         public static final int NULL_TYPE=-1;
76         /**
77          * Request to broadcast by the source
78          */

79         public static final int REQ=0;
80         /**
81          * Reply to broadcast request.
82          */

83         public static final int REP=1;
84         /**
85          * Unicast message
86          */

87         public static final int UCAST=2;
88         /**
89          * Broadcast Message
90          */

91         public static final int BCAST=3;
92
93         /**
94          * The header's type tag
95          */

96         public int type;
97         /**
98          * The ID used by the message source to match replies from the
99          * sequencer
100          */

101         public long localSequenceID;
102         /**
103          * The ID imposing the total order of messages
104          */

105         public long sequenceID;
106
107         /**
108          * used for externalization
109          */

110         public Header() {
111         }
112
113         /**
114          * Create a header for the TOTAL layer
115          *
116          * @param type the header's type
117          * @param localSeqID the ID used by the sender of broadcasts to match
118          * requests with replies from the sequencer
119          * @param seqID the ID imposing the total order of messages
120          * @throws IllegalArgumentException if the provided header type is
121          * unknown
122          */

123         public Header(int type, long localSeqID, long seqID) {
124             super();
125             switch(type) {
126             case REQ:
127             case REP:
128             case UCAST:
129             case BCAST:
130                 this.type=type;
131                 break;
132             default:
133                 this.type=NULL_TYPE;
134                 throw new IllegalArgumentException JavaDoc("type");
135             }
136             this.localSequenceID=localSeqID;
137             this.sequenceID=seqID;
138         }
139
140         /**
141          * For debugging purposes
142          */

143         public String JavaDoc toString() {
144             StringBuffer JavaDoc buffer=new StringBuffer JavaDoc();
145             String JavaDoc typeName;
146             buffer.append("[TOTAL.Header");
147             switch(type) {
148             case REQ:
149                 typeName="REQ";
150                 break;
151             case REP:
152                 typeName="REP";
153                 break;
154             case UCAST:
155                 typeName="UCAST";
156                 break;
157             case BCAST:
158                 typeName="BCAST";
159                 break;
160             case NULL_TYPE:
161                 typeName="NULL_TYPE";
162                 break;
163             default:
164                 typeName="";
165                 break;
166             }
167             buffer.append(", type=" + typeName);
168             buffer.append(", " + "localID=" + localSequenceID);
169             buffer.append(", " + "seqID=" + sequenceID);
170             buffer.append(']');
171
172             return (buffer.toString());
173         }
174
175         /**
176          * Manual serialization
177          */

178         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
179             out.writeInt(type);
180             out.writeLong(localSequenceID);
181             out.writeLong(sequenceID);
182         }
183
184         /**
185          * Manual deserialization
186          */

187         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc,
188                                                         ClassNotFoundException JavaDoc {
189             type=in.readInt();
190             localSequenceID=in.readLong();
191             sequenceID=in.readLong();
192         }
193     }
194
195
196     /**
197      * The retransmission listener - It is called by the
198      * <code>AckSenderWindow</code> when a retransmission should occur
199      */

200     private class Command implements AckSenderWindow.RetransmitCommand {
201         public Command() {
202         }
203
204         public void retransmit(long seqNo, Message msg) {
205             _retransmitBcastRequest(seqNo);
206         }
207     }
208
209
210     /**
211      * Protocol name
212      */

213     private static final String JavaDoc PROT_NAME="TOTAL";
214     /**
215      * Property names
216      */

217     private static final String JavaDoc TRACE_PROP="trace";
218
219     /**
220      * Average time between broadcast request retransmissions
221      */

222     private final long[] AVG_RETRANSMIT_INTERVAL=new long[]{1000, 2000, 3000, 4000};
223
224     /**
225      * Null value for the IDs
226      */

227     private static final long NULL_ID=-1;
228     // Layer sending states
229
/**
230      * No group has been joined yet
231      */

232     private static final int NULL_STATE=-1;
233     /**
234      * When set, all messages are sent/received
235      */

236     private static final int RUN=0;
237     /**
238      * When set, only session-specific messages are sent/received, i.e. only
239      * messages essential to the session's integrity
240      */

241     private static final int FLUSH=1;
242     /**
243      * No message is sent to the layer below
244      */

245     private static final int BLOCK=2;
246
247
248     /**
249      * The state lock allowing multiple reads or a single write
250      */

251     private final ReadWriteLock stateLock=new WriterPreferenceReadWriteLock();
252     /**
253      * Protocol layer message-sending state
254      */

255     private int state=NULL_STATE;
256     /**
257      * The address of this stack
258      */

259     private Address addr=null;
260     /**
261      * The address of the sequencer
262      */

263     private Address sequencerAddr=null;
264     /**
265      * The sequencer's seq ID. The ID of the most recently broadcast reply
266      * message
267      */

268     private long sequencerSeqID=NULL_ID;
269     /**
270      * The local sequence ID, i.e. the ID sent with the last broadcast request
271      * message. This is increased with every broadcast request sent to the
272      * sequencer and it's used to match the requests with the sequencer's
273      * replies
274      */

275     private long localSeqID=NULL_ID;
276     /**
277      * The total order sequence ID. This is the ID of the most recently
278      * delivered broadcast message. As the sequence IDs are increasing without
279      * gaps, this is used to detect missing broadcast messages
280      */

281     private long seqID=NULL_ID;
282     /**
283      * The list of unanswered broadcast requests to the sequencer. The entries
284      * are stored in increasing local sequence ID, i.e. in the order they were
285      * <p/>
286      * sent localSeqID -> Broadcast msg to be sent.
287      */

288     private SortedMap reqTbl;
289     /**
290      * The list of received broadcast messages that haven't yet been delivered
291      * to the layer above. The entries are stored in increasing sequence ID,
292      * i.e. in the order they must be delivered above
293      * <p/>
294      * seqID -> Received broadcast msg
295      */

296     private SortedMap upTbl;
297     /**
298      * Retranmitter for pending broadcast requests
299      */

300     private AckSenderWindow retransmitter;
301
302
303     /**
304      * Print addresses in host_ip:port form to bypass DNS
305      */

306     private String JavaDoc _addrToString(Object JavaDoc addr) {
307         return (
308                    addr == null ? "<null>" :
309                 ((addr instanceof org.jgroups.stack.IpAddress) ?
310                 (((org.jgroups.stack.IpAddress)addr).getIpAddress(
311                 ).getHostAddress() + ':' +
312                 ((org.jgroups.stack.IpAddress)addr).getPort()) :
313                 addr.toString())
314                );
315     }
316
317
318     /**
319      * @return this protocol's name
320      */

321     private String JavaDoc _getName() {
322         return (PROT_NAME);
323     }
324
325     /**
326      * Configure the protocol based on the given list of properties
327      *
328      * @param properties the list of properties to use to setup this layer
329      * @return false if there was any unrecognized property or a property with
330      * an invalid value
331      */

332     private boolean _setProperties(Properties properties) {
333         String JavaDoc value;
334
335         // trace
336
// Parse & remove property but ignore it; use Trace.trace instead
337
value=properties.getProperty(TRACE_PROP);
338         if(value != null) properties.remove(TRACE_PROP);
339         if(properties.size() > 0) {
340             if(log.isErrorEnabled())
341                 log.error("The following properties are not " +
342                           "recognized: " + properties.toString());
343             return (false);
344         }
345         return (true);
346     }
347
348     /**
349      * Events that some layer below must handle
350      *
351      * @return the set of <code>Event</code>s that must be handled by some layer
352      * below
353      */

354     Vector _requiredDownServices() {
355         Vector services=new Vector();
356
357         return (services);
358     }
359
360     /**
361      * Events that some layer above must handle
362      *
363      * @return the set of <code>Event</code>s that must be handled by some
364      * layer above
365      */

366     Vector _requiredUpServices() {
367         Vector services=new Vector();
368
369         return (services);
370     }
371
372
373     /**
374      * Extract as many messages as possible from the pending up queue and send
375      * them to the layer above
376      */

377     private void _deliverBcast() {
378         Message msg;
379         Header header;
380
381         synchronized(upTbl) {
382             while((msg=(Message)upTbl.remove(new Long JavaDoc(seqID + 1))) != null) {
383                 header=(Header)msg.removeHeader(getName());
384                 if(header.localSequenceID != NULL_ID) passUp(new Event(Event.MSG, msg));
385                 ++seqID;
386             }
387         } // synchronized(upTbl)
388
}
389
390
391     /**
392      * Add all undelivered bcasts sent by this member in the req queue and then
393      * replay this queue
394      */

395     private void _replayBcast() {
396         Iterator it;
397         Message msg;
398         Header header;
399
400         // i. Remove all undelivered bcasts sent by this member and place them
401
// again in the pending bcast req queue
402

403         synchronized(upTbl) {
404             if(upTbl.size() > 0)
405                 if(log.isInfoEnabled()) log.info("Replaying undelivered bcasts");
406
407             it=upTbl.entrySet().iterator();
408             while(it.hasNext()) {
409                 msg=(Message)((Map.Entry)it.next()).getValue();
410                 it.remove();
411                 if(!msg.getSrc().equals(addr)) {
412                     if(log.isInfoEnabled())
413                         log.info("During replay: " +
414                                  "discarding BCAST[" +
415                                  ((TOTAL.Header)msg.getHeader(getName())).sequenceID +
416                                  "] from " + _addrToString(msg.getSrc()));
417                     continue;
418                 }
419                 header=(Header)msg.removeHeader(getName());
420                 if(header.localSequenceID == NULL_ID) continue;
421                 _sendBcastRequest(msg, header.localSequenceID);
422             }
423         } // synchronized(upTbl)
424
}
425
426
427     /**
428      * Send a unicast message: Add a <code>UCAST</code> header
429      *
430      * @param msg the message to unicast
431      * @return the message to send
432      */

433     private Message _sendUcast(Message msg) {
434         msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID, NULL_ID));
435         return (msg);
436     }
437
438
439     /**
440      * Replace the original message with a broadcast request sent to the
441      * sequencer. The original bcast message is stored locally until a reply to
442      * bcast is received from the sequencer. This function has the side-effect
443      * of increasing the <code>localSeqID</code>
444      *
445      * @param msg the message to broadcast
446      */

447     private void _sendBcastRequest(Message msg) {
448         _sendBcastRequest(msg, ++localSeqID);
449     }
450
451
452     /**
453      * Replace the original message with a broadcast request sent to the
454      * sequencer. The original bcast message is stored locally until a reply
455      * to bcast is received from the sequencer
456      *
457      * @param msg the message to broadcast
458      * @param id the local sequence ID to use
459      */

460     private void _sendBcastRequest(Message msg, long id) {
461
462         // i. Store away the message while waiting for the sequencer's reply
463
// ii. Send a bcast request immediatelly and also schedule a
464
// retransmission
465
synchronized(reqTbl) {
466             reqTbl.put(new Long JavaDoc(id), msg);
467         }
468         _transmitBcastRequest(id);
469         retransmitter.add(id, msg);
470     }
471
472
473     /**
474      * Send the bcast request with the given localSeqID
475      *
476      * @param seqID the local sequence id of the
477      */

478     private void _transmitBcastRequest(long seqID) {
479         Message reqMsg;
480
481         // i. If NULL_STATE, then ignore, just transient state before
482
// shutting down the retransmission thread
483
// ii. If blocked, be patient - reschedule
484
// iii. If the request is not pending any more, acknowledge it
485
// iv. Create a broadcast request and send it to the sequencer
486

487         if(state == NULL_STATE) {
488             if(log.isInfoEnabled()) log.info("Transmit BCAST_REQ[" + seqID + "] in NULL_STATE");
489             return;
490         }
491         if(state == BLOCK) return;
492
493         synchronized(reqTbl) {
494             if(!reqTbl.containsKey(new Long JavaDoc(seqID))) {
495                 retransmitter.ack(seqID);
496                 return;
497             }
498         }
499         reqMsg=new Message(sequencerAddr, addr, new byte[0]);
500         reqMsg.putHeader(getName(), new Header(Header.REQ, seqID, NULL_ID));
501
502         passDown(new Event(Event.MSG, reqMsg));
503     }
504
505
506     /**
507      * Receive a unicast message: Remove the <code>UCAST</code> header
508      *
509      * @param msg the received unicast message
510      */

511     private void _recvUcast(Message msg) {
512         msg.removeHeader(getName());
513     }
514
515     /**
516      * Receive a broadcast message: Put it in the pending up queue and then
517      * try to deliver above as many messages as possible
518      *
519      * @param msg the received broadcast message
520      */

521     private void _recvBcast(Message msg) {
522         Header header=(Header)msg.getHeader(getName());
523
524         // i. Put the message in the up pending queue only if it's not
525
// already there, as it seems that the event may be received
526
// multiple times before a view change when all members are
527
// negotiating a common set of stable msgs
528
//
529
// ii. Deliver as many messages as possible
530

531         synchronized(upTbl) {
532             if(header.sequenceID <= seqID)
533                 return;
534             upTbl.put(new Long JavaDoc(header.sequenceID), msg);
535         }
536
537         _deliverBcast();
538     }
539
540
541     /**
542      * Received a bcast request - Ignore if not the sequencer, else send a
543      * bcast reply
544      *
545      * @param msg the broadcast request message
546      */

547     private void _recvBcastRequest(Message msg) {
548         Header header;
549         Message repMsg;
550
551         // i. If blocked, discard the bcast request
552
// ii. Assign a seqID to the message and send it back to the requestor
553

554         if(!addr.equals(sequencerAddr)) {
555             if(log.isErrorEnabled())
556                 log.error("Received bcast request " +
557                           "but not a sequencer");
558             return;
559         }
560         if(state == BLOCK) {
561             if(log.isInfoEnabled()) log.info("Blocked, discard bcast req");
562             return;
563         }
564         header=(Header)msg.getHeader(getName());
565         ++sequencerSeqID;
566         repMsg=new Message(msg.getSrc(), addr, new byte[0]);
567         repMsg.putHeader(getName(), new Header(Header.REP, header.localSequenceID,
568                                                sequencerSeqID));
569
570         passDown(new Event(Event.MSG, repMsg));
571     }
572
573
574     /**
575      * Received a bcast reply - Match with the pending bcast request and move
576      * the message in the list of messages to be delivered above
577      *
578      * @param header the header of the bcast reply
579      */

580     private void _recvBcastReply(Header header) {
581         Message msg;
582         long id;
583
584         // i. If blocked, discard the bcast reply
585
//
586
// ii. Assign the received seqID to the message and broadcast it
587
//
588
// iii.
589
// - Acknowledge the message to the retransmitter
590
// - If non-existent BCAST_REQ, send a fake bcast to avoid seqID gaps
591
// - If localID == NULL_ID, it's a null BCAST, else normal BCAST
592
// - Set the seq ID of the message to the one sent by the sequencer
593

594         if(state == BLOCK) {
595             if(log.isInfoEnabled()) log.info("Blocked, discard bcast rep");
596             return;
597         }
598
599         synchronized(reqTbl) {
600             msg=(Message)reqTbl.remove(new Long JavaDoc(header.localSequenceID));
601         }
602
603         if(msg != null) {
604             retransmitter.ack(header.localSequenceID);
605             id=header.localSequenceID;
606         }
607         else {
608             if(log.isInfoEnabled())
609                 log.info("Bcast reply to " +
610                          "non-existent BCAST_REQ[" + header.localSequenceID +
611                          "], Sending NULL bcast");
612             id=NULL_ID;
613             msg=new Message(null, addr, new byte[0]);
614         }
615         msg.putHeader(getName(), new Header(Header.BCAST, id, header.sequenceID));
616
617         passDown(new Event(Event.MSG, msg));
618     }
619
620
621     /**
622      * Resend the bcast request with the given localSeqID
623      *
624      * @param seqID the local sequence id of the
625      */

626     private void _retransmitBcastRequest(long seqID) {
627         // *** Get a shared lock
628
try {
629             stateLock.readLock().acquire();
630             try {
631                 if(log.isInfoEnabled()) log.info("Retransmit BCAST_REQ[" + seqID + ']');
632                 _transmitBcastRequest(seqID);
633             }
634             finally {
635                 stateLock.readLock().release();
636             }
637         }
638         catch(InterruptedException JavaDoc e) {
639             log.error("failed acquiring a read lock", e);
640         }
641     }
642
643
644     /* Up event handlers
645      * If the return value is true the event travels further up the stack
646      * else it won't be forwarded
647      */

648
649     /**
650      * Prepare for a VIEW_CHANGE: switch to flushing state
651      *
652      * @return true if the event is to be forwarded further up
653      */

654     private boolean _upBlock() {
655         // *** Get an exclusive lock
656
try {
657             stateLock.writeLock().acquire();
658             try {
659                 state=FLUSH;
660                 // *** Revoke the exclusive lock
661
}
662             finally {
663                 stateLock.writeLock().release();
664             }
665         }
666         catch(InterruptedException JavaDoc e) {
667             log.error("failed acquiring the write lock", e);
668         }
669
670         return (true);
671     }
672
673
674     /**
675      * Handle an up MSG event
676      *
677      * @param event the MSG event
678      * @return true if the event is to be forwarded further up
679      */

680     private boolean _upMsg(Event event) {
681         Message msg;
682         Object JavaDoc obj;
683         Header header;
684
685         // *** Get a shared lock
686
try {
687             stateLock.readLock().acquire();
688             try {
689
690                 // If NULL_STATE, shouldn't receive any msg on the up queue!
691
if(state == NULL_STATE) {
692                     if(log.isErrorEnabled()) log.error("Up msg in NULL_STATE");
693                     return (false);
694                 }
695
696                 // Peek the header:
697
//
698
// (UCAST) A unicast message - Send up the stack
699
// (BCAST) A broadcast message - Handle specially
700
// (REQ) A broadcast request - Handle specially
701
// (REP) A broadcast reply from the sequencer - Handle specially
702
msg=(Message)event.getArg();
703                 if(!((obj=msg.getHeader(getName())) instanceof TOTAL.Header)) {
704                     if(log.isErrorEnabled()) log.error("No TOTAL.Header found");
705                     return (false);
706                 }
707                 header=(Header)obj;
708
709                 switch(header.type) {
710                 case Header.UCAST:
711                     _recvUcast(msg);
712                     return (true);
713                 case Header.BCAST:
714                     _recvBcast(msg);
715                     return (false);
716                 case Header.REQ:
717                     _recvBcastRequest(msg);
718                     return (false);
719                 case Header.REP:
720                     _recvBcastReply(header);
721                     return (false);
722                 default:
723                     if(log.isErrorEnabled()) log.error("Unknown header type");
724                     return (false);
725                 }
726
727                 // ** Revoke the shared lock
728
}
729             finally {
730                 stateLock.readLock().release();
731             }
732         }
733         catch(InterruptedException JavaDoc e) {
734             if(log.isErrorEnabled()) log.error(e.getMessage());
735         }
736
737         return (true);
738     }
739
740
741     /**
742      * Set the address of this group member
743      *
744      * @param event the SET_LOCAL_ADDRESS event
745      * @return true if event should be forwarded further up
746      */

747     private boolean _upSetLocalAddress(Event event) {
748         // *** Get an exclusive lock
749
try {
750             stateLock.writeLock().acquire();
751             try {
752                 addr=(Address)event.getArg();
753             }
754             finally {
755                 stateLock.writeLock().release();
756             }
757         }
758         catch(InterruptedException JavaDoc e) {
759             log.error(e.getMessage());
760         }
761         return (true);
762     }
763
764
765     /**
766      * Handle view changes
767      * <p/>
768      * param event the VIEW_CHANGE event
769      *
770      * @return true if the event should be forwarded to the layer above
771      */

772     private boolean _upViewChange(Event event) {
773         Object JavaDoc oldSequencerAddr;
774
775         // *** Get an exclusive lock
776
try {
777             stateLock.writeLock().acquire();
778             try {
779
780                 state=RUN;
781
782                 // i. See if this member is the sequencer
783
// ii. If this is the sequencer, reset the sequencer's sequence ID
784
// iii. Reset the last received sequence ID
785
//
786
// iv. Replay undelivered bcasts: Put all the undelivered bcasts
787
// sent by us back to the req queue and discard the rest
788
oldSequencerAddr=sequencerAddr;
789                 sequencerAddr=
790                         (Address)((View)event.getArg()).getMembers().elementAt(0);
791                 if(addr.equals(sequencerAddr)) {
792                     sequencerSeqID=NULL_ID;
793                     if((oldSequencerAddr == null) ||
794                             (!addr.equals(oldSequencerAddr)))
795                         if(log.isInfoEnabled()) log.info("I'm the new sequencer");
796                 }
797                 seqID=NULL_ID;
798                 _replayBcast();
799
800                 // *** Revoke the exclusive lock
801
}
802             finally {
803                 stateLock.writeLock().release();
804             }
805         }
806         catch(InterruptedException JavaDoc e) {
807             log.error(e.getMessage());
808         }
809
810         return (true);
811     }
812
813
814     /*
815      * Down event handlers
816      * If the return value is true the event travels further down the stack
817      * else it won't be forwarded
818      */

819
820
821     /**
822      * Blocking confirmed - No messages should come from above until a
823      * VIEW_CHANGE event is received. Switch to blocking state.
824      *
825      * @return true if event should travel further down
826      */

827     private boolean _downBlockOk() {
828         // *** Get an exclusive lock
829
try {
830             stateLock.writeLock().acquire();
831             try {
832                 state=BLOCK;
833             }
834             finally {
835                 stateLock.writeLock().release();
836             }
837         }
838         catch(InterruptedException JavaDoc e) {
839             log.error(e.getMessage());
840         }
841
842         return (true);
843     }
844
845
846     /**
847      * A MSG event travelling down the stack. Forward unicast messages, treat
848      * specially the broadcast messages.<br>
849      * <p/>
850      * If in <code>BLOCK</code> state, i.e. it has replied to a
851      * <code>BLOCk_OK</code> and hasn't yet received a
852      * <code>VIEW_CHANGE</code> event, messages are discarded<br>
853      * <p/>
854      * If in <code>FLUSH</code> state, forward unicast but queue broadcasts
855      *
856      * @param event the MSG event
857      * @return true if event should travel further down
858      */

859     private boolean _downMsg(Event event) {
860         Message msg;
861
862         // *** Get a shared lock
863
try {
864             stateLock.readLock().acquire();
865             try {
866
867                 // i. Discard all msgs, if in NULL_STATE
868
// ii. Discard all msgs, if blocked
869
if(state == NULL_STATE) {
870                     if(log.isErrorEnabled()) log.error("Discard msg in NULL_STATE");
871                     return (false);
872                 }
873                 if(state == BLOCK) {
874                     if(log.isErrorEnabled()) log.error("Blocked, discard msg");
875                     return (false);
876                 }
877
878                 msg=(Message)event.getArg();
879                 if(msg.getDest() == null) {
880                     _sendBcastRequest(msg);
881                     return (false);
882                 }
883                 else {
884                     msg=_sendUcast(msg);
885                     event.setArg(msg);
886                 }
887
888                 // ** Revoke the shared lock
889
}
890             finally {
891                 stateLock.readLock().release();
892             }
893         }
894         catch(InterruptedException JavaDoc e) {
895             log.error(e.getMessage());
896         }
897
898         return (true);
899     }
900
901
902     /**
903      * Prepare this layer to receive messages from above
904      */

905     public void start() throws Exception JavaDoc {
906         TimeScheduler timer;
907
908         timer=stack != null ? stack.timer : null;
909         if(timer == null)
910             throw new Exception JavaDoc("TOTAL.start(): timer is null");
911
912         reqTbl=new TreeMap();
913         upTbl=new TreeMap();
914         retransmitter=new AckSenderWindow(new Command(), AVG_RETRANSMIT_INTERVAL);
915     }
916
917
918     /**
919      * Handle the stop() method travelling down the stack.
920      * <p/>
921      * The local addr is set to null, since after a Start->Stop->Start
922      * sequence this member's addr is not guaranteed to be the same
923      */

924     public void stop() {
925         try {
926             stateLock.writeLock().acquire();
927             try {
928                 state=NULL_STATE;
929                 retransmitter.reset();
930                 reqTbl.clear();
931                 upTbl.clear();
932                 addr=null;
933             }
934             finally {
935                 stateLock.writeLock().release();
936             }
937         }
938         catch(InterruptedException JavaDoc e) {
939             log.error(e.getMessage());
940         }
941     }
942
943
944     /**
945      * Process an event coming from the layer below
946      *
947      * @param event the event to process
948      */

949     private void _up(Event event) {
950         switch(event.getType()) {
951         case Event.BLOCK:
952             if(!_upBlock()) return;
953             break;
954         case Event.MSG:
955             if(!_upMsg(event)) return;
956             break;
957         case Event.SET_LOCAL_ADDRESS:
958             if(!_upSetLocalAddress(event)) return;
959             break;
960         case Event.VIEW_CHANGE:
961             if(!_upViewChange(event)) return;
962             break;
963         default:
964             break;
965         }
966
967         passUp(event);
968     }
969
970
971     /**
972      * Process an event coming from the layer above
973      *
974      * @param event the event to process
975      */

976     private void _down(Event event) {
977         switch(event.getType()) {
978         case Event.BLOCK_OK:
979             if(!_downBlockOk()) return;
980             break;
981         case Event.MSG:
982             if(!_downMsg(event)) return;
983             break;
984         default:
985             break;
986         }
987
988         passDown(event);
989     }
990
991
992     /**
993      * Create the TOTAL layer
994      */

995     public TOTAL() {
996     }
997
998
999     // Methods deriving from <code>Protocol</code>
1000
// javadoc inherited from superclass
1001
public String JavaDoc getName() {
1002        return (_getName());
1003    }
1004
1005    // javadoc inherited from superclass
1006
public boolean setProperties(Properties properties) {
1007        return (_setProperties(properties));
1008    }
1009
1010    // javadoc inherited from superclass
1011
public Vector requiredDownServices() {
1012        return (_requiredDownServices());
1013    }
1014
1015    // javadoc inherited from superclass
1016
public Vector requiredUpServices() {
1017        return (_requiredUpServices());
1018    }
1019
1020    // javadoc inherited from superclass
1021
public void up(Event event) {
1022        _up(event);
1023    }
1024
1025    // javadoc inherited from superclass
1026
public void down(Event event) {
1027        _down(event);
1028    }
1029}
1030
Popular Tags