KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > TOTAL_OLD


1 // $Id: TOTAL_OLD.java,v 1.7 2004/09/23 16:29:42 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.Protocol;
7
8 import java.io.IOException JavaDoc;
9 import java.io.ObjectInput JavaDoc;
10 import java.io.ObjectOutput JavaDoc;
11 import java.util.Vector JavaDoc;
12
13
14
15
16
17 /**
18  * class SavedMessages
19  *
20  * Stores a set of messages along with their sequence id (assigned by the sequencer).
21  */

22 class SavedMessages {
23
24     /**
25      * class Entry (inner class)
26      *
27      * object type to store in the messages Vector (need to store sequence id in addition to message)
28      */

29     class Entry {
30     private final Message msg;
31     private final long seq;
32
33     public Entry( Message msg, long seq ) {
34         this.msg = msg;
35         this.seq = seq;
36     }
37
38     public Message getMsg() {
39         return msg;
40     }
41
42     public long getSeq() {
43         return seq;
44     }
45     } // class Entry
46

47
48     private final Vector JavaDoc messages; // vector of "Entry"s to store "Message"s, sorted by sequence id
49

50
51     /**
52      * Constructor - creates an empty space to store messages
53      */

54     public SavedMessages() {
55     messages = new Vector JavaDoc();
56     }
57
58     /**
59      * inserts the specified message and sequence id into the "list" of stored messages
60      * if the sequence id given is already stored, then nothing is stored
61      */

62     public void insertMessage( Message msg, long seq ) {
63     synchronized( messages ) {
64         int size = messages.size();
65         int index = 0;
66         long this_seq = -1; // used to prevent duplicate messages being stored
67

68         // find the index where this message should be inserted
69
try {
70         while( (index < size) &&
71                ((this_seq = ((Entry) (messages.elementAt(index))).getSeq()) < seq) ) {
72             index++;
73         }
74         } catch ( java.lang.ClassCastException JavaDoc e ) {
75         System.err.println( "Error: (TOTAL_OLD) SavedMessages.insertMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index " + index + ')' );
76         return;
77         }
78
79         // check that the sequences aren't the same (don't want duplicates)
80
if ( this_seq == seq ) {
81         System.err.println( "SavedMessages.insertMessage() - sequence " + seq + " already exists in saved messages. Message NOT saved." );
82         return;
83         }
84
85         messages.insertElementAt( new Entry( msg, seq ), index );
86     } // synchronized( messages )
87
}
88
89     /**
90      * returns a copy of the stored message with the given sequence id
91      * if delete_msg is true, then the message is removed from the
92      * the list of stored messages, otherwise the message is not
93      * removed from the list
94      * if no message is stored with this sequence id, null is returned
95      */

96     private Message getMessage( long seq, boolean delete_msg ) {
97     synchronized( messages ) {
98         int size = messages.size();
99         int index = 0;
100         long this_seq = -1;
101         try {
102         while( (index < size) &&
103                ((this_seq = (((Entry) (messages.elementAt(index))).getSeq())) < seq) ) {
104             index++;
105         }
106         } catch ( java.lang.ClassCastException JavaDoc e ) {
107         System.err.println( "Error: (TOTAL_OLD) SavedMessages.getMessage() - ClassCastException: could not cast element of \"messages\" to an Entry (index " + index + ')' );
108         return null;
109         }
110         // determine if we found the specified sequence
111
if ( this_seq == seq ) {
112         // we found the message at index
113
Object JavaDoc temp_obj = messages.elementAt(index);
114         if ( temp_obj instanceof Entry ) {
115             Message ret_val = ((Entry) temp_obj).getMsg().copy();
116
117             // should we delete
118
if ( delete_msg ) {
119             messages.removeElementAt(index);
120             }
121
122             return ret_val;
123         } else {
124             System.err.println( "Error: (TOTAL_OLD) SavedMessages.getMessage() - could not cast element of \"messages\" to an Entry (index " + index + ')' );
125             return null;
126         } // if ( temp_obj instanceof Entry )
127
} else {
128         // we didn't find this sequence number in the messages
129
return null;
130         }
131     } // synchronized( messages )
132
}
133
134     /**
135      * returns a stored message with the given sequence id
136      * the message is then removed from the list of stored messages
137      * if no message is stored with this sequence id, null is returned
138      */

139     public Message getMessage( long seq ) {
140     return getMessage( seq, true );
141     }
142
143     /**
144      * similar to GetMessage, except a copy of the message is returned
145      * and the message is not removed from the list
146      */

147     public Message peekMessage( long seq ) {
148     return getMessage( seq, false );
149     }
150
151     /**
152      * returns a copy of the stored message with the lowest sequence id
153      * if delete_msg is true, then the message is removed from the
154      * the list of stored messages, otherwise the message is not
155      * removed from the list
156      * if their are no messages stored, null is returned
157      */

158     private Message getFirstMessage( boolean delete_msg ) {
159     synchronized( messages ) {
160         if ( isEmpty() ) {
161         return null;
162         } else {
163         Object JavaDoc temp_obj = messages.firstElement();
164         if ( temp_obj instanceof Entry ) {
165             Message ret_val = ((Entry) temp_obj).getMsg().copy();
166             messages.removeElementAt(0);
167             return ret_val;
168         } else {
169             System.err.println( "Error: (TOTAL_OLD) SavedMessages.getFirstMessage() - could not cast element of \"messages\" to an Entry" );
170             return null;
171         } // if ( temp_obj instanceof Entry )
172
}
173     } // synchronized( messages )
174
}
175
176     /**
177      * returns the stored message with the lowest sequence id;
178      * the message is then removed from the list of stored messages
179      * if their are no messages stored, null is returned
180      */

181     public synchronized Message getFirstMessage() {
182     return getFirstMessage( true );
183     }
184
185     /**
186      * similar to GetFirstMessage, except a copy of the message is returned
187      * and the message is not removed from the list
188      */

189     public Message peekFirstMessage() {
190     return getFirstMessage( false );
191     }
192
193     /**
194      * returns the lowest sequence id of the messages stored
195      * if no messages are stored, -1 is returned
196      */

197     public long getFirstSeq() {
198     synchronized( messages ) {
199         if ( isEmpty() ) {
200         return -1;
201         } else {
202         Object JavaDoc temp_obj = messages.firstElement();
203         if ( temp_obj instanceof Entry ) {
204             return ((Entry) temp_obj).getSeq();
205         } else {
206             System.err.println( "Error: (TOTAL_OLD) SavedMessages.getFirstSeq() - could not cast element of \"messages\" to an Entry " );
207             return -1;
208         }
209         }
210     } // synchronized( messages )
211
}
212
213     /**
214      * returns true if there are messages stored
215      * returns false if there are no messages stored
216      */

217     public boolean isEmpty() {
218     return messages.isEmpty();
219     }
220
221     /**
222      * returns the number of messages stored
223      */

224     public int getSize() {
225     return messages.size();
226     }
227
228     /**
229      * clears all of the stored messages
230      */

231     public void clearMessages() {
232     synchronized( messages ) {
233         messages.removeAllElements();
234     }
235     }
236 } // class SavedMessages
237

