KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > amq > AMQPersistenceAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
4  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
5  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
6  * License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
11  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
12  * specific language governing permissions and limitations under the License.
13  */

14
15 package org.apache.activemq.store.amq;
16
17 import java.io.File JavaDoc;
18 import java.io.IOException JavaDoc;
19 import java.util.Date JavaDoc;
20 import java.util.HashSet JavaDoc;
21 import java.util.Iterator JavaDoc;
22 import java.util.Set JavaDoc;
23 import java.util.concurrent.ConcurrentHashMap JavaDoc;
24 import java.util.concurrent.CountDownLatch JavaDoc;
25 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
26 import org.apache.activeio.journal.Journal;
27 import org.apache.activemq.broker.ConnectionContext;
28 import org.apache.activemq.broker.BrokerServiceAware;
29 import org.apache.activemq.broker.BrokerService;
30 import org.apache.activemq.command.ActiveMQDestination;
31 import org.apache.activemq.command.ActiveMQQueue;
32 import org.apache.activemq.command.ActiveMQTopic;
33 import org.apache.activemq.command.DataStructure;
34 import org.apache.activemq.command.JournalQueueAck;
35 import org.apache.activemq.command.JournalTopicAck;
36 import org.apache.activemq.command.JournalTrace;
37 import org.apache.activemq.command.JournalTransaction;
38 import org.apache.activemq.command.Message;
39 import org.apache.activemq.command.MessageAck;
40 import org.apache.activemq.kaha.impl.async.AsyncDataManager;
41 import org.apache.activemq.kaha.impl.async.Location;
42 import org.apache.activemq.memory.UsageListener;
43 import org.apache.activemq.memory.UsageManager;
44 import org.apache.activemq.openwire.OpenWireFormat;
45 import org.apache.activemq.store.MessageStore;
46 import org.apache.activemq.store.PersistenceAdapter;
47 import org.apache.activemq.store.ReferenceStore;
48 import org.apache.activemq.store.ReferenceStoreAdapter;
49 import org.apache.activemq.store.TopicMessageStore;
50 import org.apache.activemq.store.TopicReferenceStore;
51 import org.apache.activemq.store.TransactionStore;
52 import org.apache.activemq.store.amq.AMQTransactionStore.Tx;
53 import org.apache.activemq.store.amq.AMQTransactionStore.TxOperation;
54 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
55 import org.apache.activemq.thread.DefaultThreadPools;
56 import org.apache.activemq.thread.Scheduler;
57 import org.apache.activemq.thread.Task;
58 import org.apache.activemq.thread.TaskRunner;
59 import org.apache.activemq.thread.TaskRunnerFactory;
60 import org.apache.activemq.util.ByteSequence;
61 import org.apache.activemq.util.IOExceptionSupport;
62 import org.apache.activemq.util.IOHelper;
63 import org.apache.activemq.wireformat.WireFormat;
64 import org.apache.commons.logging.Log;
65 import org.apache.commons.logging.LogFactory;
66
67 /**
68  * An implementation of {@link PersistenceAdapter} designed for use with a {@link Journal} and then check pointing
69  * asynchronously on a timeout with some other long term persistent storage.
70  *
71  * @org.apache.xbean.XBean element="amqPersistenceAdapter"
72  *
73  * @version $Revision: 1.17 $
74  */

