KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > protocols > HTOTAL


1 // $Id: HTOTAL.java,v 1.1 2005/04/05 14:35:09 belaban Exp $
2

3 package org.jgroups.protocols;
4
5 import org.jgroups.*;
6 import org.jgroups.stack.Protocol;
7 import org.jgroups.util.Streamable;
8 import org.jgroups.util.Util;
9
10 import java.io.*;
11 import java.util.Properties JavaDoc;
12 import java.util.Vector JavaDoc;
13
14
15 /**
16  * Implementation of UTO-TCP as designed by EPFL. Implements chaining algorithm: each sender sends the message
17  * to a coordinator who then forwards it to its neighbor on the right, who then forwards it to its neighbor to the right
18  * etc.
19  * @author Bela Ban
20  * @version $Id: HTOTAL.java,v 1.1 2005/04/05 14:35:09 belaban Exp $
21  */

22 public class HTOTAL extends Protocol {
23     Address coord=null;
24     Address neighbor=null; // to whom do we forward the message (member to the right, or null if we're at the tail)
25
Address local_addr=null;
26     Vector JavaDoc mbrs=new Vector JavaDoc();
27     boolean is_coord=false;
28     private boolean use_multipoint_forwarding=false;
29
30
31     public static class HTotalHeader extends Header implements Streamable {
32         Address dest, src;
33         boolean forward=true;
34
35         public HTotalHeader() {
36         }
37
38         public HTotalHeader(Address dest, Address src) {
39             this.dest=dest;
40             this.src=src;
41         }
42
43         public void writeExternal(ObjectOutput out) throws IOException {
44             out.writeObject(dest);
45             out.writeObject(src);
46             out.writeBoolean(forward);
47         }
48
49         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
50             dest=(Address)in.readObject();
51             src=(Address)in.readObject();
52             forward=in.readBoolean();
53         }
54
55         public void writeTo(DataOutputStream out) throws IOException {
56             Util.writeAddress(dest, out);
57             Util.writeAddress(src, out);
58             out.writeBoolean(forward);
59         }
60
61         public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
62             dest=Util.readAddress(in);
63             src=Util.readAddress(in);
64             forward=in.readBoolean();
65         }
66
67         public String JavaDoc toString() {
68             return "dest=" + dest + ", SRC=" + src + ", forward=" + forward;
69         }
70     }
71
72     public HTOTAL() {
73     }
74
75     public String JavaDoc getName() {
76         return "HTOTAL";
77     }
78
79      public boolean setProperties(Properties JavaDoc props) {
80         String JavaDoc str;
81
82         super.setProperties(props);
83         str=props.getProperty("use_multipoint_forwarding");
84         if(str != null) {
85             use_multipoint_forwarding=Boolean.valueOf(str).booleanValue();
86             props.remove("use_multipoint_forwarding");
87         }
88
89         if(props.size() > 0) {
90             System.err.println("TCP.setProperties(): the following properties are not recognized:");
91             props.list(System.out);
92             return false;
93         }
94         return true;
95     }
96
97     public void down(Event evt) {
98         switch(evt.getType()) {
99         case Event.VIEW_CHANGE:
100             determineCoordinatorAndNextMember((View)evt.getArg());
101             break;
102         case Event.MSG:
103             Message msg=(Message)evt.getArg();
104             Address dest=msg.getDest();
105             if(dest == null || dest.isMulticastAddress()) { // only process multipoint messages
106
if(coord == null)
107                     log.error("coordinator is null, cannot send message to coordinator");
108                 else
109                     forwardTo(coord, msg);
110                 return; // handled here, don't pass down by default
111
}
112             break;
113         }
114         passDown(evt);
115     }
116
117     public void up(Event evt) {
118         switch(evt.getType()) {
119         case Event.SET_LOCAL_ADDRESS:
120             local_addr=(Address)evt.getArg();
121             break;
122         case Event.VIEW_CHANGE:
123             determineCoordinatorAndNextMember((View)evt.getArg());
124             break;
125         case Event.MSG:
126             Message msg=(Message)evt.getArg();
127             HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName());
128
129             if(hdr == null)
130                 break; // probably a unicast message, just pass it up
131

132             if(hdr.forward) {
133                 Message copy=msg.copy();
134                 if(use_multipoint_forwarding) {
135                     copy.setDest(null);
136                     passDown(new Event(Event.MSG, copy));
137                 }
138                 else {
139                     if(neighbor != null) {
140                         forwardTo(neighbor, copy);
141                     }
142                 }
143             }
144
145             msg.setDest(hdr.dest); // set destination to be the original destination
146
msg.setSrc(hdr.src); // set sender to be the original sender (important for retransmission requests)
147

148             passUp(evt); // <-- we modify msg directly inside evt
149
return;
150         }
151         passUp(evt);
152     }
153
154     private void forwardTo(Address destination, Message msg) {
155         HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName());
156
157         if(hdr == null) {
158             hdr=new HTotalHeader(msg.getDest(), local_addr);
159             msg.putHeader(getName(), hdr);
160         }
161         msg.setDest(destination);
162         if(log.isTraceEnabled())
163             log.trace("forwarding message to " + destination + ", hdr=" + hdr);
164         passDown(new Event(Event.MSG, msg));
165     }
166
167
168     private void determineCoordinatorAndNextMember(View v) {
169         Object JavaDoc tmp;
170         Address retval=null;
171
172         mbrs.clear();
173         mbrs.addAll(v.getMembers());
174
175         coord=(Address)(mbrs != null && mbrs.size() > 0? mbrs.firstElement() : null);
176         is_coord=coord != null && local_addr != null && coord.equals(local_addr);
177
178         if(mbrs == null || mbrs.size() < 2 || local_addr == null)
179             neighbor=null;
180         else {
181             for(int i=0; i < mbrs.size(); i++) {
182                 tmp=mbrs.elementAt(i);
183                 if(local_addr.equals(tmp)) {
184                     if(i + 1 >= mbrs.size())
185                         retval=null; // we don't wrap, last member is null
186
else
187                         retval=(Address)mbrs.elementAt(i + 1);
188                     break;
189                 }
190             }
191         }
192         neighbor=retval;
193         if(log.isTraceEnabled())
194             log.trace("coord=" + coord + ", neighbor=" + neighbor);
195     }
196
197
198 }
199
Popular Tags