KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > amq > AMQMessageStore


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.amq;
16
17 import java.io.IOException JavaDoc;
18 import java.io.InterruptedIOException JavaDoc;
19 import java.util.ArrayList JavaDoc;
20 import java.util.Collections JavaDoc;
21 import java.util.HashSet JavaDoc;
22 import java.util.Iterator JavaDoc;
23 import java.util.LinkedHashMap JavaDoc;
24 import java.util.Map.Entry;
25 import java.util.concurrent.CountDownLatch JavaDoc;
26 import java.util.concurrent.atomic.AtomicReference JavaDoc;
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.command.ActiveMQDestination;
29 import org.apache.activemq.command.DataStructure;
30 import org.apache.activemq.command.JournalQueueAck;
31 import org.apache.activemq.command.Message;
32 import org.apache.activemq.command.MessageAck;
33 import org.apache.activemq.command.MessageId;
34 import org.apache.activemq.kaha.impl.async.Location;
35 import org.apache.activemq.memory.UsageManager;
36 import org.apache.activemq.store.MessageRecoveryListener;
37 import org.apache.activemq.store.MessageStore;
38 import org.apache.activemq.store.PersistenceAdapter;
39 import org.apache.activemq.store.ReferenceStore;
40 import org.apache.activemq.store.ReferenceStore.ReferenceData;
41 import org.apache.activemq.thread.Task;
42 import org.apache.activemq.thread.TaskRunner;
43 import org.apache.activemq.transaction.Synchronization;
44 import org.apache.activemq.util.Callback;
45 import org.apache.activemq.util.TransactionTemplate;
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48
49 /**
50  * A MessageStore that uses a Journal to store it's messages.
51  *
52  * @version $Revision: 1.14 $
53  */

54 public class AMQMessageStore implements MessageStore{
55
56     private static final Log log=LogFactory.getLog(AMQMessageStore.class);
57     protected final AMQPersistenceAdapter peristenceAdapter;
58     protected final AMQTransactionStore transactionStore;
59     protected final ReferenceStore referenceStore;
60     protected final ActiveMQDestination destination;
61     protected final TransactionTemplate transactionTemplate;
62     private LinkedHashMap JavaDoc<MessageId,ReferenceData> messages=new LinkedHashMap JavaDoc<MessageId,ReferenceData>();
63     private ArrayList JavaDoc<MessageAck> messageAcks=new ArrayList JavaDoc<MessageAck>();
64     /** A MessageStore that we can use to retrieve messages quickly. */
65     private LinkedHashMap JavaDoc<MessageId,ReferenceData> cpAddedMessageIds;
66     protected Location lastLocation;
67     protected Location lastWrittenLocation;
68     protected HashSet JavaDoc<Location> inFlightTxLocations=new HashSet JavaDoc<Location>();
69     protected final TaskRunner asyncWriteTask;
70     protected CountDownLatch JavaDoc flushLatch;
71     private final boolean debug=log.isDebugEnabled();
72     private final AtomicReference JavaDoc<Location> mark=new AtomicReference JavaDoc<Location>();
73     
74     public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore,ActiveMQDestination destination){
75         this.peristenceAdapter=adapter;
76         this.transactionStore=adapter.getTransactionStore();
77         this.referenceStore=referenceStore;
78         this.destination=destination;
79         this.transactionTemplate=new TransactionTemplate(adapter,new ConnectionContext());
80         asyncWriteTask=adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
81
82             public boolean iterate(){
83                 asyncWrite();
84                 return false;
85             }
86         },"Checkpoint: "+destination);
87     }
88
89     public void setUsageManager(UsageManager usageManager){
90         referenceStore.setUsageManager(usageManager);
91     }
92
93     /**
94      * Not synchronized since the Journal has better throughput if you increase the number of concurrent writes that it
95      * is doing.
96      */

