KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > openwire > OpenWireFormat


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.openwire;
19
20 import java.io.DataInput JavaDoc;
21 import java.io.DataOutput JavaDoc;
22 import java.io.IOException JavaDoc;
23 import java.lang.reflect.Method JavaDoc;
24 import java.util.HashMap JavaDoc;
25
26 import org.apache.activemq.command.CommandTypes;
27 import org.apache.activemq.command.DataStructure;
28 import org.apache.activemq.command.MarshallAware;
29 import org.apache.activemq.command.WireFormatInfo;
30 import org.apache.activemq.util.ByteSequence;
31 import org.apache.activemq.util.ByteSequenceData;
32 import org.apache.activemq.util.ClassLoading;
33 import org.apache.activemq.util.DataByteArrayInputStream;
34 import org.apache.activemq.util.DataByteArrayOutputStream;
35 import org.apache.activemq.util.IdGenerator;
36 import org.apache.activemq.wireformat.WireFormat;
37
38 /**
39  *
40  * @version $Revision$
41  */

42 final public class OpenWireFormat implements WireFormat {
43
44     public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
45
46     static final byte NULL_TYPE = CommandTypes.NULL;
47     private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2;
48     private static final int MARSHAL_CACHE_FREE_SPACE = 100;
49     
50     private DataStreamMarshaller dataMarshallers[];
51     private int version;
52     private boolean stackTraceEnabled=false;
53     private boolean tcpNoDelayEnabled=false;
54     private boolean cacheEnabled=false;
55     private boolean tightEncodingEnabled=false;
56     private boolean sizePrefixDisabled=false;
57
58     // The following fields are used for value caching
59
private short nextMarshallCacheIndex=0;
60     private short nextMarshallCacheEvictionIndex=0;
61     private HashMap JavaDoc marshallCacheMap = new HashMap JavaDoc();
62     private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
63     private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
64     private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
65     private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
66     private WireFormatInfo preferedWireFormatInfo;
67             
68     public OpenWireFormat() {
69         this(DEFAULT_VERSION);
70     }
71     
72     public OpenWireFormat(int i) {
73         setVersion(i);
74     }
75
76     public int hashCode() {
77         return version
78             ^ (cacheEnabled ? 0x10000000:0x20000000)
79             ^ (stackTraceEnabled ? 0x01000000:0x02000000)
80             ^ (tightEncodingEnabled ? 0x00100000:0x00200000)
81             ^ (sizePrefixDisabled ? 0x00010000:0x00020000)
82             ;
83     }
84     
85     public OpenWireFormat copy() {
86         OpenWireFormat answer = new OpenWireFormat();
87         answer.version = version;
88         answer.stackTraceEnabled = stackTraceEnabled;
89         answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
90         answer.cacheEnabled = cacheEnabled;
91         answer.tightEncodingEnabled = tightEncodingEnabled;
92         answer.sizePrefixDisabled = sizePrefixDisabled;
93         answer.preferedWireFormatInfo = preferedWireFormatInfo;
94         return answer;
95     }
96     
97     public boolean equals(Object JavaDoc object) {
98         if( object == null )
99             return false;
100         OpenWireFormat o = (OpenWireFormat) object;
101         return o.stackTraceEnabled == stackTraceEnabled &&
102             o.cacheEnabled == cacheEnabled &&
103             o.version == version &&
104             o.tightEncodingEnabled == tightEncodingEnabled &&
105             o.sizePrefixDisabled == sizePrefixDisabled
106             ;
107     }
108     
109     static IdGenerator g = new IdGenerator();
110     String JavaDoc id = g.generateId();
111     public String JavaDoc toString() {
112         return "OpenWireFormat{version="+version+", cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", tightEncodingEnabled="+tightEncodingEnabled+", sizePrefixDisabled="+sizePrefixDisabled+"}";
113         //return "OpenWireFormat{id="+id+", tightEncodingEnabled="+tightEncodingEnabled+"}";
114
}
115     
116     public int getVersion() {
117         return version;
118     }
119     
120     public synchronized ByteSequence marshal(Object JavaDoc command) throws IOException JavaDoc {
121         
122         if( cacheEnabled ) {
123             runMarshallCacheEvictionSweep();
124         }
125         
126         MarshallAware ma=null;
127         // If not using value caching, then the marshaled form is always the same
128
if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
129             ma = (MarshallAware) command;
130         }
131         
132         ByteSequence sequence=null;
133 // if( ma!=null ) {
134
// sequence = ma.getCachedMarshalledForm(this);
135
// }
136

