KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > MERGEFAST


1 package org.jgroups.protocols;
2
3 import org.jgroups.*;
4 import org.jgroups.stack.Protocol;
5
6 import java.io.IOException JavaDoc;
7 import java.io.ObjectInput JavaDoc;
8 import java.io.ObjectOutput JavaDoc;
9 import java.util.Vector JavaDoc;
10
11 /**
12  * The coordinator attaches a small header to each (or every nth) message. If another coordinator <em>in the
13  * same group</em> sees the message, it will initiate the merge protocol immediately by sending a MERGE
14  * event up the stack.
15  * @author Bela Ban, Aug 25 2003
16  */

17 public class MERGEFAST extends Protocol {
18     Address local_addr=null;
19     boolean is_coord=false;
20     static final String JavaDoc name="MERGEFAST";
21
22     public String JavaDoc getName() {
23         return name;
24     }
25
26
27     public void down(Event evt) {
28         if(is_coord == true && evt.getType() == Event.MSG && local_addr != null) {
29             Message msg=(Message)evt.getArg();
30             Address dest=msg.getDest();
31             if(dest == null || dest.isMulticastAddress()) {
32                 msg.putHeader(getName(), new MergefastHeader(local_addr));
33             }
34         }
35
36         if(evt.getType() == Event.VIEW_CHANGE) {
37             handleViewChange((View)evt.getArg());
38         }
39
40         passDown(evt);
41     }
42
43
44
45     public void up(Event evt) {
46         switch(evt.getType()) {
47             case Event.SET_LOCAL_ADDRESS:
48                 local_addr=(Address)evt.getArg();
49                 break;
50             case Event.MSG:
51                 if(is_coord == false) // only handle message if we are coordinator
52
break;
53                 Message msg=(Message)evt.getArg();
54                 MergefastHeader hdr=(MergefastHeader)msg.removeHeader(name);
55                 passUp(evt);
56                 if(hdr != null && local_addr != null) {
57                     Address other_coord=hdr.coord;
58                     if(!local_addr.equals(other_coord)) {
59                         sendUpMerge(new Address[]{local_addr, other_coord});
60                     }
61                 }
62                 return; // event was already passed up
63
case Event.VIEW_CHANGE:
64                 handleViewChange((View)evt.getArg());
65                 break;
66         }
67         passUp(evt);
68     }
69
70
71     void handleViewChange(View v) {
72         Vector JavaDoc mbrs;
73         if(local_addr == null)
74             return;
75         mbrs=v.getMembers();
76         if(mbrs != null && mbrs.size() > 0 && local_addr.equals(mbrs.firstElement())) {
77             is_coord=true;
78         }
79         else
80             is_coord=false;
81     }
82
83     // todo: avoid sending up too many MERGE events
84
void sendUpMerge(Address[] addresses) {
85         Vector JavaDoc v=new Vector JavaDoc(11);
86         for(int i=0; i < addresses.length; i++) {
87             Address addr=addresses[i];
88             v.add(addr);
89         }
90         passUp(new Event(Event.MERGE, v));
91     }
92
93
94     public static class MergefastHeader extends Header {
95         Address coord=null;
96
97         public MergefastHeader() {
98         }
99
100         public MergefastHeader(Address coord) {
101             this.coord=coord;
102         }
103
104         public void writeExternal(ObjectOutput JavaDoc out) throws IOException JavaDoc {
105             out.writeObject(coord);
106         }
107
108         public void readExternal(ObjectInput JavaDoc in) throws IOException JavaDoc, ClassNotFoundException JavaDoc {
109             coord=(Address)in.readObject();
110         }
111
112     }
113
114 }
115
Popular Tags