KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > pbcast > CoordGmsImpl


1 // $Id: CoordGmsImpl.java,v 1.21 2005/04/15 13:16:59 belaban Exp $
2

3 package org.jgroups.protocols.pbcast;
4
5
6 import org.jgroups.*;
7 import org.jgroups.util.TimeScheduler;
8
9 import java.io.Serializable JavaDoc;
10 import java.util.Iterator JavaDoc;
11 import java.util.Vector JavaDoc;
12
13
14
15
16 /**
17  * Coordinator role of the Group MemberShip (GMS) protocol. Accepts JOIN and LEAVE requests and emits view changes
18  * accordingly.
19  * @author Bela Ban
20  */

21 public class CoordGmsImpl extends GmsImpl {
22     private boolean merging=false;
23     private final MergeTask merge_task=new MergeTask();
24     private final Vector JavaDoc merge_rsps=new Vector JavaDoc(11);
25     // for MERGE_REQ/MERGE_RSP correlation, contains MergeData elements
26
private Serializable JavaDoc merge_id=null;
27
28     private Address merge_leader=null;
29
30     private MergeCanceller merge_canceller=null;
31
32
33
34     public CoordGmsImpl(GMS g) {
35         gms=g;
36     }
37
38
39     void setMergeId(Serializable JavaDoc merge_id) {
40         this.merge_id=merge_id;
41         if(this.merge_id != null) {
42             stopMergeCanceller();
43             merge_canceller=new MergeCanceller(this.merge_id, gms.merge_timeout);
44             gms.timer.add(merge_canceller);
45         }
46         else { // merge completed
47
stopMergeCanceller();
48         }
49     }
50
51     private void stopMergeCanceller() {
52         if(merge_canceller != null) {
53             merge_canceller.cancel();
54             merge_canceller=null;
55         }
56     }
57
58     public void init() throws Exception JavaDoc {
59         super.init();
60         cancelMerge();
61     }
62
63     public void join(Address mbr) {
64         wrongMethod("join");
65     }
66
67     /** The coordinator itself wants to leave the group */
68     public void leave(Address mbr) {
69         if(mbr == null) {
70             if(log.isErrorEnabled()) log.error("member's address is null !");
71             return;
72         }
73         if(mbr.equals(gms.local_addr))
74             leaving=true;
75         handleLeave(mbr, false); // regular leave
76
}
77
78     public void handleJoinResponse(JoinRsp join_rsp) {
79         wrongMethod("handleJoinResponse");
80     }
81
82     public void handleLeaveResponse() {
83         ; // safely ignore this
84
}
85
86     public void suspect(Address mbr) {
87         handleSuspect(mbr);
88     }
89
90     public void unsuspect(Address mbr) {
91
92     }
93
94     /**
95      * Invoked upon receiving a MERGE event from the MERGE layer. Starts the merge protocol.
96      * See description of protocol in DESIGN.
97      * @param other_coords A list of coordinators (including myself) found by MERGE protocol
98      */

99     public void merge(Vector JavaDoc other_coords) {
100         Membership tmp;
101
102         if(merging) {
103             if(log.isWarnEnabled()) log.warn("merge already in progress, discarded MERGE event");
104             return;
105         }
106         merge_leader=null;
107         if(other_coords == null) {
108             if(log.isWarnEnabled()) log.warn("list of other coordinators is null. Will not start merge.");
109             return;
110         }
111
112         if(other_coords.size() <= 1) {
113             if(log.isErrorEnabled())
114                 log.error("number of coordinators found is " + other_coords.size() + "; will not perform merge");
115             return;
116         }
117
118         /* Establish deterministic order, so that coords can elect leader */
119         tmp=new Membership(other_coords);
120         tmp.sort();
121         merge_leader=(Address)tmp.elementAt(0);
122         if(log.isDebugEnabled()) log.debug("coordinators in merge protocol are: " + tmp);
123         if(merge_leader.equals(gms.local_addr) || gms.merge_leader) {
124             if(log.isTraceEnabled())
125                 log.trace("I (" + gms.local_addr + ", merge_leader=" + gms.merge_leader +
126                           ") will be the leader. Starting the merge task");
127             startMergeTask(other_coords);
128         }
129         else {
130             if(log.isTraceEnabled()) log.trace("I (" + gms.local_addr + ") am not the merge leader (" +
131                                                merge_leader + "), waiting for merge leader to initiate merge");
132         }
133     }
134
135     /**
136      * Get the view and digest and send back both (MergeData) in the form of a MERGE_RSP to the sender.
137      * If a merge is already in progress, send back a MergeData with the merge_rejected field set to true.
138      */

139     public void handleMergeRequest(Address sender, Object JavaDoc merge_id) {
140         Digest digest;
141         View view;
142
143         if(sender == null) {
144             if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response");
145             return;
146         }
147         if(merging) {
148             if(log.isErrorEnabled()) log.error("merge already in progress");
149             sendMergeRejectedResponse(sender);
150             return;
151         }
152         merging=true;
153         setMergeId((Serializable JavaDoc)merge_id);
154         if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id);
155         digest=gms.getDigest();
156         view=new View(gms.view_id.copy(), gms.members.getMembers());
157         sendMergeResponse(sender, view, digest);
158     }
159
160
161     MergeData getMergeResponse(Address sender, Object JavaDoc merge_id) {
162         Digest digest;
163         View view;
164         MergeData retval;
165
166         if(sender == null) {
167             if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response");
168             return null;
169         }
170         if(merging) {
171             if(log.isErrorEnabled()) log.error("merge already in progress");
172             retval=new MergeData(sender, null, null);
173             retval.merge_rejected=true;
174             return retval;
175         }
176         merging=true;
177         setMergeId((Serializable JavaDoc)merge_id);
178         if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id);
179
180         digest=gms.getDigest();
181         view=new View(gms.view_id.copy(), gms.members.getMembers());
182         retval=new MergeData(sender, view, digest);
183         retval.view=view;
184         retval.digest=digest;
185         return retval;
186     }
187
188
189     public void handleMergeResponse(MergeData data, Object JavaDoc merge_id) {
190         if(data == null) {
191             if(log.isErrorEnabled()) log.error("merge data is null");
192             return;
193         }
194         if(merge_id == null || this.merge_id == null) {
195             if(log.isErrorEnabled()) log.error("merge_id ("
196                     + merge_id
197                     + ") or this.merge_id ("
198                     + this.merge_id
199                     + ") == null (sender="
200                     + data.getSender()
201                     + ").");
202             return;
203         }
204
205         if(!this.merge_id.equals(merge_id)) {
206             if(log.isErrorEnabled()) log.error("this.merge_id ("
207                     + this.merge_id
208                     + ") is different from merge_id ("
209                     + merge_id
210                     + ')');
211             return;
212         }
213
214         synchronized(merge_rsps) {
215             if(!merge_rsps.contains(data)) {
216                 merge_rsps.addElement(data);
217                 merge_rsps.notifyAll();
218             }
219         }
220     }
221
222     /**
223      * If merge_id != this.merge_id --> discard
224      * Else cast the view/digest to all members of this group.
225      */

