KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > FLUSH


1 package org.jgroups.protocols.pbcast;
2
3 import org.jgroups.*;
4 import org.jgroups.annotations.GuardedBy;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.util.Digest;
7 import org.jgroups.util.Promise;
8 import org.jgroups.util.Streamable;
9 import org.jgroups.util.Util;
10
11 import java.io.*;
12 import java.util.*;
13
14 /**
15  * Flush, as it name implies, forces group members to flush their pending
16  * messages while blocking them to send any additional messages. The process of
17  * flushing acquiesces the group so that state transfer or a join can be done.
18  * It is also called stop-the-world model as nobody will be able to send
19  * messages while a flush is in process.
20  *
21  * <p>
22  * Flush is needed for:
23  * <p>
24  * (1) State transfer. When a member requests state transfer, the coordinator
25  * tells everyone to stop sending messages and waits for everyone's ack. Then it
26  * asks the application for its state and ships it back to the requester. After
27  * the requester has received and set the state successfully, the coordinator
28  * tells everyone to resume sending messages.
29  * <p>
30  * (2) View changes (e.g.a join). Before installing a new view V2, flushing
31  * would ensure that all messages *sent* in the current view V1 are indeed
32  * *delivered* in V1, rather than in V2 (in all non-faulty members). This is
33  * essentially Virtual Synchrony.
34  *
35  *
36  *
37  * @author Vladimir Blagojevic
38  * @version $Id$
39  * @since 2.4
40  */