137         if( sequence == null ) {
138             
139             int size=1;
140             if( command != null) {
141                 
142                 DataStructure c = (DataStructure) command;
143                 byte type = c.getDataStructureType();
144                 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
145                 if( dsm == null )
146                     throw new IOException JavaDoc("Unknown data type: "+type);
147                 
148                 if( tightEncodingEnabled ) {
149                     
150                     BooleanStream bs = new BooleanStream();
151                     size += dsm.tightMarshal1(this, c, bs);
152                     size += bs.marshalledSize();
153     
154                     bytesOut.restart(size);
155                     if( !sizePrefixDisabled ) {
156                         bytesOut.writeInt(size);
157                     }
158                     bytesOut.writeByte(type);
159                     bs.marshal(bytesOut);
160                     dsm.tightMarshal2(this, c, bytesOut, bs);
161                     sequence = bytesOut.toByteSequence();
162                     
163                 } else {
164                     bytesOut.restart();
165                     if( !sizePrefixDisabled ) {
166                         bytesOut.writeInt(0); // we don't know the final size yet but write this here for now.
167
}
168                     bytesOut.writeByte(type);
169                     dsm.looseMarshal(this, c, bytesOut);
170                     sequence = bytesOut.toByteSequence();
171                     
172                     if( !sizePrefixDisabled ) {
173                         size = sequence.getLength()-4;
174                         int pos = sequence.offset;
175                         ByteSequenceData.writeIntBig(sequence, size);
176                         sequence.offset = pos;
177                     }
178                 }
179                 
180                 
181             } else {
182                 bytesOut.restart(5);
183                 bytesOut.writeInt(size);
184                 bytesOut.writeByte(NULL_TYPE);
185                 sequence = bytesOut.toByteSequence();
186             }
187             
188 // if( ma!=null ) {
189
// ma.setCachedMarshalledForm(this, sequence);
190
// }
191
}
192         return sequence;
193     }
194     
195     public synchronized Object JavaDoc unmarshal(ByteSequence sequence) throws IOException JavaDoc {
196         bytesIn.restart(sequence);
197         //DataInputStream dis = new DataInputStream(new ByteArrayInputStream(sequence));
198

199         if( !sizePrefixDisabled ) {
200             int size = bytesIn.readInt();
201             if( sequence.getLength()-4 != size ) {
202     // throw new IOException("Packet size does not match marshaled size");
203
}
204         }
205         
206         Object JavaDoc command = doUnmarshal(bytesIn);
207 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
208
// ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
209
// }
210
return command;
211     }
212     
213     public synchronized void marshal(Object JavaDoc o, DataOutput JavaDoc dataOut) throws IOException JavaDoc {
214         
215         if( cacheEnabled ) {
216             runMarshallCacheEvictionSweep();
217         }
218         
219         int size=1;
220         if( o != null) {
221             
222             DataStructure c = (DataStructure) o;
223             byte type = c.getDataStructureType();
224             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
225             if( dsm == null )
226                 throw new IOException JavaDoc("Unknown data type: "+type);
227
228             if( tightEncodingEnabled ) {
229                 BooleanStream bs = new BooleanStream();
230                 size += dsm.tightMarshal1(this, c, bs);
231                 size += bs.marshalledSize();
232
233                 if( !sizePrefixDisabled ) {
234                     dataOut.writeInt(size);
235                 }
236                 
237                 dataOut.writeByte(type);
238                 bs.marshal(dataOut);
239                 dsm.tightMarshal2(this, c, dataOut, bs);
240                 
241             } else {
242                 DataOutput JavaDoc looseOut = dataOut;
243                                 
244                 if( !sizePrefixDisabled ) {
245                     bytesOut.restart();
246                     looseOut = bytesOut;
247                 }
248                 
249                 looseOut.writeByte(type);
250                 dsm.looseMarshal(this, c, looseOut);
251                 
252                 if( !sizePrefixDisabled ) {
253                     ByteSequence sequence = bytesOut.toByteSequence();
254                     dataOut.writeInt(sequence.getLength());
255                     dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
256                 }
257
258             }
259             
260         } else {
261             dataOut.writeInt(size);
262             dataOut.writeByte(NULL_TYPE);
263         }
264     }
265
266     public Object JavaDoc unmarshal(DataInput JavaDoc dis) throws IOException JavaDoc {
267         DataInput JavaDoc dataIn = dis;
268         if( !sizePrefixDisabled ) {
269             int size = dis.readInt();
270             //byte[] data = new byte[size];
271
//dis.readFully(data);
272
//bytesIn.restart(data);
273
//dataIn = bytesIn;
274
}
275         return doUnmarshal(dataIn);
276     }
277     
278     /**
279      * Used by NIO or AIO transports
280      */