238
239 /**
240  * class MessageAcks
241  *
242  * Used by sequencer to store cumulative acknowledgements of broadcast messages
243  * sent to the group in this view
244  */

245 class MessageAcks {
246
247     // TODO: may also want to store some sort of timestamp in each Entry (maybe)
248
/**
249      * class Entry (inner class)
250      *
251      * object type to store cumulative acknowledgements using a member's Address
252      * and the sequence id of a message
253      */

254     class Entry {
255     public final Address addr;
256     public long seq;
257
258     public Entry( Address addr, long seq ) {
259         this.addr = addr;
260         this.seq = seq;
261     }
262
263     public Entry ( Address addr ) {
264         this.addr = addr;
265         this.seq = -1; // means that no acknowledgements have been made yet
266
}
267     } // class Entry
268

269     // Vector of "Entry"s representing cumulative acknowledgements for each member of the group
270
private final Vector JavaDoc acks;
271
272     private final SavedMessages message_history; // history of broadcast messages sent
273

274
275     /**
276      * Constructor - creates a Vector of "Entry"s given a Vector of "Address"es for the members
277      */

278     public MessageAcks( Vector JavaDoc members ) {
279     acks = new Vector JavaDoc();
280
281     // initialize the message history to contain no messages
282
message_history = new SavedMessages();
283
284     // insert slots for each member in the acknowledgement Vector
285
reset( members );
286     }
287
288     /**
289      * resets acknowledgement Vector with "Entry"s using the given Vector of "Address"es
290      * also clears the message history
291      */

292     public synchronized void reset( Vector JavaDoc members ) {
293     clear();
294
295     // initialize Vector of acknowledgements (no acks for any member)
296
int num_members = members.size();
297     for( int i=0; i<num_members; i++ ) {
298         Object JavaDoc temp_obj = members.elementAt(i);
299         if ( temp_obj instanceof Address ) {
300         acks.addElement( new Entry( (Address) temp_obj ) );
301         } else {
302         System.err.println( "Error: (TOTAL_OLD) MessageAcks.reset() - could not cast element of \"members\" to an Address object" );
303         return;
304         }
305     }
306     }
307
308     /**
309      * clear all acknowledgements and the message history
310      */

311     private void clear() {
312     acks.removeAllElements();
313     message_history.clearMessages();
314     }
315
316     /**
317      * returns the Entry from the acknowledgement Vector with the given Address
318      * returns null if an Entry with the given Address is not found
319      */

320     private Entry getEntry( Address addr ) {
321     synchronized( acks ) {
322         // look for this addreess in the acknowledgement Vector
323
int size = acks.size();
324         for( int i=0; i<size; i++ ) {
325         Object JavaDoc temp_obj = acks.elementAt(i);
326         if ( temp_obj instanceof Entry ) {
327             Entry this_entry = (Entry) temp_obj;
328             if ( (this_entry.addr).equals(addr) ) {
329             // the given Address matches this entry
330
return this_entry;
331             }
332         } else {
333             System.err.println( "Error: (TOTAL_OLD) MessageAcks.getEntry() - could not cast element of \"acks\" to an Entry" );
334         } // if ( temp_obj instanceof Entry )
335
}
336
337         // if we get here, we didn't find this Address
338
return null;
339     }
340     }
341
342     /**
343      * sets the sequence id for the given Address to the given value
344      * note: if the current sequence value for this host is greater than
345      * the given value, the sequence for this member is NOT changed
346      * (i.e. it will only set it to a larger value)
347      * if the given Address is not found in the member list,
348      * nothing is changed
349      */

350     public void setSeq( Address addr, long seq ) {
351     Entry this_entry = getEntry( addr );
352     if ( (this_entry != null) && (this_entry.seq < seq) ) {
353         this_entry.seq = seq;
354
355         // try to remove any messages that we don't need anymore
356
truncateHistory();
357     }
358     }
359
360     /**
361      * returns the sequence id of the "latest" cumulative acknowledgement
362      * for the specified Address
363      * if the Address is not found in the member list, a negative value
364      * is returned
365      * note: the value returned may also be negative if their have been
366      * no acknowledgements from the given address
367      */

368     public long getSeq( Address addr ) {
369     Entry this_entry = getEntry( addr );
370     if ( this_entry == null ) {
371         return -2; // TODO: change this to something else (e.g. constant) later (maybe)
372
} else {
373         return this_entry.seq;
374     }
375     }
376
377     /**
378      * returns the message in the history that matches the given sequence id
379      * returns null if no message exists in the history with this sequence id
380      */

381     public Message getMessage( long seq ) {
382     return message_history.peekMessage( seq );
383     }
384
385     /**
386      * adds the given message (with the specified sequence id) to the
387      * message history
388      * if the given sequence id already exists in the message history,
389      * the message is NOT added
390      */

391     public void addMessage( Message msg, long seq ) {
392     message_history.insertMessage( msg, seq );
393     }
394
395     /**
396      * returns the minimum cumulative acknowledged sequence id from all the members
397      * (i.e. the greatest sequence id cumulatively acknowledged by all members)
398      */

399     private long getLowestSeqAck() {
400     synchronized( acks ) {
401         long ret_val = -10; // start with a negative value
402

403         int size = acks.size();
404         for( int i=0; i<size; i++ ) {
405         Object JavaDoc temp_obj = acks.elementAt(i);
406         if ( temp_obj instanceof Entry ) {
407             long this_seq = ((Entry) temp_obj).seq;
408             if ( this_seq < ret_val ) {
409             ret_val = this_seq;
410             }
411         } else {
412             System.err.println( "Error: (TOTAL_OLD) MessageAcks.getLowestSeqAck() - could not cast element of \"acks\" to an Entry (index=" + i + ')' );
413             return -1;
414         }
415         }
416
417         return ret_val;
418     }
419     }
420
421     /**
422      * removes messages from the history that have been acknowledged
423      * by all the members of the group
424      */

425     private synchronized void truncateHistory() {
426     long lowest_ack_seq = getLowestSeqAck();
427     if ( lowest_ack_seq < 0 ) {
428         // either no members, or someone has not received any messages yet
429
// either way, do nothing
430
return;
431     }
432
433     // don't want message_history being altered during this operation
434
synchronized( message_history ) {
435         long lowest_stored_seq;
436         // keep deleting the oldest stored message for as long as we can
437
while( ((lowest_stored_seq = message_history.getFirstSeq()) >=0) &&
438            (lowest_stored_seq > lowest_ack_seq) ) {
439         // we can delete the oldest stored message
440
message_history.getFirstMessage();
441         }
442     } // synchronized( message_history )
443
}
444 } // class MessageAcks
445