75 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
76
77     private static final Log log=LogFactory.getLog(AMQPersistenceAdapter.class);
78     private final ConcurrentHashMap JavaDoc<ActiveMQQueue,AMQMessageStore> queues=new ConcurrentHashMap JavaDoc<ActiveMQQueue,AMQMessageStore>();
79     private final ConcurrentHashMap JavaDoc<ActiveMQTopic,AMQMessageStore> topics=new ConcurrentHashMap JavaDoc<ActiveMQTopic,AMQMessageStore>();
80     private AsyncDataManager asyncDataManager;
81     private ReferenceStoreAdapter referenceStoreAdapter;
82     private TaskRunnerFactory taskRunnerFactory;
83     private WireFormat wireFormat=new OpenWireFormat();
84     private UsageManager usageManager;
85     private long cleanupInterval=1000*60;
86     private long checkpointInterval=1000*10;
87     private int maxCheckpointWorkers=1;
88     private int maxCheckpointMessageAddSize=1024*4;
89     private AMQTransactionStore transactionStore=new AMQTransactionStore(this);
90     private TaskRunner checkpointTask;
91     private CountDownLatch JavaDoc nextCheckpointCountDownLatch=new CountDownLatch JavaDoc(1);
92     private final AtomicBoolean JavaDoc started=new AtomicBoolean JavaDoc(false);
93     private Runnable JavaDoc periodicCheckpointTask;
94     private Runnable JavaDoc periodicCleanupTask;
95     private boolean deleteAllMessages;
96     private boolean syncOnWrite;
97     private String JavaDoc brokerName="";
98     private File JavaDoc directory;
99     private BrokerService brokerService;
100
101     public String JavaDoc getBrokerName(){
102         return this.brokerName;
103     }
104
105     public void setBrokerName(String JavaDoc brokerName){
106         this.brokerName=brokerName;
107         if(this.referenceStoreAdapter!=null){
108             this.referenceStoreAdapter.setBrokerName(brokerName);
109         }
110     }
111
112     public BrokerService getBrokerService() {
113         return brokerService;
114     }
115
116     public void setBrokerService(BrokerService brokerService) {
117         this.brokerService = brokerService;
118     }
119
120     public synchronized void start() throws Exception JavaDoc{
121         if(!started.compareAndSet(false,true))
122             return;
123         if(this.directory==null) {
124             if (brokerService != null) {
125                 this.directory = brokerService.getBrokerDataDirectory();
126             }
127             else {
128                 this.directory=new File JavaDoc(IOHelper.getDefaultDataDirectory(),brokerName);
129                 this.directory=new File JavaDoc(directory,"amqstore");
130             }
131         }
132         log.info("AMQStore starting using directory: " + directory);
133         this.directory.mkdirs();
134
135         if(this.usageManager!=null){
136             this.usageManager.addUsageListener(this);
137         }
138         if(asyncDataManager==null){
139             asyncDataManager=createAsyncDataManager();
140         }
141         if(referenceStoreAdapter==null){
142             referenceStoreAdapter=createReferenceStoreAdapter();
143         }
144         referenceStoreAdapter.setDirectory(new File JavaDoc(directory,"kr-store"));
145         referenceStoreAdapter.setBrokerName(getBrokerName());
146         referenceStoreAdapter.setUsageManager(usageManager);
147         if(taskRunnerFactory==null){
148             taskRunnerFactory=createTaskRunnerFactory();
149         }
150         asyncDataManager.start();
151         if(deleteAllMessages){
152             asyncDataManager.delete();
153             try{
154                 JournalTrace trace=new JournalTrace();
155                 trace.setMessage("DELETED "+new Date JavaDoc());
156                 Location location=asyncDataManager.write(wireFormat.marshal(trace),false);
157                 asyncDataManager.setMark(location,true);
158                 log.info("Journal deleted: ");
159                 deleteAllMessages=false;
160             }catch(IOException JavaDoc e){
161                 throw e;
162             }catch(Throwable JavaDoc e){
163                 throw IOExceptionSupport.create(e);
164             }
165             referenceStoreAdapter.deleteAllMessages();
166         }
167         referenceStoreAdapter.start();
168         Set JavaDoc<Integer JavaDoc> files=referenceStoreAdapter.getReferenceFileIdsInUse();
169         log.info("Active data files: "+files);
170         checkpointTask=taskRunnerFactory.createTaskRunner(new Task(){
171
172             public boolean iterate(){
173                 doCheckpoint();
174                 return false;
175             }
176         },"ActiveMQ Journal Checkpoint Worker");
177         createTransactionStore();
178         if(referenceStoreAdapter.isStoreValid()==false){
179             log.warn("The ReferenceStore is not valid - recovering ...");
180             recover();
181             log.info("Finished recovering the ReferenceStore");
182         }else {
183             Location location=writeTraceMessage("RECOVERED "+new Date JavaDoc(),true);
184             asyncDataManager.setMark(location,true);
185         }
186         // Do a checkpoint periodically.
187
periodicCheckpointTask=new Runnable JavaDoc(){
188
189             public void run(){
190                 checkpoint(false);
191             }
192         };
193         Scheduler.executePeriodically(periodicCheckpointTask,checkpointInterval);
194         periodicCleanupTask=new Runnable JavaDoc(){
195
196             public void run(){
197                 cleanup();
198             }
199         };
200         Scheduler.executePeriodically(periodicCleanupTask,cleanupInterval);
201     }
202
203     public void stop() throws Exception JavaDoc{
204       
205         if(!started.compareAndSet(true,false))
206             return;
207         this.usageManager.removeUsageListener(this);
208         Scheduler.cancel(periodicCheckpointTask);
209         Scheduler.cancel(periodicCleanupTask);
210         Iterator JavaDoc<AMQMessageStore> iterator=queues.values().iterator();
211         while(iterator.hasNext()){
212             AMQMessageStore ms=iterator.next();
213             ms.stop();
214         }
215         iterator=topics.values().iterator();
216         while(iterator.hasNext()){
217             final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
218             ms.stop();
219         }
220         // Take one final checkpoint and stop checkpoint processing.
221
checkpoint(true);
222         checkpointTask.shutdown();
223         queues.clear();
224         topics.clear();
225         IOException JavaDoc firstException=null;
226         referenceStoreAdapter.stop();
227         try{
228             log.debug("Journal close");
229             asyncDataManager.close();
230         }catch(Exception JavaDoc e){
231             firstException=IOExceptionSupport.create("Failed to close journals: "+e,e);
232         }
233         if(firstException!=null){
234             throw firstException;
235         }
236     }
237
238     /**
239      * When we checkpoint we move all the journalled data to long term storage.
240      * @param sync
241      */