226     public void handleMergeView(MergeData data, Object JavaDoc merge_id) {
227         if(merge_id == null
228                 || this.merge_id == null
229                 || !this.merge_id.equals(merge_id)) {
230             if(log.isErrorEnabled()) log.error("merge_ids don't match (or are null); merge view discarded");
231             return;
232         }
233         gms.castViewChange(data.view, data.digest);
234         merging=false;
235         merge_id=null;
236     }
237
238     public void handleMergeCancelled(Object JavaDoc merge_id) {
239         if(merge_id != null
240                 && this.merge_id != null
241                 && this.merge_id.equals(merge_id)) {
242             if(log.isDebugEnabled()) log.debug("merge was cancelled (merge_id=" + merge_id + ')');
243             setMergeId(null);
244             this.merge_leader=null;
245             merging=false;
246         }
247     }
248
249
250     private void cancelMerge() {
251         if(merge_id != null && log.isDebugEnabled()) log.debug("cancelling merge (merge_id=" + merge_id + ')');
252         setMergeId(null);
253         this.merge_leader=null;
254         stopMergeTask();
255         merging=false;
256         synchronized(merge_rsps) {
257             merge_rsps.clear();
258         }
259     }
260
261     /**
262      * Computes the new view (including the newly joined member) and get the digest from PBCAST.
263      * Returns both in the form of a JoinRsp
264      */

