1 3 package org.jgroups; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.conf.ClassConfigurator; 9 import org.jgroups.util.ContextObjectInputStream; 10 import org.jgroups.util.Marshaller; 11 import org.jgroups.util.Streamable; 12 import org.jgroups.util.Util; 13 import org.jgroups.stack.IpAddress; 14 15 import java.io.*; 16 import java.util.HashMap ; 17 import java.util.HashSet ; 18 import java.util.Iterator ; 19 import java.util.Map ; 20 21 22 31 public class Message implements Externalizable, Streamable { 32 protected Address dest_addr=null; 33 protected Address src_addr=null; 34 35 36 private byte[] buf=null; 37 38 39 protected transient int offset=0; 40 41 42 protected transient int length=0; 43 44 45 protected HashMap headers=null; 46 47 protected static final Log log=LogFactory.getLog(Message.class); 48 49 static final long serialVersionUID=-1137364035832847034L; 50 51 static final byte DEST_SET=1; 52 static final byte SRC_SET=2; 53 static final byte BUF_SET=4; 54 static final byte HDRS_SET=8; 55 static final byte IPADDR_DEST=16; 56 static final byte IPADDR_SRC=32; 57 static final byte SRC_HOST_NULL=64; 58 59 static final HashSet nonStreamableHeaders=new HashSet (); 61 62 63 75 public Message(Address dest, Address src, byte[] buf) { 76 dest_addr=dest; 77 src_addr=src; 78 setBuffer(buf); 79 } 80 81 98 public Message(Address dest, Address src, byte[] buf, int offset, int length) { 99 dest_addr=dest; 100 src_addr=src; 101 setBuffer(buf, offset, length); 102 } 103 104 105 118 public Message(Address dest, Address src, Serializable obj) { 119 dest_addr=dest; 120 src_addr=src; 121 setObject(obj); 122 } 123 124 125 126 public Message() { 127 } 129 public Address getDest() { 130 return dest_addr; 131 } 132 133 public void setDest(Address new_dest) { 134 dest_addr=new_dest; 135 } 136 137 public Address getSrc() { 138 return src_addr; 139 } 140 141 public void setSrc(Address new_src) { 142 src_addr=new_src; 143 } 144 145 151 public byte[] getRawBuffer() { 152 return buf; 153 } 154 155 159 public byte[] getBuffer() { 160 if(buf == null) 161 return null; 162 if(offset == 0 && length == buf.length) 163 return buf; 164 else { 165 byte[] retval=new byte[length]; 166 System.arraycopy(buf, offset, retval, 0, length); 167 return retval; 168 } 169 } 170 171 public void setBuffer(byte[] b) { 172 buf=b; 173 if(buf != null) { 174 offset=0; 175 length=buf.length; 176 } 177 else { 178 offset=length=0; 179 } 180 } 181 182 188 public void setBuffer(byte[] b, int offset, int length) { 189 buf=b; 190 if(buf != null) { 191 if(offset < 0 || offset > buf.length) 192 throw new ArrayIndexOutOfBoundsException (offset); 193 if((offset + length) > buf.length) 194 throw new ArrayIndexOutOfBoundsException ((offset+length)); 195 this.offset=offset; 196 this.length=length; 197 } 198 else { 199 offset=length=0; 200 } 201 } 202 203 204 public int getOffset() { 205 return offset; 206 } 207 208 209 public int getLength() { 210 return length; 211 } 212 213 public Map getHeaders() { 214 return headers; 215 } 216 217 public void setObject(Serializable obj) { 218 if(obj == null) return; 219 try { 220 ByteArrayOutputStream out_stream=new ByteArrayOutputStream(); 221 ObjectOutputStream out=new ObjectOutputStream(out_stream); 222 out.writeObject(obj); 223 setBuffer(out_stream.toByteArray()); 224 } 225 catch(IOException ex) { 226 throw new IllegalArgumentException (ex.toString()); 227 } 228 } 229 230 public Object getObject() { 231 if(buf == null) return null; 232 try { 233 ByteArrayInputStream in_stream=new ByteArrayInputStream(buf, offset, length); 234 ObjectInputStream in=new ContextObjectInputStream(in_stream); return in.readObject(); 237 } 238 catch(Exception ex) { 239 throw new IllegalArgumentException (ex.toString()); 240 } 241 } 242 243 244 248 public void reset() { 249 dest_addr=src_addr=null; 250 setBuffer(null); 251 if(headers != null) 252 headers.clear(); 253 } 254 255 256 257 258 public void putHeader(String key, Header hdr) { 259 headers().put(key, hdr); 260 } 261 262 public Header removeHeader(String key) { 263 return headers != null ? (Header)headers.remove(key) : null; 264 } 265 266 public void removeHeaders() { 267 if(headers != null) 268 headers.clear(); 269 } 270 271 public Header getHeader(String key) { 272 return headers != null ? (Header)headers.get(key) : null; 273 } 274 275 276 277 public Message copy() { 278 return copy(true); 279 } 280 281 287 public Message copy(boolean copy_buffer) { 288 Message retval=new Message(); 289 retval.dest_addr=dest_addr; 290 retval.src_addr=src_addr; 291 292 if(copy_buffer && buf != null) { 293 294 retval.setBuffer(buf, offset, length); 296 297 298 308 } 309 310 if(headers != null) 311 retval.headers=(HashMap )headers.clone(); 312 return retval; 313 } 314 315 316 protected Object clone() throws CloneNotSupportedException { 317 return copy(); 318 } 319 320 public Message makeReply() { 321 return new Message(src_addr, null, null); 322 } 323 324 325 public String toString() { 326 StringBuffer ret=new StringBuffer (64); 327 ret.append("[dst: "); 328 if(dest_addr == null) 329 ret.append("<null>"); 330 else 331 ret.append(dest_addr); 332 ret.append(", src: "); 333 if(src_addr == null) 334 ret.append("<null>"); 335 else 336 ret.append(src_addr); 337 338 if(headers != null && headers.size() > 0) 339 ret.append(" (" + headers.size() + " headers)"); 340 341 ret.append(", size = "); 342 if(buf != null && length > 0) 343 ret.append(length); 344 else 345 ret.append('0'); 346 ret.append(" bytes"); 347 ret.append(']'); 348 return ret.toString(); 349 } 350 351 352 353 public String toStringAsObject() { 354 Object obj; 355 356 if(buf == null) return null; 357 try { 358 obj=getObject(); 359 return obj != null ? obj.toString() : ""; 360 } 361 catch(Exception e) { return ""; 363 } 364 } 365 366 367 375 public long size() { 376 long retval=Global.BYTE_SIZE + length + (buf != null? Global.INT_SIZE : 0); 380 if(dest_addr != null) 381 retval+=dest_addr.size(); 382 if(src_addr != null) 383 retval+=(src_addr).size(); 384 385 if(headers != null) { 386 Map.Entry entry; 387 String key; 388 Header hdr; 389 retval+=Global.INT_SIZE; for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { 391 entry=(Map.Entry )it.next(); 392 key=(String )entry.getKey(); 393 retval+=key.length() +2; hdr=(Header)entry.getValue(); 395 retval+=5; retval+=hdr.size(); 397 } 398 } 399 return retval; 400 } 401 402 403 public String printObjectHeaders() { 404 StringBuffer sb=new StringBuffer (); 405 Map.Entry entry; 406 407 if(headers != null) { 408 for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { 409 entry=(Map.Entry )it.next(); 410 sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n'); 411 } 412 } 413 return sb.toString(); 414 } 415 416 417 418 419 420 public void writeExternal(ObjectOutput out) throws IOException { 421 int len; 422 Externalizable hdr; 423 Map.Entry entry; 424 425 if(dest_addr != null) { 426 out.writeBoolean(true); 427 Marshaller.write(dest_addr, out); 428 } 429 else { 430 out.writeBoolean(false); 431 } 432 433 if(src_addr != null) { 434 out.writeBoolean(true); 435 Marshaller.write(src_addr, out); 436 } 437 else { 438 out.writeBoolean(false); 439 } 440 441 if(buf == null) 442 out.writeInt(0); 443 else { 444 out.writeInt(length); 445 out.write(buf, offset, length); 446 } 447 448 if(headers == null) 449 out.writeInt(0); 450 else { 451 len=headers.size(); 452 out.writeInt(len); 453 for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { 454 entry=(Map.Entry )it.next(); 455 out.writeUTF((String )entry.getKey()); 456 hdr=(Externalizable)entry.getValue(); 457 Marshaller.write(hdr, out); 458 } 459 } 460 } 461 462 463 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { 464 int len; 465 boolean destAddressExist=in.readBoolean(); 466 boolean srcAddressExist; 467 Object key, value; 468 469 if(destAddressExist) { 470 dest_addr=(Address)Marshaller.read(in); 471 } 472 473 srcAddressExist=in.readBoolean(); 474 if(srcAddressExist) { 475 src_addr=(Address)Marshaller.read(in); 476 } 477 478 int i=in.readInt(); 479 if(i != 0) { 480 buf=new byte[i]; 481 in.readFully(buf); 482 offset=0; 483 length=buf.length; 484 } 485 486 len=in.readInt(); 487 if(len > 0) headers=new HashMap (11); 488 while(len-- > 0) { 489 key=in.readUTF(); 490 value=Marshaller.read(in); 491 headers.put(key, value); 492 } 493 } 494 495 496 497 498 499 500 505 public void writeTo(DataOutputStream out) throws IOException { 506 Map.Entry entry; 507 508 byte leading=0; 509 if(dest_addr != null) { 510 leading+=DEST_SET; 511 if(dest_addr instanceof IpAddress) 512 leading+=IPADDR_DEST; 513 } 514 if(src_addr != null) { 515 leading+=SRC_SET; 516 if(src_addr instanceof IpAddress) { 517 leading+=IPADDR_SRC; 518 if(((IpAddress)src_addr).getIpAddress() == null) { 519 leading+=SRC_HOST_NULL; 520 } 521 } 522 } 523 if(buf != null) 524 leading+=BUF_SET; 525 if(headers != null && headers.size() > 0) 526 leading+=HDRS_SET; 527 528 out.write(leading); 530 531 if(dest_addr != null) { 533 if(dest_addr instanceof IpAddress) 534 dest_addr.writeTo(out); 535 else 536 Util.writeAddress(dest_addr, out); 537 } 538 539 if(src_addr != null) { 541 if(src_addr instanceof IpAddress) { 542 src_addr.writeTo(out); 543 } 560 else { 561 Util.writeAddress(src_addr, out); 562 } 563 } 564 565 if(buf != null) { 567 out.writeInt(length); 568 out.write(buf, offset, length); 569 } 570 571 if(headers != null && headers.size() > 0) { 573 out.writeInt(headers.size()); 574 for(Iterator it=headers.entrySet().iterator(); it.hasNext();) { 575 entry=(Map.Entry )it.next(); 576 out.writeUTF((String )entry.getKey()); 577 writeHeader((Header)entry.getValue(), out); 578 } 579 } 580 } 581 582 583 public void readFrom(DataInputStream in) throws IOException, IllegalAccessException , InstantiationException { 584 int len, leading; 585 String hdr_name; 586 Header hdr; 587 588 589 leading=in.readByte(); 591 592 if((leading & DEST_SET) == DEST_SET) { 594 if((leading & IPADDR_DEST) == IPADDR_DEST) { 595 dest_addr=new IpAddress(); 596 dest_addr.readFrom(in); 597 } 598 else { 599 dest_addr=Util.readAddress(in); 600 } 601 } 602 603 if((leading & SRC_SET) == SRC_SET) { 605 if((leading & IPADDR_SRC) == IPADDR_SRC) { 606 src_addr=new IpAddress(); 607 src_addr.readFrom(in); 608 609 624 625 } 626 else { 627 src_addr=Util.readAddress(in); 628 } 629 } 630 631 if((leading & BUF_SET) == BUF_SET) { 633 len=in.readInt(); 634 buf=new byte[len]; 635 in.read(buf, 0, len); 636 length=len; 637 } 638 639 if((leading & HDRS_SET) == HDRS_SET) { 641 len=in.readInt(); 642 headers(len); 643 for(int i=0; i < len; i++) { 644 hdr_name=in.readUTF(); 645 hdr=readHeader(in); 646 headers.put(hdr_name, hdr); 647 } 648 } 649 } 650 651 652 653 654 655 656 657 658 659 HashMap headers() { 660 return headers != null ? headers : (headers=new HashMap (11)); 661 } 662 663 664 HashMap headers(int len) { 665 return headers != null ? headers : (headers=new HashMap (len)); 666 } 667 668 private void writeHeader(Header value, DataOutputStream out) throws IOException { 669 int magic_number; 670 String classname; 671 ObjectOutputStream oos=null; 672 try { 673 magic_number=ClassConfigurator.getInstance(false).getMagicNumber(value.getClass()); 674 if(magic_number == -1) { 676 out.writeBoolean(false); 677 classname=value.getClass().getName(); 678 out.writeUTF(classname); 679 } 680 else { 681 out.writeBoolean(true); 682 out.writeInt(magic_number); 683 } 684 685 if(value instanceof Streamable) { 687 ((Streamable)value).writeTo(out); 688 } 689 else { 690 oos=new ObjectOutputStream(out); 691 value.writeExternal(oos); 692 if(!nonStreamableHeaders.contains(value.getClass())) { 693 nonStreamableHeaders.add(value.getClass()); 694 if(log.isTraceEnabled()) 695 log.trace("encountered non-Streamable header: " + value.getClass()); 696 } 697 } 698 } 699 catch(ChannelException e) { 700 log.error("failed writing the header", e); 701 } 702 finally { 703 if(oos != null) 704 oos.close(); 705 } 706 } 707 708 709 private Header readHeader(DataInputStream in) throws IOException { 710 Header hdr=null; 711 boolean use_magic_number=in.readBoolean(); 712 int magic_number; 713 String classname; 714 Class clazz; 715 ObjectInputStream ois=null; 716 717 try { 718 if(use_magic_number) { 719 magic_number=in.readInt(); 720 clazz=ClassConfigurator.getInstance(false).get(magic_number); 721 } 722 else { 723 classname=in.readUTF(); 724 clazz=ClassConfigurator.getInstance(false).get(classname); 725 } 726 hdr=(Header)clazz.newInstance(); 727 if(hdr instanceof Streamable) { 728 ((Streamable)hdr).readFrom(in); 729 } 730 else { 731 ois=new ObjectInputStream(in); 732 hdr.readExternal(ois); 733 } 734 } 735 catch(Exception ex) { 736 throw new IOException("failed read header: " + ex.toString()); 737 } 738 finally { 739 if(ois != null) 740 ois.close(); 741 } 742 743 return hdr; 744 } 745 746 747 748 749 750 } 751 | Popular Tags |