KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > alfresco > repo > content > replication > ReplicatingContentStore


1 /*
2  * Copyright (C) 2006 Alfresco, Inc.
3  *
4  * Licensed under the Mozilla Public License version 1.1
5  * with a permitted attribution clause. You may obtain a
6  * copy of the License at
7  *
8  * http://www.alfresco.org/legal/license.txt
9  *
10  * Unless required by applicable law or agreed to in writing,
11  * software distributed under the License is distributed on an
12  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
13  * either express or implied. See the License for the specific
14  * language governing permissions and limitations under the
15  * License.
16  */

17 package org.alfresco.repo.content.replication;
18
19 import java.util.Date JavaDoc;
20 import java.util.HashSet JavaDoc;
21 import java.util.List JavaDoc;
22 import java.util.Set JavaDoc;
23 import java.util.concurrent.ThreadPoolExecutor JavaDoc;
24 import java.util.concurrent.locks.Lock JavaDoc;
25 import java.util.concurrent.locks.ReadWriteLock JavaDoc;
26 import java.util.concurrent.locks.ReentrantReadWriteLock JavaDoc;
27
28 import org.alfresco.error.AlfrescoRuntimeException;
29 import org.alfresco.repo.content.AbstractContentStore;
30 import org.alfresco.repo.content.ContentStore;
31 import org.alfresco.service.cmr.repository.ContentIOException;
32 import org.alfresco.service.cmr.repository.ContentReader;
33 import org.alfresco.service.cmr.repository.ContentStreamListener;
34 import org.alfresco.service.cmr.repository.ContentWriter;
35 import org.alfresco.service.transaction.TransactionService;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39 /**
40  * </h1><u>Replicating Content Store</u></h1>
41  * <p>
42  * A content store implementation that is able to replicate content between stores.
43  * Content is not persisted by this store, but rather it relies on any number of
44  * child {@link org.alfresco.repo.content.ContentStore stores} to provide access to
45  * content readers and writers.
46  * <p>
47  * The order in which the stores appear in the list of stores participating is
48  * important. The first store in the list is known as the <i>primary store</i>.
49  * When the replicator goes to fetch content, the stores are searched
50  * from first to last. The stores should therefore be arranged in order of
51  * speed.
52  * <p>
53  * It supports the notion of inbound and/or outbound replication, both of which can be
54  * operational at the same time.
55  *
56  * </h2><u>Outbound Replication</u></h2>
57  * <p>
58  * When this is enabled, then the primary store is used for writes. When the
59  * content write completes (i.e. the write channel is closed) then the content
60  * is synchronously copied to all other stores. The write is therefore slowed
61  * down, but the content replication will occur <i>in-transaction</i>.
62  * <p>
63  * The {@link #setOutboundThreadPoolExecutor(boolean) outboundThreadPoolExecutor }
64  * property to enable asynchronous replication.<br>
65  * With asynchronous replication, there is always a risk that a failure
66  * occurs during the replication. Depending on the configuration of the server,
67  * further action may need to be taken to rectify the problem manually.
68  *
69  * </h2><u>Inbound Replication</u></h2>
70  * <p>
71  * This can be used to lazily replicate content onto the primary store. When
72  * content can't be found in the primary store, the other stores are checked
73  * in order. If content is found, then it is copied into the local store
74  * before being returned. Subsequent accesses will use the primary store.<br>
75  * This should be used where the secondary stores are much slower, such as in
76  * the case of a store against some kind of archival mechanism.
77  *
78  * <h2><u>No Replication</u></h2>
79  * <p>
80  * Content is not written to the primary store only. The other stores are
81  * only used to retrieve content and the primary store is not updated with
82  * the content.
83  *
84  * @author Derek Hulley
85  */

