KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > store > journal > JournalPersistenceAdapter


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.store.journal;
19
20 import java.io.File JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.HashSet JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.Set JavaDoc;
26 import java.util.concurrent.Callable JavaDoc;
27 import java.util.concurrent.ConcurrentHashMap JavaDoc;
28 import java.util.concurrent.CountDownLatch JavaDoc;
29 import java.util.concurrent.FutureTask JavaDoc;
30 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
31 import java.util.concurrent.ThreadFactory JavaDoc;
32 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
33 import java.util.concurrent.TimeUnit JavaDoc;
34 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
35
36 import org.apache.activeio.journal.InvalidRecordLocationException;
37 import org.apache.activeio.journal.Journal;
38 import org.apache.activeio.journal.JournalEventListener;
39 import org.apache.activeio.journal.RecordLocation;
40 import org.apache.activeio.packet.ByteArrayPacket;
41 import org.apache.activeio.packet.Packet;
42 import org.apache.activemq.broker.ConnectionContext;
43 import org.apache.activemq.command.ActiveMQDestination;
44 import org.apache.activemq.command.ActiveMQQueue;
45 import org.apache.activemq.command.ActiveMQTopic;
46 import org.apache.activemq.command.DataStructure;
47 import org.apache.activemq.command.JournalQueueAck;
48 import org.apache.activemq.command.JournalTopicAck;
49 import org.apache.activemq.command.JournalTrace;
50 import org.apache.activemq.command.JournalTransaction;
51 import org.apache.activemq.command.Message;
52 import org.apache.activemq.command.MessageAck;
53 import org.apache.activemq.memory.UsageListener;
54 import org.apache.activemq.memory.UsageManager;
55 import org.apache.activemq.openwire.OpenWireFormat;
56 import org.apache.activemq.store.MessageStore;
57 import org.apache.activemq.store.PersistenceAdapter;
58 import org.apache.activemq.store.TopicMessageStore;
59 import org.apache.activemq.store.TransactionStore;
60 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
61 import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
62 import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
63 import org.apache.activemq.thread.Scheduler;
64 import org.apache.activemq.thread.Task;
65 import org.apache.activemq.thread.TaskRunner;
66 import org.apache.activemq.thread.TaskRunnerFactory;
67 import org.apache.activemq.util.ByteSequence;
68 import org.apache.activemq.util.IOExceptionSupport;
69 import org.apache.activemq.wireformat.WireFormat;
70 import org.apache.commons.logging.Log;
71 import org.apache.commons.logging.LogFactory;
72
73 /**
74  * An implementation of {@link PersistenceAdapter} designed for use with a
75  * {@link Journal} and then check pointing asynchronously on a timeout with some
76  * other long term persistent storage.
77  *
78  * @org.apache.xbean.XBean
79  *
80  * @version $Revision: 1.17 $
81  */

82 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener {
83
84     private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
85
86     private final Journal journal;
87     private final PersistenceAdapter longTermPersistence;
88
89     private final WireFormat wireFormat = new OpenWireFormat();
90
91     private final ConcurrentHashMap JavaDoc queues = new ConcurrentHashMap JavaDoc();
92     private final ConcurrentHashMap JavaDoc topics = new ConcurrentHashMap JavaDoc();
93     
94     private UsageManager usageManager;
95     private long checkpointInterval = 1000 * 60 * 5;
96     private long lastCheckpointRequest = System.currentTimeMillis();
97     private long lastCleanup = System.currentTimeMillis();
98     private int maxCheckpointWorkers = 10;
99     private int maxCheckpointMessageAddSize = 1024*1024;
100
101     private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
102     private ThreadPoolExecutor JavaDoc checkpointExecutor;
103     
104     private TaskRunner checkpointTask;
105     private CountDownLatch JavaDoc nextCheckpointCountDownLatch = new CountDownLatch JavaDoc(1);
106     private boolean fullCheckPoint;
107     
108     private AtomicBoolean JavaDoc started = new AtomicBoolean JavaDoc(false);
109
110     private final Runnable JavaDoc periodicCheckpointTask = createPeriodicCheckpointTask();
111         
112     final Runnable JavaDoc createPeriodicCheckpointTask() {
113         return new Runnable JavaDoc() {
114             public void run() {
115                 if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
116                     checkpoint(false, true);
117                 }
118             }
119         };
120     }
121     
122     public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException JavaDoc {
123
124         this.journal = journal;
125         journal.setJournalEventListener(this);
126         
127         checkpointTask = taskRunnerFactory.createTaskRunner(new Task JavaDoc(){
128             public boolean iterate() {
129                 return doCheckpoint();
130             }
131         }, "ActiveMQ Journal Checkpoint Worker");
132
133         this.longTermPersistence = longTermPersistence;
134     }
135
136     /**
137      * @param usageManager The UsageManager that is controlling the destination's memory usage.
138      */