41 public class FLUSH extends Protocol {
42     public static final String JavaDoc NAME = "FLUSH";
43
44     @GuardedBy ("sharedLock")
45     private View currentView;
46
47     private Address localAddress;
48
49     /**
50      * Group member that requested FLUSH. For view intallations flush
51      * coordinator is the group coordinator For state transfer flush coordinator
52      * is the state requesting member
53      */

54     @GuardedBy ("sharedLock")
55     private Address flushCoordinator;
56
57     @GuardedBy ("sharedLock")
58     private final List<Address> flushMembers;
59
60     @GuardedBy ("sharedLock")
61     private final Set<Address> flushOkSet;
62
63     @GuardedBy ("sharedLock")
64     private final Map<Address, Digest> flushCompletedMap;
65
66     @GuardedBy ("sharedLock")
67     private final Set<Address> stopFlushOkSet;
68
69     @GuardedBy ("sharedLock")
70     private final Set<Address> suspected;
71
72     private final Object JavaDoc sharedLock = new Object JavaDoc();
73
74     private final Object JavaDoc blockMutex = new Object JavaDoc();
75
76     /**
77      * Indicates if FLUSH.down() is currently blocking threads Condition
78      * predicate associated with blockMutex
79      */

80     @GuardedBy ("blockMutex")
81     private boolean isBlockingFlushDown = true;
82
83     /**
84      * Default timeout for a group member to be in
85      * <code>isBlockingFlushDown</code>
86      */

87     private long timeout = 8000;
88     
89     private boolean enable_reconciliation = true;
90
91     @GuardedBy ("sharedLock")
92     private boolean receivedFirstView = false;
93
94     @GuardedBy ("sharedLock")
95     private boolean receivedMoreThanOneView = false;
96
97     private long startFlushTime;
98
99     private long totalTimeInFlush;
100
101     private int numberOfFlushes;
102
103     private double averageFlushDuration;
104
105     private final Promise flush_promise = new Promise();
106
107     @GuardedBy ("sharedLock")
108     private final FlushPhase flushPhase = new FlushPhase();
109
110     @GuardedBy ("sharedLock")
111     private final List<Address> reconcileOks = new ArrayList<Address>();
112
113     public FLUSH() {
114         super();
115         currentView = new View(new ViewId(), new Vector<Address>());
116         flushOkSet = new TreeSet<Address>();
117         flushCompletedMap = new HashMap<Address, Digest>();
118         stopFlushOkSet = new TreeSet<Address>();
119         flushMembers = new ArrayList<Address>();
120         suspected = new TreeSet<Address>();
121     }
122
123     public String JavaDoc getName() {
124         return NAME;
125     }
126
127     public boolean setProperties(Properties props) {
128         super.setProperties(props);
129
130         timeout = Util.parseLong(props, "timeout", timeout);
131         enable_reconciliation = Util.parseBoolean(props, "enable_reconciliation", enable_reconciliation);
132         String JavaDoc str = props.getProperty("auto_flush_conf");
133         if (str != null) {
134             log.warn("auto_flush_conf has been deprecated and its value will be ignored");
135             props.remove("auto_flush_conf");
136         }
137
138         if (!props.isEmpty()) {
139             log.error("the following properties are not recognized: " + props);
140             return false;
141         }
142         return true;
143     }
144
145     public void start() throws Exception JavaDoc {
146         Map<String JavaDoc,Boolean JavaDoc> map = new HashMap<String JavaDoc,Boolean JavaDoc>();
147         map.put("flush_supported", Boolean.TRUE);
148         up_prot.up(new Event(Event.CONFIG, map));
149         down_prot.down(new Event(Event.CONFIG, map));
150
151         synchronized (sharedLock) {
152             receivedFirstView = false;
153             receivedMoreThanOneView = false;
154         }
155         synchronized (blockMutex) {
156             isBlockingFlushDown = true;
157         }
158     }
159
160     public void stop() {
161         synchronized (sharedLock) {
162             currentView = new View(new ViewId(), new Vector<Address>());
163             flushCompletedMap.clear();
164             flushOkSet.clear();
165             stopFlushOkSet.clear();
166             flushMembers.clear();
167             suspected.clear();
168             flushCoordinator = null;
169         }
170     }
171
172     /* -------------------JMX attributes and operations --------------------- */
173
174     public double getAverageFlushDuration() {
175         return averageFlushDuration;
176     }
177
178     public long getTotalTimeInFlush() {
179         return totalTimeInFlush;
180     }
181
182     public int getNumberOfFlushes() {
183         return numberOfFlushes;
184     }
185
186     public boolean startFlush(long timeout) {
187         Map atts = new HashMap();
188         atts.put("timeout", new Long JavaDoc(timeout));
189         return startFlush(new Event(Event.SUSPEND,atts), 3, false);
190     }
191
192     private boolean startFlush(Event evt, int numberOfAttempts, boolean isRetry) {
193         boolean successfulFlush = false;
194         if (!flushPhase.isFlushInProgress() || isRetry) {
195             flush_promise.reset();
196             Map atts = (Map) evt.getArg();
197             long timeout = ((Long JavaDoc)atts.get("timeout")).longValue();
198             if (log.isDebugEnabled()){
199                 if(isRetry)
200                     log.debug("Retrying FLUSH at " + localAddress + ", "+ evt + ". Attempts left " + numberOfAttempts);
201                 else
202                     log.debug("Received " + evt+ " at " + localAddress + ". Running FLUSH...");
203             }
204
205             onSuspend((View)atts.get("view"));
206             try {
207                 Boolean JavaDoc r = (Boolean JavaDoc) flush_promise.getResultWithTimeout(timeout);
208                 successfulFlush = r.booleanValue();
209             } catch (TimeoutException e) {
210                 if (log.isTraceEnabled())
211                     log.trace("At " + localAddress
212                             + " timed out waiting for flush responses after "
213                             + timeout + " msec");
214             }
215         }
216
217         if (!successfulFlush && numberOfAttempts > 0) {
218             long backOffSleepTime = Util.random(5);
219             backOffSleepTime = backOffSleepTime < 2 ? backOffSleepTime + 2: backOffSleepTime;
220             if (log.isTraceEnabled())
221                 log.trace("At " + localAddress + ". Backing off for "
222                         + backOffSleepTime + " sec. Attempts left "
223                         + numberOfAttempts);
224
225             Util.sleep(backOffSleepTime * 1000);
226             Boolean JavaDoc succeededWhileWeSlept = (Boolean JavaDoc)flush_promise.getResult(1);
227             boolean shouldRetry = !(succeededWhileWeSlept !=null && succeededWhileWeSlept.booleanValue());
228             if(shouldRetry)
229                 successfulFlush = startFlush(evt, --numberOfAttempts, true);
230         }
231         return successfulFlush;
232     }
233
234     public void stopFlush() {
235         down(new Event(Event.RESUME));
236     }
237
238     /*
239      * ------------------- end JMX attributes and operations
240      * ---------------------
241      */

242
243     public Object JavaDoc down(Event evt) {
244         switch (evt.getType()) {
245         case Event.MSG:
246             Message msg = (Message) evt.getArg();
247             FlushHeader fh = (FlushHeader) msg.getHeader(getName());
248             if (fh != null && fh.type == FlushHeader.FLUSH_BYPASS) {
249                 return down_prot.down(evt);
250             } else {
251                 blockMessageDuringFlush();
252             }
253             break;
254         case Event.GET_STATE:
255             blockMessageDuringFlush();
256             break;
257
258         case Event.CONNECT:
259             sendBlockUpToChannel();
260             break;
261
262         case Event.SUSPEND:
263             return startFlush(evt, 3, false);
264
265         case Event.RESUME:
266             onResume();
267             return null;
268         }
269         return down_prot.down(evt);
270     }
271
272     private void blockMessageDuringFlush() {
273         boolean shouldSuspendByItself = false;
274         long start = 0, stop = 0;
275         synchronized (blockMutex) {
276             while (isBlockingFlushDown) {
277                 if (log.isDebugEnabled())
278                     log.debug("FLUSH block at " + localAddress + " for "
279                             + (timeout <= 0 ? "ever" : timeout + "ms"));
280                 try {
281                     start = System.currentTimeMillis();
282                     if (timeout <= 0)
283                         blockMutex.wait();
284                     else
285                         blockMutex.wait(timeout);
286                     stop = System.currentTimeMillis();
287                 } catch (InterruptedException JavaDoc e) {
288                     Thread.currentThread().interrupt(); // set interrupt flag
289
// again
290
}
291                 if (isBlockingFlushDown) {
292                     isBlockingFlushDown = false;
293                     shouldSuspendByItself = true;
294                     blockMutex.notifyAll();
295                 }
296             }
297         }
298         if (shouldSuspendByItself) {
299             log.warn("unblocking FLUSH.down() at " + localAddress
300                     + " after timeout of " + (stop - start) + "ms");
301             flush_promise.setResult(Boolean.TRUE);
302         }
303     }
304
305     public Object JavaDoc up(Event evt) {
306
307         switch (evt.getType()) {
308         case Event.MSG:
309             Message msg = (Message) evt.getArg();
310             FlushHeader fh = (FlushHeader) msg.getHeader(getName());
311             if (fh != null) {
312                 if (fh.type == FlushHeader.FLUSH_BYPASS) {
313                     return up_prot.up(evt);
314                 } else if (fh.type == FlushHeader.START_FLUSH) {
315                     handleStartFlush(msg, fh);
316                 } else if (fh.type == FlushHeader.FLUSH_RECONCILE) {
317                     handleFlushReconcile(msg, fh);
318                 } else if (fh.type == FlushHeader.FLUSH_RECONCILE_OK) {
319                     onFlushReconcileOK(msg);
320                 } else if (fh.type == FlushHeader.STOP_FLUSH) {
321                     onStopFlush();
322                 } else if (fh.type == FlushHeader.ABORT_FLUSH) {
323                     // abort current flush
324
flush_promise.setResult(Boolean.FALSE);
325                 } else if (isCurrentFlushMessage(fh)) {
326                     if (fh.type == FlushHeader.FLUSH_OK) {
327                         onFlushOk(msg.getSrc(), fh.viewID);
328                     } else if (fh.type == FlushHeader.STOP_FLUSH_OK) {
329                         onStopFlushOk(msg.getSrc());
330                     } else if (fh.type == FlushHeader.FLUSH_COMPLETED) {
331                         onFlushCompleted(msg.getSrc(), fh.digest);
332                     }
333                 } else {
334                     if (log.isDebugEnabled())
335                         log.debug(localAddress
336                                 + " received outdated FLUSH message " + fh
337                                 + ",ignoring it.");
338                 }
339                 return null; // do not pass FLUSH msg up
340
}
341             break;
342
343         case Event.VIEW_CHANGE:
344             // if this is channel's first view and its the only member of the
345
// group then the
346
// goal is to pass BLOCK,VIEW,UNBLOCK to application space on the
347
// same thread as VIEW.
348
View newView = (View) evt.getArg();
349             boolean firstView = onViewChange(newView);
350             boolean singletonMember = newView.size() == 1
351                     && newView.containsMember(localAddress);
352             if (firstView && singletonMember) {
353                 up_prot.up(evt);
354                 synchronized (blockMutex) {
355                     isBlockingFlushDown = false;
356                     blockMutex.notifyAll();
357                 }
358                 if (log.isDebugEnabled())
359                     log.debug("At "
360                             + localAddress
361                             + " unblocking FLUSH.down() and sending UNBLOCK up");
362
363                 up_prot.up(new Event(Event.UNBLOCK));
364                 return null;
365             }
366             break;
367             
368         case Event.TMP_VIEW:
369             /*
370              * April 25, 2007
371              *
372              * Accomodating current NAKACK (1.127)
373              *
374              * Updates field currentView of a leaving coordinator.
375              * Leaving coordinator, after it sends out the view,
376              * does not need to participate in second flush phase.
377              *
378              * see onStopFlush();
379              *
380              * TODO: revisit if still needed post NAKACK 1.127
381              *
382              */

383             View tmpView = (View) evt.getArg();
384             if(!tmpView.containsMember(localAddress)){
385                 onViewChange(tmpView);
386             }
387             break;
388
389         case Event.SET_LOCAL_ADDRESS:
390             localAddress = (Address) evt.getArg();
391             break;
392
393         case Event.SUSPECT:
394             onSuspect((Address) evt.getArg());
395             break;
396
397         case Event.SUSPEND:
398             return startFlush(evt, 3, false);
399
400         case Event.RESUME:
401             onResume();
402             return null;
403
404         }
405
406         return up_prot.up(evt);
407     }
408
409     private void onFlushReconcileOK(Message msg) {
410         if (log.isDebugEnabled())
411             log.debug(localAddress + " received reconcile ok from "
412                     + msg.getSrc());
413
414         synchronized (sharedLock) {
415             reconcileOks.add(msg.getSrc());
416             if (reconcileOks.size() >= flushMembers.size()) {
417                 flush_promise.setResult(Boolean.TRUE);
418                 if (log.isDebugEnabled())
419                     log.debug("All FLUSH_RECONCILE_OK received at "
420                             + localAddress);
421             }
422         }
423     }
424
425     private void handleFlushReconcile(Message msg, FlushHeader fh) {
426         Address requester = msg.getSrc();
427         Digest reconcileDigest = fh.digest;
428
429         if (log.isDebugEnabled())
430             log.debug("Received FLUSH_RECONCILE at " + localAddress
431                     + " passing digest to NAKACK " + reconcileDigest);
432
433         // Let NAKACK reconcile missing messages
434
down_prot.down(new Event(Event.REBROADCAST, reconcileDigest));
435
436         if (log.isDebugEnabled())
437             log.debug("Returned from FLUSH_RECONCILE at " + localAddress
438                     + " Sending RECONCILE_OK to " + requester + ", thread "
439                     + Thread.currentThread());
440
441         Message reconcileOk = new Message(requester);
442         reconcileOk.setFlag(Message.OOB);
443         reconcileOk.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_RECONCILE_OK));
444         down_prot.down(new Event(Event.MSG, reconcileOk));
445     }
446
447     private void handleStartFlush(Message msg, FlushHeader fh) {
448         byte oldPhase = flushPhase.transitionToFirstPhase();
449         if (oldPhase == FlushPhase.START_PHASE) {
450             sendBlockUpToChannel();
451             onStartFlush(msg.getSrc(), fh);
452         } else if (oldPhase == FlushPhase.FIRST_PHASE) {
453             Address flushRequester = msg.getSrc();
454             Address coordinator = null;
455             synchronized (sharedLock) {
456                 if(flushCoordinator != null)
457                     coordinator = flushCoordinator;
458                 else
459                     coordinator = flushRequester;
460             }
461
462             if (flushRequester.compareTo(coordinator) < 0) {
463                 rejectFlush(fh.viewID, coordinator);
464                 if (log.isDebugEnabled()) {
465                     log.debug("Rejecting flush at " + localAddress
466                             + " to current flush coordinator " + coordinator
467                             + " and switching flush coordinator to "
468                             + flushRequester);
469                 }
470                 synchronized (sharedLock) {
471                     flushCoordinator = flushRequester;
472                 }
473             } else if (flushRequester.compareTo(coordinator) > 0) {
474                 rejectFlush(fh.viewID, flushRequester);
475                 if (log.isDebugEnabled()) {
476                     log.debug("Rejecting flush at " + localAddress
477                             + " to flush requester " + flushRequester + " coordinator is " + coordinator);
478                 }
479             }
480             else if (flushRequester.equals(coordinator)){
481                 if (log.isDebugEnabled()) {
482                     log.debug("Accepting flush at " + localAddress
483                             + ", proceeding with flush");
484                 }
485                 onStartFlush(msg.getSrc(), fh);
486             }
487         } else if (oldPhase == FlushPhase.SECOND_PHASE) {
488             Address flushRequester = msg.getSrc();
489             rejectFlush(fh.viewID, flushRequester);
490             if (log.isDebugEnabled()) {
491                 log.debug("Rejecting flush in second phase at " + localAddress
492                         + " to flush requester " + flushRequester);
493             }
494         }
495     }
496
497     public Vector<Integer JavaDoc> providedDownServices() {
498         Vector<Integer JavaDoc> retval = new Vector<Integer JavaDoc>(2);
499         retval.addElement(new Integer JavaDoc(Event.SUSPEND));
500         retval.addElement(new Integer JavaDoc(Event.RESUME));
501         return retval;
502     }
503
504     private void rejectFlush(long viewId, Address flushRequester) {
505         Message reject = new Message(flushRequester, localAddress, null);
506         reject.putHeader(getName(), new FlushHeader(FlushHeader.ABORT_FLUSH,viewId));
507         down_prot.down(new Event(Event.MSG, reject));
508     }
509
510     private void sendBlockUpToChannel() {
511         up_prot.up(new Event(Event.BLOCK));
512     }
513
514     private boolean isCurrentFlushMessage(FlushHeader fh) {
515         return fh.viewID == currentViewId();
516     }
517
518     private long currentViewId() {
519         long viewId = -1;
520         synchronized (sharedLock) {
521             ViewId view = currentView.getVid();
522             if (view != null) {
523                 viewId = view.getId();
524             }
525         }
526         return viewId;
527     }
528
529     private boolean onViewChange(View view) {
530         boolean amINewCoordinator = false;
531         boolean isThisOurFirstView = false;
532         synchronized (sharedLock) {
533             if (receivedFirstView) {
534                 receivedMoreThanOneView = true;
535             }
536             if (!receivedFirstView) {
537                 receivedFirstView = true;
538             }
539             isThisOurFirstView = receivedFirstView && !receivedMoreThanOneView;
540             suspected.retainAll(view.getMembers());
541             currentView = view;
542             boolean coordinatorLeft = flushCoordinator != null && !view.containsMember(flushCoordinator);
543         
544             if(coordinatorLeft){
545                 flushCoordinator = view.getMembers().get(0);
546                 amINewCoordinator = localAddress.equals(flushCoordinator);
547             }
548         }
549
550         // If coordinator leaves, its STOP FLUSH message will be discarded by
551
// other members at NAKACK layer. Remaining members will be hung,
552
// waiting
553
// for STOP_FLUSH message. If I am new coordinator I will complete the
554
// FLUSH and send STOP_FLUSH on flush callers behalf.
555
if (amINewCoordinator) {
556             if (log.isDebugEnabled())
557                 log.debug("Coordinator left, " + localAddress
558                         + " will complete flush");
559             onResume();
560         }
561
562         if (log.isDebugEnabled())
563             log.debug("Installing view at " + localAddress + " view is "
564                     + view);
565
566         return isThisOurFirstView;
567     }
568
569     private void onStopFlush() {
570         flushPhase.transitionToSecondPhase();
571         if (stats) {
572             long stopFlushTime = System.currentTimeMillis();
573             totalTimeInFlush += (stopFlushTime - startFlushTime);
574             if (numberOfFlushes > 0) {
575                 averageFlushDuration = totalTimeInFlush / (double) numberOfFlushes;
576             }
577         }
578         
579         /*
580          * April 25, 2007
581          *
582          * Accomodating current NAKACK (1.127)
583          *
584          * ack this STOP_FLUSH only if we are surviving member
585          * otherwise we get runtime exception from NAKACK
586          *
587          * TODO: revisit if still needed post NAKACK 1.127
588          *
589          */

590         boolean amISurvivingMember = false;
591         synchronized(sharedLock){
592             amISurvivingMember = currentView.containsMember(localAddress);
593         }
594         if(amISurvivingMember){
595             Message msg = new Message(null, localAddress, null);
596             msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH_OK,currentViewId()));
597             down_prot.down(new Event(Event.MSG, msg));
598             if (log.isDebugEnabled())
599                 log.debug("Received STOP_FLUSH and sent STOP_FLUSH_OK from "
600                         + localAddress);
601         }
602     }
603
604     private void onSuspend(View view) {
605         Message msg = null;
606         Collection<Address> participantsInFlush = null;
607         synchronized (sharedLock) {
608             // start FLUSH only on group members that we need to flush
609
if (view != null) {
610                 participantsInFlush = new ArrayList<Address>(view.getMembers());
611                 participantsInFlush.retainAll(currentView.getMembers());
612             } else {
613                 participantsInFlush = new ArrayList<Address>(currentView.getMembers());
614             }
615             msg = new Message(null, localAddress, null);
616             msg.putHeader(getName(), new FlushHeader(FlushHeader.START_FLUSH,
617                     currentViewId(), participantsInFlush));
618         }
619         if (participantsInFlush.isEmpty()) {
620             flush_promise.setResult(Boolean.TRUE);
621         } else {
622             down_prot.down(new Event(Event.MSG, msg));
623             if (log.isDebugEnabled())
624                 log.debug("Flush coordinator " + localAddress
625                         + " is starting FLUSH with participants " + participantsInFlush);
626         }
627     }
628
629     private void onResume() {
630         long viewID = currentViewId();
631         Message msg = new Message(null, localAddress, null);
632         msg.putHeader(getName(), new FlushHeader(FlushHeader.STOP_FLUSH, viewID));
633         down_prot.down(new Event(Event.MSG, msg));
634         if (log.isDebugEnabled())
635             log.debug("Received RESUME at " + localAddress
636                     + ", sent STOP_FLUSH to all");
637     }
638
639     private void onStartFlush(Address flushStarter, FlushHeader fh) {
640         if (stats) {
641             startFlushTime = System.currentTimeMillis();
642             numberOfFlushes += 1;
643         }
644         boolean amIParticipant = false;
645         synchronized (sharedLock) {
646             flushCoordinator = flushStarter;
647             flushMembers.clear();
648             if (fh.flushParticipants != null) {
649                 flushMembers.addAll(fh.flushParticipants);
650             }
651             flushMembers.removeAll(suspected);
652             amIParticipant = flushMembers.contains(localAddress);
653         }
654         if (amIParticipant) {
655             Message msg = new Message(null);
656             msg.putHeader(getName(), new FlushHeader(FlushHeader.FLUSH_OK,fh.viewID));
657             down_prot.down(new Event(Event.MSG, msg));
658             if (log.isDebugEnabled())
659                 log.debug("Received START_FLUSH at " + localAddress
660                         + " responded with FLUSH_OK");
661         }
662     }
663
664     private void onFlushOk(Address address, long viewID) {
665
666         boolean flushOkCompleted = false;
667         boolean amIParticipant = false;
668         Message m = null;
669         synchronized (sharedLock) {
670             amIParticipant = flushMembers.contains(address);
671             flushOkSet.add(address);
672             flushOkCompleted = flushOkSet.containsAll(flushMembers);
673             if (flushOkCompleted) {
674                 m = new Message(flushCoordinator);
675             }
676             if (log.isDebugEnabled())
677                 log.debug("At " + localAddress + " FLUSH_OK from " + address
678                         + ",completed " + flushOkCompleted + ", flushOkSet "
679                         + flushOkSet);
680         }
681
682         if (flushOkCompleted && amIParticipant) {
683             synchronized (blockMutex) {
684                 isBlockingFlushDown = true;
685             }
686             Digest digest = (Digest) down_prot.down(new Event(Event.GET_DIGEST));
687             FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_COMPLETED,viewID);
688             fh.addDigest(digest);
689             m.putHeader(getName(), fh);
690             if (log.isDebugEnabled())
691                 log.debug(localAddress
692                         + " is blocking FLUSH.down(). Sending FLUSH_COMPLETED message to "
693                         + flushCoordinator);
694             down_prot.down(new Event(Event.MSG, m));
695             
696         }
697     }
698
699     private void onStopFlushOk(Address address) {
700
701         boolean stopFlushOkCompleted = false;
702         synchronized (sharedLock) {
703             stopFlushOkSet.add(address);
704             TreeSet<Address> membersCopy = new TreeSet<Address>(currentView.getMembers());
705             membersCopy.removeAll(suspected);
706             stopFlushOkCompleted = stopFlushOkSet.containsAll(membersCopy);
707             
708             if (log.isDebugEnabled())
709                 log.debug("At " + localAddress + " STOP_FLUSH_OK from " + address
710                         + ",completed " + stopFlushOkCompleted
711                         + ", stopFlushOkSet " + stopFlushOkSet);
712         }
713
714         if (stopFlushOkCompleted) {
715             synchronized (sharedLock) {
716                 flushPhase.transitionToStart();
717                 flushCompletedMap.clear();
718                 flushOkSet.clear();
719                 stopFlushOkSet.clear();
720                 flushMembers.clear();
721                 suspected.clear();
722                 flushCoordinator = null;
723             }
724
725             if (log.isDebugEnabled())
726                 log.debug("At " + localAddress
727                         + " unblocking FLUSH.down() and sending UNBLOCK up");
728
729             synchronized (blockMutex) {
730                 isBlockingFlushDown = false;
731                 blockMutex.notifyAll();
732             }
733             up_prot.up(new Event(Event.UNBLOCK));
734         }
735     }
736
737     private void onFlushCompleted(Address address, Digest digest) {
738         boolean flushCompleted = false;
739         Message msg = null;
740         boolean needsReconciliationPhase = false;
741         synchronized (sharedLock) {
742             flushCompletedMap.put(address, digest);
743             if (flushCompletedMap.size() >= flushMembers.size()) {
744                 flushCompleted = flushCompletedMap.keySet().containsAll(flushMembers);
745             }
746             
747             if (log.isDebugEnabled())
748                 log.debug("At " + localAddress + " FLUSH_COMPLETED from " + address
749                         + ",completed " + flushCompleted + ",flushCompleted "
750                         + flushCompletedMap.keySet());
751             
752             needsReconciliationPhase = enable_reconciliation && flushCompleted && hasVirtualSynchronyGaps();
753             if (needsReconciliationPhase){
754                 
755                 Digest d = findHighestSequences();
756                 msg = new Message();
757                 msg.setFlag(Message.OOB);
758                 FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_RECONCILE, currentViewId(), flushMembers);
759                 reconcileOks.clear();
760                 fh.addDigest(d);
761                 msg.putHeader(getName(), fh);
762                 
763                 if (log.isTraceEnabled())
764                     log.trace("Reconciling flush mebers due to virtual synchrony gap, digest is "
765                                     + d + " flush members are " + flushMembers);
766                 
767                 flushCompletedMap.clear();
768             }
769         }
770         if(needsReconciliationPhase){
771             down_prot.down(new Event(Event.MSG, msg));
772         }else if(flushCompleted){
773             flush_promise.setResult(Boolean.TRUE);
774             if (log.isDebugEnabled())
775                 log.debug("All FLUSH_COMPLETED received at "
776                                 + localAddress);
777         }
778     }
779
780     private boolean hasVirtualSynchronyGaps() {
781         ArrayList <Digest> digests = new ArrayList<Digest>();
782         digests.addAll(flushCompletedMap.values());
783         Digest firstDigest = digests.get(0);
784         List<Digest> remainingDigests = digests.subList(1, digests.size());
785         for (Digest digest : remainingDigests) {
786             Digest diff = firstDigest.difference(digest);
787             if (diff != Digest.EMPTY_DIGEST) {
788                 return true;
789             }
790         }
791         return false;
792     }
793
794     private Digest findHighestSequences() {
795         Digest result = null;
796         List<Digest> digests = new ArrayList<Digest>(flushCompletedMap.values());
797
798         result =digests.get(0);
799         List<Digest> remainingDigests = digests.subList(1, digests.size());
800
801         for (Digest digestG : remainingDigests) {
802             result = result.highestSequence(digestG);
803         }
804         return result;
805     }
806
807     private void onSuspect(Address address) {
808         boolean flushOkCompleted = false;
809         Message m = null;
810         long viewID = 0;
811         synchronized (sharedLock) {
812             suspected.add(address);
813             flushMembers.removeAll(suspected);
814             viewID = currentViewId();
815             flushOkCompleted = !flushOkSet.isEmpty() && flushOkSet.containsAll(flushMembers);
816             if (flushOkCompleted) {
817                 m = new Message(flushCoordinator, localAddress, null);
818             }
819             if (log.isDebugEnabled())
820                 log.debug("Suspect is " + address + ",completed "
821                         + flushOkCompleted + ", flushOkSet " + flushOkSet
822                         + " flushMembers " + flushMembers);
823         }
824         if (flushOkCompleted) {
825             Digest digest = (Digest) down_prot.down(new Event(Event.GET_DIGEST));
826             FlushHeader fh = new FlushHeader(FlushHeader.FLUSH_COMPLETED,viewID);
827             fh.addDigest(digest);
828             m.putHeader(getName(), fh);
829             down_prot.down(new Event(Event.MSG, m));
830             if (log.isDebugEnabled())
831                 log.debug(localAddress + " sent FLUSH_COMPLETED message to "
832                         + flushCoordinator);
833         }
834     }
835
836     private class FlushPhase {
837         private byte phase = 0;
838
839         public static final byte START_PHASE = 0;
840
841         public static final byte FIRST_PHASE = 1;
842
843         public static final byte SECOND_PHASE = 2;
844
845         FlushPhase() {
846         }
847
848         public byte transitionToFirstPhase() {
849             byte oldPhase = -1;
850             synchronized(sharedLock){
851                 oldPhase = phase;
852                 if(oldPhase == START_PHASE){
853                     phase = FIRST_PHASE;
854                 }
855             }
856             return oldPhase;
857         }
858
859         public void transitionToStart() {
860             synchronized(sharedLock){
861                 phase = START_PHASE;
862             }
863         }
864
865         public void transitionToSecondPhase() {
866             synchronized(sharedLock){
867                 phase = SECOND_PHASE;
868             }
869         }
870
871         public boolean isFlushInProgress() {
872             synchronized(sharedLock){
873                 return phase != START_PHASE;
874             }
875         }
876     }
877
878     public static class FlushHeader extends Header implements Streamable {
879         public static final byte START_FLUSH = 0;
880
881         public static final byte FLUSH_OK = 1;
882
883         public static final byte STOP_FLUSH = 2;
884
885         public static final byte FLUSH_COMPLETED = 3;
886
887         public static final byte STOP_FLUSH_OK = 4;
888
889         public static final byte ABORT_FLUSH = 5;
890
891         public static final byte FLUSH_BYPASS = 6;
892
893         public static final byte FLUSH_RECONCILE = 7;
894
895         public static final byte FLUSH_RECONCILE_OK = 8;
896
897         byte type;
898
899         long viewID;
900
901         Collection<Address> flushParticipants;
902
903         Digest digest = null;
904
905         public FlushHeader() {
906             this(START_FLUSH, 0);
907         } // used for externalization
908

909         public FlushHeader(byte type) {
910             this(type, 0);
911         }
912
913         public FlushHeader(byte type, long viewID) {
914             this(type, viewID, null);
915         }
916
917         public FlushHeader(byte type, long viewID, Collection<Address> flushView) {
918             this.type = type;
919             this.viewID = viewID;
920             this.flushParticipants = flushView;
921         }
922
923         public void addDigest(Digest digest) {
924             this.digest = digest;
925         }
926
927         public String JavaDoc toString() {
928             switch (type) {
929             case START_FLUSH:
930                 return "FLUSH[type=START_FLUSH,viewId=" + viewID + ",members="
931                         + flushParticipants + "]";
932             case FLUSH_OK:
933                 return "FLUSH[type=FLUSH_OK,viewId=" + viewID + "]";
934             case STOP_FLUSH:
935                 return "FLUSH[type=STOP_FLUSH,viewId=" + viewID + "]";
936             case STOP_FLUSH_OK:
937                 return "FLUSH[type=STOP_FLUSH_OK,viewId=" + viewID + "]";
938             case ABORT_FLUSH:
939                 return "FLUSH[type=ABORT_FLUSH,viewId=" + viewID + "]";
940             case FLUSH_COMPLETED:
941                 return "FLUSH[type=FLUSH_COMPLETED,viewId=" + viewID + "]";
942             case FLUSH_BYPASS:
943                 return "FLUSH[type=FLUSH_BYPASS,viewId=" + viewID + "]";
944             case FLUSH_RECONCILE:
945                 return "FLUSH[type=FLUSH_RECONCILE,viewId=" + viewID
946                         + ",digest=" + digest + "]";
947             case FLUSH_RECONCILE_OK:
948                 return "FLUSH[type=FLUSH_RECONCILE_OK,viewId=" + viewID + "]";
949             default:
950                 return "[FLUSH: unknown type (" + type + ")]";
951             }
952         }
953
954         public void writeExternal(ObjectOutput out) throws IOException {
955             out.writeByte(type);
956             out.writeLong(viewID);
957             out.writeObject(flushParticipants);
958             out.writeObject(digest);
959         }
960
961         public void readExternal(ObjectInput in) throws IOException,
962                 ClassNotFoundException JavaDoc {
963             type = in.readByte();
964             viewID = in.readLong();
965             flushParticipants = (Collection) in.readObject();
966             digest = (Digest) in.readObject();
967         }
968
969         public void writeTo(DataOutputStream out) throws IOException {
970             out.writeByte(type);
971             out.writeLong(viewID);
972             if (flushParticipants != null && !flushParticipants.isEmpty()) {
973                 out.writeShort(flushParticipants.size());
974                 for (Iterator<Address> iter = flushParticipants.iterator(); iter.hasNext();) {
975                     Address address = iter.next();
976                     Util.writeAddress(address, out);
977                 }
978             } else {
979                 out.writeShort(0);
980             }
981             if (digest != null) {
982                 out.writeBoolean(true);
983                 Util.writeStreamable(digest, out);
984             } else {
985                 out.writeBoolean(false);
986             }
987         }
988
989         public void readFrom(DataInputStream in) throws IOException,
990                 IllegalAccessException JavaDoc, InstantiationException JavaDoc {
991             type = in.readByte();
992             viewID = in.readLong();
993             int flushParticipantsSize = in.readShort();
994             if (flushParticipantsSize > 0) {
995                 flushParticipants = new ArrayList<Address>(flushParticipantsSize);
996                 for (int i = 0; i < flushParticipantsSize; i++) {
997                     flushParticipants.add(Util.readAddress(in));
998                 }
999             }
1000            boolean hasDigest = in.readBoolean();
1001            if (hasDigest) {
1002                digest = (Digest) Util.readStreamable(Digest.class, in);
1003            }
1004        }
1005    }
1006}
1007
Popular Tags