1 package org.jgroups.util; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.Address; 6 import org.jgroups.Global; 7 import org.jgroups.annotations.Immutable; 8 9 import java.io.*; 10 import static java.lang.Math.max ; 11 import java.util.*; 12 import java.util.concurrent.ConcurrentHashMap ; 13 14 15 30 public class Digest implements Externalizable, Streamable { 31 32 public static final Digest EMPTY_DIGEST = new Digest(); 33 34 protected final Map <Address,Entry> senders; 35 36 protected static final Log log=LogFactory.getLog(Digest.class); 37 38 39 40 41 public Digest() { 42 senders=createSenders(7); 43 } 44 45 public Digest(int size) { 46 senders=createSenders(size); 47 } 48 49 50 public Digest(Map <Address, Entry> map) { 51 senders=createSenders(map); 52 } 53 54 55 public Digest(Digest d) { 56 this(d.senders); 57 } 58 59 60 public Digest(Address sender, long low, long highest_delivered, long highest_received) { 61 senders=createSenders(1); 62 senders.put(sender, new Entry(low, highest_delivered, highest_received)); 63 } 64 65 public Digest(Address sender, long low, long highest_delivered) { 66 senders=createSenders(1); 67 senders.put(sender, new Entry(low, highest_delivered)); 68 } 69 70 71 public Map <Address, Entry> getSenders() { 72 return Collections.unmodifiableMap(senders); 73 } 74 75 public boolean equals(Object obj) { 76 if(!(obj instanceof Digest)) 77 return false; 78 Digest other=(Digest)obj; 79 return senders.equals(other.senders); 80 } 81 82 83 public boolean contains(Address sender) { 84 return senders.containsKey(sender); 85 } 86 87 88 public Entry get(Address sender) { 89 return senders.get(sender); 90 } 91 92 93 98 public boolean sameSenders(Digest other) { 99 if(other == null) return false; 100 if(this.senders.size() != other.senders.size()) return false; 101 102 Set<Address> my_senders=senders.keySet(), other_senders=other.senders.keySet(); 103 return my_senders.equals(other_senders); 104 } 105 106 public Digest difference(Digest other) { 107 if(other == null) return copy(); 108 109 Digest result=EMPTY_DIGEST; 110 if(this.equals(other)) { 111 return result; 112 } 113 else { 114 Map <Address, Entry> resultMap=new ConcurrentHashMap <Address, Entry>(7); 116 Set<Address> intersection=new TreeSet<Address>(this.senders.keySet()); 117 intersection.retainAll(other.senders.keySet()); 118 119 for(Address address : intersection) { 120 Entry e1=this.get(address); 121 Entry e2=other.get(address); 122 if(e1.getHighestDeliveredSeqno() != e2.getHighestDeliveredSeqno()) { 123 long low=Math.min(e1.highest_delivered_seqno, e2.highest_delivered_seqno); 124 long high=max(e1.highest_delivered_seqno, e2.highest_delivered_seqno); 125 Entry r=new Entry(low, high); 126 resultMap.put(address, r); 127 } 128 } 129 130 if(intersection.size() != this.senders.keySet().size()) { 133 Set<Address> thisMinusInteresection=new TreeSet<Address>(this.senders.keySet()); 134 thisMinusInteresection.removeAll(intersection); 135 for(Address address : thisMinusInteresection) { 136 resultMap.put(address, new Entry(this.get(address))); 137 } 138 } 139 140 if(intersection.size() != other.senders.keySet().size()) { 143 Set<Address> otherMinusInteresection=new TreeSet<Address>(other.senders.keySet()); 144 otherMinusInteresection.removeAll(intersection); 145 for(Address address : otherMinusInteresection) { 146 resultMap.put(address, new Entry(other.get(address))); 147 } 148 } 149 result=new Digest(resultMap); 150 } 151 return result; 152 } 153 154 public Digest highestSequence(Digest other) { 155 if(other == null) return copy(); 156 157 Digest result=EMPTY_DIGEST; 158 if(this.equals(other)) { 159 return this; 160 } 161 else { 162 Map <Address, Entry> resultMap=new ConcurrentHashMap <Address, Entry>(7); 164 Set<Address> intersection=new TreeSet<Address>(this.senders.keySet()); 165 intersection.retainAll(other.senders.keySet()); 166 167 for(Address address : intersection) { 168 Entry e1=this.get(address); 169 Entry e2=other.get(address); 170 171 long high=max(e1.highest_delivered_seqno, e2.highest_delivered_seqno); 172 Entry r=new Entry(0, high); 173 resultMap.put(address, r); 174 } 175 176 if(intersection.size() != this.senders.keySet().size()) { 179 Set<Address> thisMinusInteresection=new TreeSet<Address>(this.senders.keySet()); 180 thisMinusInteresection.removeAll(intersection); 181 for(Address address : thisMinusInteresection) { 182 resultMap.put(address, new Entry(this.get(address))); 183 } 184 } 185 186 if(intersection.size() != other.senders.keySet().size()) { 189 Set<Address> otherMinusInteresection=new TreeSet<Address>(other.senders.keySet()); 190 otherMinusInteresection.removeAll(intersection); 191 for(Address address : otherMinusInteresection) { 192 resultMap.put(address, new Entry(other.get(address))); 193 } 194 } 195 result=new Digest(resultMap); 196 } 197 return result; 198 } 199 200 201 public int size() { 202 return senders.size(); 203 } 204 205 206 public long lowSeqnoAt(Address sender) { 207 Entry entry=senders.get(sender); 208 if(entry == null) 209 return -1; 210 else 211 return entry.low_seqno; 212 } 213 214 215 public long highestDeliveredSeqnoAt(Address sender) { 216 Entry entry=senders.get(sender); 217 if(entry == null) 218 return -1; 219 else 220 return entry.highest_delivered_seqno; 221 } 222 223 224 public long highestReceivedSeqnoAt(Address sender) { 225 Entry entry=senders.get(sender); 226 if(entry == null) 227 return -1; 228 else 229 return entry.highest_received_seqno; 230 } 231 232 233 238 public boolean isGreaterThanOrEqual(Digest other) { 239 if(other == null) 240 return true; 241 Map <Address,Entry> our_map=getSenders(); 242 Address sender; 243 Entry my_entry, their_entry; 244 long my_highest, their_highest; 245 for(Map.Entry <Address,Entry> entry: our_map.entrySet()) { 246 sender=entry.getKey(); 247 my_entry=entry.getValue(); 248 their_entry=other.get(sender); 249 if(their_entry == null) 250 continue; 251 my_highest=my_entry.getHighest(); 252 their_highest=their_entry.getHighest(); 253 if(my_highest < their_highest) 254 return false; 255 } 256 return true; 257 } 258 259 260 public Digest copy() { 261 return new Digest(senders); 262 } 263 264 265 public String toString() { 266 StringBuilder sb=new StringBuilder (); 267 boolean first=true; 268 if(senders.isEmpty()) return "[]"; 269 Map.Entry <Address,Entry> entry; 270 Address key; 271 Entry val; 272 273 for(Iterator<Map.Entry <Address,Entry>> it=senders.entrySet().iterator(); it.hasNext();) { 274 entry=it.next(); 275 key=entry.getKey(); 276 val=entry.getValue(); 277 if(!first) { 278 sb.append(", "); 279 } 280 else { 281 first=false; 282 } 283 sb.append(key).append(": ").append('[').append(val.low_seqno).append(" : "); 284 sb.append(val.highest_delivered_seqno); 285 if(val.highest_received_seqno >= 0) 286 sb.append(" (").append(val.highest_received_seqno).append(")"); 287 sb.append("]"); 288 } 289 return sb.toString(); 290 } 291 292 293 public String printHighestDeliveredSeqnos() { 294 StringBuilder sb=new StringBuilder ("["); 295 boolean first=true; 296 Map.Entry <Address,Entry> entry; 297 Address key; 298 Entry val; 299 300 TreeMap<Address,Entry> copy=new TreeMap<Address,Entry>(senders); 301 for(Iterator<Map.Entry <Address, Entry>> it=copy.entrySet().iterator(); it.hasNext();) { 302 entry=it.next(); 303 key=entry.getKey(); 304 val=entry.getValue(); 305 if(!first) { 306 sb.append(", "); 307 } 308 else { 309 first=false; 310 } 311 sb.append(key).append("#").append(val.highest_delivered_seqno); 312 } 313 sb.append(']'); 314 return sb.toString(); 315 } 316 317 318 public String printHighestReceivedSeqnos() { 319 StringBuilder sb=new StringBuilder (); 320 boolean first=true; 321 Map.Entry <Address,Entry> entry; 322 Address key; 323 Entry val; 324 325 for(Iterator<Map.Entry <Address, Entry>> it=senders.entrySet().iterator(); it.hasNext();) { 326 entry=it.next(); 327 key=entry.getKey(); 328 val=entry.getValue(); 329 if(!first) { 330 sb.append(", "); 331 } 332 else { 333 sb.append('['); 334 first=false; 335 } 336 sb.append(key).append("#").append(val.highest_received_seqno); 337 } 338 sb.append(']'); 339 return sb.toString(); 340 } 341 342 343 public void writeExternal(ObjectOutput out) throws IOException { 344 out.writeObject(senders); 345 } 346 347 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 348 Map <Address, Entry> tmp=(Map <Address, Entry>)in.readObject(); 349 senders.clear(); 350 senders.putAll(tmp); 351 } 352 353 public void writeTo(DataOutputStream out) throws IOException { 354 out.writeShort(senders.size()); 355 Map.Entry <Address,Entry> entry; 356 Address key; 357 Entry val; 358 for(Iterator<Map.Entry <Address, Entry>> it=senders.entrySet().iterator(); it.hasNext();) { 359 entry=it.next(); 360 key=entry.getKey(); 361 val=entry.getValue(); 362 Util.writeAddress(key, out); 363 out.writeLong(val.low_seqno); 364 out.writeLong(val.highest_delivered_seqno); 365 out.writeLong(val.highest_received_seqno); 366 } 367 } 368 369 370 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 371 short size=in.readShort(); 372 Map <Address,Entry> tmp=new HashMap <Address, Entry>(size); 373 Address key; 374 for(int i=0; i < size; i++) { 375 key=Util.readAddress(in); 376 tmp.put(key, new Entry(in.readLong(), in.readLong(), in.readLong())); 377 } 378 senders.clear(); 379 senders.putAll(tmp); 380 } 381 382 383 public long serializedSize() { 384 long retval=Global.SHORT_SIZE; if(!senders.isEmpty()) { 386 Address addr=senders.keySet().iterator().next(); 387 int len=addr.size() + 388 2 * Global.BYTE_SIZE; len+=Entry.SIZE; retval+=len * senders.size(); 391 } 392 return retval; 393 } 394 395 private static Map <Address, Entry> createSenders(int size) { 396 return new ConcurrentHashMap <Address,Entry>(size); 397 } 398 399 private static Map <Address, Entry> createSenders(Map <Address, Entry> map) { 400 return new ConcurrentHashMap <Address,Entry>(map); 401 } 402 403 404 405 409 @Immutable 410 public static class Entry implements Externalizable, Streamable { 411 private long low_seqno=0; 412 private long highest_delivered_seqno=0; private long highest_received_seqno=0; final static int SIZE=Global.LONG_SIZE * 3; 415 416 public Entry() { 417 } 418 419 public Entry(long low_seqno, long highest_delivered_seqno, long highest_received_seqno) { 420 this.low_seqno=low_seqno; 421 this.highest_delivered_seqno=highest_delivered_seqno; 422 this.highest_received_seqno=highest_received_seqno; 423 check(); 424 } 425 426 427 428 public Entry(long low_seqno, long highest_delivered_seqno) { 429 this.low_seqno=low_seqno; 430 this.highest_delivered_seqno=highest_delivered_seqno; 431 check(); 432 } 433 434 public Entry(Entry other) { 435 if(other != null) { 436 low_seqno=other.low_seqno; 437 highest_delivered_seqno=other.highest_delivered_seqno; 438 highest_received_seqno=other.highest_received_seqno; 439 check(); 440 } 441 } 442 443 public final long getLow() {return low_seqno;} 444 public final long getHighestDeliveredSeqno() {return highest_delivered_seqno;} 445 public final long getHighestReceivedSeqno() {return highest_received_seqno;} 446 447 448 public final long getHighest() {return max(highest_delivered_seqno, highest_received_seqno);} 449 450 public boolean equals(Object obj) { 451 if(!(obj instanceof Entry)) 452 return false; 453 Entry other=(Entry)obj; 454 return low_seqno == other.low_seqno && highest_delivered_seqno == other.highest_delivered_seqno && highest_received_seqno == other.highest_received_seqno; 455 } 456 457 public String toString() { 458 return new StringBuffer ("low=").append(low_seqno).append(", highest delivered=").append(highest_delivered_seqno). 459 append(", highest received=").append(highest_received_seqno).toString(); 460 } 461 462 public void writeExternal(ObjectOutput out) throws IOException { 463 out.writeLong(low_seqno); 464 out.writeLong(highest_delivered_seqno); 465 out.writeLong(highest_received_seqno); 466 } 467 468 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 469 low_seqno=in.readLong(); 470 highest_delivered_seqno=in.readLong(); 471 highest_received_seqno=in.readLong(); 472 } 473 474 public int size() { 475 return SIZE; 476 } 477 478 public void writeTo(DataOutputStream out) throws IOException { 479 out.writeLong(low_seqno); 480 out.writeLong(highest_delivered_seqno); 481 out.writeLong(highest_received_seqno); 482 } 483 484 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 485 low_seqno=in.readLong(); 486 highest_delivered_seqno=in.readLong(); 487 highest_received_seqno=in.readLong(); 488 } 489 490 491 private void check() { 492 if(low_seqno > highest_delivered_seqno) 493 throw new IllegalArgumentException ("low_seqno (" + low_seqno + ") is greater than highest_delivered_seqno (" + highest_delivered_seqno + ")"); 494 } 495 496 497 } 498 } 499 | Popular Tags |