KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > IndirectMessageReference


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.io.IOException JavaDoc;
21
22 import org.apache.activemq.command.ConsumerId;
23 import org.apache.activemq.command.Message;
24 import org.apache.activemq.command.MessageId;
25 import org.apache.activemq.store.MessageStore;
26
27 /**
28  * Keeps track of a message that is flowing through the Broker. This
29  * object may hold a hard reference to the message or only hold the
30  * id of the message if the message has been persisted on in a MessageStore.
31  *
32  * @version $Revision: 1.15 $
33  */

34 public class IndirectMessageReference implements QueueMessageReference {
35
36     /** The destination that is managing the message */
37     private final Destination regionDestination;
38     
39     private final MessageStore destinationStore;
40     
41     /** The id of the message is always valid */
42     private final MessageId messageId;
43     /** Is the message persistent? */
44     private final boolean persistent;
45     private final String JavaDoc groupID;
46     private final int groupSequence;
47     private final ConsumerId targetConsumerId;
48     
49     /** The number of times the message has been delivered.*/
50     private short redeliveryCounter = 0;
51     /** The subscription that has locked the message */
52     private LockOwner lockOwner;
53     /** Has the message been dropped? */
54     private boolean dropped;
55     /** Has the message been acked? */
56     private boolean acked;
57     /** Direct reference to the message */
58     private Message message;
59     /** The number of times the message has requested being hardened */
60     private int referenceCount;
61     /** the size of the message **/
62     private int cachedSize = 0;
63     /** the expiration time of the message */
64     private long expiration;
65     
66     public IndirectMessageReference(Queue destination, MessageStore destinationStore, Message message) {
67         this.regionDestination=destination;
68         this.destinationStore = destinationStore;
69         this.message = message;
70         this.messageId=message.getMessageId();
71         this.persistent=message.isPersistent() && destination.getMessageStore()!=null;
72         this.groupID = message.getGroupID();
73         this.groupSequence = message.getGroupSequence();
74         this.targetConsumerId=message.getTargetConsumerId();
75         this.expiration = message.getExpiration();
76         
77         this.referenceCount=1;
78         message.incrementReferenceCount();
79         this.cachedSize = message != null ? message.getSize() : 0;
80     }
81     
82     synchronized public Message getMessageHardRef() {
83         return message;
84     }
85     
86     synchronized public int getReferenceCount() {
87         return referenceCount;
88     }
89     
90     synchronized public int incrementReferenceCount() {
91         int rc = ++referenceCount;
92         if( persistent && rc==1 && message == null) {
93                  
94             try {
95                 message = destinationStore.getMessage(messageId);
96                 if( message == null ) {
97                     dropped = true;
98                 } else {
99                     message.setRegionDestination(regionDestination);
100                     message.incrementReferenceCount();
101                 }
102             } catch (IOException JavaDoc e) {
103                 throw new RuntimeException JavaDoc(e);
104             }
105         }
106         return rc;
107     }
108     
109     synchronized public int decrementReferenceCount() {
110         int rc = --referenceCount;
111         if( persistent && rc == 0 && message!=null) {
112             message.decrementReferenceCount();
113             //message=null;
114
}
115         return rc;
116     }
117
118
119     synchronized public Message getMessage() {
120         return message;
121     }
122
123     public String JavaDoc toString() {
124         return "Message "+messageId+" dropped="+dropped+" locked="+(lockOwner!=null);
125     }
126     
127     synchronized public void incrementRedeliveryCounter() {
128         this.redeliveryCounter++;
129     }
130
131     synchronized public boolean isDropped() {
132         return dropped;
133     }
134     
135     synchronized public void drop() {
136         dropped=true;
137         lockOwner = null;
138         if( !persistent && message!=null ) {
139             message.decrementReferenceCount();
140             message=null;
141         }
142     }
143
144     public boolean lock(LockOwner subscription) {
145         if( !regionDestination.lock(this, subscription) )
146             return false;
147         synchronized (this) {
148             if( dropped || (lockOwner!=null && lockOwner!=subscription) )
149                 return false;
150             lockOwner = subscription;
151             return true;
152         }
153     }
154     
155     synchronized public void unlock() {
156         lockOwner = null;
157     }
158     
159     synchronized public LockOwner getLockOwner() {
160         return lockOwner;
161     }
162
163     synchronized public int getRedeliveryCounter() {
164         return redeliveryCounter;
165     }
166
167     public MessageId getMessageId() {
168         return messageId;
169     }
170
171     public Destination getRegionDestination() {
172         return regionDestination;
173     }
174
175     public boolean isPersistent() {
176         return persistent;
177     }
178     
179     synchronized public boolean isLocked() {
180         return lockOwner!=null;
181     }
182     
183     synchronized public boolean isAcked() {
184         return acked;
185     }
186
187     synchronized public void setAcked(boolean b) {
188         acked=b;
189     }
190
191     public String JavaDoc getGroupID() {
192         return groupID;
193     }
194
195     public int getGroupSequence() {
196         return groupSequence;
197     }
198
199     public ConsumerId getTargetConsumerId() {
200         return targetConsumerId;
201     }
202
203     public long getExpiration() {
204         return expiration;
205     }
206
207     public boolean isExpired() {
208         long expireTime = getExpiration();
209         if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
210             return true;
211         }
212         return false;
213     }
214
215     public int getSize(){
216        Message msg = message;
217        if (msg != null){
218            return msg.getSize();
219        }
220        return cachedSize;
221     }
222 }
223
Popular Tags