281     public int tightMarshal1(Object JavaDoc o, BooleanStream bs) throws IOException JavaDoc {
282         int size=1;
283         if( o != null) {
284             DataStructure c = (DataStructure) o;
285             byte type = c.getDataStructureType();
286             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
287             if( dsm == null )
288                 throw new IOException JavaDoc("Unknown data type: "+type);
289
290             size += dsm.tightMarshal1(this, c, bs);
291             size += bs.marshalledSize();
292         }
293         return size;
294     }
295     
296     /**
297      * Used by NIO or AIO transports; note that the size is not written as part of this method.
298      */

299     public void tightMarshal2(Object JavaDoc o, DataOutput JavaDoc ds, BooleanStream bs) throws IOException JavaDoc {
300         if( cacheEnabled ) {
301             runMarshallCacheEvictionSweep();
302         }
303         
304         if( o != null) {
305             DataStructure c = (DataStructure) o;
306             byte type = c.getDataStructureType();
307             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
308             if( dsm == null )
309                 throw new IOException JavaDoc("Unknown data type: "+type);
310
311             ds.writeByte(type);
312             bs.marshal(ds);
313             dsm.tightMarshal2(this, c, ds, bs);
314         }
315     }
316
317     
318     /**
319      * Allows you to dynamically switch the version of the openwire protocol being used.
320      * @param version
321      */

