KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > journal > JournalMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.journal;
19
20 import java.io.IOException JavaDoc;
21 import java.util.ArrayList JavaDoc;
22 import java.util.Collections JavaDoc;
23 import java.util.HashSet JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.LinkedHashMap JavaDoc;
26
27 import org.apache.activeio.journal.RecordLocation;
28 import org.apache.activemq.broker.ConnectionContext;
29 import org.apache.activemq.command.ActiveMQDestination;
30 import org.apache.activemq.command.JournalQueueAck;
31 import org.apache.activemq.command.Message;
32 import org.apache.activemq.command.MessageAck;
33 import org.apache.activemq.command.MessageId;
34 import org.apache.activemq.memory.UsageManager;
35 import org.apache.activemq.store.MessageRecoveryListener;
36 import org.apache.activemq.store.MessageStore;
37 import org.apache.activemq.store.PersistenceAdapter;
38 import org.apache.activemq.transaction.Synchronization;
39 import org.apache.activemq.util.Callback;
40 import org.apache.activemq.util.TransactionTemplate;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43
44 /**
45  * A MessageStore that uses a Journal to store it's messages.
46  *
47  * @version $Revision: 1.14 $
48  */

49 public class JournalMessageStore implements MessageStore {
50
51     private static final Log log = LogFactory.getLog(JournalMessageStore.class);
52
53     protected final JournalPersistenceAdapter peristenceAdapter;
54     protected final JournalTransactionStore transactionStore;
55     protected final MessageStore longTermStore;
56     protected final ActiveMQDestination destination;
57     protected final TransactionTemplate transactionTemplate;
58
59     private LinkedHashMap JavaDoc messages = new LinkedHashMap JavaDoc();
60     private ArrayList JavaDoc messageAcks = new ArrayList JavaDoc();
61
62     /** A MessageStore that we can use to retrieve messages quickly. */
63     private LinkedHashMap JavaDoc cpAddedMessageIds;
64     
65     protected RecordLocation lastLocation;
66     protected HashSet JavaDoc inFlightTxLocations = new HashSet JavaDoc();
67
68     private UsageManager usageManager;
69     
70     public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
71         this.peristenceAdapter = adapter;
72         this.transactionStore = adapter.getTransactionStore();
73         this.longTermStore = checkpointStore;
74         this.destination = destination;
75         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
76     }
77     
78     public void setUsageManager(UsageManager usageManager) {
79         this.usageManager = usageManager;
80         longTermStore.setUsageManager(usageManager);
81     }
82
83
84     /**
85      * Not synchronized since the Journal has better throughput if you increase
86      * the number of concurrent writes that it is doing.
87      */

88     public void addMessage(ConnectionContext context, final Message message) throws IOException JavaDoc {
89         
90         final MessageId id = message.getMessageId();
91         
92         final boolean debug = log.isDebugEnabled();
93         message.incrementReferenceCount();
94         
95         final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
96         if( !context.isInTransaction() ) {
97             if( debug )
98                 log.debug("Journalled message add for: "+id+", at: "+location);
99             addMessage(message, location);
100         } else {
101             if( debug )
102                 log.debug("Journalled transacted message add for: "+id+", at: "+location);
103             synchronized( this ) {
104                 inFlightTxLocations.add(location);
105             }
106             transactionStore.addMessage(this, message, location);
107             context.getTransaction().addSynchronization(new Synchronization(){
108                 public void afterCommit() throws Exception JavaDoc {
109                     if( debug )
110                         log.debug("Transacted message add commit for: "+id+", at: "+location);
111                     synchronized( JournalMessageStore.this ) {
112                         inFlightTxLocations.remove(location);
113                         addMessage(message, location);
114                     }
115                 }
116                 public void afterRollback() throws Exception JavaDoc {
117                     if( debug )
118                         log.debug("Transacted message add rollback for: "+id+", at: "+location);
119                     synchronized( JournalMessageStore.this ) {
120                         inFlightTxLocations.remove(location);
121                     }
122                     message.decrementReferenceCount();
123                 }
124             });
125         }
126     }
127
128     private void addMessage(final Message message, final RecordLocation location) {
129         synchronized (this) {
130             lastLocation = location;
131             MessageId id = message.getMessageId();
132             messages.put(id, message);
133         }
134     }
135     
136     public void replayAddMessage(ConnectionContext context, Message message) {
137         try {
138             // Only add the message if it has not already been added.
139
Message t = longTermStore.getMessage(message.getMessageId());
140             if( t==null ) {
141                 longTermStore.addMessage(context, message);
142             }
143         }
144         catch (Throwable JavaDoc e) {
145             log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
146         }
147     }
148
149     /**
150      */

