1 17 package org.alfresco.repo.content.replication; 18 19 import java.util.Date ; 20 import java.util.HashSet ; 21 import java.util.List ; 22 import java.util.Set ; 23 import java.util.concurrent.ThreadPoolExecutor ; 24 import java.util.concurrent.locks.Lock ; 25 import java.util.concurrent.locks.ReadWriteLock ; 26 import java.util.concurrent.locks.ReentrantReadWriteLock ; 27 28 import org.alfresco.error.AlfrescoRuntimeException; 29 import org.alfresco.repo.content.AbstractContentStore; 30 import org.alfresco.repo.content.ContentStore; 31 import org.alfresco.service.cmr.repository.ContentIOException; 32 import org.alfresco.service.cmr.repository.ContentReader; 33 import org.alfresco.service.cmr.repository.ContentStreamListener; 34 import org.alfresco.service.cmr.repository.ContentWriter; 35 import org.alfresco.service.transaction.TransactionService; 36 import org.apache.commons.logging.Log; 37 import org.apache.commons.logging.LogFactory; 38 39 86 public class ReplicatingContentStore extends AbstractContentStore 87 { 88 97 98 private static Log logger = LogFactory.getLog(ReplicatingContentStore.class); 99 100 private TransactionService transactionService; 101 private ContentStore primaryStore; 102 private List <ContentStore> secondaryStores; 103 private boolean inbound; 104 private boolean outbound; 105 private ThreadPoolExecutor outboundThreadPoolExecutor; 106 107 private Lock readLock; 108 private Lock writeLock; 109 110 113 public ReplicatingContentStore() 114 { 115 inbound = false; 116 outbound = true; 117 118 ReadWriteLock storeLock = new ReentrantReadWriteLock (); 119 readLock = storeLock.readLock(); 120 writeLock = storeLock.writeLock(); 121 } 122 123 128 public void setTransactionService(TransactionService transactionService) 129 { 130 this.transactionService = transactionService; 131 } 132 133 138 public void setPrimaryStore(ContentStore primaryStore) 139 { 140 this.primaryStore = primaryStore; 141 } 142 143 148 public void setSecondaryStores(List <ContentStore> secondaryStores) 149 { 150 this.secondaryStores = secondaryStores; 151 } 152 153 160 public void setInbound(boolean inbound) 161 { 162 this.inbound = inbound; 163 } 164 165 171 public void setOutbound(boolean outbound) 172 { 173 this.outbound = outbound; 174 } 175 176 182 public void setOutboundThreadPoolExecutor(ThreadPoolExecutor outboundThreadPoolExecutor) 183 { 184 this.outboundThreadPoolExecutor = outboundThreadPoolExecutor; 185 } 186 187 190 public ContentReader getReader(String contentUrl) throws ContentIOException 191 { 192 if (primaryStore == null) 193 { 194 throw new AlfrescoRuntimeException("ReplicatingContentStore not initialised"); 195 } 196 197 ContentReader existingContentReader = null; 199 readLock.lock(); 200 try 201 { 202 ContentReader primaryReader = primaryStore.getReader(contentUrl); 204 205 if (primaryReader.exists()) 207 { 208 return primaryReader; 209 } 210 211 ContentReader secondaryContentReader = null; 213 for (ContentStore store : secondaryStores) 214 { 215 ContentReader reader = store.getReader(contentUrl); 216 if (reader.exists()) 217 { 218 secondaryContentReader = reader; 220 break; 221 } 222 } 223 if (secondaryContentReader == null) 226 { 227 return primaryReader; 228 } 229 if (!inbound) 232 { 233 return secondaryContentReader; 234 } 235 236 existingContentReader = secondaryContentReader; 238 } 239 finally 240 { 241 readLock.unlock(); 242 } 243 244 246 writeLock.lock(); 248 try 249 { 250 ContentReader primaryContentReader = primaryStore.getReader(contentUrl); 252 if (primaryContentReader.exists()) 253 { 254 return primaryContentReader; 256 } 257 ContentWriter primaryContentWriter = primaryStore.getWriter(existingContentReader, contentUrl); 259 primaryContentWriter.putContent(existingContentReader); 261 primaryContentReader = primaryContentWriter.getReader(); 263 return primaryContentReader; 265 } 266 finally 267 { 268 writeLock.unlock(); 269 } 270 } 271 272 275 public ContentWriter getWriter(ContentReader existingContentReader, String newContentUrl) throws ContentIOException 276 { 277 ContentWriter writer = primaryStore.getWriter(existingContentReader, newContentUrl); 279 280 if (outbound) 282 { 283 if (logger.isDebugEnabled()) 284 { 285 logger.debug( 286 "Attaching " + (outboundThreadPoolExecutor == null ? "" : "a") + "synchronous " + 287 "replicating listener to local writer: \n" + 288 " primary store: " + primaryStore + "\n" + 289 " writer: " + writer); 290 } 291 ContentStreamListener listener = new ReplicatingWriteListener(secondaryStores, writer, outboundThreadPoolExecutor); 293 writer.addListener(listener); 294 writer.setTransactionService(transactionService); } 296 297 return writer; 299 } 300 301 307 public boolean delete(String contentUrl) throws ContentIOException 308 { 309 boolean deleted = primaryStore.delete(contentUrl); 311 312 if (outbound) 314 { 315 for (ContentStore store : secondaryStores) 316 { 317 store.delete(contentUrl); 318 } 319 if (logger.isDebugEnabled()) 321 { 322 logger.debug("Propagated content delete to " + secondaryStores.size() + " stores:" + contentUrl); 323 } 324 } 325 if (logger.isDebugEnabled()) 327 { 328 logger.debug("Deleted content for URL: " + contentUrl); 329 } 330 return deleted; 331 } 332 333 337 public Set <String > getUrls(Date createdAfter, Date createdBefore) throws ContentIOException 338 { 339 Set <String > urls = new HashSet <String >(1024); 340 341 Set <String > primaryUrls = primaryStore.getUrls(createdAfter, createdBefore); 343 urls.addAll(primaryUrls); 344 345 for (ContentStore secondaryStore : secondaryStores) 347 { 348 Set <String > secondaryUrls = secondaryStore.getUrls(createdAfter, createdBefore); 349 urls.addAll(secondaryUrls); 351 } 352 if (logger.isDebugEnabled()) 354 { 355 logger.debug("Found " + urls.size() + " URLs, of which " + primaryUrls.size() + " are primary: \n" + 356 " created after: " + createdAfter + "\n" + 357 " created before: " + createdBefore); 358 } 359 return urls; 360 } 361 362 372 public static class ReplicatingWriteListener implements ContentStreamListener 373 { 374 private List <ContentStore> stores; 375 private ContentWriter writer; 376 private ThreadPoolExecutor threadPoolExecutor; 377 378 public ReplicatingWriteListener( 379 List <ContentStore> stores, 380 ContentWriter writer, 381 ThreadPoolExecutor threadPoolExecutor) 382 { 383 this.stores = stores; 384 this.writer = writer; 385 this.threadPoolExecutor = threadPoolExecutor; 386 } 387 388 public void contentStreamClosed() throws ContentIOException 389 { 390 Runnable runnable = new ReplicateOnCloseRunnable(); 391 if (threadPoolExecutor == null) 392 { 393 runnable.run(); 395 } 396 else 397 { 398 threadPoolExecutor.execute(runnable); 399 } 400 } 401 402 407 private class ReplicateOnCloseRunnable implements Runnable 408 { 409 public void run() 410 { 411 for (ContentStore store : stores) 412 { 413 try 414 { 415 ContentReader reader = writer.getReader(); 417 String contentUrl = reader.getContentUrl(); 418 ContentWriter replicatedWriter = store.getWriter(null, contentUrl); 420 replicatedWriter.putContent(reader); 422 423 if (logger.isDebugEnabled()) 424 { 425 logger.debug("Replicated content to store: \n" + 426 " url: " + contentUrl + "\n" + 427 " to store: " + store); 428 } 429 } 430 catch (Throwable e) 431 { 432 throw new ContentIOException("Content replication failed: \n" + 433 " url: " + writer.getContentUrl() + "\n" + 434 " to store: " + store); 435 } 436 } 437 } 438 } 439 } 440 } 441 | Popular Tags |