KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > region > QueueSubscription


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.region;
19
20 import java.io.IOException JavaDoc;
21 import javax.jms.InvalidSelectorException JavaDoc;
22 import javax.jms.JMSException JavaDoc;
23 import org.apache.activemq.broker.Broker;
24 import org.apache.activemq.broker.ConnectionContext;
25 import org.apache.activemq.broker.region.group.MessageGroupMap;
26 import org.apache.activemq.command.ActiveMQMessage;
27 import org.apache.activemq.command.ConsumerId;
28 import org.apache.activemq.command.ConsumerInfo;
29 import org.apache.activemq.command.Message;
30 import org.apache.activemq.command.MessageAck;
31 import org.apache.activemq.transaction.Synchronization;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35 public class QueueSubscription extends PrefetchSubscription implements LockOwner {
36     
37     private static final Log log = LogFactory.getLog(QueueSubscription.class);
38     
39     public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException JavaDoc {
40         super(broker,context, info);
41     }
42         
43     /**
44      * In the queue case, mark the node as dropped and then a gc cycle will remove it from
45      * the queue.
46      * @throws IOException
47      */

48     protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException JavaDoc {
49         
50
51
52         final Destination q = n.getRegionDestination();
53         q.acknowledge(context, this, ack, n);
54
55         final QueueMessageReference node = (QueueMessageReference) n;
56         final Queue queue = (Queue)q;
57         if( !ack.isInTransaction() ) {
58             node.drop();
59             queue.dropEvent();
60         } else {
61             node.setAcked(true);
62             context.getTransaction().addSynchronization(new Synchronization(){
63                 public void afterCommit() throws Exception JavaDoc {
64                     node.drop();
65                     queue.dropEvent();
66                 }
67                 public void afterRollback() throws Exception JavaDoc {
68                     node.setAcked(false);
69                 }
70             });
71         }
72     }
73     
74     protected boolean canDispatch(MessageReference n) throws IOException JavaDoc {
75         QueueMessageReference node = (QueueMessageReference) n;
76         if( node.isAcked())
77             return false;
78         // Keep message groups together.
79
String JavaDoc groupId = node.getGroupID();
80         int sequence = node.getGroupSequence();
81         if( groupId!=null ) {
82             MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
83             
84             // If we can own the first, then no-one else should own the rest.
85
if( sequence == 1 ) {
86                 if( node.lock(this) ) {
87                     assignGroupToMe(messageGroupOwners, n, groupId);
88                     return true;
89                 } else {
90                     return false;
91                 }
92             }
93             
94             // Make sure that the previous owner is still valid, we may
95
// need to become the new owner.
96
ConsumerId groupOwner;
97             synchronized(node) {
98                 groupOwner = messageGroupOwners.get(groupId);
99                 if( groupOwner==null ) {
100                     if( node.lock(this) ) {
101                         assignGroupToMe(messageGroupOwners, n, groupId);
102                         return true;
103                     } else {
104                         return false;
105                     }
106                 }
107             }
108             
109             if( groupOwner.equals(info.getConsumerId()) ) {
110                 // A group sequence < 1 is an end of group signal.
111
if ( sequence < 0 ) {
112                     messageGroupOwners.removeGroup(groupId);
113                 }
114                 return true;
115             }
116             
117             return false;
118             
119         } else {
120             return node.lock(this);
121         }
122     }
123
124     /**
125      * Assigns the message group to this subscription and set the flag on the message that it is the first message
126      * to be dispatched.
127      */

128     protected void assignGroupToMe(MessageGroupMap messageGroupOwners, MessageReference n, String JavaDoc groupId) throws IOException JavaDoc {
129         messageGroupOwners.put(groupId, info.getConsumerId());
130         Message message = n.getMessage();
131         if (message instanceof ActiveMQMessage) {
132             ActiveMQMessage activeMessage = (ActiveMQMessage) message;
133             try {
134                 activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
135             }
136             catch (JMSException JavaDoc e) {
137                 log.warn("Failed to set boolean header: " + e, e);
138             }
139         }
140     }
141     
142     public String JavaDoc toString() {
143         return
144             "QueueSubscription:" +
145             " consumer="+info.getConsumerId()+
146             ", destinations="+destinations.size()+
147             ", dispatched="+dispatched.size()+
148             ", delivered="+this.prefetchExtension+
149             ", pending="+getPendingQueueSize();
150     }
151
152     public int getLockPriority() {
153         return info.getPriority();
154     }
155
156     public boolean isLockExclusive() {
157         return info.isExclusive();
158     }
159
160     /**
161      * Override so that the message ref count is > 0 only when the message is being dispatched
162      * to a client. Keeping it at 0 when it is in the pending list allows the message to be swapped out
163      * to disk.
164      *
165      * @return true if the message was dispatched.
166      */

167     protected boolean dispatch(MessageReference node) throws IOException JavaDoc {
168         boolean rc = false;
169         // This brings the message into memory if it was swapped out.
170
node.incrementReferenceCount();
171         try {
172             rc = super.dispatch(node);
173         } finally {
174             // If the message was dispatched, it could be getting dispatched async, so we
175
// can only drop the reference count when that completes @see onDispatch
176
if( !rc ) {
177                 node.decrementReferenceCount();
178             }
179         }
180         return rc;
181     }
182
183     /**
184      * OK Message was transmitted, we can now drop the reference count.
185      *
186      * @see org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.command.Message)
187      */

188     protected void onDispatch(MessageReference node, Message message) {
189         // Now that the message has been sent over the wire to the client,
190
// we can let it get swapped out.
191
node.decrementReferenceCount();
192         super.onDispatch(node, message);
193     }
194     
195     /**
196      * Sending a message to the DQL will require us to increment the ref count so we can get at the content.
197      */

198     protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException JavaDoc, Exception JavaDoc {
199         // This brings the message into memory if it was swapped out.
200
node.incrementReferenceCount();
201         try{
202             super.sendToDLQ(context, node);
203         } finally {
204             // This let's the message be swapped out of needed.
205
node.decrementReferenceCount();
206         }
207     }
208     
209     /**
210      */

211     public void destroy() {
212     }
213
214 }
215
Popular Tags