265     public synchronized JoinRsp handleJoin(Address mbr) {
266         Vector JavaDoc new_mbrs=new Vector JavaDoc(1);
267         View v=null;
268         Digest d, tmp;
269
270         if(log.isDebugEnabled()) log.debug("mbr=" + mbr);
271         if(gms.local_addr.equals(mbr)) {
272             if(log.isErrorEnabled()) log.error("cannot join myself !");
273             return null;
274         }
275
276         if(gms.members.contains(mbr)) {
277             if(log.isErrorEnabled())
278                 log.error("member " + mbr + " already present; returning existing view " + gms.members.getMembers());
279             return new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest());
280             // already joined: return current digest and membership
281
}
282         new_mbrs.addElement(mbr);
283         tmp=gms.getDigest(); // get existing digest
284
if(tmp == null) {
285             if(log.isErrorEnabled()) log.error("received null digest from GET_DIGEST: will cause JOIN to fail");
286             return null;
287         }
288         if(log.isDebugEnabled()) log.debug("got digest=" + tmp);
289
290         d=new Digest(tmp.size() + 1);
291         // create a new digest, which contains 1 more member
292
d.add(tmp); // add the existing digest to the new one
293
d.add(mbr, 0, 0);
294         // ... and add the new member. it's first seqno will be 1
295
v=gms.getNextView(new_mbrs, null, null);
296         if(log.isDebugEnabled()) log.debug("joined member " + mbr + ", view is " + v);
297         return new JoinRsp(v, d);
298     }
299
300     /**
301      Exclude <code>mbr</code> from the membership. If <code>suspected</code> is true, then
302      this member crashed and therefore is forced to leave, otherwise it is leaving voluntarily.
303      */

304     public synchronized void handleLeave(Address mbr, boolean suspected) {
305         Vector JavaDoc v=new Vector JavaDoc(1);
306         // contains either leaving mbrs or suspected mbrs
307
if(log.isDebugEnabled()) log.debug("mbr=" + mbr);
308         if(!gms.members.contains(mbr)) {
309             if(log.isErrorEnabled()) log.error("mbr " + mbr + " is not a member !");
310             return;
311         }
312
313         if(gms.view_id == null) {
314             // we're probably not the coord anymore (we just left ourselves), let someone else do it
315
// (client will retry when it doesn't get a response
316
if(log.isDebugEnabled())
317                 log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving +
318                           "); the new coordinator will handle the leave request");
319             return;
320         }
321
322         sendLeaveResponse(mbr); // send an ack to the leaving member
323

324         v.addElement(mbr);
325         if(suspected)
326             gms.castViewChange(null, null, v);
327         else
328             gms.castViewChange(null, v, null);
329     }
330
331     void sendLeaveResponse(Address mbr) {
332         Message msg=new Message(mbr, null, null);
333         GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP);
334         msg.putHeader(gms.getName(), hdr);
335         gms.passDown(new Event(Event.MSG, msg));
336     }
337
338     /**
339      * Called by the GMS when a VIEW is received.
340      * @param new_view The view to be installed
341      * @param digest If view is a MergeView, digest contains the seqno digest of all members and has to
342      * be set by GMS
343      */

