1 18 package org.apache.activemq.broker.region; 19 20 import java.io.IOException ; 21 22 import org.apache.activemq.command.ConsumerId; 23 import org.apache.activemq.command.Message; 24 import org.apache.activemq.command.MessageId; 25 import org.apache.activemq.store.MessageStore; 26 27 34 public class IndirectMessageReference implements QueueMessageReference { 35 36 37 private final Destination regionDestination; 38 39 private final MessageStore destinationStore; 40 41 42 private final MessageId messageId; 43 44 private final boolean persistent; 45 private final String groupID; 46 private final int groupSequence; 47 private final ConsumerId targetConsumerId; 48 49 50 private short redeliveryCounter = 0; 51 52 private LockOwner lockOwner; 53 54 private boolean dropped; 55 56 private boolean acked; 57 58 private Message message; 59 60 private int referenceCount; 61 62 private int cachedSize = 0; 63 64 private long expiration; 65 66 public IndirectMessageReference(Queue destination, MessageStore destinationStore, Message message) { 67 this.regionDestination=destination; 68 this.destinationStore = destinationStore; 69 this.message = message; 70 this.messageId=message.getMessageId(); 71 this.persistent=message.isPersistent() && destination.getMessageStore()!=null; 72 this.groupID = message.getGroupID(); 73 this.groupSequence = message.getGroupSequence(); 74 this.targetConsumerId=message.getTargetConsumerId(); 75 this.expiration = message.getExpiration(); 76 77 this.referenceCount=1; 78 message.incrementReferenceCount(); 79 this.cachedSize = message != null ? message.getSize() : 0; 80 } 81 82 synchronized public Message getMessageHardRef() { 83 return message; 84 } 85 86 synchronized public int getReferenceCount() { 87 return referenceCount; 88 } 89 90 synchronized public int incrementReferenceCount() { 91 int rc = ++referenceCount; 92 if( persistent && rc==1 && message == null) { 93 94 try { 95 message = destinationStore.getMessage(messageId); 96 if( message == null ) { 97 dropped = true; 98 } else { 99 message.setRegionDestination(regionDestination); 100 message.incrementReferenceCount(); 101 } 102 } catch (IOException e) { 103 throw new RuntimeException (e); 104 } 105 } 106 return rc; 107 } 108 109 synchronized public int decrementReferenceCount() { 110 int rc = --referenceCount; 111 if( persistent && rc == 0 && message!=null) { 112 message.decrementReferenceCount(); 113 } 115 return rc; 116 } 117 118 119 synchronized public Message getMessage() { 120 return message; 121 } 122 123 public String toString() { 124 return "Message "+messageId+" dropped="+dropped+" locked="+(lockOwner!=null); 125 } 126 127 synchronized public void incrementRedeliveryCounter() { 128 this.redeliveryCounter++; 129 } 130 131 synchronized public boolean isDropped() { 132 return dropped; 133 } 134 135 synchronized public void drop() { 136 dropped=true; 137 lockOwner = null; 138 if( !persistent && message!=null ) { 139 message.decrementReferenceCount(); 140 message=null; 141 } 142 } 143 144 public boolean lock(LockOwner subscription) { 145 if( !regionDestination.lock(this, subscription) ) 146 return false; 147 synchronized (this) { 148 if( dropped || (lockOwner!=null && lockOwner!=subscription) ) 149 return false; 150 lockOwner = subscription; 151 return true; 152 } 153 } 154 155 synchronized public void unlock() { 156 lockOwner = null; 157 } 158 159 synchronized public LockOwner getLockOwner() { 160 return lockOwner; 161 } 162 163 synchronized public int getRedeliveryCounter() { 164 return redeliveryCounter; 165 } 166 167 public MessageId getMessageId() { 168 return messageId; 169 } 170 171 public Destination getRegionDestination() { 172 return regionDestination; 173 } 174 175 public boolean isPersistent() { 176 return persistent; 177 } 178 179 synchronized public boolean isLocked() { 180 return lockOwner!=null; 181 } 182 183 synchronized public boolean isAcked() { 184 return acked; 185 } 186 187 synchronized public void setAcked(boolean b) { 188 acked=b; 189 } 190 191 public String getGroupID() { 192 return groupID; 193 } 194 195 public int getGroupSequence() { 196 return groupSequence; 197 } 198 199 public ConsumerId getTargetConsumerId() { 200 return targetConsumerId; 201 } 202 203 public long getExpiration() { 204 return expiration; 205 } 206 207 public boolean isExpired() { 208 long expireTime = getExpiration(); 209 if (expireTime > 0 && System.currentTimeMillis() > expireTime) { 210 return true; 211 } 212 return false; 213 } 214 215 public int getSize(){ 216 Message msg = message; 217 if (msg != null){ 218 return msg.getSize(); 219 } 220 return cachedSize; 221 } 222 } 223 | Popular Tags |