242     public void checkpoint(boolean sync){
243         try{
244             if(asyncDataManager==null)
245                 throw new IllegalStateException JavaDoc("Journal is closed.");
246             CountDownLatch JavaDoc latch=null;
247             synchronized(this){
248                 latch=nextCheckpointCountDownLatch;
249             }
250             checkpointTask.wakeup();
251             if(sync){
252                 if(log.isDebugEnabled()){
253                     log.debug("Waitng for checkpoint to complete.");
254                 }
255                 latch.await();
256             }
257             referenceStoreAdapter.checkpoint(sync);
258         }catch(InterruptedException JavaDoc e){
259             Thread.currentThread().interrupt();
260             log.warn("Request to start checkpoint failed: "+e,e);
261         }catch(IOException JavaDoc e){
262             log.error("checkpoint failed: "+e,e);
263         }
264     }
265
266     /**
267      * This does the actual checkpoint.
268      *
269      * @return true if successful
270      */

271     public boolean doCheckpoint(){
272         CountDownLatch JavaDoc latch=null;
273         synchronized(this){
274             latch=nextCheckpointCountDownLatch;
275             nextCheckpointCountDownLatch=new CountDownLatch JavaDoc(1);
276         }
277         try{
278             if(log.isDebugEnabled()){
279                 log.debug("Checkpoint started.");
280             }
281            
282             Location newMark=null;
283             Iterator JavaDoc<AMQMessageStore> iterator=queues.values().iterator();
284             while(iterator.hasNext()){
285                 final AMQMessageStore ms=iterator.next();
286                 Location mark=(Location)ms.getMark();
287                 if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
288                     newMark=mark;
289                 }
290             }
291             iterator=topics.values().iterator();
292             while(iterator.hasNext()){
293                 final AMQTopicMessageStore ms=(AMQTopicMessageStore)iterator.next();
294                 Location mark=(Location)ms.getMark();
295                 if(mark!=null&&(newMark==null||newMark.compareTo(mark)<0)){
296                     newMark=mark;
297                 }
298             }
299             try{
300                 if(newMark!=null){
301                     if(log.isDebugEnabled()){
302                         log.debug("Marking journal at: "+newMark);
303                     }
304                     asyncDataManager.setMark(newMark,false);
305                     writeTraceMessage("CHECKPOINT "+new Date JavaDoc(),true);
306                 }
307             }catch(Exception JavaDoc e){
308                 log.error("Failed to mark the Journal: "+e,e);
309             }
310             if(log.isDebugEnabled()){
311                 log.debug("Checkpoint done.");
312             }
313         }finally{
314             latch.countDown();
315         }
316         return true;
317     }
318
319     /**
320      * Cleans up the data files
321      *
322      * @return
323      * @throws IOException
324      */