344     public void handleViewChange(View new_view, Digest digest) {
345         Vector JavaDoc mbrs=new_view.getMembers();
346         if(log.isDebugEnabled()) {
347             if(digest != null)
348                 log.debug("view=" + new_view + ", digest=" + digest);
349             else
350                 log.debug("view=" + new_view);
351         }
352
353         if(leaving && !mbrs.contains(gms.local_addr))
354             return;
355         gms.installView(new_view, digest);
356     }
357
358     public void handleSuspect(Address mbr) {
359         if(mbr.equals(gms.local_addr)) {
360             if(log.isWarnEnabled()) log.warn("I am the coord and I'm being am suspected -- will probably leave shortly");
361             return;
362         }
363         handleLeave(mbr, true); // irregular leave - forced
364
}
365
366     public void handleExit() {
367         cancelMerge();
368     }
369
370     public void stop() {
371         super.stop(); // sets leaving=false
372
stopMergeTask();
373     }
374
375     /* ------------------------------------------ Private methods ----------------------------------------- */
376
377     void startMergeTask(Vector JavaDoc coords) {
378         synchronized(merge_task) {
379             merge_task.start(coords);
380         }
381     }
382
383     void stopMergeTask() {
384         synchronized(merge_task) {
385             merge_task.stop();
386         }
387     }
388
389     /**
390      * Sends a MERGE_REQ to all coords and populates a list of MergeData (in merge_rsps). Returns after coords.size()
391      * response have been received, or timeout msecs have elapsed (whichever is first).<p>
392      * If a subgroup coordinator rejects the MERGE_REQ (e.g. because of participation in a different merge),
393      * <em>that member will be removed from coords !</em>
394      * @param coords A list of Addresses of subgroup coordinators (inluding myself)
395      * @param timeout Max number of msecs to wait for the merge responses from the subgroup coords
396      */

397     void getMergeDataFromSubgroupCoordinators(Vector JavaDoc coords, long timeout) {
398         Message msg;
399         GMS.GmsHeader hdr;
400         Address coord;
401         long curr_time, time_to_wait=0, end_time;
402         int num_rsps_expected=0;
403
404         if(coords == null || coords.size() <= 1) {
405             if(log.isErrorEnabled()) log.error("coords == null or size <= 1");
406             return;
407         }
408
409         synchronized(merge_rsps) {
410             merge_rsps.removeAllElements();
411
412             if(log.isDebugEnabled()) log.debug("sending MERGE_REQ to " + coords);
413             for(int i=0; i < coords.size(); i++) {
414                 coord=(Address)coords.elementAt(i);
415
416                 if(gms.local_addr != null && gms.local_addr.equals(coord)) {
417                     merge_rsps.add(getMergeResponse(gms.local_addr, merge_id));
418                     continue;
419                 }
420
421                 msg=new Message(coord, null, null);
422                 hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_REQ);
423                 hdr.mbr=gms.local_addr;
424                 hdr.merge_id=merge_id;
425                 msg.putHeader(gms.getName(), hdr);
426                 gms.passDown(new Event(Event.MSG, msg));
427             }
428
429             // wait until num_rsps_expected >= num_rsps or timeout elapsed
430
num_rsps_expected=coords.size();
431             curr_time=System.currentTimeMillis();
432             end_time=curr_time + timeout;
433             while(end_time > curr_time) {
434                 time_to_wait=end_time - curr_time;
435                 if(log.isDebugEnabled()) log.debug("waiting " + time_to_wait + " msecs for merge responses");
436                 if(merge_rsps.size() < num_rsps_expected) {
437                     try {
438                         merge_rsps.wait(time_to_wait);
439                     }
440                     catch(Exception JavaDoc ex) {
441                     }
442                 }
443                 if(log.isDebugEnabled())
444                     log.debug("num_rsps_expected=" + num_rsps_expected + ", actual responses=" + merge_rsps.size());
445
446                 if(merge_rsps.size() >= num_rsps_expected)
447                     break;
448                 curr_time=System.currentTimeMillis();
449             }
450         }
451     }
452
453     /**
454      * Generates a unique merge id by taking the local address and the current time
455      */