151     public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException JavaDoc {
152         final boolean debug = log.isDebugEnabled();
153         JournalQueueAck remove = new JournalQueueAck();
154         remove.setDestination(destination);
155         remove.setMessageAck(ack);
156         
157         final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
158         if( !context.isInTransaction() ) {
159             if( debug )
160                 log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
161             removeMessage(ack, location);
162         } else {
163             if( debug )
164                 log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
165             synchronized( this ) {
166                 inFlightTxLocations.add(location);
167             }
168             transactionStore.removeMessage(this, ack, location);
169             context.getTransaction().addSynchronization(new Synchronization(){
170                 public void afterCommit() throws Exception JavaDoc {
171                     if( debug )
172                         log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
173                     synchronized( JournalMessageStore.this ) {
174                         inFlightTxLocations.remove(location);
175                         removeMessage(ack, location);
176                     }
177                 }
178                 public void afterRollback() throws Exception JavaDoc {
179                     if( debug )
180                         log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
181                     synchronized( JournalMessageStore.this ) {
182                         inFlightTxLocations.remove(location);
183                     }
184                 }
185             });
186
187         }
188     }
189     
190     private void removeMessage(final MessageAck ack, final RecordLocation location) {
191         synchronized (this) {
192             lastLocation = location;
193             MessageId id = ack.getLastMessageId();
194             Message message = (Message) messages.remove(id);
195             if (message == null) {
196                 messageAcks.add(ack);
197             } else {
198                 message.decrementReferenceCount();
199             }
200         }
201     }
202     
203     public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
204         try {
205             // Only remove the message if it has not already been removed.
206
Message t = longTermStore.getMessage(messageAck.getLastMessageId());
207             if( t!=null ) {
208                 longTermStore.removeMessage(context, messageAck);
209             }
210         }
211         catch (Throwable JavaDoc e) {
212             log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
213         }
214     }
215
216     /**
217      * @return
218      * @throws IOException
219      */

220     public RecordLocation checkpoint() throws IOException JavaDoc {
221         return checkpoint(null);
222     }
223     
224     /**
225      * @return
226      * @throws IOException
227      */

228     public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException JavaDoc {
229
230         
231         RecordLocation rc;
232         final ArrayList JavaDoc cpRemovedMessageLocations;
233         final ArrayList JavaDoc cpActiveJournalLocations;
234         final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
235
236         // swap out the message hash maps..
237
synchronized (this) {
238             cpAddedMessageIds = this.messages;
239             cpRemovedMessageLocations = this.messageAcks;
240
241             cpActiveJournalLocations=new ArrayList JavaDoc(inFlightTxLocations);
242             
243             this.messages = new LinkedHashMap JavaDoc();
244             this.messageAcks = new ArrayList JavaDoc();
245         }
246
247         transactionTemplate.run(new Callback() {
248             public void execute() throws Exception JavaDoc {
249
250                 int size = 0;
251                 
252                 PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
253                 ConnectionContext context = transactionTemplate.getContext();
254                 
255                 // Checkpoint the added messages.
256
Iterator JavaDoc iterator = cpAddedMessageIds.values().iterator();
257                 while (iterator.hasNext()) {
258                     Message message = (Message) iterator.next();
259                     try {
260                         longTermStore.addMessage(context, message);
261                     } catch (Throwable JavaDoc e) {
262                         log.warn("Message could not be added to long term store: " + e.getMessage(), e);
263                     }
264                     
265                     size += message.getSize();
266                     
267                     message.decrementReferenceCount();
268                     
269                     // Commit the batch if it's getting too big
270
if( size >= maxCheckpointMessageAddSize ) {
271                         persitanceAdapter.commitTransaction(context);
272                         persitanceAdapter.beginTransaction(context);
273                         size=0;
274                     }
275                     
276                 }
277
278                 persitanceAdapter.commitTransaction(context);
279                 persitanceAdapter.beginTransaction(context);
280
281                 // Checkpoint the removed messages.
282
iterator = cpRemovedMessageLocations.iterator();
283                 while (iterator.hasNext()) {
284                     try {
285                         MessageAck ack = (MessageAck) iterator.next();
286                         longTermStore.removeMessage(transactionTemplate.getContext(), ack);
287                     } catch (Throwable JavaDoc e) {
288                         log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
289                     }
290                 }
291                 
292                 if( postCheckpointTest!= null ) {
293                     postCheckpointTest.execute();
294                 }
295             }
296
297         });
298
299         synchronized (this) {
300             cpAddedMessageIds = null;
301         }
302         
303         if( cpActiveJournalLocations.size() > 0 ) {
304             Collections.sort(cpActiveJournalLocations);
305             return (RecordLocation) cpActiveJournalLocations.get(0);
306         } else {
307             return lastLocation;
308         }
309     }
310
311     /**
312      *
313      */