86 public class ReplicatingContentStore extends AbstractContentStore
87 {
88     /*
89      * The replication process uses thread synchronization as it can
90      * decide to write content to specific URLs during requests for
91      * a reader.
92      * While this won't help the underlying stores if there are
93      * multiple replications on top of them, it will prevent repeated
94      * work from multiple threads entering an instance of this component
95      * looking for the same content at the same time.
96      */

97     
98     private static Log logger = LogFactory.getLog(ReplicatingContentStore.class);
99     
100     private TransactionService transactionService;
101     private ContentStore primaryStore;
102     private List JavaDoc<ContentStore> secondaryStores;
103     private boolean inbound;
104     private boolean outbound;
105     private ThreadPoolExecutor JavaDoc outboundThreadPoolExecutor;
106     
107     private Lock JavaDoc readLock;
108     private Lock JavaDoc writeLock;
109
110     /**
111      * Default constructor set <code>inbound = false</code> and <code>outbound = true</code>;
112      */

113     public ReplicatingContentStore()
114     {
115         inbound = false;
116         outbound = true;
117         
118         ReadWriteLock JavaDoc storeLock = new ReentrantReadWriteLock JavaDoc();
119         readLock = storeLock.readLock();
120         writeLock = storeLock.writeLock();
121     }
122     
123     /**
124      * Required to ensure that content listeners are executed in a transaction
125      *
126      * @param transactionService
127      */

128     public void setTransactionService(TransactionService transactionService)
129     {
130         this.transactionService = transactionService;
131     }
132
133     /**
134      * Set the primary store that content will be replicated to or from
135      *
136      * @param primaryStore the primary content store
137      */

138     public void setPrimaryStore(ContentStore primaryStore)
139     {
140         this.primaryStore = primaryStore;
141     }
142
143     /**
144      * Set the secondary stores that this component will replicate to or from
145      *
146      * @param stores a list of stores to replicate to or from
147      */

148     public void setSecondaryStores(List JavaDoc<ContentStore> secondaryStores)
149     {
150         this.secondaryStores = secondaryStores;
151     }
152     
153     /**
154      * Set whether or not this component should replicate content to the
155      * primary store if not found.
156      *
157      * @param inbound true to pull content onto the primary store when found
158      * on one of the other stores
159      */

160     public void setInbound(boolean inbound)
161     {
162         this.inbound = inbound;
163     }
164     
165     /**
166      * Set whether or not this component should replicate content to all stores
167      * as it is written.
168      *
169      * @param outbound true to enable synchronous replication to all stores
170      */

171     public void setOutbound(boolean outbound)
172     {
173         this.outbound = outbound;
174     }
175
176     /**
177      * Set the thread pool executer
178      *
179      * @param outboundThreadPoolExecutor set this to have the synchronization occur in a separate
180      * thread
181      */

182     public void setOutboundThreadPoolExecutor(ThreadPoolExecutor JavaDoc outboundThreadPoolExecutor)
183     {
184         this.outboundThreadPoolExecutor = outboundThreadPoolExecutor;
185     }
186     
187     /**
188      * Forwards the call directly to the first store in the list of stores.
189      */

190     public ContentReader getReader(String JavaDoc contentUrl) throws ContentIOException
191     {
192         if (primaryStore == null)
193         {
194             throw new AlfrescoRuntimeException("ReplicatingContentStore not initialised");
195         }
196         
197         // get a read lock so that we are sure that no replication is underway
198
ContentReader existingContentReader = null;
199         readLock.lock();
200         try
201         {
202             // get a reader from the primary store
203
ContentReader primaryReader = primaryStore.getReader(contentUrl);
204             
205             // give it straight back if the content is there
206
if (primaryReader.exists())
207             {
208                 return primaryReader;
209             }
210
211             // the content is not in the primary reader so we have to go looking for it
212
ContentReader secondaryContentReader = null;
213             for (ContentStore store : secondaryStores)
214             {
215                 ContentReader reader = store.getReader(contentUrl);
216                 if (reader.exists())
217                 {
218                     // found the content in a secondary store
219
secondaryContentReader = reader;
220                     break;
221                 }
222             }
223             // we already know that the primary has nothing
224
// drop out if no content was found
225
if (secondaryContentReader == null)
226             {
227                 return primaryReader;
228             }
229             // secondary content was found
230
// return it if we are not doing inbound
231
if (!inbound)
232             {
233                 return secondaryContentReader;
234             }
235             
236             // we have to replicate inbound
237
existingContentReader = secondaryContentReader;
238         }
239         finally
240         {
241             readLock.unlock();
242         }
243         
244         // -- a small gap for concurrent threads to get through --
245

246         // do inbound replication
247
writeLock.lock();
248         try
249         {
250             // double check the primary
251
ContentReader primaryContentReader = primaryStore.getReader(contentUrl);
252             if (primaryContentReader.exists())
253             {
254                 // we were beaten to it
255
return primaryContentReader;
256             }
257             // get a writer
258
ContentWriter primaryContentWriter = primaryStore.getWriter(existingContentReader, contentUrl);
259             // copy it over
260
primaryContentWriter.putContent(existingContentReader);
261             // get a writer to the new content
262
primaryContentReader = primaryContentWriter.getReader();
263             // done
264
return primaryContentReader;
265         }
266         finally
267         {
268             writeLock.unlock();
269         }
270     }
271
272     /**
273      *
274      */

275     public ContentWriter getWriter(ContentReader existingContentReader, String JavaDoc newContentUrl) throws ContentIOException
276     {
277         // get the writer
278
ContentWriter writer = primaryStore.getWriter(existingContentReader, newContentUrl);
279         
280         // attach a replicating listener if outbound replication is on
281
if (outbound)
282         {
283             if (logger.isDebugEnabled())
284             {
285                 logger.debug(
286                         "Attaching " + (outboundThreadPoolExecutor == null ? "" : "a") + "synchronous " +
287                                 "replicating listener to local writer: \n" +
288                         " primary store: " + primaryStore + "\n" +
289                         " writer: " + writer);
290             }
291             // attach the listener
292
ContentStreamListener listener = new ReplicatingWriteListener(secondaryStores, writer, outboundThreadPoolExecutor);
293             writer.addListener(listener);
294             writer.setTransactionService(transactionService); // mandatory when listeners are added
295
}
296         
297         // done
298
return writer;
299     }
300
301     /**
302      * Performs a delete on the local store and if outbound replication is on, propogates
303      * the delete to the other stores too.
304      *
305      * @return Returns the value returned by the delete on the primary store.
306      */

307     public boolean delete(String JavaDoc contentUrl) throws ContentIOException
308     {
309         // delete on the primary store
310
boolean deleted = primaryStore.delete(contentUrl);
311         
312         // propogate outbound deletions
313
if (outbound)
314         {
315             for (ContentStore store : secondaryStores)
316             {
317                 store.delete(contentUrl);
318             }
319             // log
320
if (logger.isDebugEnabled())
321             {
322                 logger.debug("Propagated content delete to " + secondaryStores.size() + " stores:" + contentUrl);
323             }
324         }
325         // done
326
if (logger.isDebugEnabled())
327         {
328             logger.debug("Deleted content for URL: " + contentUrl);
329         }
330         return deleted;
331     }
332
333     /**
334      * @return Returns the results as given by the primary store, and if inbound
335      * replication is active, merges the URLs from the secondary stores.
336      */

337     public Set JavaDoc<String JavaDoc> getUrls(Date JavaDoc createdAfter, Date JavaDoc createdBefore) throws ContentIOException
338     {
339         Set JavaDoc<String JavaDoc> urls = new HashSet JavaDoc<String JavaDoc>(1024);
340         
341         // add in URLs from primary store
342
Set JavaDoc<String JavaDoc> primaryUrls = primaryStore.getUrls(createdAfter, createdBefore);
343         urls.addAll(primaryUrls);
344         
345         // add in URLs from secondary stores (they are visible for reads)
346
for (ContentStore secondaryStore : secondaryStores)
347         {
348             Set JavaDoc<String JavaDoc> secondaryUrls = secondaryStore.getUrls(createdAfter, createdBefore);
349             // merge them
350
urls.addAll(secondaryUrls);
351         }
352         // done
353
if (logger.isDebugEnabled())
354         {
355             logger.debug("Found " + urls.size() + " URLs, of which " + primaryUrls.size() + " are primary: \n" +
356                     " created after: " + createdAfter + "\n" +
357                     " created before: " + createdBefore);
358         }
359         return urls;
360     }
361
362     /**
363      * Replicates the content upon stream closure. If the thread pool is available,
364      * then the process will be asynchronous.
365      * <p>
366      * No transaction boundaries have been declared as the
367      * {@link ContentWriter#addListener(ContentStreamListener)} method indicates that
368      * all listeners will be called within a transaction.
369      *
370      * @author Derek Hulley
371      */

372     public static class ReplicatingWriteListener implements ContentStreamListener
373     {
374         private List JavaDoc<ContentStore> stores;
375         private ContentWriter writer;
376         private ThreadPoolExecutor JavaDoc threadPoolExecutor;
377         
378         public ReplicatingWriteListener(
379                 List JavaDoc<ContentStore> stores,
380                 ContentWriter writer,
381                 ThreadPoolExecutor JavaDoc threadPoolExecutor)
382         {
383             this.stores = stores;
384             this.writer = writer;
385             this.threadPoolExecutor = threadPoolExecutor;
386         }
387         
388         public void contentStreamClosed() throws ContentIOException
389         {
390             Runnable JavaDoc runnable = new ReplicateOnCloseRunnable();
391             if (threadPoolExecutor == null)
392             {
393                 // execute direct
394
runnable.run();
395             }
396             else
397             {
398                 threadPoolExecutor.execute(runnable);
399             }
400         }
401         
402         /**
403          * Performs the actual replication work.
404          *
405          * @author Derek Hulley
406          */

407         private class ReplicateOnCloseRunnable implements Runnable JavaDoc
408         {
409             public void run()
410             {
411                 for (ContentStore store : stores)
412                 {
413                     try
414                     {
415                         // replicate the content to the store - we know the URL that we want to write to
416
ContentReader reader = writer.getReader();
417                         String JavaDoc contentUrl = reader.getContentUrl();
418                         // in order to replicate, we have to specify the URL that we are going to write to
419
ContentWriter replicatedWriter = store.getWriter(null, contentUrl);
420                         // write it
421
replicatedWriter.putContent(reader);
422                         
423                         if (logger.isDebugEnabled())
424                         {
425                             logger.debug("Replicated content to store: \n" +
426                                     " url: " + contentUrl + "\n" +
427                                     " to store: " + store);
428                         }
429                     }
430                     catch (Throwable JavaDoc e)
431                     {
432                         throw new ContentIOException("Content replication failed: \n" +
433                                 " url: " + writer.getContentUrl() + "\n" +
434                                 " to store: " + store);
435                     }
436                 }
437             }
438         }
439     }
440 }
441
Popular Tags