446
447 /*****************************************************************************
448  * class TOTAL_OLD extends Protocol
449  *
450  * TODO: (more comments)
451  * Sequencer based total ordering protocol layer
452  * - requires the following layers "below" it in the stack
453  * (or layers with equivalent functionality):
454  * GMS, FD, PING, UDP, ...
455  @author Manish Sambhu mms21@cornell.edu Spring 1999
456  ****************************************************************************/

457 public class TOTAL_OLD extends Protocol {
458     // the unique name of the protocol
459
private final static String JavaDoc PROTOCOL_NAME = "TOTAL_OLD";
460
461     private Address local_addr = null;
462     private Vector JavaDoc members = new Vector JavaDoc(); // note: members should never be null
463
// (because of synchronized blocks)
464

465     /**
466      * next_seq_id
467      * the sequence id of the next message we expect to receive
468      * note: this value is only meaningful when non-negative
469      */

470     private long next_seq_id = -1;
471
472     /**
473      * next_seq_id_to_assign
474      * used only by the sequencer to assign sequence ids to requests
475      * and resend them to the group
476      * note: this value is only meaningful when non-negative
477      */

478     private long next_seq_id_to_assign = -1;
479
480     private final static long INIT_SEQ_ID = 10; // this value is pretty much arbitrary (should be positive though)
481

482     /**
483      * queued_messages
484      * broadcast messages that we received that we are storing so that we can
485      * deterministically order the messages based on their sequence ids
486      */

487     private final SavedMessages queued_messages = new SavedMessages();
488
489     /**
490      * ack_history
491      * used only by the sequencer
492      * stores the cumulative acks for each member of the group
493      * also stores messages that may be needed for resend requests
494      * (i.e. messages that have not been acked by all group members)
495      */

496     private MessageAcks ack_history = null;
497
498     /**
499      * retrans_thread
500      * thread that handles sending requests to the sequencer for messages
501      * that may not have been received but were expected to arrive
502      */

503     private final TotalRetransmissionThread retrans_thread = new TotalRetransmissionThread( this );
504
505
506     /**
507      * returns the unique name of this protocol
508      */

509     public String JavaDoc getName() {
510     return PROTOCOL_NAME;
511     }
512
513
514
515
516     public void start() throws Exception JavaDoc {
517         // Start work
518
retrans_thread.start();
519     }
520
521     public void stop() {
522         // stop the retransmission thread
523
retrans_thread.stopResendRequests();
524     }
525
526
527     /** Just remove if you don't need to reset any state */
528     public void reset() {
529     // TODO: find out when this would be called, maybe do more here
530

531     // don't accept any messages until we receive a TOTAL_NEW_VIEW message from the sequencer
532
next_seq_id = -1;
533     // clear (i.e. delete) any messages that did not get propagated up
534
queued_messages.clearMessages();
535
536     // reset the retransmission thread state
537
retrans_thread.reset();
538     }
539
540
541     /**
542      * returns the next sequence id expected to be received in this view
543      */

544     protected long getNextSeqID() {
545     return next_seq_id;
546     }
547
548
549     /**
550      * returns the sequence id of the "first" queued message
551      * (i.e. the lowest seq id queued)
552      * returns -1 if no messages are queued
553      */

554     protected long getFirstQueuedSeqID() {
555     return queued_messages.getFirstSeq();
556     }
557
558
559     /**
560      * handles an Event coming up the Protocol Stack
561      */

562     public void up(Event evt) {
563         Message msg;
564
565         //System.out.println("UP: " + evt);
566

567     Object JavaDoc temp_obj; // used for type checking before performing casts
568
switch( evt.getType() ) {
569
570     case Event.SET_LOCAL_ADDRESS:
571         temp_obj = evt.getArg();
572         if ( temp_obj instanceof Address ) {
573         local_addr = (Address) temp_obj;
574         } else {
575         System.err.println( "Error: Total.up() - could not cast local address to an Address object" );
576         }
577         break;
578
579         case Event.MSG:
580         // get the message and the header for the TOTAL_OLD layer
581
temp_obj = evt.getArg();
582         if ( temp_obj instanceof Message ) {
583         msg = (Message) temp_obj;
584         temp_obj = msg.removeHeader(getName());
585         if ( temp_obj instanceof TotalHeader ) {
586             TotalHeader hdr = (TotalHeader) temp_obj;
587
588             // switch on the "command" defined by the header
589
switch( hdr.total_header_type ) {
590
591             case TotalHeader.TOTAL_UNICAST:
592             // don't process this message, just pass it up (TotalHeader header already removed)
593
passUp(evt);
594             return;
595
596             case TotalHeader.TOTAL_BCAST:
597             handleBCastMessage( msg, hdr.seq_id );
598             break;
599
600             case TotalHeader.TOTAL_REQUEST:
601             // if we are the sequencer, respond to this request
602
if ( isSequencer() ) {
603                 handleRequestMessage( msg );
604             }
605             break;
606
607             case TotalHeader.TOTAL_NEW_VIEW:
608             // store the sequence id that we should expect next
609
next_seq_id = hdr.seq_id;
610
611             // TODO: need to send some sort of ACK or something to the sequencer (maybe)
612
break;
613
614             case TotalHeader.TOTAL_CUM_SEQ_ACK:
615             // if we are the sequencer, update state
616
if ( isSequencer() ) {
617                 temp_obj = msg.getSrc();
618                 if ( temp_obj instanceof Address ) {
619                 ack_history.setSeq( (Address) temp_obj, hdr.seq_id );
620                 } else {
621                 System.err.println( "Error: TOTAL_OLD.Up() - could not cast source of message to an Address object (case TotalHeader.TOTAL_CUM_SEQ_ACK)" );
622                 }
623             }
624             break;
625
626             case TotalHeader.TOTAL_RESEND:
627             // if we are the sequencer, respond to this request
628
if ( isSequencer() ) {
629                 handleResendRequest( msg, hdr.seq_id );
630             }
631             break;
632
633             default:
634             // unrecognized header type - discard message
635
System.err.println( "Error: TOTAL_OLD.up() - unrecognized TotalHeader in message - " + hdr.toString() );
636             return; // don't let it call passUp()
637
} // switch( hdr.total_header_type )
638
} else {
639             System.err.println( "Error: TOTAL_OLD.up() - could not cast message header to TotalHeader (case Event.MSG)" );
640         } // if ( temp_obj instanceof TotalHeader )
641
} else {
642         System.err.println( "Error: TOTAL_OLD.up() - could not cast argument of Event to a Message (case Event.MSG)" );
643         } // if ( temp_obj instanceof Address )
644

645             //System.out.println("The message is " + msg);
646
return; // don't blindly pass up messages immediately (if at all)
647

648         // begin mms21
649
/*
650         case Event.BECOME_SERVER:
651             System.out.println( "Become Server event passed up to TOTAL_OLD (debug - mms21)" );
652             break;
653             */

654
655     case Event.TMP_VIEW: // TODO: this may be temporary
656
case Event.VIEW_CHANGE:
657             System.out.println( "View Change event passed up to TOTAL_OLD (debug - mms21)" );
658             View new_view = (View) evt.getArg();
659         members = new_view.getMembers();
660          {
661         // print the members of this new view
662
System.out.println( "New view members (printed in TOTAL_OLD):" );
663         int view_size = members.size();
664         for( int i=0; i<view_size; i++ ) {
665             System.out.println( " " + members.elementAt(i).toString() );
666         }
667         }
668
669         // reset the state for total ordering for this new view
670
reset();
671
672         // if we are the sequencer in this new view, send a new
673
// TOTAL_NEW_VIEW message to the group
674
if ( isSequencer() ) {
675         // we are the sequencer in this new view
676
System.err.println( "TOTAL_OLD.up() - I am the sequencer of this new view" );
677
678         // we need to keep track of acknowledgements messages
679
ack_history = new MessageAcks( members );
680
681         // start assigning messages with this sequence id
682
next_seq_id_to_assign = INIT_SEQ_ID;
683
684         // send a message to the group with the initial sequence id to expect
685
Message new_view_msg = new Message( null, local_addr, null );
686         new_view_msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_NEW_VIEW, next_seq_id_to_assign ) );
687         passDown( new Event( Event.MSG, new_view_msg ) );
688         }
689
690             break;
691         // end mms21
692

693         default:
694             break;
695         } // switch( evt.getType() )
696