322     public void setVersion(int version) {
323         String JavaDoc mfName = "org.apache.activemq.openwire.v"+version+".MarshallerFactory";
324         Class JavaDoc mfClass;
325         try {
326             mfClass = ClassLoading.loadClass(mfName, getClass().getClassLoader());
327         } catch (ClassNotFoundException JavaDoc e) {
328             throw (IllegalArgumentException JavaDoc)new IllegalArgumentException JavaDoc("Invalid version: "+version+", could not load "+mfName).initCause(e);
329         }
330         try {
331             Method JavaDoc method = mfClass.getMethod("createMarshallerMap", new Class JavaDoc[]{OpenWireFormat.class});
332             dataMarshallers = (DataStreamMarshaller[]) method.invoke(null, new Object JavaDoc[]{this});
333         } catch (Throwable JavaDoc e) {
334             throw (IllegalArgumentException JavaDoc)new IllegalArgumentException JavaDoc("Invalid version: "+version+", "+mfName+" does not properly implement the createMarshallerMap method.").initCause(e);
335         }
336         this.version = version;
337     }
338         
339     public Object JavaDoc doUnmarshal(DataInput JavaDoc dis) throws IOException JavaDoc {
340         byte dataType = dis.readByte();
341         if( dataType!=NULL_TYPE ) {
342             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
343             if( dsm == null )
344                 throw new IOException JavaDoc("Unknown data type: "+dataType);
345             Object JavaDoc data = dsm.createObject();
346             if( this.tightEncodingEnabled ) {
347                 BooleanStream bs = new BooleanStream();
348                 bs.unmarshal(dis);
349                 dsm.tightUnmarshal(this, data, dis, bs);
350             } else {
351                 dsm.looseUnmarshal(this, data, dis);
352             }
353             return data;
354         } else {
355             return null;
356         }
357     }
358
359 // public void debug(String msg) {
360
// String t = (Thread.currentThread().getName()+" ").substring(0, 40);
361
// System.out.println(t+": "+msg);
362
// }
363
public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException JavaDoc {
364         bs.writeBoolean(o != null);
365         if( o == null )
366             return 0;
367
368         if( o.isMarshallAware() ) {
369             MarshallAware ma = (MarshallAware) o;
370             ByteSequence sequence=null;
371 // sequence=ma.getCachedMarshalledForm(this);
372
bs.writeBoolean(sequence!=null);
373             if( sequence!=null ) {
374                 return 1 + sequence.getLength();
375             }
376         }
377         
378         byte type = o.getDataStructureType();
379         DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
380         if( dsm == null )
381             throw new IOException JavaDoc("Unknown data type: "+type);
382         return 1 + dsm.tightMarshal1(this, o, bs);
383     }
384     
385     public void tightMarshalNestedObject2(DataStructure o, DataOutput JavaDoc ds, BooleanStream bs) throws IOException JavaDoc {
386         if( !bs.readBoolean() )
387             return;
388             
389         byte type = o.getDataStructureType();
390         ds.writeByte(type);
391
392         if( o.isMarshallAware() && bs.readBoolean() ) {
393
394             // We should not be doing any caching
395
throw new IOException JavaDoc("Corrupted stream");
396 // MarshallAware ma = (MarshallAware) o;
397
// ByteSequence sequence=ma.getCachedMarshalledForm(this);
398
// ds.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
399

400         } else {
401             
402             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
403             if( dsm == null )
404                 throw new IOException JavaDoc("Unknown data type: "+type);
405             dsm.tightMarshal2(this, o, ds, bs);
406             
407         }
408     }
409     
410     public DataStructure tightUnmarshalNestedObject(DataInput JavaDoc dis, BooleanStream bs) throws IOException JavaDoc {
411         if( bs.readBoolean() ) {
412             
413             byte dataType = dis.readByte();
414             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
415             if( dsm == null )
416                 throw new IOException JavaDoc("Unknown data type: "+dataType);
417             DataStructure data = dsm.createObject();
418
419             if( data.isMarshallAware() && bs.readBoolean() ) {
420                 
421                 dis.readInt();
422                 dis.readByte();
423                 
424                 BooleanStream bs2 = new BooleanStream();
425                 bs2.unmarshal(dis);
426                 dsm.tightUnmarshal(this, data, dis, bs2);
427
428                 // TODO: extract the sequence from the dis and associate it.
429
// MarshallAware ma = (MarshallAware)data
430
// ma.setCachedMarshalledForm(this, sequence);
431

432             } else {
433                 dsm.tightUnmarshal(this, data, dis, bs);
434             }
435             
436             return data;
437         } else {
438             return null;
439         }
440     }
441     
442     public DataStructure looseUnmarshalNestedObject(DataInput JavaDoc dis) throws IOException JavaDoc {
443         if( dis.readBoolean() ) {
444             
445             byte dataType = dis.readByte();
446             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF];
447             if( dsm == null )
448                 throw new IOException JavaDoc("Unknown data type: "+dataType);
449             DataStructure data = dsm.createObject();
450             dsm.looseUnmarshal(this, data, dis);
451             return data;
452             
453         } else {
454             return null;
455         }
456     }
457
458     public void looseMarshalNestedObject(DataStructure o, DataOutput JavaDoc dataOut) throws IOException JavaDoc {
459         dataOut.writeBoolean(o!=null);
460         if( o!=null ) {
461             byte type = o.getDataStructureType();
462             dataOut.writeByte(type);
463             DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF];
464             if( dsm == null )
465                 throw new IOException JavaDoc("Unknown data type: "+type);
466             dsm.looseMarshal(this, o, dataOut);
467         }
468     }
469
470     public void runMarshallCacheEvictionSweep() {
471         // Do we need to start evicting??
472
while( marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE ) {
473             
474             marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
475             marshallCache[nextMarshallCacheEvictionIndex]=null;
476
477             nextMarshallCacheEvictionIndex++;
478             if( nextMarshallCacheEvictionIndex >= marshallCache.length ) {
479                 nextMarshallCacheEvictionIndex=0;
480             }
481             
482         }
483     }
484     
485     public Short JavaDoc getMarshallCacheIndex(DataStructure o) {
486         return (Short JavaDoc) marshallCacheMap.get(o);
487     }
488     
489     public Short JavaDoc addToMarshallCache(DataStructure o) {
490         short i = nextMarshallCacheIndex++;
491         if( nextMarshallCacheIndex >= marshallCache.length ) {
492             nextMarshallCacheIndex=0;
493         }
494         
495         // We can only cache that item if there is space left.
496
if( marshallCacheMap.size() < marshallCache.length ) {
497             marshallCache[i] = o;
498             Short JavaDoc index = new Short JavaDoc(i);
499             marshallCacheMap.put(o, index);
500             return index;
501         } else {
502             // Use -1 to indicate that the value was not cached due to cache being full.
503
return new Short JavaDoc((short)-1);
504         }
505     }
506     
507     public void setInUnmarshallCache(short index, DataStructure o) {
508         
509         // There was no space left in the cache, so we can't
510
// put this in the cache.
511
if( index == -1 )
512             return;
513         
514         unmarshallCache[index]=o;
515     }
516     
517     public DataStructure getFromUnmarshallCache(short index) {
518         return unmarshallCache[index];
519     }
520
521
522     public void setStackTraceEnabled(boolean b) {
523         stackTraceEnabled = b;
524     }
525     public boolean isStackTraceEnabled() {
526         return stackTraceEnabled;
527     }
528
529     public boolean isTcpNoDelayEnabled() {
530         return tcpNoDelayEnabled;
531     }
532     public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
533         this.tcpNoDelayEnabled = tcpNoDelayEnabled;
534     }
535
536     public boolean isCacheEnabled() {
537         return cacheEnabled;
538     }
539     public void setCacheEnabled(boolean cacheEnabled) {
540         this.cacheEnabled = cacheEnabled;
541     }
542
543     public boolean isTightEncodingEnabled() {
544         return tightEncodingEnabled;
545     }
546
547     public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
548         this.tightEncodingEnabled = tightEncodingEnabled;
549     }
550
551     public boolean isSizePrefixDisabled() {
552         return sizePrefixDisabled;
553     }
554
555     public void setSizePrefixDisabled(boolean prefixPacketSize) {
556         this.sizePrefixDisabled = prefixPacketSize;
557     }
558
559     public void setPreferedWireFormatInfo(WireFormatInfo info) {
560         this.preferedWireFormatInfo = info;
561     }
562     public WireFormatInfo getPreferedWireFormatInfo() {
563         return preferedWireFormatInfo;
564     }
565
566     public void renegotiateWireFormat(WireFormatInfo info) throws IOException JavaDoc {
567         
568         if( preferedWireFormatInfo==null )
569             throw new IllegalStateException JavaDoc("Wireformat cannot not be renegotiated.");
570         
571         this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) );
572         info.setVersion(this.getVersion());
573         
574         this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
575         info.setStackTraceEnabled(this.stackTraceEnabled);
576         
577         this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
578         info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
579         
580         this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
581         info.setCacheEnabled(this.cacheEnabled);
582         
583         this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled();
584         info.setTightEncodingEnabled(this.tightEncodingEnabled);
585         
586         this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled();
587         info.setSizePrefixDisabled(this.sizePrefixDisabled);
588         
589         if( cacheEnabled ) {
590             
591             int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
592             info.setCacheSize(size);
593             
594             if( size == 0 ) {
595                 size = MARSHAL_CACHE_SIZE;
596             }
597             
598             marshallCache = new DataStructure[size];
599             unmarshallCache = new DataStructure[size];
600             nextMarshallCacheIndex=0;
601             nextMarshallCacheEvictionIndex =0;
602             marshallCacheMap = new HashMap JavaDoc();
603         } else {
604             marshallCache=null;
605             unmarshallCache=null;
606             nextMarshallCacheIndex=0;
607             nextMarshallCacheEvictionIndex=0;
608             marshallCacheMap = null;
609         }
610         
611     }
612
613     protected int min(int version1, int version2) {
614         if (version1 < version2 && version1 > 0 || version2 <= 0) {
615             return version1;
616         }
617         return version2;
618     }
619 }
620
Popular Tags