97     public void addMessage(ConnectionContext context,final Message message) throws IOException JavaDoc{
98         final MessageId id=message.getMessageId();
99         final Location location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
100         if(!context.isInTransaction()){
101             if(debug)
102                 log.debug("Journalled message add for: "+id+", at: "+location);
103             addMessage(message,location);
104         }else{
105             if(debug)
106                 log.debug("Journalled transacted message add for: "+id+", at: "+location);
107             synchronized(this){
108                 inFlightTxLocations.add(location);
109             }
110             transactionStore.addMessage(this,message,location);
111             context.getTransaction().addSynchronization(new Synchronization(){
112
113                 public void afterCommit() throws Exception JavaDoc{
114                     if(debug)
115                         log.debug("Transacted message add commit for: "+id+", at: "+location);
116                     synchronized(AMQMessageStore.this){
117                         inFlightTxLocations.remove(location);
118                         addMessage(message,location);
119                     }
120                 }
121
122                 public void afterRollback() throws Exception JavaDoc{
123                     if(debug)
124                         log.debug("Transacted message add rollback for: "+id+", at: "+location);
125                     synchronized(AMQMessageStore.this){
126                         inFlightTxLocations.remove(location);
127                     }
128                 }
129             });
130         }
131     }
132
133     private void addMessage(final Message message,final Location location) throws InterruptedIOException JavaDoc{
134         ReferenceData data=new ReferenceData();
135         data.setExpiration(message.getExpiration());
136         data.setFileId(location.getDataFileId());
137         data.setOffset(location.getOffset());
138         synchronized(this){
139             lastLocation=location;
140             messages.put(message.getMessageId(),data);
141         }
142         try{
143             asyncWriteTask.wakeup();
144         }catch(InterruptedException JavaDoc e){
145             throw new InterruptedIOException JavaDoc();
146         }
147     }
148
149     public boolean replayAddMessage(ConnectionContext context,Message message,Location location){
150         MessageId id=message.getMessageId();
151         try{
152             // Only add the message if it has not already been added.
153
ReferenceData data=referenceStore.getMessageReference(id);
154             if(data==null){
155                 data=new ReferenceData();
156                 data.setExpiration(message.getExpiration());
157                 data.setFileId(location.getDataFileId());
158                 data.setOffset(location.getOffset());
159                 referenceStore.addMessageReference(context,id,data);
160                 return true;
161             }
162         }catch(Throwable JavaDoc e){
163             log.warn("Could not replay add for message '"+id+"'. Message may have already been added. reason: "+e,e);
164         }
165         return false;
166     }
167
168     /**
169      */

170     public void removeMessage(ConnectionContext context,final MessageAck ack) throws IOException JavaDoc{
171         JournalQueueAck remove=new JournalQueueAck();
172         remove.setDestination(destination);
173         remove.setMessageAck(ack);
174         final Location location=peristenceAdapter.writeCommand(remove,ack.isResponseRequired());
175         if(!context.isInTransaction()){
176             if(debug)
177                 log.debug("Journalled message remove for: "+ack.getLastMessageId()+", at: "+location);
178             removeMessage(ack,location);
179         }else{
180             if(debug)
181                 log.debug("Journalled transacted message remove for: "+ack.getLastMessageId()+", at: "+location);
182             synchronized(this){
183                 inFlightTxLocations.add(location);
184             }
185             transactionStore.removeMessage(this,ack,location);
186             context.getTransaction().addSynchronization(new Synchronization(){
187
188                 public void afterCommit() throws Exception JavaDoc{
189                     if(debug)
190                         log.debug("Transacted message remove commit for: "+ack.getLastMessageId()+", at: "+location);
191                     synchronized(AMQMessageStore.this){
192                         inFlightTxLocations.remove(location);
193                         removeMessage(ack,location);
194                     }
195                 }
196
197                 public void afterRollback() throws Exception JavaDoc{
198                     if(debug)
199                         log.debug("Transacted message remove rollback for: "+ack.getLastMessageId()+", at: "+location);
200                     synchronized(AMQMessageStore.this){
201                         inFlightTxLocations.remove(location);
202                     }
203                 }
204             });
205         }
206     }
207
208     private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException JavaDoc{
209         ReferenceData data;
210         synchronized(this){
211             lastLocation=location;
212             MessageId id=ack.getLastMessageId();
213             data=messages.remove(id);
214             if(data==null){
215                 messageAcks.add(ack);
216             }
217         }
218         if(data==null){
219             try{
220                 asyncWriteTask.wakeup();
221             }catch(InterruptedException JavaDoc e){
222                 throw new InterruptedIOException JavaDoc();
223             }
224         }
225     }
226
227     public boolean replayRemoveMessage(ConnectionContext context,MessageAck messageAck){
228         try{
229             // Only remove the message if it has not already been removed.
230
ReferenceData t=referenceStore.getMessageReference(messageAck.getLastMessageId());
231             if(t!=null){
232                 referenceStore.removeMessage(context,messageAck);
233                 return true;
234             }
235         }catch(Throwable JavaDoc e){
236             log.warn("Could not replay acknowledge for message '"+messageAck.getLastMessageId()
237                     +"'. Message may have already been acknowledged. reason: "+e);
238         }
239         return false;
240     }
241
242     /**
243      * Waits till the lastest data has landed on the referenceStore
244      *
245      * @throws InterruptedIOException
246      */