314     public Message getMessage(MessageId identity) throws IOException JavaDoc {
315         Message answer = null;
316
317         synchronized (this) {
318             // Do we have a still have it in the journal?
319
answer = (Message) messages.get(identity);
320             if( answer==null && cpAddedMessageIds!=null )
321                 answer = (Message) cpAddedMessageIds.get(identity);
322         }
323         
324         if (answer != null ) {
325             return answer;
326         }
327         
328         // If all else fails try the long term message store.
329
return longTermStore.getMessage(identity);
330     }
331
332     /**
333      * Replays the checkpointStore first as those messages are the oldest ones,
334      * then messages are replayed from the transaction log and then the cache is
335      * updated.
336      *
337      * @param listener
338      * @throws Exception
339      */

340     public void recover(final MessageRecoveryListener listener) throws Exception JavaDoc {
341         peristenceAdapter.checkpoint(true, true);
342         longTermStore.recover(listener);
343     }
344
345     public void start() throws Exception JavaDoc {
346         if( this.usageManager != null )
347             this.usageManager.addUsageListener(peristenceAdapter);
348         longTermStore.start();
349     }
350
351     public void stop() throws Exception JavaDoc {
352         longTermStore.stop();
353         if( this.usageManager != null )
354             this.usageManager.removeUsageListener(peristenceAdapter);
355     }
356
357     /**
358      * @return Returns the longTermStore.
359      */

360     public MessageStore getLongTermMessageStore() {
361         return longTermStore;
362     }
363
364     /**
365      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
366      */

367     public void removeAllMessages(ConnectionContext context) throws IOException JavaDoc {
368         peristenceAdapter.checkpoint(true, true);
369         longTermStore.removeAllMessages(context);
370     }
371     
372     public ActiveMQDestination getDestination() {
373         return destination;
374     }
375
376     public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String JavaDoc messageRef) throws IOException JavaDoc {
377         throw new IOException JavaDoc("The journal does not support message references.");
378     }
379
380     public String JavaDoc getMessageReference(MessageId identity) throws IOException JavaDoc {
381         throw new IOException JavaDoc("The journal does not support message references.");
382     }
383
384     /**
385      * @return
386      * @throws IOException
387      * @see org.apache.activemq.store.MessageStore#getMessageCount()
388      */

389     public int getMessageCount() throws IOException JavaDoc{
390         peristenceAdapter.checkpoint(true, true);
391         return longTermStore.getMessageCount();
392     }
393
394    
395     public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception JavaDoc{
396         peristenceAdapter.checkpoint(true, true);
397         longTermStore.recoverNextMessages(maxReturned,listener);
398         
399     }
400
401     
402     public void resetBatching(){
403         longTermStore.resetBatching();
404         
405     }
406
407 }
408
Popular Tags