456     Serializable JavaDoc generateMergeId() {
457         return new ViewId(gms.local_addr, System.currentTimeMillis());
458         // we're (ab)using ViewId as a merge id
459
}
460
461     /**
462      * Merge all MergeData. All MergeData elements should be disjunct (both views and digests). However,
463      * this method is prepared to resolve duplicate entries (for the same member). Resolution strategy for
464      * views is to merge only 1 of the duplicate members. Resolution strategy for digests is to take the higher
465      * seqnos for duplicate digests.<p>
466      * After merging all members into a Membership and subsequent sorting, the first member of the sorted membership
467      * will be the new coordinator.
468      * @param v A list of MergeData items. Elements with merge_rejected=true were removed before. Is guaranteed
469      * not to be null and to contain at least 1 member.
470      */

471     MergeData consolidateMergeData(Vector JavaDoc v) {
472         MergeData ret=null;
473         MergeData tmp_data;
474         long logical_time=0; // for new_vid
475
ViewId new_vid, tmp_vid;
476         MergeView new_view;
477         View tmp_view;
478         Membership new_mbrs=new Membership();
479         int num_mbrs=0;
480         Digest new_digest=null;
481         Address new_coord;
482         Vector JavaDoc subgroups=new Vector JavaDoc(11);
483         // contains a list of Views, each View is a subgroup
484

485         for(int i=0; i < v.size(); i++) {
486             tmp_data=(MergeData)v.elementAt(i);
487             if(log.isDebugEnabled()) log.debug("merge data is " + tmp_data);
488             tmp_view=tmp_data.getView();
489             if(tmp_view != null) {
490                 tmp_vid=tmp_view.getVid();
491                 if(tmp_vid != null) {
492                     // compute the new view id (max of all vids +1)
493
logical_time=Math.max(logical_time, tmp_vid.getId());
494                 }
495                 // merge all membership lists into one (prevent duplicates)
496
new_mbrs.add(tmp_view.getMembers());
497                 subgroups.addElement(tmp_view.clone());
498             }
499         }
500
501         // the new coordinator is the first member of the consolidated & sorted membership list
502
new_mbrs.sort();
503         num_mbrs=new_mbrs.size();
504         new_coord=num_mbrs > 0? (Address)new_mbrs.elementAt(0) : null;
505         if(new_coord == null) {
506             if(log.isErrorEnabled()) log.error("new_coord == null");
507             return null;
508         }
509         // should be the highest view ID seen up to now plus 1
510
new_vid=new ViewId(new_coord, logical_time + 1);
511
512         // determine the new view
513
new_view=new MergeView(new_vid, new_mbrs.getMembers(), subgroups);
514         if(log.isDebugEnabled()) log.debug("new merged view will be " + new_view);
515
516         // determine the new digest
517
new_digest=consolidateDigests(v, num_mbrs);
518         if(new_digest == null) {
519             if(log.isErrorEnabled()) log.error("digest could not be consolidated");
520             return null;
521         }
522         if(log.isDebugEnabled()) log.debug("consolidated digest=" + new_digest);
523         ret=new MergeData(gms.local_addr, new_view, new_digest);
524         return ret;
525     }
526
527     /**
528      * Merge all digests into one. For each sender, the new value is min(low_seqno), max(high_seqno),
529      * max(high_seqno_seen)
530      */

531     Digest consolidateDigests(Vector JavaDoc v, int num_mbrs) {
532         MergeData data;
533         Digest tmp_digest, retval=new Digest(num_mbrs);
534
535         for(int i=0; i < v.size(); i++) {
536             data=(MergeData)v.elementAt(i);
537             tmp_digest=data.getDigest();
538             if(tmp_digest == null) {
539                 if(log.isErrorEnabled()) log.error("tmp_digest == null; skipping");
540                 continue;
541             }
542             retval.merge(tmp_digest);
543         }
544         return retval;
545     }
546
547     /**
548      * Sends the new view and digest to all subgroup coordinors in coords. Each coord will in turn
549      * <ol>
550      * <li>cast the new view and digest to all the members of its subgroup (MergeView)
551      * <li>on reception of the view, if it is a MergeView, each member will set the digest and install
552      * the new view
553      * </ol>
554      */