325     public void cleanup(){
326         try{
327             Set JavaDoc<Integer JavaDoc> inUse=referenceStoreAdapter.getReferenceFileIdsInUse();
328             asyncDataManager.consolidateDataFilesNotIn(inUse);
329         }catch(IOException JavaDoc e){
330             log.error("Could not cleanup data files: "+e,e);
331         }
332     }
333
334     public Set JavaDoc<ActiveMQDestination> getDestinations(){
335         Set JavaDoc<ActiveMQDestination> destinations=new HashSet JavaDoc<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
336         destinations.addAll(queues.keySet());
337         destinations.addAll(topics.keySet());
338         return destinations;
339     }
340
341     private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException JavaDoc{
342         if(destination.isQueue()){
343             return createQueueMessageStore((ActiveMQQueue)destination);
344         }else{
345             return createTopicMessageStore((ActiveMQTopic)destination);
346         }
347     }
348
349     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException JavaDoc{
350         AMQMessageStore store=queues.get(destination);
351         if(store==null){
352             ReferenceStore checkpointStore=referenceStoreAdapter.createQueueReferenceStore(destination);
353             store=new AMQMessageStore(this,checkpointStore,destination);
354             try{
355                 store.start();
356             }catch(Exception JavaDoc e){
357                 throw IOExceptionSupport.create(e);
358             }
359             queues.put(destination,store);
360         }
361         return store;
362     }
363
364     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException JavaDoc{
365         AMQTopicMessageStore store=(AMQTopicMessageStore)topics.get(destinationName);
366         if(store==null){
367             TopicReferenceStore checkpointStore=referenceStoreAdapter.createTopicReferenceStore(destinationName);
368             store=new AMQTopicMessageStore(this,checkpointStore,destinationName);
369             try{
370                 store.start();
371             }catch(Exception JavaDoc e){
372                 throw IOExceptionSupport.create(e);
373             }
374             topics.put(destinationName,store);
375         }
376         return store;
377     }
378
379     public TransactionStore createTransactionStore() throws IOException JavaDoc{
380         return transactionStore;
381     }
382
383     public long getLastMessageBrokerSequenceId() throws IOException JavaDoc{
384         return referenceStoreAdapter.getLastMessageBrokerSequenceId();
385     }
386
387     public void beginTransaction(ConnectionContext context) throws IOException JavaDoc{
388         referenceStoreAdapter.beginTransaction(context);
389     }
390
391     public void commitTransaction(ConnectionContext context) throws IOException JavaDoc{
392         referenceStoreAdapter.commitTransaction(context);
393     }
394
395     public void rollbackTransaction(ConnectionContext context) throws IOException JavaDoc{
396         referenceStoreAdapter.rollbackTransaction(context);
397     }
398
399     /**
400      * @param location
401      * @return
402      * @throws IOException
403      */

404     public DataStructure readCommand(Location location) throws IOException JavaDoc{
405         try{
406             ByteSequence packet=asyncDataManager.read(location);
407             return (DataStructure)wireFormat.unmarshal(packet);
408         }catch(IOException JavaDoc e){
409             throw createReadException(location,e);
410         }
411     }
412
413     /**
414      * Move all the messages that were in the journal into long term storage. We just replay and do a checkpoint.
415      *
416      * @throws IOException
417      * @throws IOException
418      * @throws InvalidLocationException
419      * @throws IllegalStateException
420      */