139     public void setUsageManager(UsageManager usageManager) {
140         this.usageManager = usageManager;
141         longTermPersistence.setUsageManager(usageManager);
142     }
143
144     public Set JavaDoc getDestinations() {
145         Set JavaDoc destinations = new HashSet JavaDoc(longTermPersistence.getDestinations());
146         destinations.addAll(queues.keySet());
147         destinations.addAll(topics.keySet());
148         return destinations;
149     }
150
151     private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException JavaDoc {
152         if (destination.isQueue()) {
153             return createQueueMessageStore((ActiveMQQueue) destination);
154         }
155         else {
156             return createTopicMessageStore((ActiveMQTopic) destination);
157         }
158     }
159
160     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException JavaDoc {
161         JournalMessageStore store = (JournalMessageStore) queues.get(destination);
162         if (store == null) {
163             MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
164             store = new JournalMessageStore(this, checkpointStore, destination);
165             queues.put(destination, store);
166         }
167         return store;
168     }
169
170     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException JavaDoc {
171         JournalTopicMessageStore store = (JournalTopicMessageStore) topics.get(destinationName);
172         if (store == null) {
173             TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
174             store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
175             topics.put(destinationName, store);
176         }
177         return store;
178     }
179
180     public TransactionStore createTransactionStore() throws IOException JavaDoc {
181         return transactionStore;
182     }
183
184     public long getLastMessageBrokerSequenceId() throws IOException JavaDoc {
185         return longTermPersistence.getLastMessageBrokerSequenceId();
186     }
187
188     public void beginTransaction(ConnectionContext context) throws IOException JavaDoc {
189         longTermPersistence.beginTransaction(context);
190     }
191
192     public void commitTransaction(ConnectionContext context) throws IOException JavaDoc {
193         longTermPersistence.commitTransaction(context);
194     }
195
196     public void rollbackTransaction(ConnectionContext context) throws IOException JavaDoc {
197         longTermPersistence.rollbackTransaction(context);
198     }
199
200     public synchronized void start() throws Exception JavaDoc {
201         if( !started.compareAndSet(false, true) )
202             return;
203         
204         checkpointExecutor = new ThreadPoolExecutor JavaDoc(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue JavaDoc(), new ThreadFactory JavaDoc() {
205             public Thread JavaDoc newThread(Runnable JavaDoc runable) {
206                 Thread JavaDoc t = new Thread JavaDoc(runable, "Journal checkpoint worker");
207                 t.setPriority(7);
208                 return t;
209             }
210         });
211         //checkpointExecutor.allowCoreThreadTimeOut(true);
212

213         this.usageManager.addUsageListener(this);
214
215         if (longTermPersistence instanceof JDBCPersistenceAdapter) {
216             // Disabled periodic clean up as it deadlocks with the checkpoint
217
// operations.
218
((JDBCPersistenceAdapter) longTermPersistence).setCleanupPeriod(0);
219         }
220
221         longTermPersistence.start();
222         createTransactionStore();
223         recover();
224
225         // Do a checkpoint periodically.
226
Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval/10);
227
228     }
229
230     public void stop() throws Exception JavaDoc {
231         
232         this.usageManager.removeUsageListener(this);
233         if( !started.compareAndSet(true, false) )
234             return;
235         
236         Scheduler.cancel(periodicCheckpointTask);
237
238         // Take one final checkpoint and stop checkpoint processing.
239
checkpoint(true, true);
240         checkpointTask.shutdown();
241         checkpointExecutor.shutdown();
242         
243         queues.clear();
244         topics.clear();
245
246         IOException JavaDoc firstException = null;
247         try {
248             journal.close();
249         } catch (Exception JavaDoc e) {
250             firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
251         }
252         longTermPersistence.stop();
253         
254         if (firstException != null) {
255             throw firstException;
256         }
257     }
258
259     // Properties
260
// -------------------------------------------------------------------------
261
public PersistenceAdapter getLongTermPersistence() {
262         return longTermPersistence;
263     }
264
265     /**
266      * @return Returns the wireFormat.
267      */

