KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > amq > AMQTopicMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.amq;
16
17 import java.io.IOException JavaDoc;
18 import java.io.InterruptedIOException JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.Iterator JavaDoc;
21 import org.apache.activemq.broker.ConnectionContext;
22 import org.apache.activemq.command.ActiveMQTopic;
23 import org.apache.activemq.command.JournalTopicAck;
24 import org.apache.activemq.command.MessageId;
25 import org.apache.activemq.command.SubscriptionInfo;
26 import org.apache.activemq.kaha.impl.async.Location;
27 import org.apache.activemq.store.MessageRecoveryListener;
28 import org.apache.activemq.store.TopicMessageStore;
29 import org.apache.activemq.store.TopicReferenceStore;
30 import org.apache.activemq.transaction.Synchronization;
31 import org.apache.activemq.util.Callback;
32 import org.apache.activemq.util.SubscriptionKey;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35
36 /**
37  * A MessageStore that uses a Journal to store it's messages.
38  *
39  * @version $Revision: 1.13 $
40  */

41 public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore{
42
43     private static final Log log=LogFactory.getLog(AMQTopicMessageStore.class);
44     private TopicReferenceStore topicReferenceStore;
45     private HashMap JavaDoc<SubscriptionKey,MessageId> ackedLastAckLocations=new HashMap JavaDoc<SubscriptionKey,MessageId>();
46
47     public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore,
48             ActiveMQTopic destinationName){
49         super(adapter,topicReferenceStore,destinationName);
50         this.topicReferenceStore=topicReferenceStore;
51     }
52
53     public void recoverSubscription(String JavaDoc clientId,String JavaDoc subscriptionName,MessageRecoveryListener listener)
54             throws Exception JavaDoc{
55         flush();
56         topicReferenceStore.recoverSubscription(clientId,subscriptionName,new RecoveryListenerAdapter(this,listener));
57     }
58
59     public void recoverNextMessages(String JavaDoc clientId,String JavaDoc subscriptionName,int maxReturned,
60             final MessageRecoveryListener listener) throws Exception JavaDoc{
61         RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
62         topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
63         if(recoveryListener.size()==0){
64             flush();
65             topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
66         }
67     }
68
69     public SubscriptionInfo lookupSubscription(String JavaDoc clientId,String JavaDoc subscriptionName) throws IOException JavaDoc{
70         return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
71     }
72
73     public void addSubsciption(String JavaDoc clientId,String JavaDoc subscriptionName,String JavaDoc selector,boolean retroactive)
74             throws IOException JavaDoc{
75         topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive);
76     }
77
78     /**
79      */

80     public void acknowledge(ConnectionContext context,String JavaDoc clientId,String JavaDoc subscriptionName,final MessageId messageId)
81             throws IOException JavaDoc{
82         final boolean debug=log.isDebugEnabled();
83         JournalTopicAck ack=new JournalTopicAck();
84         ack.setDestination(destination);
85         ack.setMessageId(messageId);
86         ack.setMessageSequenceId(messageId.getBrokerSequenceId());
87         ack.setSubscritionName(subscriptionName);
88         ack.setClientId(clientId);
89         ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
90         final Location location=peristenceAdapter.writeCommand(ack,false);
91         final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
92         if(!context.isInTransaction()){
93             if(debug)
94                 log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
95             acknowledge(messageId,location,key);
96         }else{
97             if(debug)
98                 log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
99             synchronized(this){
100                 inFlightTxLocations.add(location);
101             }
102             transactionStore.acknowledge(this,ack,location);
103             context.getTransaction().addSynchronization(new Synchronization(){
104
105                 public void afterCommit() throws Exception JavaDoc{
106                     if(debug)
107                         log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
108                     synchronized(AMQTopicMessageStore.this){
109                         inFlightTxLocations.remove(location);
110                         acknowledge(messageId,location,key);
111                     }
112                 }
113
114                 public void afterRollback() throws Exception JavaDoc{
115                     if(debug)
116                         log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
117                     synchronized(AMQTopicMessageStore.this){
118                         inFlightTxLocations.remove(location);
119                     }
120                 }
121             });
122         }
123     }
124
125     public boolean replayAcknowledge(ConnectionContext context,String JavaDoc clientId,String JavaDoc subscritionName,
126             MessageId messageId){
127         try{
128             SubscriptionInfo sub=topicReferenceStore.lookupSubscription(clientId,subscritionName);
129             if(sub!=null){
130                 topicReferenceStore.acknowledge(context,clientId,subscritionName,messageId);
131                 return true;
132             }
133         }catch(Throwable JavaDoc e){
134             log.debug("Could not replay acknowledge for message '"+messageId
135                     +"'. Message may have already been acknowledged. reason: "+e);
136         }
137         return false;
138     }
139
140     /**
141      * @param messageId
142      * @param location
143      * @param key
144      * @throws InterruptedIOException
145      */

146     private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException JavaDoc{
147         synchronized(this){
148             lastLocation=location;
149             ackedLastAckLocations.put(key,messageId);
150         }
151         try{
152             asyncWriteTask.wakeup();
153         }catch(InterruptedException JavaDoc e){
154             throw new InterruptedIOException JavaDoc();
155         }
156     }
157
158     @Override JavaDoc protected Location doAsyncWrite() throws IOException JavaDoc{
159         final HashMap JavaDoc<SubscriptionKey,MessageId> cpAckedLastAckLocations;
160         // swap out the hash maps..
161
synchronized(this){
162             cpAckedLastAckLocations=this.ackedLastAckLocations;
163             this.ackedLastAckLocations=new HashMap JavaDoc<SubscriptionKey,MessageId>();
164         }
165         Location location=super.doAsyncWrite();
166
167         if (cpAckedLastAckLocations != null) {
168             transactionTemplate.run(new Callback() {
169                 public void execute() throws Exception JavaDoc {
170                     // Checkpoint the acknowledged messages.
171
Iterator JavaDoc<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
172                     while (iterator.hasNext()) {
173                         SubscriptionKey subscriptionKey = iterator.next();
174                         MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
175                         topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
176                                 subscriptionKey.subscriptionName, identity);
177                     }
178                 }
179             });
180         }
181         return location;
182     }
183
184     /**
185      * @return Returns the longTermStore.
186      */

187     public TopicReferenceStore getTopicReferenceStore(){
188         return topicReferenceStore;
189     }
190
191     public void deleteSubscription(String JavaDoc clientId,String JavaDoc subscriptionName) throws IOException JavaDoc{
192         topicReferenceStore.deleteSubscription(clientId,subscriptionName);
193     }
194
195     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc{
196         return topicReferenceStore.getAllSubscriptions();
197     }
198
199     public int getMessageCount(String JavaDoc clientId,String JavaDoc subscriberName) throws IOException JavaDoc{
200         flush();
201         return topicReferenceStore.getMessageCount(clientId,subscriberName);
202     }
203
204     public void resetBatching(String JavaDoc clientId,String JavaDoc subscriptionName){
205         topicReferenceStore.resetBatching(clientId,subscriptionName);
206     }
207 }
208
Popular Tags