421     private void recover() throws IllegalStateException JavaDoc,IOException JavaDoc{
422         Location pos=null;
423         int redoCounter=0;
424         log.info("Journal Recovery Started from: "+asyncDataManager);
425         long start=System.currentTimeMillis();
426         ConnectionContext context=new ConnectionContext();
427         // While we have records in the journal.
428
while((pos=asyncDataManager.getNextLocation(pos))!=null){
429             ByteSequence data=asyncDataManager.read(pos);
430             DataStructure c=(DataStructure)wireFormat.unmarshal(data);
431             if(c instanceof Message){
432                 Message message=(Message)c;
433                 AMQMessageStore store=(AMQMessageStore)createMessageStore(message.getDestination());
434                 if(message.isInTransaction()){
435                     transactionStore.addMessage(store,message,pos);
436                 }else{
437                     if(store.replayAddMessage(context,message,pos)){
438                         redoCounter++;
439                     }
440                 }
441             }else{
442                 switch(c.getDataStructureType()){
443                 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
444                     JournalQueueAck command=(JournalQueueAck)c;
445                     AMQMessageStore store=(AMQMessageStore)createMessageStore(command.getDestination());
446                     if(command.getMessageAck().isInTransaction()){
447                         transactionStore.removeMessage(store,command.getMessageAck(),pos);
448                     }else{
449                         if(store.replayRemoveMessage(context,command.getMessageAck())){
450                             redoCounter++;
451                         }
452                     }
453                 }
454                     break;
455                 case JournalTopicAck.DATA_STRUCTURE_TYPE: {
456                     JournalTopicAck command=(JournalTopicAck)c;
457                     AMQTopicMessageStore store=(AMQTopicMessageStore)createMessageStore(command.getDestination());
458                     if(command.getTransactionId()!=null){
459                         transactionStore.acknowledge(store,command,pos);
460                     }else{
461                         if(store.replayAcknowledge(context,command.getClientId(),command.getSubscritionName(),command
462                                 .getMessageId())){
463                             redoCounter++;
464                         }
465                     }
466                 }
467                     break;
468                 case JournalTransaction.DATA_STRUCTURE_TYPE: {
469                     JournalTransaction command=(JournalTransaction)c;
470                     try{
471                         // Try to replay the packet.
472
switch(command.getType()){
473                         case JournalTransaction.XA_PREPARE:
474                             transactionStore.replayPrepare(command.getTransactionId());
475                             break;
476                         case JournalTransaction.XA_COMMIT:
477                         case JournalTransaction.LOCAL_COMMIT:
478                             Tx tx=transactionStore.replayCommit(command.getTransactionId(),command.getWasPrepared());
479                             if(tx==null)
480                                 break; // We may be trying to replay a commit that
481
// was already committed.
482
// Replay the committed operations.
483
tx.getOperations();
484                             for(Iterator JavaDoc iter=tx.getOperations().iterator();iter.hasNext();){
485                                 TxOperation op=(TxOperation)iter.next();
486                                 if(op.operationType==TxOperation.ADD_OPERATION_TYPE){
487                                     if(op.store.replayAddMessage(context,(Message)op.data,op.location))
488                                         redoCounter++;
489                                 }
490                                 if(op.operationType==TxOperation.REMOVE_OPERATION_TYPE){
491                                     if(op.store.replayRemoveMessage(context,(MessageAck)op.data))
492                                         redoCounter++;
493                                 }
494                                 if(op.operationType==TxOperation.ACK_OPERATION_TYPE){
495                                     JournalTopicAck ack=(JournalTopicAck)op.data;
496                                     if(((AMQTopicMessageStore)op.store).replayAcknowledge(context,ack.getClientId(),ack
497                                             .getSubscritionName(),ack.getMessageId())){
498                                         redoCounter++;
499                                     }
500                                 }
501                             }
502                             break;
503                         case JournalTransaction.LOCAL_ROLLBACK:
504                         case JournalTransaction.XA_ROLLBACK:
505                             transactionStore.replayRollback(command.getTransactionId());
506                             break;
507                         }
508                     }catch(IOException JavaDoc e){
509                         log.error("Recovery Failure: Could not replay: "+c+", reason: "+e,e);
510                     }
511                 }
512                     break;
513                 case JournalTrace.DATA_STRUCTURE_TYPE:
514                     JournalTrace trace=(JournalTrace)c;
515                     log.debug("TRACE Entry: "+trace.getMessage());
516                     break;
517                 default:
518                     log.error("Unknown type of record in transaction log which will be discarded: "+c);
519                 }
520             }
521         }
522         Location location=writeTraceMessage("RECOVERED "+new Date JavaDoc(),true);
523         asyncDataManager.setMark(location,true);
524         long end=System.currentTimeMillis();
525         log.info("Recovered "+redoCounter+" operations from redo log in "+((end-start)/1000.0f)+" seconds.");
526     }
527
528     private IOException JavaDoc createReadException(Location location,Exception JavaDoc e){
529         return IOExceptionSupport.create("Failed to read to journal for: "+location+". Reason: "+e,e);
530     }
531
532     protected IOException JavaDoc createWriteException(DataStructure packet,Exception JavaDoc e){
533         return IOExceptionSupport.create("Failed to write to journal for: "+packet+". Reason: "+e,e);
534     }
535
536     protected IOException JavaDoc createWriteException(String JavaDoc command,Exception JavaDoc e){
537         return IOExceptionSupport.create("Failed to write to journal for command: "+command+". Reason: "+e,e);
538     }
539
540     protected IOException JavaDoc createRecoveryFailedException(Exception JavaDoc e){
541         return IOExceptionSupport.create("Failed to recover from journal. Reason: "+e,e);
542     }
543
544     /**
545      *
546      * @param command
547      * @param syncHint
548      * @return
549      * @throws IOException
550      */