697         passUp(evt); // Pass up to the layer above us
698
}
699
700
701     /**
702      * passes up (calling passUp()) any stored messages eligible according to
703      * the total ordering property
704      */

705     private synchronized int passUpMessages() {
706     if ( next_seq_id < 0 ) {
707         // don't know what to pass up so don't pass up anything
708
return 0;
709     }
710
711     long lowest_seq_stored = queued_messages.getFirstSeq();
712     if ( lowest_seq_stored < 0 ) {
713         // there are no messages stored
714
return 0;
715     }
716     if ( lowest_seq_stored < next_seq_id ) {
717         // it is bad to have messages stored that have a lower sequence id than what
718
// we are expecting
719
System.err.println( "Error: TOTAL_OLD.passUpMessages() - next expected sequence id (" + next_seq_id + ") is greater than the sequence id of a stored message (" + lowest_seq_stored + ')' );
720         return 0;
721     } else if ( next_seq_id == lowest_seq_stored ) {
722         // we can pass this first message up the Protocol Stack
723
Message msg = queued_messages.getFirstMessage();
724         if ( msg == null ) {
725         System.err.println( "Error: TOTAL_OLD.passUpMessages() - unexpected null Message retrieved from stored messages" );
726         return 0;
727         }
728         passUp( new Event( Event.MSG, msg ) );
729
730         // increment the next expected sequence id
731
next_seq_id++;
732
733         return (1 + passUpMessages());
734     } else {
735         /* don't drop messages, it should be requesting resends
736         // all messages stored have sequence ids greater than expected
737         if ( queued_messages.getSize() > 10 ) {
738          {
739             System.err.println( "WARNING: TOTAL_OLD.passUpMessages() - more than 10 messages saved" );
740             System.err.println( "Dropping sequence id: " + next_seq_id );
741         }
742         next_seq_id++;
743         return passUpMessages();
744         }
745         */

746         return 0;
747     }
748     }
749
750
751     private final long last_request_time = -1;
752     /**
753      * stores the message in the list of messages. also passes up any messages
754      * if it can (i.e. if it satisfies total ordering).
755      * if the sequence for the next expected message is unknown, the message is
756      * discarded without being stored
757      */

