1 18 package org.apache.activemq.command; 19 20 import java.io.DataInputStream ; 21 import java.io.DataOutputStream ; 22 import java.io.IOException ; 23 import java.util.Collections ; 24 import java.util.HashMap ; 25 import java.util.Map ; 26 27 import org.apache.activemq.ActiveMQConnection; 28 import org.apache.activemq.advisory.AdvisorySupport; 29 import org.apache.activemq.broker.region.MessageReference; 30 import org.apache.activemq.util.ByteArrayInputStream; 31 import org.apache.activemq.util.ByteArrayOutputStream; 32 import org.apache.activemq.util.ByteSequence; 33 import org.apache.activemq.util.MarshallingSupport; 34 import org.apache.activemq.wireformat.WireFormat; 35 36 42 abstract public class Message extends BaseCommand implements MarshallAware, MessageReference { 43 44 public static final int AVERAGE_MESSAGE_SIZE_OVERHEAD = 500; 45 46 protected MessageId messageId; 47 protected ActiveMQDestination originalDestination; 48 protected TransactionId originalTransactionId; 49 50 protected ProducerId producerId; 51 protected ActiveMQDestination destination; 52 protected TransactionId transactionId; 53 54 protected long expiration; 55 protected long timestamp; 56 protected long arrival; 57 protected String correlationId; 58 protected ActiveMQDestination replyTo; 59 protected boolean persistent; 60 protected String type; 61 protected byte priority; 62 protected String groupID; 63 protected int groupSequence; 64 protected ConsumerId targetConsumerId; 65 protected boolean compressed = false; 66 protected String userID; 67 68 protected ByteSequence content; 69 protected ByteSequence marshalledProperties; 70 protected DataStructure dataStructure; 71 protected int redeliveryCounter; 72 73 protected int size; 74 protected Map properties; 75 protected boolean readOnlyProperties = false; 76 protected boolean readOnlyBody = false; 77 protected transient boolean recievedByDFBridge = false; 78 79 private transient short referenceCount; 80 private transient ActiveMQConnection connection; 81 private transient org.apache.activemq.broker.region.Destination regionDestination; 82 83 private BrokerId [] brokerPath; 84 protected boolean droppable = false; 85 86 abstract public Message copy(); 87 88 protected void copy(Message copy) { 89 super.copy(copy); 90 copy.producerId = producerId; 91 copy.transactionId = transactionId; 92 copy.destination = destination; 93 copy.messageId = messageId != null ? messageId.copy() : null; 94 copy.originalDestination = originalDestination; 95 copy.originalTransactionId = originalTransactionId; 96 copy.expiration = expiration; 97 copy.timestamp = timestamp; 98 copy.correlationId = correlationId; 99 copy.replyTo = replyTo; 100 copy.persistent = persistent; 101 copy.redeliveryCounter = redeliveryCounter; 102 copy.type = type; 103 copy.priority = priority; 104 copy.size = size; 105 copy.groupID = groupID; 106 copy.userID = userID; 107 copy.groupSequence = groupSequence; 108 109 if( properties!=null ) 110 copy.properties = new HashMap (properties); 111 else 112 copy.properties = properties; 113 114 copy.content = content; 115 copy.marshalledProperties = marshalledProperties; 116 copy.dataStructure = dataStructure; 117 copy.readOnlyProperties = readOnlyProperties; 118 copy.readOnlyBody = readOnlyBody; 119 copy.compressed = compressed; 120 copy.recievedByDFBridge = recievedByDFBridge; 121 122 copy.arrival = arrival; 123 copy.connection = connection; 124 copy.regionDestination = regionDestination; 125 129 } 133 134 public Object getProperty(String name) throws IOException { 135 if( properties == null ) { 136 if( marshalledProperties ==null ) 137 return null; 138 properties = unmarsallProperties(marshalledProperties); 139 } 140 return properties.get(name); 141 } 142 143 public Map getProperties() throws IOException { 144 if( properties == null ) { 145 if( marshalledProperties==null ) 146 return Collections.EMPTY_MAP; 147 properties = unmarsallProperties(marshalledProperties); 148 } 149 return Collections.unmodifiableMap(properties); 150 } 151 152 public void clearProperties() { 153 marshalledProperties = null; 154 properties=null; 155 } 156 157 158 public void setProperty(String name, Object value) throws IOException { 159 lazyCreateProperties(); 160 properties.put(name, value); 161 } 162 163 protected void lazyCreateProperties() throws IOException { 164 if( properties == null ) { 165 if( marshalledProperties == null ) { 166 properties = new HashMap (); 167 } else { 168 properties = unmarsallProperties(marshalledProperties); 169 marshalledProperties = null; 170 } 171 } 172 } 173 174 private Map unmarsallProperties(ByteSequence marshalledProperties) throws IOException { 175 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream (new ByteArrayInputStream(marshalledProperties))); 176 } 177 178 public void beforeMarshall(WireFormat wireFormat) throws IOException { 179 if( marshalledProperties==null && properties!=null ) { 181 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 182 DataOutputStream os = new DataOutputStream (baos); 183 MarshallingSupport.marshalPrimitiveMap(properties, os); 184 os.close(); 185 marshalledProperties = baos.toByteSequence(); 186 } 187 } 188 189 public void afterMarshall(WireFormat wireFormat) throws IOException { 190 } 191 192 public void beforeUnmarshall(WireFormat wireFormat) throws IOException { 193 } 194 195 public void afterUnmarshall(WireFormat wireFormat) throws IOException { 196 } 197 198 199 205 208 public ProducerId getProducerId() { 209 return producerId; 210 } 211 public void setProducerId(ProducerId producerId) { 212 this.producerId = producerId; 213 } 214 215 218 public ActiveMQDestination getDestination() { 219 return destination; 220 } 221 public void setDestination(ActiveMQDestination destination) { 222 this.destination = destination; 223 } 224 225 228 public TransactionId getTransactionId() { 229 return transactionId; 230 } 231 public void setTransactionId(TransactionId transactionId) { 232 this.transactionId = transactionId; 233 } 234 235 public boolean isInTransaction() { 236 return transactionId!=null; 237 } 238 239 240 243 public ActiveMQDestination getOriginalDestination() { 244 return originalDestination; 245 } 246 public void setOriginalDestination(ActiveMQDestination destination) { 247 this.originalDestination = destination; 248 } 249 250 253 public MessageId getMessageId() { 254 return messageId; 255 } 256 257 public void setMessageId(MessageId messageId) { 258 this.messageId = messageId; 259 } 260 261 264 public TransactionId getOriginalTransactionId() { 265 return originalTransactionId; 266 } 267 public void setOriginalTransactionId(TransactionId transactionId) { 268 this.originalTransactionId = transactionId; 269 } 270 271 274 public String getGroupID() { 275 return groupID; 276 } 277 public void setGroupID(String groupID) { 278 this.groupID = groupID; 279 } 280 281 284 public int getGroupSequence() { 285 return groupSequence; 286 } 287 public void setGroupSequence(int groupSequence) { 288 this.groupSequence = groupSequence; 289 } 290 291 294 public String getCorrelationId() { 295 return correlationId; 296 } 297 public void setCorrelationId(String correlationId) { 298 this.correlationId = correlationId; 299 } 300 301 304 public boolean isPersistent() { 305 return persistent; 306 } 307 public void setPersistent(boolean deliveryMode) { 308 this.persistent = deliveryMode; 309 } 310 311 314 public long getExpiration() { 315 return expiration; 316 } 317 public void setExpiration(long expiration) { 318 this.expiration = expiration; 319 } 320 321 324 public byte getPriority() { 325 return priority; 326 } 327 public void setPriority(byte priority) { 328 this.priority = priority; 329 } 330 331 334 public ActiveMQDestination getReplyTo() { 335 return replyTo; 336 } 337 public void setReplyTo(ActiveMQDestination replyTo) { 338 this.replyTo = replyTo; 339 } 340 341 344 public long getTimestamp() { 345 return timestamp; 346 } 347 public void setTimestamp(long timestamp) { 348 this.timestamp = timestamp; 349 } 350 351 354 public String getType() { 355 return type; 356 } 357 public void setType(String type) { 358 this.type = type; 359 } 360 361 364 public ByteSequence getContent() { 365 return content; 366 } 367 public void setContent(ByteSequence content) { 368 this.content = content; 369 } 370 371 374 public ByteSequence getMarshalledProperties() { 375 return marshalledProperties; 376 } 377 public void setMarshalledProperties(ByteSequence marshalledProperties) { 378 this.marshalledProperties = marshalledProperties; 379 } 380 381 384 public DataStructure getDataStructure() { 385 return dataStructure; 386 } 387 public void setDataStructure(DataStructure data) { 388 this.dataStructure = data; 389 } 390 391 400 public ConsumerId getTargetConsumerId() { 401 return targetConsumerId; 402 } 403 public void setTargetConsumerId(ConsumerId targetConsumerId) { 404 this.targetConsumerId = targetConsumerId; 405 } 406 407 public boolean isExpired() { 408 long expireTime = getExpiration(); 409 if (expireTime > 0 && System.currentTimeMillis() > expireTime) { 410 return true; 411 } 412 return false; 413 } 414 415 public boolean isAdvisory(){ 416 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE); 417 } 418 419 422 public boolean isCompressed() { 423 return compressed; 424 } 425 public void setCompressed(boolean compressed) { 426 this.compressed = compressed; 427 } 428 429 public boolean isRedelivered() { 430 return redeliveryCounter>0; 431 } 432 433 public void setRedelivered(boolean redelivered) { 434 if( redelivered ) { 435 if( !isRedelivered() ) { 436 setRedeliveryCounter(1); 437 } 438 } else { 439 if( isRedelivered() ) { 440 setRedeliveryCounter(0); 441 } 442 } 443 } 444 445 public void incrementRedeliveryCounter() { 446 redeliveryCounter++; 447 } 448 449 452 public int getRedeliveryCounter() { 453 return redeliveryCounter; 454 } 455 public void setRedeliveryCounter(int deliveryCounter) { 456 this.redeliveryCounter = deliveryCounter; 457 } 458 459 464 public BrokerId[] getBrokerPath() { 465 return brokerPath; 466 } 467 public void setBrokerPath(BrokerId[] brokerPath) { 468 this.brokerPath = brokerPath; 469 } 470 471 public boolean isReadOnlyProperties() { 472 return readOnlyProperties; 473 } 474 public void setReadOnlyProperties(boolean readOnlyProperties) { 475 this.readOnlyProperties = readOnlyProperties; 476 } 477 public boolean isReadOnlyBody() { 478 return readOnlyBody; 479 } 480 public void setReadOnlyBody(boolean readOnlyBody) { 481 this.readOnlyBody = readOnlyBody; 482 } 483 484 public ActiveMQConnection getConnection() { 485 return this.connection; 486 } 487 public void setConnection(ActiveMQConnection connection) { 488 this.connection = connection; 489 } 490 491 497 public long getArrival() { 498 return arrival; 499 } 500 public void setArrival(long arrival) { 501 this.arrival = arrival; 502 } 503 504 505 512 public String getUserID() { 513 return userID; 514 } 515 516 public void setUserID(String jmsxUserID) { 517 this.userID = jmsxUserID; 518 } 519 520 public int getReferenceCount() { 521 return referenceCount; 522 } 523 524 public Message getMessageHardRef() { 525 return this; 526 } 527 528 public Message getMessage() throws IOException { 529 return this; 530 } 531 532 public org.apache.activemq.broker.region.Destination getRegionDestination() { 533 return regionDestination; 534 } 535 536 public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) { 537 this.regionDestination = destination; 538 } 539 540 public boolean isMarshallAware() { 541 return true; 542 } 543 544 public int incrementReferenceCount() { 545 int rc; 546 int size; 547 synchronized (this) { 548 rc = ++referenceCount; 549 size = getSize(); 550 } 551 552 if( rc==1 && regionDestination!=null ) 553 regionDestination.getUsageManager().increaseUsage(size); 554 555 return rc; 557 } 558 559 synchronized public int decrementReferenceCount() { 560 int rc; 561 int size; 562 synchronized (this) { 563 rc = --referenceCount; 564 size = getSize(); 565 } 566 567 if( rc==0 && regionDestination!=null ) 568 regionDestination.getUsageManager().decreaseUsage(size); 569 570 572 return rc; 573 } 574 575 public int getSize() { 576 if( size <=AVERAGE_MESSAGE_SIZE_OVERHEAD ) { 577 size = AVERAGE_MESSAGE_SIZE_OVERHEAD; 578 if( marshalledProperties!=null ) 579 size += marshalledProperties.getLength(); 580 if( content!=null ) 581 size += content.getLength(); 582 } 583 return size; 584 } 585 586 590 public boolean isRecievedByDFBridge(){ 591 return recievedByDFBridge; 592 } 593 594 597 public void setRecievedByDFBridge(boolean recievedByDFBridge){ 598 this.recievedByDFBridge=recievedByDFBridge; 599 } 600 601 public void onMessageRolledBack() { 602 incrementRedeliveryCounter(); 603 } 604 605 608 public boolean isDroppable() { 609 return droppable; 610 } 611 public void setDroppable(boolean droppable) { 612 this.droppable = droppable; 613 } 614 } 615 | Popular Tags |