555     void sendMergeView(Vector JavaDoc coords, MergeData combined_merge_data) {
556         Message msg;
557         GMS.GmsHeader hdr;
558         Address coord;
559         View v;
560         Digest d;
561
562         if(coords == null || combined_merge_data == null)
563             return;
564         v=combined_merge_data.view;
565         d=combined_merge_data.digest;
566         if(v == null || d == null) {
567             if(log.isErrorEnabled()) log.error("view or digest is null, cannot send consolidated merge view/digest");
568             return;
569         }
570
571         for(int i=0; i < coords.size(); i++) {
572             coord=(Address)coords.elementAt(i);
573             msg=new Message(coord, null, null);
574             hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW);
575             hdr.view=v;
576             hdr.my_digest=d;
577             hdr.merge_id=merge_id;
578             msg.putHeader(gms.getName(), hdr);
579             gms.passDown(new Event(Event.MSG, msg));
580         }
581     }
582
583     /**
584      * Send back a response containing view and digest to sender
585      */

586     void sendMergeResponse(Address sender, View view, Digest digest) {
587         Message msg=new Message(sender, null, null);
588         GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP);
589         hdr.merge_id=merge_id;
590         hdr.view=view;
591         hdr.my_digest=digest;
592         msg.putHeader(gms.getName(), hdr);
593         if(log.isDebugEnabled()) log.debug("response=" + hdr);
594         gms.passDown(new Event(Event.MSG, msg));
595     }
596
597     void sendMergeRejectedResponse(Address sender) {
598         Message msg=new Message(sender, null, null);
599         GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP);
600         hdr.merge_rejected=true;
601         hdr.merge_id=merge_id;
602         msg.putHeader(gms.getName(), hdr);
603         if(log.isDebugEnabled()) log.debug("response=" + hdr);
604         gms.passDown(new Event(Event.MSG, msg));
605     }
606
607     void sendMergeCancelledMessage(Vector JavaDoc coords, Serializable JavaDoc merge_id) {
608         Message msg;
609         GMS.GmsHeader hdr;
610         Address coord;
611
612         if(coords == null || merge_id == null) {
613             if(log.isErrorEnabled()) log.error("coords or merge_id == null");
614             return;
615         }
616         for(int i=0; i < coords.size(); i++) {
617             coord=(Address)coords.elementAt(i);
618             msg=new Message(coord, null, null);
619             hdr=new GMS.GmsHeader(GMS.GmsHeader.CANCEL_MERGE);
620             hdr.merge_id=merge_id;
621             msg.putHeader(gms.getName(), hdr);
622             gms.passDown(new Event(Event.MSG, msg));
623         }
624     }
625
626     /** Removed rejected merge requests from merge_rsps and coords */
627     void removeRejectedMergeRequests(Vector JavaDoc coords) {
628         MergeData data;
629         for(Iterator JavaDoc it=merge_rsps.iterator(); it.hasNext();) {
630             data=(MergeData)it.next();
631             if(data.merge_rejected) {
632                 if(data.getSender() != null && coords != null)
633                     coords.removeElement(data.getSender());
634                 it.remove();
635                 if(log.isDebugEnabled()) log.debug("removed element " + data);
636             }
637         }
638     }
639
640     /* --------------------------------------- End of Private methods ------------------------------------- */
641
642     /**
643      * Starts the merge protocol (only run by the merge leader). Essentially sends a MERGE_REQ to all
644      * coordinators of all subgroups found. Each coord receives its digest and view and returns it.
645      * The leader then computes the digest and view for the new group from the return values. Finally, it
646      * sends this merged view/digest to all subgroup coordinators; each coordinator will install it in their
647      * subgroup.
648      */