758     private synchronized void handleBCastMessage( Message msg, long seq ) {
759     /* store the message anyway, hopefully we'll get a TOTAL_NEW_VIEW message later
760     if ( next_seq < 0 ) {
761         // don't know what sequence id to expect
762          System.err.println( "TOTAL_OLD.handleBCastMessage() - received broadcast message but don't know what sequence id to expect" );
763         return;
764     }
765     */

766
767     if ( seq < next_seq_id ) {
768         // we're expecting a message with a greater sequence id
769
// hopefully, we've already seen this message so just ignore it
770
return;
771     }
772
773     // save this message in the list of received broadcast messages
774
queued_messages.insertMessage( msg, seq );
775
776     // try to pass up any messages
777
int num_passed = passUpMessages();
778 // TODO: this if is temporary (debug)
779
if ( num_passed > 1 )
780      System.err.println( "TOTAL_OLD.handleBCastMessage() - " + num_passed + " message(s) passed up the Protocol Stack" );
781
782         /* this is handles by the retransmission thread now
783     // see if we may need to issue any resend requests
784     if ( queued_messages.getSize() > 1 ) { // TODO: magical constant N?
785         Address sequencer = getSequencer();
786         //Object sequencer = msg.makeReply().getSrc(); // test (debug)
787         if ( sequencer == null ) {
788         // couldn't get the sequencer of the group
789         System.err.println( "TOTAL_OLD.handleBCastMessage() - couldn't determine sequencer to send a TOTAL_RESEND request" );
790         return;
791         }
792
793         if ( local_addr == null ) {
794         // don't know local address, can't set source of message
795         System.err.println( "TOTAL_OLD.handleBCastMessage() - do not know local address so cannot send resend request for message " + seq );
796         return;
797         }
798
799         long time_now = System.currentTimeMillis();
800         if ( (last_request_time >= 0) && ((time_now - last_request_time) < 1000) ) {
801         return;
802         } else {
803         last_request_time = time_now;
804         }
805         // request a resend request for all missing sequence ids
806         // from the next one expected up to the "earliest" queued one
807         // TODO: (works a little different now)
808         long first_queued_seq = queued_messages.getFirstSeq();
809         long max_resend_seq = ((next_seq_id + 10) > first_queued_seq) ? first_queued_seq : (next_seq_id + 10);
810         for( long resend_seq=next_seq_id; resend_seq<=max_resend_seq ; resend_seq++ ) {
811         Message resend_msg = new Message( sequencer, local_addr, null );
812         resend_msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_RESEND, resend_seq ) );
813         passDown( new Event( Event.MSG, resend_msg ) );
814          System.err.println( "TOTAL_OLD.handleBCastMessage() - resend requested for message " + resend_seq );
815         }
816     }
817     */

818     }
819
820
821     /**
822      * respond to a request message by broadcasting a copy of the message to the group
823      * with the next sequence id assigned to it
824      * if we do not know what the next sequence id is to assign, discard the message
825      */

826     private synchronized void handleRequestMessage( Message msg ) {
827     if ( next_seq_id_to_assign < 0 ) {
828         // we cannot assign a valid sequence id
829
System.err.println( "Error: TOTAL_OLD.handleRequestMessage() - cannot handle request... do not know what sequence id to assign" );
830         return;
831     }
832
833     // make the message a broadcast message to the group
834
msg.setDest( null );
835
836     // set the source of the message to be me
837
msg.setSrc( local_addr );
838
839     // add the sequence id to the message
840
msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_BCAST, next_seq_id_to_assign ) );
841
842     // store a copy of this message is the history
843
Message msg_copy = msg.copy();
844     ack_history.addMessage( msg_copy, next_seq_id_to_assign );
845
846     // begin debug
847
{
848         Object JavaDoc header = msg_copy.getHeader(getName());
849         if ( !(header instanceof TotalHeader) ) {
850         System.err.println( "Error: TOTAL_OLD.handleRequestMessage() - BAD: stored message that did not contain a TotalHeader - " + next_seq_id_to_assign );
851         }
852     } //
853
// end debug
854

855     // increment the next sequence id to use
856
next_seq_id_to_assign++;
857
858     // pass this new Message (wrapped in an Event) down the Protocol Stack
859
passDown( new Event( Event.MSG, msg ) );
860     }
861
862
863     /**
864      * respond to a request to resend a message with the specified sequence id
865      */

