1 3 package org.jgroups.protocols; 4 5 import org.jgroups.*; 6 import org.jgroups.stack.Protocol; 7 import org.jgroups.util.Streamable; 8 import org.jgroups.util.Util; 9 10 import java.io.*; 11 import java.util.Properties ; 12 import java.util.Vector ; 13 14 15 22 public class HTOTAL extends Protocol { 23 Address coord=null; 24 Address neighbor=null; Address local_addr=null; 26 Vector mbrs=new Vector (); 27 boolean is_coord=false; 28 private boolean use_multipoint_forwarding=false; 29 30 31 public static class HTotalHeader extends Header implements Streamable { 32 Address dest, src; 33 boolean forward=true; 34 35 public HTotalHeader() { 36 } 37 38 public HTotalHeader(Address dest, Address src) { 39 this.dest=dest; 40 this.src=src; 41 } 42 43 public void writeExternal(ObjectOutput out) throws IOException { 44 out.writeObject(dest); 45 out.writeObject(src); 46 out.writeBoolean(forward); 47 } 48 49 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 50 dest=(Address)in.readObject(); 51 src=(Address)in.readObject(); 52 forward=in.readBoolean(); 53 } 54 55 public void writeTo(DataOutputStream out) throws IOException { 56 Util.writeAddress(dest, out); 57 Util.writeAddress(src, out); 58 out.writeBoolean(forward); 59 } 60 61 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 62 dest=Util.readAddress(in); 63 src=Util.readAddress(in); 64 forward=in.readBoolean(); 65 } 66 67 public String toString() { 68 return "dest=" + dest + ", SRC=" + src + ", forward=" + forward; 69 } 70 } 71 72 public HTOTAL() { 73 } 74 75 public String getName() { 76 return "HTOTAL"; 77 } 78 79 public boolean setProperties(Properties props) { 80 String str; 81 82 super.setProperties(props); 83 str=props.getProperty("use_multipoint_forwarding"); 84 if(str != null) { 85 use_multipoint_forwarding=Boolean.valueOf(str).booleanValue(); 86 props.remove("use_multipoint_forwarding"); 87 } 88 89 if(props.size() > 0) { 90 System.err.println("TCP.setProperties(): the following properties are not recognized:"); 91 props.list(System.out); 92 return false; 93 } 94 return true; 95 } 96 97 public void down(Event evt) { 98 switch(evt.getType()) { 99 case Event.VIEW_CHANGE: 100 determineCoordinatorAndNextMember((View)evt.getArg()); 101 break; 102 case Event.MSG: 103 Message msg=(Message)evt.getArg(); 104 Address dest=msg.getDest(); 105 if(dest == null || dest.isMulticastAddress()) { if(coord == null) 107 log.error("coordinator is null, cannot send message to coordinator"); 108 else 109 forwardTo(coord, msg); 110 return; } 112 break; 113 } 114 passDown(evt); 115 } 116 117 public void up(Event evt) { 118 switch(evt.getType()) { 119 case Event.SET_LOCAL_ADDRESS: 120 local_addr=(Address)evt.getArg(); 121 break; 122 case Event.VIEW_CHANGE: 123 determineCoordinatorAndNextMember((View)evt.getArg()); 124 break; 125 case Event.MSG: 126 Message msg=(Message)evt.getArg(); 127 HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName()); 128 129 if(hdr == null) 130 break; 132 if(hdr.forward) { 133 Message copy=msg.copy(); 134 if(use_multipoint_forwarding) { 135 copy.setDest(null); 136 passDown(new Event(Event.MSG, copy)); 137 } 138 else { 139 if(neighbor != null) { 140 forwardTo(neighbor, copy); 141 } 142 } 143 } 144 145 msg.setDest(hdr.dest); msg.setSrc(hdr.src); 148 passUp(evt); return; 150 } 151 passUp(evt); 152 } 153 154 private void forwardTo(Address destination, Message msg) { 155 HTotalHeader hdr=(HTotalHeader)msg.getHeader(getName()); 156 157 if(hdr == null) { 158 hdr=new HTotalHeader(msg.getDest(), local_addr); 159 msg.putHeader(getName(), hdr); 160 } 161 msg.setDest(destination); 162 if(log.isTraceEnabled()) 163 log.trace("forwarding message to " + destination + ", hdr=" + hdr); 164 passDown(new Event(Event.MSG, msg)); 165 } 166 167 168 private void determineCoordinatorAndNextMember(View v) { 169 Object tmp; 170 Address retval=null; 171 172 mbrs.clear(); 173 mbrs.addAll(v.getMembers()); 174 175 coord=(Address)(mbrs != null && mbrs.size() > 0? mbrs.firstElement() : null); 176 is_coord=coord != null && local_addr != null && coord.equals(local_addr); 177 178 if(mbrs == null || mbrs.size() < 2 || local_addr == null) 179 neighbor=null; 180 else { 181 for(int i=0; i < mbrs.size(); i++) { 182 tmp=mbrs.elementAt(i); 183 if(local_addr.equals(tmp)) { 184 if(i + 1 >= mbrs.size()) 185 retval=null; else 187 retval=(Address)mbrs.elementAt(i + 1); 188 break; 189 } 190 } 191 } 192 neighbor=retval; 193 if(log.isTraceEnabled()) 194 log.trace("coord=" + coord + ", neighbor=" + neighbor); 195 } 196 197 198 } 199 | Popular Tags |