649     private class MergeTask implements Runnable JavaDoc {
650         Thread JavaDoc t=null;
651         Vector JavaDoc coords=null; // list of subgroup coordinators to be contacted
652

653         public void start(Vector JavaDoc coords) {
654             if(t == null || !t.isAlive()) {
655                 this.coords=(Vector JavaDoc)(coords != null? coords.clone() : null);
656                 t=new Thread JavaDoc(this, "MergeTask thread");
657                 t.setDaemon(true);
658                 t.start();
659             }
660         }
661
662         public void stop() {
663             Thread JavaDoc tmp=t;
664             if(isRunning()) {
665                 t=null;
666                 tmp.interrupt();
667             }
668             t=null;
669             coords=null;
670         }
671
672         public boolean isRunning() {
673             return t != null && t.isAlive();
674         }
675
676         /**
677          * Runs the merge protocol as a leader
678          */

679         public void run() {
680             MergeData combined_merge_data=null;
681
682             if(merging == true) {
683                 if(log.isWarnEnabled()) log.warn("merge is already in progress, terminating");
684                 return;
685             }
686
687             if(log.isDebugEnabled()) log.debug("merge task started");
688             try {
689
690                 /* 1. Generate a merge_id that uniquely identifies the merge in progress */
691                 setMergeId(generateMergeId());
692
693                 /* 2. Fetch the current Views/Digests from all subgroup coordinators */
694                 getMergeDataFromSubgroupCoordinators(coords, gms.merge_timeout);
695
696                 /* 3. Remove rejected MergeData elements from merge_rsp and coords (so we'll send the new view only
697                    to members who accepted the merge request) */

698                 removeRejectedMergeRequests(coords);
699
700                 if(merge_rsps.size() <= 1) {
701                     if(log.isWarnEnabled())
702                         log.warn("merge responses from subgroup coordinators <= 1 (" + merge_rsps + "). Cancelling merge");
703                     sendMergeCancelledMessage(coords, merge_id);
704                     return;
705                 }
706
707                 /* 4. Combine all views and digests into 1 View/1 Digest */
708                 combined_merge_data=consolidateMergeData(merge_rsps);
709                 if(combined_merge_data == null) {
710                     if(log.isErrorEnabled()) log.error("combined_merge_data == null");
711                     sendMergeCancelledMessage(coords, merge_id);
712                     return;
713                 }
714
715                 /* 5. Send the new View/Digest to all coordinators (including myself). On reception, they will
716                    install the digest and view in all of their subgroup members */

717                 sendMergeView(coords, combined_merge_data);
718             }
719             catch(Throwable JavaDoc ex) {
720                 if(log.isErrorEnabled()) log.error("exception=" + ex);
721             }
722             finally {
723                 merging=false;
724                 merge_leader=null;
725                 if(log.isDebugEnabled()) log.debug("merge task terminated");
726                 t=null;
727             }
728         }
729     }
730
731
732     private class MergeCanceller implements TimeScheduler.Task {
733         private Object JavaDoc my_merge_id=null;
734         private long timeout;
735         private boolean cancelled=false;
736
737         public MergeCanceller(Object JavaDoc my_merge_id, long timeout) {
738             this.my_merge_id=my_merge_id;
739             this.timeout=timeout;
740         }
741
742         public boolean cancelled() {
743             return cancelled;
744         }
745
746         public void cancel() {
747             cancelled=true;
748         }
749
750         public long nextInterval() {
751             return timeout;
752         }
753
754         public void run() {
755             if(merge_id != null && my_merge_id.equals(merge_id)) {
756                 if(log.isTraceEnabled())
757                     log.trace("cancelling merge due to timer timeout (" + timeout + " ms)");
758                 cancelMerge();
759                 cancelled=true;
760             }
761             else {
762                 if(log.isTraceEnabled())
763                     log.trace("timer kicked in after " + timeout + " ms, but no (or different) merge was in progress: " +
764                               "merge_id=" + merge_id + ", my_merge_id=" + my_merge_id);
765             }
766         }
767     }
768
769 }
770
Popular Tags