KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > journal > JournalTopicMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.journal;
19
20 import java.io.IOException JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Iterator JavaDoc;
23
24 import org.apache.activeio.journal.RecordLocation;
25 import org.apache.activemq.broker.ConnectionContext;
26 import org.apache.activemq.command.ActiveMQTopic;
27 import org.apache.activemq.command.JournalTopicAck;
28 import org.apache.activemq.command.Message;
29 import org.apache.activemq.command.MessageId;
30 import org.apache.activemq.command.SubscriptionInfo;
31 import org.apache.activemq.store.MessageRecoveryListener;
32 import org.apache.activemq.store.TopicMessageStore;
33 import org.apache.activemq.transaction.Synchronization;
34 import org.apache.activemq.util.Callback;
35 import org.apache.activemq.util.SubscriptionKey;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39 /**
40  * A MessageStore that uses a Journal to store it's messages.
41  *
42  * @version $Revision: 1.13 $
43  */

44 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
45     
46     private static final Log log = LogFactory.getLog(JournalTopicMessageStore.class);
47
48     private TopicMessageStore longTermStore;
49     private HashMap JavaDoc ackedLastAckLocations = new HashMap JavaDoc();
50     
51     public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, ActiveMQTopic destinationName) {
52         super(adapter, checkpointStore, destinationName);
53         this.longTermStore = checkpointStore;
54     }
55     
56     public void recoverSubscription(String JavaDoc clientId, String JavaDoc subscriptionName, MessageRecoveryListener listener) throws Exception JavaDoc {
57         this.peristenceAdapter.checkpoint(true, true);
58         longTermStore.recoverSubscription(clientId, subscriptionName, listener);
59     }
60     
61     public void recoverNextMessages(String JavaDoc clientId,String JavaDoc subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception JavaDoc{
62         this.peristenceAdapter.checkpoint(true, true);
63         longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned,listener);
64         
65     }
66
67     public SubscriptionInfo lookupSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
68         return longTermStore.lookupSubscription(clientId, subscriptionName);
69     }
70
71     public void addSubsciption(String JavaDoc clientId, String JavaDoc subscriptionName, String JavaDoc selector, boolean retroactive) throws IOException JavaDoc {
72         this.peristenceAdapter.checkpoint(true, true);
73         longTermStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
74     }
75
76     public void addMessage(ConnectionContext context, Message message) throws IOException JavaDoc {
77         super.addMessage(context, message);
78     }
79     
80     /**
81      */

82     public void acknowledge(ConnectionContext context, String JavaDoc clientId, String JavaDoc subscriptionName, final MessageId messageId) throws IOException JavaDoc {
83         final boolean debug = log.isDebugEnabled();
84         
85         JournalTopicAck ack = new JournalTopicAck();
86         ack.setDestination(destination);
87         ack.setMessageId(messageId);
88         ack.setMessageSequenceId(messageId.getBrokerSequenceId());
89         ack.setSubscritionName(subscriptionName);
90         ack.setClientId(clientId);
91         ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
92         final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
93         
94         final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
95         if( !context.isInTransaction() ) {
96             if( debug )
97                 log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
98             acknowledge(messageId, location, key);
99         } else {
100             if( debug )
101                 log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
102             synchronized (this) {
103                 inFlightTxLocations.add(location);
104             }
105             transactionStore.acknowledge(this, ack, location);
106             context.getTransaction().addSynchronization(new Synchronization(){
107                 public void afterCommit() throws Exception JavaDoc {
108                     if( debug )
109                         log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
110                     synchronized (JournalTopicMessageStore.this) {
111                         inFlightTxLocations.remove(location);
112                         acknowledge(messageId, location, key);
113                     }
114                 }
115                 public void afterRollback() throws Exception JavaDoc {
116                     if( debug )
117                         log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
118                     synchronized (JournalTopicMessageStore.this) {
119                         inFlightTxLocations.remove(location);
120                     }
121                 }
122             });
123         }
124         
125     }
126     
127     public void replayAcknowledge(ConnectionContext context, String JavaDoc clientId, String JavaDoc subscritionName, MessageId messageId) {
128         try {
129             SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
130             if( sub != null ) {
131                 longTermStore.acknowledge(context, clientId, subscritionName, messageId);
132             }
133         }
134         catch (Throwable JavaDoc e) {
135             log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
136         }
137     }
138         
139
140     /**
141      * @param messageId
142      * @param location
143      * @param key
144      */

145     private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
146         synchronized(this) {
147             lastLocation = location;
148             ackedLastAckLocations.put(key, messageId);
149         }
150     }
151     
152     public RecordLocation checkpoint() throws IOException JavaDoc {
153         
154         final HashMap JavaDoc cpAckedLastAckLocations;
155
156         // swap out the hash maps..
157
synchronized (this) {
158             cpAckedLastAckLocations = this.ackedLastAckLocations;
159             this.ackedLastAckLocations = new HashMap JavaDoc();
160         }
161
162         return super.checkpoint( new Callback() {
163             public void execute() throws Exception JavaDoc {
164
165                 // Checkpoint the acknowledged messages.
166
Iterator JavaDoc iterator = cpAckedLastAckLocations.keySet().iterator();
167                 while (iterator.hasNext()) {
168                     SubscriptionKey subscriptionKey = (SubscriptionKey) iterator.next();
169                     MessageId identity = (MessageId) cpAckedLastAckLocations.get(subscriptionKey);
170                     longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
171                 }
172
173             }
174         });
175
176     }
177
178     /**
179      * @return Returns the longTermStore.
180      */

181     public TopicMessageStore getLongTermTopicMessageStore() {
182         return longTermStore;
183     }
184
185     public void deleteSubscription(String JavaDoc clientId, String JavaDoc subscriptionName) throws IOException JavaDoc {
186         longTermStore.deleteSubscription(clientId, subscriptionName);
187     }
188     
189     public SubscriptionInfo[] getAllSubscriptions() throws IOException JavaDoc {
190         return longTermStore.getAllSubscriptions();
191     }
192
193     
194     public int getMessageCount(String JavaDoc clientId,String JavaDoc subscriberName) throws IOException JavaDoc{
195         this.peristenceAdapter.checkpoint(true, true);
196         return longTermStore.getMessageCount(clientId,subscriberName);
197     }
198     
199     public void resetBatching(String JavaDoc clientId,String JavaDoc subscriptionName) {
200         longTermStore.resetBatching(clientId,subscriptionName);
201     }
202
203     
204
205 }
206
Popular Tags