247     public void flush() throws InterruptedIOException JavaDoc{
248         if(log.isDebugEnabled()){
249             log.debug("flush starting ...");
250         }
251         CountDownLatch JavaDoc countDown;
252         synchronized(this){
253             if(lastWrittenLocation==lastLocation){
254                 return;
255             }
256             if(flushLatch==null){
257                 flushLatch=new CountDownLatch JavaDoc(1);
258             }
259             countDown=flushLatch;
260         }
261         try{
262             asyncWriteTask.wakeup();
263             countDown.await();
264         }catch(InterruptedException JavaDoc e){
265             throw new InterruptedIOException JavaDoc();
266         }
267         if(log.isDebugEnabled()){
268             log.debug("flush finished");
269         }
270     }
271
272     /**
273      * @return
274      * @throws IOException
275      */

276     private void asyncWrite(){
277         try{
278             CountDownLatch JavaDoc countDown;
279             synchronized(this){
280                 countDown=flushLatch;
281                 flushLatch=null;
282             }
283             mark.set(doAsyncWrite());
284             if(countDown!=null){
285                 countDown.countDown();
286             }
287         }catch(IOException JavaDoc e){
288             log.error("Checkpoint failed: "+e,e);
289         }
290     }
291
292     /**
293      * @return
294      * @throws IOException
295      */

296     protected Location doAsyncWrite() throws IOException JavaDoc{
297         final ArrayList JavaDoc<MessageAck> cpRemovedMessageLocations;
298         final ArrayList JavaDoc<Location> cpActiveJournalLocations;
299         final int maxCheckpointMessageAddSize=peristenceAdapter.getMaxCheckpointMessageAddSize();
300         final Location lastLocation;
301         // swap out the message hash maps..
302
synchronized(this){
303             cpAddedMessageIds=this.messages;
304             cpRemovedMessageLocations=this.messageAcks;
305             cpActiveJournalLocations=new ArrayList JavaDoc<Location>(inFlightTxLocations);
306             this.messages=new LinkedHashMap JavaDoc<MessageId,ReferenceData>();
307             this.messageAcks=new ArrayList JavaDoc<MessageAck>();
308             lastLocation=this.lastLocation;
309         }
310         if(log.isDebugEnabled())
311             log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing: "
312                     +cpRemovedMessageLocations.size()+" ");
313         transactionTemplate.run(new Callback(){
314
315             public void execute() throws Exception JavaDoc{
316                 int size=0;
317                 PersistenceAdapter persitanceAdapter=transactionTemplate.getPersistenceAdapter();
318                 ConnectionContext context=transactionTemplate.getContext();
319                 // Checkpoint the added messages.
320
Iterator JavaDoc<Entry<MessageId,ReferenceData>> iterator=cpAddedMessageIds.entrySet().iterator();
321                 while(iterator.hasNext()){
322                     Entry<MessageId,ReferenceData> entry=iterator.next();
323                     try{
324                         referenceStore.addMessageReference(context,entry.getKey(),entry.getValue());
325                     }catch(Throwable JavaDoc e){
326                         log.warn("Message could not be added to long term store: "+e.getMessage(),e);
327                     }
328                     size++;
329                     // Commit the batch if it's getting too big
330
if(size>=maxCheckpointMessageAddSize){
331                         persitanceAdapter.commitTransaction(context);
332                         persitanceAdapter.beginTransaction(context);
333                         size=0;
334                     }
335                 }
336                 persitanceAdapter.commitTransaction(context);
337                 persitanceAdapter.beginTransaction(context);
338                 // Checkpoint the removed messages.
339
for(MessageAck ack:cpRemovedMessageLocations){
340                     try{
341                         referenceStore.removeMessage(transactionTemplate.getContext(),ack);
342                     }catch(Throwable JavaDoc e){
343                         log.warn("Message could not be removed from long term store: "+e.getMessage(),e);
344                     }
345                 }
346             }
347         });
348         log.debug("Batch update done.");
349         synchronized(this){
350             cpAddedMessageIds=null;
351             lastWrittenLocation=lastLocation;
352         }
353         if(cpActiveJournalLocations.size()>0){
354             Collections.sort(cpActiveJournalLocations);
355             return cpActiveJournalLocations.get(0);
356         }else{
357             return lastLocation;
358         }
359     }
360
361     /**
362      *
363      */

