KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > Message


1 // $Id: Message.java,v 1.30 2005/04/20 14:02:08 belaban Exp $
2

3 package org.jgroups;
4
5
6 import org.apache.commons.logging.Log;
7 import org.apache.commons.logging.LogFactory;
8 import org.jgroups.conf.ClassConfigurator;
9 import org.jgroups.util.ContextObjectInputStream;
10 import org.jgroups.util.Marshaller;
11 import org.jgroups.util.Streamable;
12 import org.jgroups.util.Util;
13 import org.jgroups.stack.IpAddress;
14
15 import java.io.*;
16 import java.util.HashMap JavaDoc;
17 import java.util.HashSet JavaDoc;
18 import java.util.Iterator JavaDoc;
19 import java.util.Map JavaDoc;
20
21
22 /**
23  * A Message encapsulates data sent to members of a group. It contains among other things the
24  * address of the sender, the destination address, a payload (byte buffer) and a list of
25  * headers. Headers are added by protocols on the sender side and removed by protocols
26  * on the receiver's side.<br/>
27  * The byte buffer can point to a reference, and we can subset it using index and length. However,
28  * when the message is serialized, we only write the bytes between index and length.
29  * @author Bela Ban
30  */

31 public class Message implements Externalizable, Streamable {
32     protected Address dest_addr=null;
33     protected Address src_addr=null;
34
35     /** The payload */
36     private byte[] buf=null;
37
38     /** The index into the payload (usually 0) */
39     protected transient int offset=0;
40
41     /** The number of bytes in the buffer (usually buf.length is buf != null) */
42     protected transient int length=0;
43
44     /** HashMap<String,Header> */
45     protected HashMap JavaDoc headers=null;
46
47     protected static final Log log=LogFactory.getLog(Message.class);
48
49     static final long serialVersionUID=-1137364035832847034L;
50
51     static final byte DEST_SET=1;
52     static final byte SRC_SET=2;
53     static final byte BUF_SET=4;
54     static final byte HDRS_SET=8;
55     static final byte IPADDR_DEST=16;
56     static final byte IPADDR_SRC=32;
57     static final byte SRC_HOST_NULL=64;
58
59     static final HashSet JavaDoc nonStreamableHeaders=new HashSet JavaDoc(); // todo: remove when all headers are streamable
60

61
62
63     /** Public constructor
64      * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
65      * it is sent to the group (either to current group or to the group as given
66      * in the string). If it is a Vector, then it contains a number of addresses
67      * to which it must be sent. Otherwise, it contains a single destination.<p>
68      * Addresses are generally untyped (all are of type <em>Object</em>. A channel
69      * instance must know what types of addresses it expects and downcast
70      * accordingly.
71      * @param src Address of sender
72      * @param buf Message to be sent. Note that this buffer must not be modified (e.g. buf[0]=0 is
73      * not allowed), since we don't copy the contents on clopy() or clone().
74      */

75     public Message(Address dest, Address src, byte[] buf) {
76         dest_addr=dest;
77         src_addr=src;
78         setBuffer(buf);
79     }
80
81     /**
82      * Constructs a message. The index and length parameters allow to provide a <em>reference</em> to
83      * a byte buffer, rather than a copy, and refer to a subset of the buffer. This is important when
84      * we want to avoid copying. When the message is serialized, only the subset is serialized.
85      * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
86      * it is sent to the group (either to current group or to the group as given
87      * in the string). If it is a Vector, then it contains a number of addresses
88      * to which it must be sent. Otherwise, it contains a single destination.<p>
89      * Addresses are generally untyped (all are of type <em>Object</em>. A channel
90      * instance must know what types of addresses it expects and downcast
91      * accordingly.
92      * @param src Address of sender
93      * @param buf A reference to a byte buffer
94      * @param offset The index into the byte buffer
95      * @param length The number of bytes to be used from <tt>buf</tt>. Both index and length are checked for
96      * array index violations and an ArrayIndexOutOfBoundsException will be thrown if invalid
97      */

98     public Message(Address dest, Address src, byte[] buf, int offset, int length) {
99         dest_addr=dest;
100         src_addr=src;
101         setBuffer(buf, offset, length);
102     }
103
104
105     /** Public constructor
106      * @param dest Address of receiver. If it is <em>null</em> or a <em>string</em>, then
107      * it is sent to the group (either to current group or to the group as given
108      * in the string). If it is a Vector, then it contains a number of addresses
109      * to which it must be sent. Otherwise, it contains a single destination.<p>
110      * Addresses are generally untyped (all are of type <em>Object</em>. A channel
111      * instance must know what types of addresses it expects and downcast
112      * accordingly.
113      * @param src Address of sender
114      * @param obj The object will be serialized into the byte buffer. <em>Object
115      * has to be serializable </em>! Note that the resulting buffer must not be modified
116      * (e.g. buf[0]=0 is not allowed), since we don't copy the contents on clopy() or clone().
117      */

118     public Message(Address dest, Address src, Serializable obj) {
119         dest_addr=dest;
120         src_addr=src;
121         setObject(obj);
122     }
123
124
125     /** Only used for Externalization (creating an initial object) */
126     public Message() {
127     } // should not be called as normal constructor
128

129     public Address getDest() {
130         return dest_addr;
131     }
132
133     public void setDest(Address new_dest) {
134         dest_addr=new_dest;
135     }
136
137     public Address getSrc() {
138         return src_addr;
139     }
140
141     public void setSrc(Address new_src) {
142         src_addr=new_src;
143     }
144
145     /**
146      * Returns a <em>reference</em> to the payload (byte buffer). Note that this buffer should not be modified as
147      * we do not copy the buffer on copy() or clone(): the buffer of the copied message is simply a reference to
148      * the old buffer.<br/>
149      * Even if offset and length are used: we return the <em>entire</em> buffer, not a subset.
150      */

151     public byte[] getRawBuffer() {
152         return buf;
153     }
154
155     /**
156      * Returns a copy of the buffer if offset and length are used, otherwise a reference
157      * @return
158      */

159     public byte[] getBuffer() {
160         if(buf == null)
161             return null;
162         if(offset == 0 && length == buf.length)
163             return buf;
164         else {
165             byte[] retval=new byte[length];
166             System.arraycopy(buf, offset, retval, 0, length);
167             return retval;
168         }
169     }
170
171     public void setBuffer(byte[] b) {
172         buf=b;
173         if(buf != null) {
174             offset=0;
175             length=buf.length;
176         }
177         else {
178             offset=length=0;
179         }
180     }
181
182     /**
183      * Set the internal buffer to point to a subset of a given buffer
184      * @param b The reference to a given buffer. If null, we'll reset the buffer to null
185      * @param offset The initial position
186      * @param length The number of bytes
187      */

188     public void setBuffer(byte[] b, int offset, int length) {
189         buf=b;
190         if(buf != null) {
191             if(offset < 0 || offset > buf.length)
192                 throw new ArrayIndexOutOfBoundsException JavaDoc(offset);
193             if((offset + length) > buf.length)
194                 throw new ArrayIndexOutOfBoundsException JavaDoc((offset+length));
195             this.offset=offset;
196             this.length=length;
197         }
198         else {
199             offset=length=0;
200         }
201     }
202
203     /** Returns the offset into the buffer at which the data starts */
204     public int getOffset() {
205         return offset;
206     }
207
208     /** Returns the number of bytes in the buffer */
209     public int getLength() {
210         return length;
211     }
212
213     public Map JavaDoc getHeaders() {
214         return headers;
215     }
216
217     public void setObject(Serializable obj) {
218         if(obj == null) return;
219         try {
220             ByteArrayOutputStream out_stream=new ByteArrayOutputStream();
221             ObjectOutputStream out=new ObjectOutputStream(out_stream);
222             out.writeObject(obj);
223             setBuffer(out_stream.toByteArray());
224         }
225         catch(IOException ex) {
226             throw new IllegalArgumentException JavaDoc(ex.toString());
227         }
228     }
229
230     public Object JavaDoc getObject() {
231         if(buf == null) return null;
232         try {
233             ByteArrayInputStream in_stream=new ByteArrayInputStream(buf, offset, length);
234             // ObjectInputStream in=new ObjectInputStream(in_stream);
235
ObjectInputStream in=new ContextObjectInputStream(in_stream); // put it back on norbert's request
236
return in.readObject();
237         }
238         catch(Exception JavaDoc ex) {
239             throw new IllegalArgumentException JavaDoc(ex.toString());
240         }
241     }
242
243
244     /**
245      * Nulls all fields of this message so that the message can be reused. Removes all headers from the
246      * stack, but keeps the stack
247      */

248     public void reset() {
249         dest_addr=src_addr=null;
250         setBuffer(null);
251         if(headers != null)
252             headers.clear();
253     }
254
255     /*---------------------- Used by protocol layers ----------------------*/
256
257     /** Puts a header given a key into the hashmap. Overwrites potential existing entry. */
258     public void putHeader(String JavaDoc key, Header hdr) {
259         headers().put(key, hdr);
260     }
261
262     public Header removeHeader(String JavaDoc key) {
263         return headers != null ? (Header)headers.remove(key) : null;
264     }
265
266     public void removeHeaders() {
267         if(headers != null)
268             headers.clear();
269     }
270
271     public Header getHeader(String JavaDoc key) {
272         return headers != null ? (Header)headers.get(key) : null;
273     }
274     /*---------------------------------------------------------------------*/
275
276
277     public Message copy() {
278         return copy(true);
279     }
280
281     /**
282      * Create a copy of the message. If offset and length are used (to refer to another buffer), the copy will
283      * contain only the subset offset and length point to, copying the subset into the new copy.
284      * @param copy_buffer
285      * @return
286      */

287     public Message copy(boolean copy_buffer) {
288         Message retval=new Message();
289         retval.dest_addr=dest_addr;
290         retval.src_addr=src_addr;
291
292         if(copy_buffer && buf != null) {
293
294             // change bela Feb 26 2004: we don't resolve the reference
295
retval.setBuffer(buf, offset, length);
296
297
298             /*
299             byte[] new_buf;
300             if(offset > 0 || length != buf.length) { // resolve reference to subset by copying subset into new buffer
301                 new_buf=new byte[length];
302                 System.arraycopy(buf, offset, new_buf, 0, length);
303             }
304             else
305                 new_buf=buf;
306             retval.setBuffer(new_buf);
307             */

308         }
309
310         if(headers != null)
311             retval.headers=(HashMap JavaDoc)headers.clone();
312         return retval;
313     }
314
315
316     protected Object JavaDoc clone() throws CloneNotSupportedException JavaDoc {
317         return copy();
318     }
319
320     public Message makeReply() {
321         return new Message(src_addr, null, null);
322     }
323
324
325     public String JavaDoc toString() {
326         StringBuffer JavaDoc ret=new StringBuffer JavaDoc(64);
327         ret.append("[dst: ");
328         if(dest_addr == null)
329             ret.append("<null>");
330         else
331             ret.append(dest_addr);
332         ret.append(", src: ");
333         if(src_addr == null)
334             ret.append("<null>");
335         else
336             ret.append(src_addr);
337
338         if(headers != null && headers.size() > 0)
339             ret.append(" (" + headers.size() + " headers)");
340
341         ret.append(", size = ");
342         if(buf != null && length > 0)
343             ret.append(length);
344         else
345             ret.append('0');
346         ret.append(" bytes");
347         ret.append(']');
348         return ret.toString();
349     }
350
351
352     /** Tries to read an object from the message's buffer and prints it */
353     public String JavaDoc toStringAsObject() {
354         Object JavaDoc obj;
355
356         if(buf == null) return null;
357         try {
358             obj=getObject();
359             return obj != null ? obj.toString() : "";
360         }
361         catch(Exception JavaDoc e) { // it is not an object
362
return "";
363         }
364     }
365
366
367     /**
368      * Returns size of buffer, plus some constant overhead for src and dest, plus number of headers time
369      * some estimated size/header. The latter is needed because we don't want to marshal all headers just
370      * to find out their size requirements. If a header implements Sizeable, the we can get the correct
371      * size.<p> Size estimations don't have to be very accurate since this is mainly used by FRAG to
372      * determine whether to fragment a message or not. Fragmentation will then serialize the message,
373      * therefore getting the correct value.
374      */

375     public long size() {
376         long retval=Global.BYTE_SIZE // leading byte
377
+ length // buffer
378
+ (buf != null? Global.INT_SIZE : 0); // if buf != null 4 bytes for length
379

380         if(dest_addr != null)
381             retval+=dest_addr.size();
382         if(src_addr != null)
383             retval+=(src_addr).size();
384
385         if(headers != null) {
386             Map.Entry JavaDoc entry;
387             String JavaDoc key;
388             Header hdr;
389             retval+=Global.INT_SIZE; // size (int)
390
for(Iterator JavaDoc it=headers.entrySet().iterator(); it.hasNext();) {
391                 entry=(Map.Entry JavaDoc)it.next();
392                 key=(String JavaDoc)entry.getKey();
393                 retval+=key.length() +2; // not the same as writeUTF(), but almost
394
hdr=(Header)entry.getValue();
395                 retval+=5; // 1 for presence of magic number, 4 for magic number
396
retval+=hdr.size();
397             }
398         }
399         return retval;
400     }
401
402
403     public String JavaDoc printObjectHeaders() {
404         StringBuffer JavaDoc sb=new StringBuffer JavaDoc();
405         Map.Entry JavaDoc entry;
406
407         if(headers != null) {
408             for(Iterator JavaDoc it=headers.entrySet().iterator(); it.hasNext();) {
409                 entry=(Map.Entry JavaDoc)it.next();
410                 sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');
411             }
412         }
413         return sb.toString();
414     }
415
416
417
418     /* ----------------------------------- Interface Externalizable ------------------------------- */
419
420     public void writeExternal(ObjectOutput out) throws IOException {
421         int len;
422         Externalizable hdr;
423         Map.Entry JavaDoc entry;
424
425         if(dest_addr != null) {
426             out.writeBoolean(true);
427             Marshaller.write(dest_addr, out);
428         }
429         else {
430             out.writeBoolean(false);
431         }
432
433         if(src_addr != null) {
434             out.writeBoolean(true);
435             Marshaller.write(src_addr, out);
436         }
437         else {
438             out.writeBoolean(false);
439         }
440
441         if(buf == null)
442             out.writeInt(0);
443         else {
444             out.writeInt(length);
445             out.write(buf, offset, length);
446         }
447
448         if(headers == null)
449             out.writeInt(0);
450         else {
451             len=headers.size();
452             out.writeInt(len);
453             for(Iterator JavaDoc it=headers.entrySet().iterator(); it.hasNext();) {
454                 entry=(Map.Entry JavaDoc)it.next();
455                 out.writeUTF((String JavaDoc)entry.getKey());
456                 hdr=(Externalizable)entry.getValue();
457                 Marshaller.write(hdr, out);
458             }
459         }
460     }
461
462
463     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException JavaDoc {
464         int len;
465         boolean destAddressExist=in.readBoolean();
466         boolean srcAddressExist;
467         Object JavaDoc key, value;
468
469         if(destAddressExist) {
470             dest_addr=(Address)Marshaller.read(in);
471         }
472
473         srcAddressExist=in.readBoolean();
474         if(srcAddressExist) {
475             src_addr=(Address)Marshaller.read(in);
476         }
477
478         int i=in.readInt();
479         if(i != 0) {
480             buf=new byte[i];
481             in.readFully(buf);
482             offset=0;
483             length=buf.length;
484         }
485
486         len=in.readInt();
487         if(len > 0) headers=new HashMap JavaDoc(11);
488         while(len-- > 0) {
489             key=in.readUTF();
490             value=Marshaller.read(in);
491             headers.put(key, value);
492         }
493     }
494
495     /* --------------------------------- End of Interface Externalizable ----------------------------- */
496
497
498     /* ----------------------------------- Interface Streamable ------------------------------- */
499
500     /**
501      * Streams all members (dest and src addresses, buffer and headers to the output stream
502      * @param outstream
503      * @throws IOException
504      */

505     public void writeTo(DataOutputStream out) throws IOException {
506         Map.Entry JavaDoc entry;
507
508         byte leading=0;
509         if(dest_addr != null) {
510             leading+=DEST_SET;
511             if(dest_addr instanceof IpAddress)
512                 leading+=IPADDR_DEST;
513         }
514         if(src_addr != null) {
515             leading+=SRC_SET;
516             if(src_addr instanceof IpAddress) {
517                 leading+=IPADDR_SRC;
518                 if(((IpAddress)src_addr).getIpAddress() == null) {
519                     leading+=SRC_HOST_NULL;
520                 }
521             }
522         }
523         if(buf != null)
524             leading+=BUF_SET;
525         if(headers != null && headers.size() > 0)
526             leading+=HDRS_SET;
527
528         // 1. write the leading byte first
529
out.write(leading);
530
531         // 2. dest_addr
532
if(dest_addr != null) {
533             if(dest_addr instanceof IpAddress)
534                 dest_addr.writeTo(out);
535             else
536                 Util.writeAddress(dest_addr, out);
537         }
538
539         // 3. src_addr
540
if(src_addr != null) {
541             if(src_addr instanceof IpAddress) {
542                 src_addr.writeTo(out);
543 // IpAddress tmp=(IpAddress)src_addr;
544
// if(tmp.getIpAddress() != null)
545
// src_addr.writeTo(out);
546
// else {
547
// byte[] additional_data;
548
// out.writeInt(tmp.getPort());
549
// // additional_data
550
// if((additional_data=tmp.getAdditionalData()) != null) {
551
// out.writeByte(1);
552
// out.writeInt(additional_data.length);
553
// out.write(additional_data, 0, additional_data.length);
554
// }
555
// else {
556
// out.writeByte(0); // no additional_data
557
// }
558
// }
559
}
560             else {
561                 Util.writeAddress(src_addr, out);
562             }
563         }
564
565         // 4. buf
566
if(buf != null) {
567             out.writeInt(length);
568             out.write(buf, offset, length);
569         }
570
571         // 5. headers
572
if(headers != null && headers.size() > 0) {
573             out.writeInt(headers.size());
574             for(Iterator JavaDoc it=headers.entrySet().iterator(); it.hasNext();) {
575                 entry=(Map.Entry JavaDoc)it.next();
576                 out.writeUTF((String JavaDoc)entry.getKey());
577                 writeHeader((Header)entry.getValue(), out);
578             }
579         }
580     }
581
582
583     public void readFrom(DataInputStream in) throws IOException, IllegalAccessException JavaDoc, InstantiationException JavaDoc {
584         int len, leading;
585         String JavaDoc hdr_name;
586         Header hdr;
587
588
589         // 1. read the leading byte first
590
leading=in.readByte();
591
592         // 1. dest_addr
593
if((leading & DEST_SET) == DEST_SET) {
594             if((leading & IPADDR_DEST) == IPADDR_DEST) {
595                 dest_addr=new IpAddress();
596                 dest_addr.readFrom(in);
597             }
598             else {
599                 dest_addr=Util.readAddress(in);
600             }
601         }
602
603         // 2. src_addr
604
if((leading & SRC_SET) == SRC_SET) {
605             if((leading & IPADDR_SRC) == IPADDR_SRC) {
606                 src_addr=new IpAddress();
607                 src_addr.readFrom(in);
608                 
609 // if((leading & SRC_HOST_NULL) == SRC_HOST_NULL) {
610
// int src_port=in.readInt();
611
// src_addr=new IpAddress(src_port, false); // keep a null host part
612
// if(in.readByte() == 1) {
613
// len=in.readInt();
614
// byte[] additional_data=new byte[len];
615
// in.readFully(additional_data, 0, len);
616
// ((IpAddress)src_addr).setAdditionalData(additional_data);
617
// }
618
// }
619
// else {
620
// src_addr=new IpAddress();
621
// src_addr.readFrom(in);
622
// }
623

624
625             }
626             else {
627                 src_addr=Util.readAddress(in);
628             }
629         }
630
631         // 3. buf
632
if((leading & BUF_SET) == BUF_SET) {
633             len=in.readInt();
634             buf=new byte[len];
635             in.read(buf, 0, len);
636             length=len;
637         }
638
639         // 4. headers
640
if((leading & HDRS_SET) == HDRS_SET) {
641             len=in.readInt();
642             headers(len);
643             for(int i=0; i < len; i++) {
644                 hdr_name=in.readUTF();
645                 hdr=readHeader(in);
646                 headers.put(hdr_name, hdr);
647             }
648         }
649     }
650
651
652
653     /* --------------------------------- End of Interface Streamable ----------------------------- */
654
655
656
657     /* ----------------------------------- Private methods ------------------------------- */
658
659     HashMap JavaDoc headers() {
660         return headers != null ? headers : (headers=new HashMap JavaDoc(11));
661     }
662
663
664     HashMap JavaDoc headers(int len) {
665         return headers != null ? headers : (headers=new HashMap JavaDoc(len));
666     }
667
668     private void writeHeader(Header value, DataOutputStream out) throws IOException {
669         int magic_number;
670         String JavaDoc classname;
671         ObjectOutputStream oos=null;
672         try {
673             magic_number=ClassConfigurator.getInstance(false).getMagicNumber(value.getClass());
674             // write the magic number or the class name
675
if(magic_number == -1) {
676                 out.writeBoolean(false);
677                 classname=value.getClass().getName();
678                 out.writeUTF(classname);
679             }
680             else {
681                 out.writeBoolean(true);
682                 out.writeInt(magic_number);
683             }
684
685             // write the contents
686
if(value instanceof Streamable) {
687                 ((Streamable)value).writeTo(out);
688             }
689             else {
690                 oos=new ObjectOutputStream(out);
691                 value.writeExternal(oos);
692                 if(!nonStreamableHeaders.contains(value.getClass())) {
693                     nonStreamableHeaders.add(value.getClass());
694                     if(log.isTraceEnabled())
695                         log.trace("encountered non-Streamable header: " + value.getClass());
696                 }
697             }
698         }
699         catch(ChannelException e) {
700             log.error("failed writing the header", e);
701         }
702         finally {
703             if(oos != null)
704                 oos.close();
705         }
706     }
707
708
709     private Header readHeader(DataInputStream in) throws IOException {
710         Header hdr=null;
711         boolean use_magic_number=in.readBoolean();
712         int magic_number;
713         String JavaDoc classname;
714         Class JavaDoc clazz;
715         ObjectInputStream ois=null;
716
717         try {
718             if(use_magic_number) {
719                 magic_number=in.readInt();
720                 clazz=ClassConfigurator.getInstance(false).get(magic_number);
721             }
722             else {
723                 classname=in.readUTF();
724                 clazz=ClassConfigurator.getInstance(false).get(classname);
725             }
726             hdr=(Header)clazz.newInstance();
727             if(hdr instanceof Streamable) {
728                ((Streamable)hdr).readFrom(in);
729             }
730             else {
731                 ois=new ObjectInputStream(in);
732                 hdr.readExternal(ois);
733             }
734         }
735         catch(Exception JavaDoc ex) {
736             throw new IOException("failed read header: " + ex.toString());
737         }
738         finally {
739             if(ois != null)
740                 ois.close();
741         }
742
743         return hdr;
744     }
745
746     /* ------------------------------- End of Private methods ---------------------------- */
747
748
749
750 }
751
Popular Tags