1 18 package org.apache.activemq.openwire; 19 20 import java.io.DataInput ; 21 import java.io.DataOutput ; 22 import java.io.IOException ; 23 import java.lang.reflect.Method ; 24 import java.util.HashMap ; 25 26 import org.apache.activemq.command.CommandTypes; 27 import org.apache.activemq.command.DataStructure; 28 import org.apache.activemq.command.MarshallAware; 29 import org.apache.activemq.command.WireFormatInfo; 30 import org.apache.activemq.util.ByteSequence; 31 import org.apache.activemq.util.ByteSequenceData; 32 import org.apache.activemq.util.ClassLoading; 33 import org.apache.activemq.util.DataByteArrayInputStream; 34 import org.apache.activemq.util.DataByteArrayOutputStream; 35 import org.apache.activemq.util.IdGenerator; 36 import org.apache.activemq.wireformat.WireFormat; 37 38 42 final public class OpenWireFormat implements WireFormat { 43 44 public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION; 45 46 static final byte NULL_TYPE = CommandTypes.NULL; 47 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE/2; 48 private static final int MARSHAL_CACHE_FREE_SPACE = 100; 49 50 private DataStreamMarshaller dataMarshallers[]; 51 private int version; 52 private boolean stackTraceEnabled=false; 53 private boolean tcpNoDelayEnabled=false; 54 private boolean cacheEnabled=false; 55 private boolean tightEncodingEnabled=false; 56 private boolean sizePrefixDisabled=false; 57 58 private short nextMarshallCacheIndex=0; 60 private short nextMarshallCacheEvictionIndex=0; 61 private HashMap marshallCacheMap = new HashMap (); 62 private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE]; 63 private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE]; 64 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream(); 65 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); 66 private WireFormatInfo preferedWireFormatInfo; 67 68 public OpenWireFormat() { 69 this(DEFAULT_VERSION); 70 } 71 72 public OpenWireFormat(int i) { 73 setVersion(i); 74 } 75 76 public int hashCode() { 77 return version 78 ^ (cacheEnabled ? 0x10000000:0x20000000) 79 ^ (stackTraceEnabled ? 0x01000000:0x02000000) 80 ^ (tightEncodingEnabled ? 0x00100000:0x00200000) 81 ^ (sizePrefixDisabled ? 0x00010000:0x00020000) 82 ; 83 } 84 85 public OpenWireFormat copy() { 86 OpenWireFormat answer = new OpenWireFormat(); 87 answer.version = version; 88 answer.stackTraceEnabled = stackTraceEnabled; 89 answer.tcpNoDelayEnabled = tcpNoDelayEnabled; 90 answer.cacheEnabled = cacheEnabled; 91 answer.tightEncodingEnabled = tightEncodingEnabled; 92 answer.sizePrefixDisabled = sizePrefixDisabled; 93 answer.preferedWireFormatInfo = preferedWireFormatInfo; 94 return answer; 95 } 96 97 public boolean equals(Object object) { 98 if( object == null ) 99 return false; 100 OpenWireFormat o = (OpenWireFormat) object; 101 return o.stackTraceEnabled == stackTraceEnabled && 102 o.cacheEnabled == cacheEnabled && 103 o.version == version && 104 o.tightEncodingEnabled == tightEncodingEnabled && 105 o.sizePrefixDisabled == sizePrefixDisabled 106 ; 107 } 108 109 static IdGenerator g = new IdGenerator(); 110 String id = g.generateId(); 111 public String toString() { 112 return "OpenWireFormat{version="+version+", cacheEnabled="+cacheEnabled+", stackTraceEnabled="+stackTraceEnabled+", tightEncodingEnabled="+tightEncodingEnabled+", sizePrefixDisabled="+sizePrefixDisabled+"}"; 113 } 115 116 public int getVersion() { 117 return version; 118 } 119 120 public synchronized ByteSequence marshal(Object command) throws IOException { 121 122 if( cacheEnabled ) { 123 runMarshallCacheEvictionSweep(); 124 } 125 126 MarshallAware ma=null; 127 if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) { 129 ma = (MarshallAware) command; 130 } 131 132 ByteSequence sequence=null; 133 137 if( sequence == null ) { 138 139 int size=1; 140 if( command != null) { 141 142 DataStructure c = (DataStructure) command; 143 byte type = c.getDataStructureType(); 144 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; 145 if( dsm == null ) 146 throw new IOException ("Unknown data type: "+type); 147 148 if( tightEncodingEnabled ) { 149 150 BooleanStream bs = new BooleanStream(); 151 size += dsm.tightMarshal1(this, c, bs); 152 size += bs.marshalledSize(); 153 154 bytesOut.restart(size); 155 if( !sizePrefixDisabled ) { 156 bytesOut.writeInt(size); 157 } 158 bytesOut.writeByte(type); 159 bs.marshal(bytesOut); 160 dsm.tightMarshal2(this, c, bytesOut, bs); 161 sequence = bytesOut.toByteSequence(); 162 163 } else { 164 bytesOut.restart(); 165 if( !sizePrefixDisabled ) { 166 bytesOut.writeInt(0); } 168 bytesOut.writeByte(type); 169 dsm.looseMarshal(this, c, bytesOut); 170 sequence = bytesOut.toByteSequence(); 171 172 if( !sizePrefixDisabled ) { 173 size = sequence.getLength()-4; 174 int pos = sequence.offset; 175 ByteSequenceData.writeIntBig(sequence, size); 176 sequence.offset = pos; 177 } 178 } 179 180 181 } else { 182 bytesOut.restart(5); 183 bytesOut.writeInt(size); 184 bytesOut.writeByte(NULL_TYPE); 185 sequence = bytesOut.toByteSequence(); 186 } 187 188 } 192 return sequence; 193 } 194 195 public synchronized Object unmarshal(ByteSequence sequence) throws IOException { 196 bytesIn.restart(sequence); 197 199 if( !sizePrefixDisabled ) { 200 int size = bytesIn.readInt(); 201 if( sequence.getLength()-4 != size ) { 202 } 204 } 205 206 Object command = doUnmarshal(bytesIn); 207 return command; 211 } 212 213 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException { 214 215 if( cacheEnabled ) { 216 runMarshallCacheEvictionSweep(); 217 } 218 219 int size=1; 220 if( o != null) { 221 222 DataStructure c = (DataStructure) o; 223 byte type = c.getDataStructureType(); 224 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; 225 if( dsm == null ) 226 throw new IOException ("Unknown data type: "+type); 227 228 if( tightEncodingEnabled ) { 229 BooleanStream bs = new BooleanStream(); 230 size += dsm.tightMarshal1(this, c, bs); 231 size += bs.marshalledSize(); 232 233 if( !sizePrefixDisabled ) { 234 dataOut.writeInt(size); 235 } 236 237 dataOut.writeByte(type); 238 bs.marshal(dataOut); 239 dsm.tightMarshal2(this, c, dataOut, bs); 240 241 } else { 242 DataOutput looseOut = dataOut; 243 244 if( !sizePrefixDisabled ) { 245 bytesOut.restart(); 246 looseOut = bytesOut; 247 } 248 249 looseOut.writeByte(type); 250 dsm.looseMarshal(this, c, looseOut); 251 252 if( !sizePrefixDisabled ) { 253 ByteSequence sequence = bytesOut.toByteSequence(); 254 dataOut.writeInt(sequence.getLength()); 255 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); 256 } 257 258 } 259 260 } else { 261 dataOut.writeInt(size); 262 dataOut.writeByte(NULL_TYPE); 263 } 264 } 265 266 public Object unmarshal(DataInput dis) throws IOException { 267 DataInput dataIn = dis; 268 if( !sizePrefixDisabled ) { 269 int size = dis.readInt(); 270 } 275 return doUnmarshal(dataIn); 276 } 277 278 281 public int tightMarshal1(Object o, BooleanStream bs) throws IOException { 282 int size=1; 283 if( o != null) { 284 DataStructure c = (DataStructure) o; 285 byte type = c.getDataStructureType(); 286 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; 287 if( dsm == null ) 288 throw new IOException ("Unknown data type: "+type); 289 290 size += dsm.tightMarshal1(this, c, bs); 291 size += bs.marshalledSize(); 292 } 293 return size; 294 } 295 296 299 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException { 300 if( cacheEnabled ) { 301 runMarshallCacheEvictionSweep(); 302 } 303 304 if( o != null) { 305 DataStructure c = (DataStructure) o; 306 byte type = c.getDataStructureType(); 307 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; 308 if( dsm == null ) 309 throw new IOException ("Unknown data type: "+type); 310 311 ds.writeByte(type); 312 bs.marshal(ds); 313 dsm.tightMarshal2(this, c, ds, bs); 314 } 315 } 316 317 318 322 public void setVersion(int version) { 323 String mfName = "org.apache.activemq.openwire.v"+version+".MarshallerFactory"; 324 Class mfClass; 325 try { 326 mfClass = ClassLoading.loadClass(mfName, getClass().getClassLoader()); 327 } catch (ClassNotFoundException e) { 328 throw (IllegalArgumentException )new IllegalArgumentException ("Invalid version: "+version+", could not load "+mfName).initCause(e); 329 } 330 try { 331 Method method = mfClass.getMethod("createMarshallerMap", new Class []{OpenWireFormat.class}); 332 dataMarshallers = (DataStreamMarshaller[]) method.invoke(null, new Object []{this}); 333 } catch (Throwable e) { 334 throw (IllegalArgumentException )new IllegalArgumentException ("Invalid version: "+version+", "+mfName+" does not properly implement the createMarshallerMap method.").initCause(e); 335 } 336 this.version = version; 337 } 338 339 public Object doUnmarshal(DataInput dis) throws IOException { 340 byte dataType = dis.readByte(); 341 if( dataType!=NULL_TYPE ) { 342 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; 343 if( dsm == null ) 344 throw new IOException ("Unknown data type: "+dataType); 345 Object data = dsm.createObject(); 346 if( this.tightEncodingEnabled ) { 347 BooleanStream bs = new BooleanStream(); 348 bs.unmarshal(dis); 349 dsm.tightUnmarshal(this, data, dis, bs); 350 } else { 351 dsm.looseUnmarshal(this, data, dis); 352 } 353 return data; 354 } else { 355 return null; 356 } 357 } 358 359 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException { 364 bs.writeBoolean(o != null); 365 if( o == null ) 366 return 0; 367 368 if( o.isMarshallAware() ) { 369 MarshallAware ma = (MarshallAware) o; 370 ByteSequence sequence=null; 371 bs.writeBoolean(sequence!=null); 373 if( sequence!=null ) { 374 return 1 + sequence.getLength(); 375 } 376 } 377 378 byte type = o.getDataStructureType(); 379 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; 380 if( dsm == null ) 381 throw new IOException ("Unknown data type: "+type); 382 return 1 + dsm.tightMarshal1(this, o, bs); 383 } 384 385 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs) throws IOException { 386 if( !bs.readBoolean() ) 387 return; 388 389 byte type = o.getDataStructureType(); 390 ds.writeByte(type); 391 392 if( o.isMarshallAware() && bs.readBoolean() ) { 393 394 throw new IOException ("Corrupted stream"); 396 400 } else { 401 402 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; 403 if( dsm == null ) 404 throw new IOException ("Unknown data type: "+type); 405 dsm.tightMarshal2(this, o, ds, bs); 406 407 } 408 } 409 410 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException { 411 if( bs.readBoolean() ) { 412 413 byte dataType = dis.readByte(); 414 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; 415 if( dsm == null ) 416 throw new IOException ("Unknown data type: "+dataType); 417 DataStructure data = dsm.createObject(); 418 419 if( data.isMarshallAware() && bs.readBoolean() ) { 420 421 dis.readInt(); 422 dis.readByte(); 423 424 BooleanStream bs2 = new BooleanStream(); 425 bs2.unmarshal(dis); 426 dsm.tightUnmarshal(this, data, dis, bs2); 427 428 432 } else { 433 dsm.tightUnmarshal(this, data, dis, bs); 434 } 435 436 return data; 437 } else { 438 return null; 439 } 440 } 441 442 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException { 443 if( dis.readBoolean() ) { 444 445 byte dataType = dis.readByte(); 446 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[dataType & 0xFF]; 447 if( dsm == null ) 448 throw new IOException ("Unknown data type: "+dataType); 449 DataStructure data = dsm.createObject(); 450 dsm.looseUnmarshal(this, data, dis); 451 return data; 452 453 } else { 454 return null; 455 } 456 } 457 458 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException { 459 dataOut.writeBoolean(o!=null); 460 if( o!=null ) { 461 byte type = o.getDataStructureType(); 462 dataOut.writeByte(type); 463 DataStreamMarshaller dsm = (DataStreamMarshaller) dataMarshallers[type & 0xFF]; 464 if( dsm == null ) 465 throw new IOException ("Unknown data type: "+type); 466 dsm.looseMarshal(this, o, dataOut); 467 } 468 } 469 470 public void runMarshallCacheEvictionSweep() { 471 while( marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE ) { 473 474 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]); 475 marshallCache[nextMarshallCacheEvictionIndex]=null; 476 477 nextMarshallCacheEvictionIndex++; 478 if( nextMarshallCacheEvictionIndex >= marshallCache.length ) { 479 nextMarshallCacheEvictionIndex=0; 480 } 481 482 } 483 } 484 485 public Short getMarshallCacheIndex(DataStructure o) { 486 return (Short ) marshallCacheMap.get(o); 487 } 488 489 public Short addToMarshallCache(DataStructure o) { 490 short i = nextMarshallCacheIndex++; 491 if( nextMarshallCacheIndex >= marshallCache.length ) { 492 nextMarshallCacheIndex=0; 493 } 494 495 if( marshallCacheMap.size() < marshallCache.length ) { 497 marshallCache[i] = o; 498 Short index = new Short (i); 499 marshallCacheMap.put(o, index); 500 return index; 501 } else { 502 return new Short ((short)-1); 504 } 505 } 506 507 public void setInUnmarshallCache(short index, DataStructure o) { 508 509 if( index == -1 ) 512 return; 513 514 unmarshallCache[index]=o; 515 } 516 517 public DataStructure getFromUnmarshallCache(short index) { 518 return unmarshallCache[index]; 519 } 520 521 522 public void setStackTraceEnabled(boolean b) { 523 stackTraceEnabled = b; 524 } 525 public boolean isStackTraceEnabled() { 526 return stackTraceEnabled; 527 } 528 529 public boolean isTcpNoDelayEnabled() { 530 return tcpNoDelayEnabled; 531 } 532 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) { 533 this.tcpNoDelayEnabled = tcpNoDelayEnabled; 534 } 535 536 public boolean isCacheEnabled() { 537 return cacheEnabled; 538 } 539 public void setCacheEnabled(boolean cacheEnabled) { 540 this.cacheEnabled = cacheEnabled; 541 } 542 543 public boolean isTightEncodingEnabled() { 544 return tightEncodingEnabled; 545 } 546 547 public void setTightEncodingEnabled(boolean tightEncodingEnabled) { 548 this.tightEncodingEnabled = tightEncodingEnabled; 549 } 550 551 public boolean isSizePrefixDisabled() { 552 return sizePrefixDisabled; 553 } 554 555 public void setSizePrefixDisabled(boolean prefixPacketSize) { 556 this.sizePrefixDisabled = prefixPacketSize; 557 } 558 559 public void setPreferedWireFormatInfo(WireFormatInfo info) { 560 this.preferedWireFormatInfo = info; 561 } 562 public WireFormatInfo getPreferedWireFormatInfo() { 563 return preferedWireFormatInfo; 564 } 565 566 public void renegotiateWireFormat(WireFormatInfo info) throws IOException { 567 568 if( preferedWireFormatInfo==null ) 569 throw new IllegalStateException ("Wireformat cannot not be renegotiated."); 570 571 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()) ); 572 info.setVersion(this.getVersion()); 573 574 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled(); 575 info.setStackTraceEnabled(this.stackTraceEnabled); 576 577 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled(); 578 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled); 579 580 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled(); 581 info.setCacheEnabled(this.cacheEnabled); 582 583 this.tightEncodingEnabled = info.isTightEncodingEnabled() && preferedWireFormatInfo.isTightEncodingEnabled(); 584 info.setTightEncodingEnabled(this.tightEncodingEnabled); 585 586 this.sizePrefixDisabled = info.isSizePrefixDisabled() && preferedWireFormatInfo.isSizePrefixDisabled(); 587 info.setSizePrefixDisabled(this.sizePrefixDisabled); 588 589 if( cacheEnabled ) { 590 591 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize()); 592 info.setCacheSize(size); 593 594 if( size == 0 ) { 595 size = MARSHAL_CACHE_SIZE; 596 } 597 598 marshallCache = new DataStructure[size]; 599 unmarshallCache = new DataStructure[size]; 600 nextMarshallCacheIndex=0; 601 nextMarshallCacheEvictionIndex =0; 602 marshallCacheMap = new HashMap (); 603 } else { 604 marshallCache=null; 605 unmarshallCache=null; 606 nextMarshallCacheIndex=0; 607 nextMarshallCacheEvictionIndex=0; 608 marshallCacheMap = null; 609 } 610 611 } 612 613 protected int min(int version1, int version2) { 614 if (version1 < version2 && version1 > 0 || version2 <= 0) { 615 return version1; 616 } 617 return version2; 618 } 619 } 620 | Popular Tags |