364     public Message getMessage(MessageId identity) throws IOException JavaDoc{
365         ReferenceData data=null;
366         synchronized(this){
367             // Is it still in flight???
368
data=messages.get(identity);
369             if(data==null&&cpAddedMessageIds!=null){
370                 data=cpAddedMessageIds.get(identity);
371             }
372         }
373         if(data==null){
374             data=referenceStore.getMessageReference(identity);
375             if(data==null){
376                 return null;
377             }
378         }
379         Location location=new Location();
380         location.setDataFileId(data.getFileId());
381         location.setOffset(data.getOffset());
382         DataStructure rc=peristenceAdapter.readCommand(location);
383         try{
384             return (Message)rc;
385         }catch(ClassCastException JavaDoc e){
386             throw new IOException JavaDoc("Could not read message "+identity+" at location "+location
387                     +", expected a message, but got: "+rc);
388         }
389     }
390
391     /**
392      * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
393      * transaction log and then the cache is updated.
394      *
395      * @param listener
396      * @throws Exception
397      */

398     public void recover(final MessageRecoveryListener listener) throws Exception JavaDoc{
399         flush();
400         referenceStore.recover(new RecoveryListenerAdapter(this,listener));
401     }
402
403     public void start() throws Exception JavaDoc{
404         referenceStore.start();
405     }
406
407     public void stop() throws Exception JavaDoc{
408         flush();
409         asyncWriteTask.shutdown();
410         referenceStore.stop();
411     }
412
413     /**
414      * @return Returns the longTermStore.
415      */

416     public ReferenceStore getReferenceStore(){
417         return referenceStore;
418     }
419
420     /**
421      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
422      */

423     public void removeAllMessages(ConnectionContext context) throws IOException JavaDoc{
424         flush();
425         referenceStore.removeAllMessages(context);
426     }
427
428     public ActiveMQDestination getDestination(){
429         return destination;
430     }
431
432     public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String JavaDoc messageRef)
433             throws IOException JavaDoc{
434         throw new IOException JavaDoc("The journal does not support message references.");
435     }
436
437     public String JavaDoc getMessageReference(MessageId identity) throws IOException JavaDoc{
438         throw new IOException JavaDoc("The journal does not support message references.");
439     }
440
441     /**
442      * @return
443      * @throws IOException
444      * @see org.apache.activemq.store.MessageStore#getMessageCount()
445      */

446     public int getMessageCount() throws IOException JavaDoc{
447         flush();
448         return referenceStore.getMessageCount();
449     }
450
451     public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception JavaDoc{
452         /*
453         RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
454         if(referenceStore.supportsExternalBatchControl()){
455             synchronized(this){
456                 referenceStore.recoverNextMessages(maxReturned,recoveryListener);
457                 if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
458                     // check for inflight messages
459                     int count=0;
460                     Iterator<Entry<MessageId,ReferenceData>> iterator=messages.entrySet().iterator();
461                     while(iterator.hasNext()&&count<maxReturned&&recoveryListener.hasSpace()){
462                         Entry<MessageId,ReferenceData> entry=iterator.next();
463                         ReferenceData data=entry.getValue();
464                         Message message=getMessage(data);
465                         recoveryListener.recoverMessage(message);
466                         count++;
467                     }
468                     referenceStore.setBatch(recoveryListener.getLastRecoveredMessageId());
469                 }
470             }
471         }else{
472             flush();
473             referenceStore.recoverNextMessages(maxReturned,recoveryListener);
474         }
475         */

476         RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
477         referenceStore.recoverNextMessages(maxReturned,recoveryListener);
478         if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
479             flush();
480             referenceStore.recoverNextMessages(maxReturned,recoveryListener);
481         }
482     }
483
484     Message getMessage(ReferenceData data) throws IOException JavaDoc{
485         Location location=new Location();
486         location.setDataFileId(data.getFileId());
487         location.setOffset(data.getOffset());
488         DataStructure rc=peristenceAdapter.readCommand(location);
489         try{
490             return (Message)rc;
491         }catch(ClassCastException JavaDoc e){
492             throw new IOException JavaDoc("Could not read message at location "+location+", expected a message, but got: "+rc);
493         }
494     }
495
496     public void resetBatching(){
497         referenceStore.resetBatching();
498     }
499
500     public Location getMark(){
501         return mark.get();
502     }
503 }
504
Popular Tags