866     private synchronized void handleResendRequest( Message msg, long seq ) {
867      System.err.println( "TOTAL_OLD.handleRequestMessage() - received resend request for message " + seq );
868
869     /* just rebroadcast for now because i can't get the source - this is bad (TODO: fix this)
870     Object requester = msg.makeReply().getSrc(); // Address? of requester - test (debug)
871     /*
872     Object temp_obj = msg.getSrc();
873     if ( temp_obj instanceof Address ) {
874         Address requester = (Address) temp_obj;
875     } else {
876         System.err.println( "Error: TOTAL_OLD.handleResendRequest() - could not cast source of message to an Address" );
877         return;
878     }
879     * /
880     if ( requester == null ) {
881         // don't know who to send this back to
882         System.err.println( "TOTAL_OLD.handleResendRequest() - do not know who requested this resend request for sequence " + seq );
883         return;
884     }
885     */

886     Address requester = null;
887 // System.err.println( "TOTAL_OLD: got here - 1" );
888
Message resend_msg = ack_history.getMessage( seq );
889 // System.err.println( "TOTAL_OLD: got here - 2" );
890
if ( resend_msg == null ) {
891         // couldn't find this message in the history
892
System.err.println( "TOTAL_OLD.handleResendRequest() - could not find the message " + seq + " in the history to resend" );
893         return;
894     }
895     resend_msg.setDest( requester );
896
897     // note: do not need to add a TotalHeader because it should already be a
898
// TOTAL_BCAST message
899
// begin debug
900
{
901         Object JavaDoc header = resend_msg.getHeader(getName());
902         if ( header instanceof TotalHeader ) {
903         //System.err.println( "TOTAL_OLD: resend msg GOOD (header is TotalHeader) - " + seq );
904
} else {
905         System.err.println( "TOTAL_OLD: resend msg BAD (header is NOT a TotalHeader) - " + seq );
906         }
907     } //
908
// end debug
909

910     passDown( new Event( Event.MSG, resend_msg ) );
911      System.err.println( "TOTAL_OLD.handleResendRequest() - responded to resend request for message " + seq );
912     }
913
914
915     /**
916      * handles an Event coming down the Protocol Stack
917      */

918     public void down(Event evt) {
919         Message msg;
920
921         //System.out.println("DOWN: " + evt);
922

923         switch( evt.getType() ) {
924
925         case Event.VIEW_CHANGE:
926         // this will probably never happen
927
System.err.println( "NOTE: VIEW_CHANGE Event going down through " + PROTOCOL_NAME );
928
929             Vector JavaDoc new_members=((View)evt.getArg()).getMembers();
930             synchronized(members) {
931                 members.removeAllElements();
932                 if(new_members != null && new_members.size() > 0)
933                     for(int i=0; i < new_members.size(); i++)
934                         members.addElement(new_members.elementAt(i));
935             }
936             break;
937
938         case Event.MSG:
939         Object JavaDoc temp_obj = evt.getArg();
940         if ( temp_obj instanceof Message ) {
941         msg = (Message) temp_obj;
942
943         // note: a TotalHeader is added to every message (Event.MSG)
944
// that is sent
945

946         // check if this is a broadcast message
947
if ( msg.getDest() == null ) {
948             // yes, this is a broadcast message
949

950             // send out a request for a message to be broadcast
951
// (the sequencer will handle this)
952
Address sequencer = getSequencer();
953             if ( sequencer != null ) {
954             // we only need to send the request to the sequencer (who will broadcast it)
955
msg.setDest( sequencer );
956             } else {
957             // couldn't find sequencer of the group
958
// for now, just send it to the original destination
959
// (don't need to do anything here)
960
}
961
962             //msg.putHeader(getName(), TotalHeader.getRequestHeader() );
963
msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_REQUEST, -1));
964
965
966         } else {
967             // this is a point to point unicast message so just send it to its original destination
968
msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_UNICAST, -1 ) ); // sequence id in header is irrelevant
969
}
970         } else {
971         System.err.println( "Error: TOTAL_OLD.down() - could not cast argument of Event to a Message (case Event.MSG)" );
972         } // if ( temp_obj instanceof Message )
973
break;
974             
975         default:
976             break;
977         } // switch( evt.getType() )
978

979         passDown(evt); // Pass on to the layer below us
980

981     }
982
983
984     /**
985      * returns true if we are currently the sequencer of the group;
986      * returns false otherwise
987      * note: returns false if our local address is unknown, or the list of members is
988      * empty
989      */

990     private boolean isSequencer() {
991     if ( local_addr == null ) {
992         // don't know my own local address
993
System.err.println( "TOTAL_OLD.isSequencer() - local address unknown!" );
994         return false;
995     }
996
997     synchronized( members ) {
998         if ( members.size() == 0 ) {
999         // there are no members listed for the group (not even myself)
1000
System.err.println( "TOTAL_OLD.isSequencer() - no members!" );
1001        return false;
1002        }
1003
1004        Object JavaDoc temp_obj = members.elementAt(0);
1005        if ( temp_obj instanceof Address ) {
1006        Address seq_addr = (Address) temp_obj;
1007        return local_addr.equals(seq_addr);
1008        } else {
1009        System.err.println( "Error: TOTAL_OLD.isSequencer() - could not cast element of \"members\" to an Address" );
1010        return false;
1011        } // if ( temp_obj instanceof Address )
1012
}
1013    }
1014
1015
1016    /**
1017     * returns the Address of the local machine
1018     * returns null if it is not known yet
1019     */

1020    protected Address getLocalAddr() {
1021    return local_addr;
1022    }
1023
1024
1025    /**
1026     * returns the address of the current sequencer of the group
1027     * returns null if the list of members is empty
1028     */

