1 2 package org.jgroups.protocols; 3 4 import org.jgroups.*; 5 import org.jgroups.stack.Protocol; 6 import org.jgroups.util.Streamable; 7 import org.jgroups.util.Util; 8 9 import java.io.*; 10 import java.util.*; 11 import java.util.concurrent.ConcurrentHashMap ; 12 import java.util.concurrent.ConcurrentMap ; 13 14 15 20 public class SEQUENCER extends Protocol { 21 private Address local_addr=null, coord=null; 22 static final String name="SEQUENCER"; 23 private boolean is_coord=false; 24 private long seqno=0; 25 26 27 private final Map <Long ,Message> forward_table=new TreeMap<Long ,Message>(); 28 29 30 private final ConcurrentMap <Address,Long > received_table=new ConcurrentHashMap <Address,Long >(); 31 32 private long forwarded_msgs=0; 33 private long bcast_msgs=0; 34 private long received_forwards=0; 35 private long received_bcasts=0; 36 37 public boolean isCoordinator() {return is_coord;} 38 public Address getCoordinator() {return coord;} 39 public Address getLocalAddress() {return local_addr;} 40 public String getName() {return name;} 41 public long getForwarded() {return forwarded_msgs;} 42 public long getBroadcast() {return bcast_msgs;} 43 public long getReceivedForwards() {return received_forwards;} 44 public long getReceivedBroadcasts() {return received_bcasts;} 45 46 public void resetStats() { 47 forwarded_msgs=bcast_msgs=received_forwards=received_bcasts=0L; 48 } 49 50 public Map <String ,Object > dumpStats() { 51 Map <String ,Object > m=super.dumpStats(); 52 if(m == null) 53 m=new HashMap <String ,Object >(); 54 m.put("forwarded", new Long (forwarded_msgs)); 55 m.put("broadcast", new Long (bcast_msgs)); 56 m.put("received_forwards", new Long (received_forwards)); 57 m.put("received_bcasts", new Long (received_bcasts)); 58 return m; 59 } 60 61 public String printStats() { 62 return dumpStats().toString(); 63 } 64 65 66 public boolean setProperties(Properties props) { 67 super.setProperties(props); 68 69 if(!props.isEmpty()) { 70 log.error("the following properties are not recognized: " + props); 71 return false; 72 } 73 return true; 74 } 75 76 private final long nextSeqno() { 77 synchronized(this) { 78 return seqno++; 79 } 80 } 81 82 83 public Object down(Event evt) { 84 switch(evt.getType()) { 85 case Event.MSG: 86 Message msg=(Message)evt.getArg(); 87 Address dest=msg.getDest(); 88 if(dest == null || dest.isMulticastAddress()) { long next_seqno=nextSeqno(); 90 SequencerHeader hdr=new SequencerHeader(SequencerHeader.FORWARD, local_addr, next_seqno); 91 msg.putHeader(name, hdr); 92 if(!is_coord) { 93 forwardToCoord(msg, next_seqno); 94 } 95 else { 96 broadcast(msg); 97 } 98 return null; } 100 break; 101 102 case Event.VIEW_CHANGE: 103 handleViewChange((View)evt.getArg()); 104 break; 105 } 106 return down_prot.down(evt); 107 } 108 109 110 111 112 public Object up(Event evt) { 113 Message msg; 114 SequencerHeader hdr; 115 116 switch(evt.getType()) { 117 118 case Event.SET_LOCAL_ADDRESS: 119 local_addr=(Address)evt.getArg(); 120 break; 121 122 case Event.MSG: 123 msg=(Message)evt.getArg(); 124 hdr=(SequencerHeader)msg.getHeader(name); 125 if(hdr == null) 126 break; 128 switch(hdr.type) { 129 case SequencerHeader.FORWARD: 130 if(!is_coord) { 131 if(log.isErrorEnabled()) 132 log.warn("I (" + local_addr + ") am not the coord and don't handle " + 133 "FORWARD requests, ignoring request"); 134 return null; 135 } 136 broadcast(msg); 137 received_forwards++; 138 return null; 139 case SequencerHeader.BCAST: 140 deliver(msg, hdr); received_bcasts++; 142 return null; 143 } 144 break; 145 146 case Event.VIEW_CHANGE: 147 handleViewChange((View)evt.getArg()); 148 break; 149 } 150 151 return up_prot.up(evt); 152 } 153 154 155 156 157 158 private void handleViewChange(View v) { 159 Vector members=v.getMembers(); 160 if(members.isEmpty()) return; 161 162 Address prev_coord=coord; 163 coord=(Address)members.firstElement(); 164 is_coord=local_addr != null && local_addr.equals(coord); 165 166 boolean coord_changed=prev_coord != null && !prev_coord.equals(coord); 167 if(coord_changed) { 168 resendMessagesInForwardTable(); } 170 int size=received_table.size(); 172 Set<Address> keys=received_table.keySet(); 173 keys.retainAll(members); 174 if(keys.size() != size) { 175 if(log.isTraceEnabled()) 176 log.trace("adjusted received_table, keys are " + keys); 177 } 178 } 179 180 187 private void resendMessagesInForwardTable() { 188 Map <Long ,Message> copy; 189 synchronized(forward_table) { 190 copy=new TreeMap<Long ,Message>(forward_table); 191 } 192 for(Message msg: copy.values()) { 193 msg.setDest(coord); 194 down_prot.down(new Event(Event.MSG, msg)); 195 } 196 } 197 198 199 private void forwardToCoord(Message msg, long seqno) { 200 msg.setDest(coord); synchronized(forward_table) { 202 forward_table.put(new Long (seqno), msg); 203 } 204 down_prot.down(new Event(Event.MSG, msg)); 205 forwarded_msgs++; 206 } 207 208 private void broadcast(Message msg) { 209 SequencerHeader hdr=(SequencerHeader)msg.getHeader(name); 210 hdr.type=SequencerHeader.BCAST; msg.setDest(null); msg.setSrc(local_addr); down_prot.down(new Event(Event.MSG, msg)); 214 bcast_msgs++; 215 } 216 217 223 private void deliver(Message msg, SequencerHeader hdr) { 224 Address original_sender=hdr.getOriginalSender(); 225 if(original_sender == null) { 226 if(log.isErrorEnabled()) 227 log.error("original sender is null, cannot swap sender address back to original sender"); 228 return; 229 } 230 long msg_seqno=hdr.getSeqno(); 231 232 if(original_sender.equals(local_addr)) { 234 synchronized(forward_table) { 235 forward_table.remove(new Long (msg_seqno)); 236 } 237 } 238 239 Long highest_seqno_seen=received_table.get(original_sender); 241 if(highest_seqno_seen != null) { 242 if(highest_seqno_seen.longValue() >= msg_seqno) { 243 if(log.isWarnEnabled()) 244 log.warn("message seqno (" + original_sender + "::" + msg_seqno + " has already " + 245 "been received (highest received=" + highest_seqno_seen + "); discarding duplicate message"); 246 return; 247 } 248 } 249 received_table.put(original_sender, new Long (msg_seqno)); 251 252 Message tmp=msg.copy(true); 254 tmp.setSrc(original_sender); 255 up_prot.up(new Event(Event.MSG, tmp)); 256 } 257 258 259 260 261 262 263 264 public static class SequencerHeader extends Header implements Streamable { 265 static final byte FORWARD = 1; 266 static final byte BCAST = 2; 267 268 byte type=-1; 269 270 ViewId tag=null; 271 272 273 public SequencerHeader() { 274 } 275 276 public SequencerHeader(byte type, Address original_sender, long seqno) { 277 this.type=type; 278 this.tag=new ViewId(original_sender, seqno); 279 } 280 281 public Address getOriginalSender() { 282 return tag != null? tag.getCoordAddress() : null; 283 } 284 285 public long getSeqno() { 286 return tag != null? tag.getId() : -1; 287 } 288 289 public String toString() { 290 StringBuilder sb=new StringBuilder (64); 291 sb.append(printType()); 292 if(tag != null) 293 sb.append(" (tag=").append(tag).append(")"); 294 return sb.toString(); 295 } 296 297 private final String printType() { 298 switch(type) { 299 case FORWARD: return "FORWARD"; 300 case BCAST: return "BCAST"; 301 default: return "n/a"; 302 } 303 } 304 305 public void writeExternal(ObjectOutput out) throws IOException { 306 out.writeByte(type); 307 out.writeObject(tag); 308 } 309 310 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 311 type=in.readByte(); 312 tag=(ViewId)in.readObject(); 313 } 314 315 public void writeTo(DataOutputStream out) throws IOException { 316 out.writeByte(type); 317 Util.writeStreamable(tag, out); 318 } 319 320 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 321 type=in.readByte(); 322 tag=(ViewId)Util.readStreamable(ViewId.class, in); 323 } 324 325 public int size() { 326 int size=Global.BYTE_SIZE *2; if(tag != null) 328 size+=tag.serializedSize(); 329 return size; 330 } 331 332 } 333 334 335 } | Popular Tags |