268     public WireFormat getWireFormat() {
269         return wireFormat;
270     }
271
272     // Implementation methods
273
// -------------------------------------------------------------------------
274

275     /**
276      * The Journal give us a call back so that we can move old data out of the
277      * journal. Taking a checkpoint does this for us.
278      *
279      * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
280      */

281     public void overflowNotification(RecordLocation safeLocation) {
282         checkpoint(false, true);
283     }
284
285     /**
286      * When we checkpoint we move all the journalled data to long term storage.
287      * @param stopping
288      *
289      * @param b
290      */

291     public void checkpoint(boolean sync, boolean fullCheckpoint) {
292         try {
293             if (journal == null )
294                 throw new IllegalStateException JavaDoc("Journal is closed.");
295             
296             long now = System.currentTimeMillis();
297             CountDownLatch JavaDoc latch = null;
298             synchronized(this) {
299                 latch = nextCheckpointCountDownLatch;
300                 lastCheckpointRequest = now;
301                 if( fullCheckpoint ) {
302                     this.fullCheckPoint = true;
303                 }
304             }
305             
306             checkpointTask.wakeup();
307             
308             if (sync) {
309                 log.debug("Waking for checkpoint to complete.");
310                 latch.await();
311             }
312         }
313         catch (InterruptedException JavaDoc e) {
314             Thread.currentThread().interrupt();
315             log.warn("Request to start checkpoint failed: " + e, e);
316         }
317     }
318     
319     public void checkpoint(boolean sync) {
320         checkpoint(sync,sync);
321     }
322         
323     /**
324      * This does the actual checkpoint.
325      * @return
326      */

327     public boolean doCheckpoint() {
328         CountDownLatch JavaDoc latch = null;
329         boolean fullCheckpoint;
330         synchronized(this) {
331             latch = nextCheckpointCountDownLatch;
332             nextCheckpointCountDownLatch = new CountDownLatch JavaDoc(1);
333             fullCheckpoint = this.fullCheckPoint;
334             this.fullCheckPoint=false;
335         }
336         try {
337
338             log.debug("Checkpoint started.");
339             RecordLocation newMark = null;
340
341             ArrayList JavaDoc futureTasks = new ArrayList JavaDoc(queues.size()+topics.size());
342             
343             //
344
// We do many partial checkpoints (fullCheckpoint==false) to move topic messages
345
// to long term store as soon as possible.
346
//
347
// We want to avoid doing that for queue messages since removes the come in the same
348
// checkpoint cycle will nullify the previous message add. Therefore, we only
349
// checkpoint queues on the fullCheckpoint cycles.
350
//
351
if( fullCheckpoint ) {
352                 Iterator JavaDoc iterator = queues.values().iterator();
353                 while (iterator.hasNext()) {
354                     try {
355                         final JournalMessageStore ms = (JournalMessageStore) iterator.next();
356                         FutureTask JavaDoc task = new FutureTask JavaDoc(new Callable JavaDoc() {
357                             public Object JavaDoc call() throws Exception JavaDoc {
358                                 return ms.checkpoint();
359                             }});
360                         futureTasks.add(task);
361                         checkpointExecutor.execute(task);
362                     }
363                     catch (Exception JavaDoc e) {
364                         log.error("Failed to checkpoint a message store: " + e, e);
365                     }
366                 }
367             }
368
369             Iterator JavaDoc iterator = topics.values().iterator();
370             while (iterator.hasNext()) {
371                 try {
372                     final JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next();
373                     FutureTask JavaDoc task = new FutureTask JavaDoc(new Callable JavaDoc() {
374                         public Object JavaDoc call() throws Exception JavaDoc {
375                             return ms.checkpoint();
376                         }});
377                     futureTasks.add(task);
378                     checkpointExecutor.execute(task);
379                 }
380                 catch (Exception JavaDoc e) {
381                     log.error("Failed to checkpoint a message store: " + e, e);
382                 }
383             }
384
385             try {
386                 for (Iterator JavaDoc iter = futureTasks.iterator(); iter.hasNext();) {
387                     FutureTask JavaDoc ft = (FutureTask JavaDoc) iter.next();
388                     RecordLocation mark = (RecordLocation) ft.get();
389                     // We only set a newMark on full checkpoints.
390
if( fullCheckpoint ) {
391                         if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
392                             newMark = mark;
393                         }
394                     }
395                 }
396             } catch (Throwable JavaDoc e) {
397                 log.error("Failed to checkpoint a message store: " + e, e);
398             }
399             
400
401             if( fullCheckpoint ) {
402                 try {
403                     if (newMark != null) {
404                         log.debug("Marking journal at: " + newMark);
405                         journal.setMark(newMark, true);
406                     }
407                 }
408                 catch (Exception JavaDoc e) {
409                     log.error("Failed to mark the Journal: " + e, e);
410                 }
411     
412                 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
413                     // We may be check pointing more often than the checkpointInterval if under high use
414
// But we don't want to clean up the db that often.
415
long now = System.currentTimeMillis();
416                     if( now > lastCleanup+checkpointInterval ) {
417                         lastCleanup = now;
418                         ((JDBCPersistenceAdapter) longTermPersistence).cleanup();
419                     }
420                 }
421             }
422
423             log.debug("Checkpoint done.");
424         }
425         finally {
426             latch.countDown();
427         }
428         synchronized(this) {
429             return this.fullCheckPoint;
430         }
431
432     }
433
434     /**
435      * @param location
436      * @return
437      * @throws IOException
438      */