551     public Location writeCommand(DataStructure command,boolean syncHint) throws IOException JavaDoc{
552         return asyncDataManager.write(wireFormat.marshal(command),(syncHint&&syncOnWrite));
553     }
554
555     private Location writeTraceMessage(String JavaDoc message,boolean sync) throws IOException JavaDoc{
556         JournalTrace trace=new JournalTrace();
557         trace.setMessage(message);
558         return writeCommand(trace,sync);
559     }
560
561     public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
562         newPercentUsage=((newPercentUsage)/10)*10;
563         oldPercentUsage=((oldPercentUsage)/10)*10;
564         if(newPercentUsage>=70&&oldPercentUsage<newPercentUsage){
565             checkpoint(false);
566         }
567     }
568
569     public AMQTransactionStore getTransactionStore(){
570         return transactionStore;
571     }
572
573     public void deleteAllMessages() throws IOException JavaDoc{
574         deleteAllMessages=true;
575     }
576
577     public String JavaDoc toString(){
578         return "AMQPersistenceAdapter("+directory+")";
579     }
580
581     // /////////////////////////////////////////////////////////////////
582
// Subclass overridables
583
// /////////////////////////////////////////////////////////////////
584
protected AsyncDataManager createAsyncDataManager(){
585         AsyncDataManager manager=new AsyncDataManager();
586         manager.setDirectory(new File JavaDoc(directory,"journal"));
587         return manager;
588     }
589
590     protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException JavaDoc{
591         KahaReferenceStoreAdapter adaptor=new KahaReferenceStoreAdapter();
592         return adaptor;
593     }
594
595     protected TaskRunnerFactory createTaskRunnerFactory(){
596         return DefaultThreadPools.getDefaultTaskRunnerFactory();
597     }
598
599     // /////////////////////////////////////////////////////////////////
600
// Property Accessors
601
// /////////////////////////////////////////////////////////////////
602
public AsyncDataManager getAsyncDataManager(){
603         return asyncDataManager;
604     }
605
606     public void setAsyncDataManager(AsyncDataManager asyncDataManager){
607         this.asyncDataManager=asyncDataManager;
608     }
609
610     public ReferenceStoreAdapter getReferenceStoreAdapter(){
611         return referenceStoreAdapter;
612     }
613
614     public TaskRunnerFactory getTaskRunnerFactory(){
615         return taskRunnerFactory;
616     }
617
618     public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory){
619         this.taskRunnerFactory=taskRunnerFactory;
620     }
621
622     /**
623      * @return Returns the wireFormat.
624      */

625     public WireFormat getWireFormat(){
626         return wireFormat;
627     }
628
629     public void setWireFormat(WireFormat wireFormat){
630         this.wireFormat=wireFormat;
631     }
632
633     public UsageManager getUsageManager(){
634         return usageManager;
635     }
636
637     public void setUsageManager(UsageManager usageManager){
638         this.usageManager=usageManager;
639     }
640
641     public int getMaxCheckpointMessageAddSize(){
642         return maxCheckpointMessageAddSize;
643     }
644
645     public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize){
646         this.maxCheckpointMessageAddSize=maxCheckpointMessageAddSize;
647     }
648
649     public int getMaxCheckpointWorkers(){
650         return maxCheckpointWorkers;
651     }
652
653     public void setMaxCheckpointWorkers(int maxCheckpointWorkers){
654         this.maxCheckpointWorkers=maxCheckpointWorkers;
655     }
656
657     public File JavaDoc getDirectory(){
658         return directory;
659     }
660
661     public void setDirectory(File JavaDoc directory){
662         this.directory=directory;
663     }
664
665     public boolean isSyncOnWrite(){
666         return this.syncOnWrite;
667     }
668
669     public void setSyncOnWrite(boolean syncOnWrite){
670         this.syncOnWrite=syncOnWrite;
671     }
672
673     
674     /**
675      * @param referenceStoreAdapter the referenceStoreAdapter to set
676      */

677     public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter){
678         this.referenceStoreAdapter=referenceStoreAdapter;
679     }
680 }
681
Popular Tags