1029    protected Address getSequencer() {
1030    synchronized( members ) {
1031        if ( members.size() == 0 ) {
1032        System.err.println( "TOTAL_OLD.getSequencer() - no members" );
1033        return null;
1034        } else {
1035        Object JavaDoc temp_obj = members.elementAt(0);
1036        if ( temp_obj instanceof Address ) {
1037            return (Address) temp_obj;
1038        } else {
1039            System.err.println( "Error: TOTAL_OLD.getSequencer() - could not cast first element of \"members\" to an Address" );
1040            return null;
1041        }
1042        }
1043    }
1044    }
1045
1046
1047
1048
1049
1050    /**
1051 * class TotalHeader
1052 *
1053 * The header that is prepended to every message passed down through the TOTAL_OLD layer
1054 * and removed (and processed) from every message passed up through the TOTAL_OLD layer
1055 */

1056    public static class TotalHeader extends Header {
1057    // Total message types
1058
public final static int TOTAL_UNICAST = 0; // a point to point unicast message that should not be processed by TOTAL_OLD
1059
public final static int TOTAL_BCAST = 1; // message broadcast by the sequencer
1060
public final static int TOTAL_REQUEST = 2; // request for a message to be broadcast
1061
public final static int TOTAL_NEW_VIEW = 3; // reset with a view change, sequence number also reset
1062
public final static int TOTAL_NEW_VIEW_ACK = 4; // acknowledgement of new view and sequence id
1063
public final static int TOTAL_CUM_SEQ_ACK = 5; // cumulatively acknowledge the reception of messages up to a sequence id
1064
public final static int TOTAL_SEQ_ACK = 6; // acknowledge the reception of a message with a certain sequence id (probably won't be used)
1065
public final static int TOTAL_RESEND = 7; // request the message with a certain sequence id
1066

1067    public int total_header_type;
1068
1069    // TODO: finish commenting meaning of seq_id for different header types
1070
/**
1071     * seq_id
1072     * for TOTAL_BCAST messages, seq_id is used to determine the order of messages
1073     * in the view. seq_id is expected to increment by one for each new message
1074     * sent in the current view. this sequence id is reset with each new view.
1075     * the GMS layer should make sure that messages sent in one view are not
1076     * received in another view.
1077     * for TOTAL_REQUEST messages, seq_id is not used
1078     * for TOTAL_NEW_VIEW, seq_id is the sequence id that the sequencer of this
1079     * view will use for the first message broadcast to the group
1080     * (i.e. the expected sequence id is "reset" to this value)
1081     * for TOTAL_NEW_VIEW_ACK,
1082     * for TOTAL_CUM_SEQ_ACK messages, the seq_id is the cumulative sequence id
1083     * that the sender has received
1084     * for TOTAL_SEQ_ACK messages, seq_id is the sequence id that is being acknowledged
1085     * for TOTAL_RESEND, seq_id is the sequence id to be sent again
1086     */

1087    public long seq_id; // see use above (varies between types of headers)
1088

1089
1090    public TotalHeader() {} // used for externalization
1091

1092    public TotalHeader( int type, long seq ) {
1093        switch( type ) {
1094        case TOTAL_UNICAST:
1095        case TOTAL_BCAST:
1096        case TOTAL_REQUEST:
1097        case TOTAL_NEW_VIEW:
1098        case TOTAL_NEW_VIEW_ACK:
1099        case TOTAL_CUM_SEQ_ACK:
1100        case TOTAL_SEQ_ACK:
1101        case TOTAL_RESEND:
1102        // the given type is a known one
1103
total_header_type = type;
1104        break;
1105
1106        default:
1107        // this type is unknown
1108
System.err.println( "Error: TotalHeader.TotalHeader() - unknown TotalHeader type given: " + type );
1109        total_header_type = -1;
1110        break;
1111        }
1112
1113        seq_id = seq;
1114    }
1115
1116    //static TotalHeader getRequestHeader() {
1117
//return new TotalHeader( TOTAL_REQUEST, -1 ); // sequence id is irrelevant
1118
//}
1119

1120    public String JavaDoc toString() {
1121        String JavaDoc type = "";
1122        switch( total_header_type ) {
1123        case TOTAL_UNICAST:
1124        type = "TOTAL_UNICAST";
1125        break;
1126
1127        case TOTAL_BCAST:
1128        type = "TOTAL_BCAST";
1129        break;
1130
1131        case TOTAL_REQUEST:
1132        type = "TOTAL_REQUEST";
1133        break;
1134
1135        case TOTAL_NEW_VIEW:
1136        type = "NEW_VIEW";
1137        break;
1138
1139        case TOTAL_NEW_VIEW_ACK:
1140        type = "NEW_VIEW_ACK";
1141        break;
1142
1143        case TOTAL_CUM_SEQ_ACK:
1144        type = "TOTAL_CUM_SEQ_ACK";
1145        break;
1146
1147        case TOTAL_SEQ_ACK:
1148        type = "TOTAL_SEQ_ACK";
1149        break;
1150
1151        case TOTAL_RESEND:
1152        type = "TOTAL_RESEND";
1153        break;
1154
1155        default:
1156        type = "UNKNOWN TYPE (" + total_header_type + ')';
1157        break;
1158        }
1159
1160        return "[ TOTAL_OLD: type=" + type + ", seq=" + seq_id + " ]";
1161    }
1162
1163
1164
1165    public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
1166        out.writeInt(total_header_type);
1167        out.writeLong(seq_id);
1168    }
1169    
1170    
1171
1172    public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
1173        total_header_type=in.readInt();
1174        seq_id=in.readLong();
1175    }
1176
1177
1178
1179    } // class TotalHeader
1180

1181
1182
1183} // class TOTAL_OLD
1184

1185
1186/*****************************************************************************
1187 * class TotalRetransmissionThread
1188 *
1189 * thread that handles retransmission for the TOTAL_OLD protocol
1190 ****************************************************************************/