439     public DataStructure readCommand(RecordLocation location) throws IOException JavaDoc {
440         try {
441             Packet packet = journal.read(location);
442             return (DataStructure) wireFormat.unmarshal(toByteSequence(packet));
443         }
444         catch (InvalidRecordLocationException e) {
445             throw createReadException(location, e);
446         }
447         catch (IOException JavaDoc e) {
448             throw createReadException(location, e);
449         }
450     }
451
452     /**
453      * Move all the messages that were in the journal into long term storage. We
454      * just replay and do a checkpoint.
455      *
456      * @throws IOException
457      * @throws IOException
458      * @throws InvalidRecordLocationException
459      * @throws IllegalStateException
460      */

461     private void recover() throws IllegalStateException JavaDoc, InvalidRecordLocationException, IOException JavaDoc, IOException JavaDoc {
462
463         RecordLocation pos = null;
464         int transactionCounter = 0;
465
466         log.info("Journal Recovery Started from: " + journal);
467         ConnectionContext context = new ConnectionContext();
468
469         // While we have records in the journal.
470
while ((pos = journal.getNextRecordLocation(pos)) != null) {
471             Packet data = journal.read(pos);
472             DataStructure c = (DataStructure) wireFormat.unmarshal(toByteSequence(data));
473
474             if (c instanceof Message ) {
475                 Message message = (Message) c;
476                 JournalMessageStore store = (JournalMessageStore) createMessageStore(message.getDestination());
477                 if ( message.isInTransaction()) {
478                     transactionStore.addMessage(store, message, pos);
479                 }
480                 else {
481                     store.replayAddMessage(context, message);
482                     transactionCounter++;
483                 }
484             } else {
485                 switch (c.getDataStructureType()) {
486                 case JournalQueueAck.DATA_STRUCTURE_TYPE:
487                 {
488                     JournalQueueAck command = (JournalQueueAck) c;
489                     JournalMessageStore store = (JournalMessageStore) createMessageStore(command.getDestination());
490                     if (command.getMessageAck().isInTransaction()) {
491                         transactionStore.removeMessage(store, command.getMessageAck(), pos);
492                     }
493                     else {
494                         store.replayRemoveMessage(context, command.getMessageAck());
495                         transactionCounter++;
496                     }
497                 }
498                 break;
499                 case JournalTopicAck.DATA_STRUCTURE_TYPE:
500                 {
501                     JournalTopicAck command = (JournalTopicAck) c;
502                     JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(command.getDestination());
503                     if (command.getTransactionId() != null) {
504                         transactionStore.acknowledge(store, command, pos);
505                     }
506                     else {
507                         store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
508                         transactionCounter++;
509                     }
510                 }
511                 break;
512                 case JournalTransaction.DATA_STRUCTURE_TYPE:
513                 {
514                     JournalTransaction command = (JournalTransaction) c;
515                     try {
516                         // Try to replay the packet.
517
switch (command.getType()) {
518                         case JournalTransaction.XA_PREPARE:
519                             transactionStore.replayPrepare(command.getTransactionId());
520                             break;
521                         case JournalTransaction.XA_COMMIT:
522                         case JournalTransaction.LOCAL_COMMIT:
523                             Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
524                             if (tx == null)
525                                 break; // We may be trying to replay a commit that
526
// was already committed.
527

528                             // Replay the committed operations.
529
tx.getOperations();
530                             for (Iterator JavaDoc iter = tx.getOperations().iterator(); iter.hasNext();) {
531                                 TxOperation op = (TxOperation) iter.next();
532                                 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
533                                     op.store.replayAddMessage(context, (Message) op.data);
534                                 }
535                                 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
536                                     op.store.replayRemoveMessage(context, (MessageAck) op.data);
537                                 }
538                                 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
539                                     JournalTopicAck ack = (JournalTopicAck) op.data;
540                                     ((JournalTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
541                                             .getMessageId());
542                                 }
543                             }
544                             transactionCounter++;
545                             break;
546                         case JournalTransaction.LOCAL_ROLLBACK:
547                         case JournalTransaction.XA_ROLLBACK:
548                             transactionStore.replayRollback(command.getTransactionId());
549                             break;
550                         }
551                     }
552                     catch (IOException JavaDoc e) {
553                         log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
554                     }
555                 }
556                 break;
557                 case JournalTrace.DATA_STRUCTURE_TYPE:
558                     JournalTrace trace = (JournalTrace) c;
559                     log.debug("TRACE Entry: " + trace.getMessage());
560                     break;
561                 default:
562                     log.error("Unknown type of record in transaction log which will be discarded: " + c);
563                 }
564             }
565         }
566
567         RecordLocation location = writeTraceMessage("RECOVERED", true);
568         journal.setMark(location, true);
569
570         log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
571     }
572
573     private IOException JavaDoc createReadException(RecordLocation location, Exception JavaDoc e) {
574         return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
575     }
576
577     protected IOException JavaDoc createWriteException(DataStructure packet, Exception JavaDoc e) {
578         return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
579     }
580
581     protected IOException JavaDoc createWriteException(String JavaDoc command, Exception JavaDoc e) {
582         return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
583     }
584
585     protected IOException JavaDoc createRecoveryFailedException(Exception JavaDoc e) {
586         return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
587     }
588
589     /**
590      *
591      * @param command
592      * @param sync
593      * @return
594      * @throws IOException
595      */

