1 17 package org.alfresco.repo.content.replication; 18 19 import java.util.Set ; 20 21 import org.alfresco.error.AlfrescoRuntimeException; 22 import org.alfresco.repo.content.ContentStore; 23 import org.alfresco.repo.node.index.IndexRecovery; 24 import org.alfresco.service.cmr.repository.ContentReader; 25 import org.alfresco.service.cmr.repository.ContentWriter; 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 import org.quartz.Job; 29 import org.quartz.JobExecutionContext; 30 import org.quartz.JobExecutionException; 31 32 44 public class ContentStoreReplicator 45 { 46 private static Log logger = LogFactory.getLog(ContentStoreReplicator.class); 47 48 private ContentStore sourceStore; 49 private ContentStore targetStore; 50 51 52 private boolean started; 53 54 private boolean runContinuously; 55 56 private long waitTime; 57 58 public ContentStoreReplicator() 59 { 60 this.started = false; 61 this.runContinuously = true; 62 this.waitTime = 60000L; 63 } 64 65 70 public void setSourceStore(ContentStore sourceStore) 71 { 72 this.sourceStore = sourceStore; 73 } 74 75 80 public void setTargetStore(ContentStore targetStore) 81 { 82 this.targetStore = targetStore; 83 } 84 85 91 public void setRunContinuously(boolean runContinuously) 92 { 93 this.runContinuously = runContinuously; 94 } 95 96 101 public void setWaitTime(long waitTime) 102 { 103 this.waitTime = waitTime * 1000L; 105 } 106 107 110 public synchronized void start() 111 { 112 if (started) 113 { 114 throw new AlfrescoRuntimeException("This ContentStoreReplicator has already been started"); 115 } 116 Runnable runnable = new ReplicationRunner(); 118 Thread thread = new Thread (runnable); 119 thread.setPriority(Thread.MIN_PRIORITY); 120 thread.setDaemon(true); 121 thread.start(); 123 } 124 125 130 private class ReplicationRunner implements Runnable 131 { 132 public void run() 133 { 134 while (true) 136 { 137 try 138 { 139 ContentStoreReplicator.this.replicate(); 140 if (!runContinuously) 142 { 143 if (logger.isDebugEnabled()) 146 { 147 logger.debug("Thread quitting - first pass of replication complete:"); 148 } 149 break; 150 } 151 synchronized(ContentStoreReplicator.this) 153 { 154 ContentStoreReplicator.this.wait(waitTime); 155 } 156 } 157 catch (InterruptedException e) 158 { 159 } 161 catch (Throwable e) 162 { 163 logger.error("Replication failure", e); 165 } 166 } 167 } 168 } 169 170 173 private void replicate() 174 { 175 Set <String > sourceUrls = sourceStore.getUrls(); 177 Set <String > targetUrls = targetStore.getUrls(); 179 sourceUrls.removeAll(targetUrls); 181 182 for (String contentUrl : sourceUrls) 184 { 185 replicate(contentUrl); 186 } 187 } 188 189 197 private void replicate(String contentUrl) 198 { 199 try 200 { 201 if (targetStore.exists(contentUrl)) 203 { 204 if (logger.isDebugEnabled()) 206 { 207 logger.debug("No replication required - URL exists in target store: \n" + 208 " source store: " + sourceStore + "\n" + 209 " target store: " + targetStore + "\n" + 210 " content URL: " + contentUrl); 211 } 212 return; 213 } 214 ContentWriter writer = targetStore.getWriter(null, contentUrl); 216 ContentReader reader = sourceStore.getReader(contentUrl); 218 if (!reader.exists()) 219 { 220 if (logger.isDebugEnabled()) 222 { 223 logger.debug("Source store no longer has URL - no replication possible: \n" + 224 " source store: " + sourceStore + "\n" + 225 " target store: " + targetStore + "\n" + 226 " content URL: " + contentUrl); 227 } 228 return; 229 } 230 writer.putContent(reader); 232 } 233 catch (Throwable e) 234 { 235 logger.error("Failed to replicate URL - removing target content: \n" + 236 " source store: " + sourceStore + "\n" + 237 " target store: " + targetStore + "\n" + 238 " content URL: " + contentUrl, 239 e); 240 targetStore.delete(contentUrl); 241 } 242 } 243 244 249 public class ContentStoreReplicatorJob implements Job 250 { 251 252 public static final String KEY_CONTENT_STORE_REPLICATOR = "contentStoreReplicator"; 253 254 258 public void execute(JobExecutionContext context) throws JobExecutionException 259 { 260 ContentStoreReplicator contentStoreReplicator = (ContentStoreReplicator) context.getJobDetail() 261 .getJobDataMap().get(KEY_CONTENT_STORE_REPLICATOR); 262 if (contentStoreReplicator == null) 263 { 264 throw new JobExecutionException("Missing job data: " + KEY_CONTENT_STORE_REPLICATOR); 265 } 266 contentStoreReplicator.start(); 268 } 269 } 270 } 271 | Popular Tags |