1191class TotalRetransmissionThread extends Thread JavaDoc {
1192    // state variables to determine when and what to request
1193
private long last_retrans_request_time; // last time (in milliseconds) that we sent a resend request
1194
private long last_requested_seq; // latest sequence id that we have requested
1195

1196    // retransmission constants
1197
final private static long polling_delay = 1000; // how long (in milliseconds) to sleep before rechecking for resend
1198
final private static long resend_timeout = 2000; // amount of time (in milliseconds) to wait on a resend request before resending another request
1199
final private static int max_request = 10; // maximum number of resend request to send out in one iteration
1200

1201    // reference to the parent TOTAL_OLD protocol instance
1202
private TOTAL_OLD prot_ptr;
1203
1204    // flag to specify if the thread should continue running
1205
private boolean is_running;
1206
1207
1208    /**
1209     * constructor
1210     *
1211     * creates and initializes a retransmission thread for the
1212     * specified instance of a TOTAL_OLD protocol
1213     */

1214    public TotalRetransmissionThread( TOTAL_OLD parent_prot ) {
1215    if ( parent_prot != null ) {
1216        prot_ptr = parent_prot;
1217    } else {
1218        // parent thread not specified
1219
System.err.println( "Error: TotalRetransmissionThread.TotalRetransmissionThread() - given parent protocol reference is null\n (FATAL ERROR - TOTAL_OLD protocol will not function properly)" );
1220
1221        // prevent the run method from doing any work
1222
is_running = false;
1223    }
1224
1225    // initialize the state variables
1226
reset();
1227
1228    // let the thread make resend requests
1229
is_running = true;
1230    }
1231
1232    /**
1233     * resets the state of the thread as if it was just started
1234     * the thread will assume that there were no resend requests make
1235     */

1236    public void reset() {
1237    // we have not made any resend requests for any messages
1238
last_retrans_request_time = -1;
1239    last_requested_seq = -1;
1240    }
1241
1242
1243    /**
1244     * send a resend request to the given sequencer (from the given local_addr)
1245     * for the given sequence id
1246     */

1247    private void sendResendRequest( Address sequencer, Address local_addr, long seq_id ) {
1248    Message resend_msg = new Message( sequencer, local_addr, null );
1249    resend_msg.putHeader(getName(), new TOTAL_OLD.TotalHeader( TOTAL_OLD.TotalHeader.TOTAL_RESEND, seq_id ) );
1250    prot_ptr.passDown( new Event( Event.MSG, resend_msg ) );
1251
1252    // debug
1253
System.err.println( "TotalRetransmissionThread.resend() - resend requested for message " + seq_id );
1254    }
1255
1256
1257    /**
1258     * checks if a resend request should be made to the sequencer. if a request needs
1259     * to be made, it makes the appropriate requests with the parameters specified
1260     * by the constants in this class
1261     */

1262    private void checkForResend() {
1263    long first_seq_id = prot_ptr.getFirstQueuedSeqID(); // sequence id of first queued message
1264
/*
1265    // begin debug
1266    System.out.println( "DEBUG (TotalRetransmissionThread) - first_seq_id = " + first_seq_id );
1267    // end debug
1268    */

1269    if ( first_seq_id >= 0 ) {
1270        // there is at least one message in the queue
1271

1272        long next_seq_id = prot_ptr.getNextSeqID(); // next sequence id expected from the group
1273
if ( (next_seq_id < first_seq_id) ) { // TODO: handle case to resend TOTAL_NEW_VIEW message
1274
// there are messages that we received out of order
1275
//System.err.println( "DEBUG (TotalRetransmissionThread) - there are messages queued" ); // debug
1276

1277        // see if it is time to send a request
1278
long time_now = System.currentTimeMillis();
1279        if ( (next_seq_id > last_requested_seq) ||
1280             (time_now > (last_retrans_request_time + resend_timeout)) ||
1281             (last_retrans_request_time < 0) ) {
1282            // send a resend request to the sequencer
1283
//System.err.println( "DEBUG (TotalRetransmissionThread) - sending resend requests" ); // debug
1284
Address sequencer = prot_ptr.getSequencer();
1285            if ( sequencer == null ) {
1286            System.out.println( "Error: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - could not determine sequencer to send a TOTAL_RESEND request" );
1287
1288            return;
1289            }
1290
1291            Address local_addr = prot_ptr.getLocalAddr();
1292            if ( local_addr == null ) {
1293            System.out.println( "Warning: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - local address not specified in TOTAL_RESEND request... attempting to send requests anyway" );
1294            }
1295
1296            long temp_long = (next_seq_id + max_request); // potential max seq id to request (exclusive)
1297
long last_resend_seq_id = (temp_long > first_seq_id) ? first_seq_id : temp_long;
1298            for( long resend_seq=next_seq_id; resend_seq<last_resend_seq_id ; resend_seq++ ) {
1299            sendResendRequest( sequencer, local_addr, resend_seq );
1300            }
1301            // update state for this set of resend requests
1302
last_retrans_request_time = time_now;
1303            last_requested_seq = last_resend_seq_id;
1304        }
1305        } // if ( (next_seq_id < first_seq_id) )
1306
} // if ( first_seq_id >= 0 )
1307
// else there are no messages to request
1308
}
1309
1310
1311    /**
1312     * overloaded from Thread
1313     * method that executes when the thread is started
1314     */

1315    public void run() {
1316    while( is_running ) {
1317        // resend any requests if necessary
1318
//System.err.println( "DEBUG (TotalRetransmissionThread) - heartbeat" ); // debug
1319
checkForResend();
1320
1321        // wait before check again
1322
try {
1323        sleep( polling_delay );
1324        } catch( InterruptedException JavaDoc e ) {} // do nothing if interrupted
1325
}
1326    }
1327
1328    /**
1329     * stops the thread from making any further resend requests
1330     * note: the thread may not die immediately
1331     */

1332    public void stopResendRequests() {
1333    is_running = false;
1334    }
1335} // class TotalRetransmissionThread
1336
Popular Tags