596     public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException JavaDoc {
597         if( started.get() )
598             return journal.write(toPacket(wireFormat.marshal(command)), sync);
599         throw new IOException JavaDoc("closed");
600     }
601
602     private RecordLocation writeTraceMessage(String JavaDoc message, boolean sync) throws IOException JavaDoc {
603         JournalTrace trace = new JournalTrace();
604         trace.setMessage(message);
605         return writeCommand(trace, sync);
606     }
607
608     public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
609         newPercentUsage = ((newPercentUsage)/10)*10;
610         oldPercentUsage = ((oldPercentUsage)/10)*10;
611         if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
612             boolean sync = newPercentUsage >= 90;
613             checkpoint(sync, true);
614         }
615     }
616     
617     public JournalTransactionStore getTransactionStore() {
618         return transactionStore;
619     }
620
621     public void deleteAllMessages() throws IOException JavaDoc {
622         try {
623             JournalTrace trace = new JournalTrace();
624             trace.setMessage("DELETED");
625             RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
626             journal.setMark(location, true);
627             log.info("Journal deleted: ");
628         } catch (IOException JavaDoc e) {
629             throw e;
630         } catch (Throwable JavaDoc e) {
631             throw IOExceptionSupport.create(e);
632         }
633         longTermPersistence.deleteAllMessages();
634     }
635
636     public UsageManager getUsageManager() {
637         return usageManager;
638     }
639
640     public int getMaxCheckpointMessageAddSize() {
641         return maxCheckpointMessageAddSize;
642     }
643
644     public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
645         this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
646     }
647
648     public int getMaxCheckpointWorkers() {
649         return maxCheckpointWorkers;
650     }
651
652     public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
653         this.maxCheckpointWorkers = maxCheckpointWorkers;
654     }
655
656     public boolean isUseExternalMessageReferences() {
657         return false;
658     }
659
660     public void setUseExternalMessageReferences(boolean enable) {
661         if( enable )
662             throw new IllegalArgumentException JavaDoc("The journal does not support message references.");
663     }
664     
665     public Packet toPacket(ByteSequence sequence) {
666         return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
667     }
668     
669     public ByteSequence toByteSequence(Packet packet) {
670         org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
671         return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
672     }
673     
674     public void setBrokerName(String JavaDoc brokerName){
675         longTermPersistence.setBrokerName(brokerName);
676     }
677     
678     public String JavaDoc toString(){
679         return "JournalPersistenceAdapator(" + longTermPersistence + ")";
680     }
681
682     public void setDirectory(File JavaDoc dir){
683     }
684
685 }
686
Popular Tags