1 18 23 24 package org.apache.roller.business.referrers; 25 26 import java.util.ArrayList ; 27 import java.util.Collections ; 28 import java.util.HashMap ; 29 import java.util.Iterator ; 30 import java.util.List ; 31 import org.apache.commons.logging.Log; 32 import org.apache.commons.logging.LogFactory; 33 import org.apache.roller.RollerException; 34 import org.apache.roller.business.runnable.ContinuousWorkerThread; 35 import org.apache.roller.business.runnable.WorkerThread; 36 import org.apache.roller.config.RollerConfig; 37 import org.apache.roller.model.RollerFactory; 38 39 40 61 public class ReferrerQueueManagerImpl implements ReferrerQueueManager { 62 63 private static Log mLogger = LogFactory.getLog(ReferrerQueueManagerImpl.class); 64 65 private static ReferrerQueueManager instance = null; 66 67 private boolean asyncMode = false; 68 private int numWorkers = 1; 69 private int sleepTime = 10000; 70 private List workers = null; 71 private List referrerQueue = null; 72 73 static { 74 instance = new ReferrerQueueManagerImpl(); 75 } 76 77 78 private ReferrerQueueManagerImpl() { 80 mLogger.info("Initializing Referrer Queue Manager"); 81 82 this.asyncMode = RollerConfig.getBooleanProperty("referrers.asyncProcessing.enabled"); 84 85 mLogger.info("Asynchronous referrer processing = "+this.asyncMode); 86 87 if(this.asyncMode) { 88 89 90 String num = RollerConfig.getProperty("referrers.queue.numWorkers"); 91 String sleep = RollerConfig.getProperty("referrers.queue.sleepTime"); 92 93 try { 94 this.numWorkers = Integer.parseInt(num); 95 96 if(numWorkers < 1) 97 this.numWorkers = 1; 98 99 } catch(NumberFormatException nfe) { 100 mLogger.warn("Invalid num workers ["+num+"], using default"); 101 } 102 103 try { 104 this.sleepTime = Integer.parseInt(sleep) * 1000; 106 } catch(NumberFormatException nfe) { 107 mLogger.warn("Invalid sleep time ["+sleep+"], using default"); 108 } 109 110 this.referrerQueue = Collections.synchronizedList(new ArrayList ()); 112 113 this.workers = new ArrayList (); 115 ContinuousWorkerThread worker = null; 116 QueuedReferrerProcessingJob job = null; 117 for(int i=0; i < this.numWorkers; i++) { 118 job = new QueuedReferrerProcessingJob(); 119 worker = new ContinuousWorkerThread("ReferrerWorker"+i, job, this.sleepTime); 120 workers.add(worker); 121 worker.start(); 122 } 123 } 124 } 125 126 127 130 public static ReferrerQueueManager getInstance() { 131 return instance; 132 } 133 134 135 142 public void processReferrer(IncomingReferrer referrer) { 143 144 if(this.asyncMode) { 145 mLogger.debug("QUEUING: "+referrer.getRequestUrl()); 146 147 this.enqueue(referrer); 149 } else { 150 ReferrerProcessingJob job = new ReferrerProcessingJob(); 152 153 HashMap inputs = new HashMap (); 155 inputs.put("referrer", referrer); 156 job.input(inputs); 157 158 job.execute(); 160 161 try { 162 RollerFactory.getRoller().flush(); 164 } catch (RollerException ex) { 165 mLogger.error("ERROR commiting referrer", ex); 166 } 167 } 168 169 } 170 171 172 175 public void enqueue(IncomingReferrer referrer) { 176 this.referrerQueue.add(referrer); 177 178 if(this.referrerQueue.size() > 250) { 179 mLogger.warn("Referrer queue is rather full. queued="+this.referrerQueue.size()); 180 } 181 } 182 183 184 187 public synchronized IncomingReferrer dequeue() { 188 189 if(!this.referrerQueue.isEmpty()) { 190 return (IncomingReferrer) this.referrerQueue.remove(0); 191 } 192 193 return null; 194 } 195 196 197 200 public void shutdown() { 201 202 if(this.workers != null && this.workers.size() > 0) { 203 mLogger.info("stopping all ReferrerQueue worker threads"); 204 205 WorkerThread worker = null; 207 Iterator it = this.workers.iterator(); 208 while(it.hasNext()) { 209 worker = (WorkerThread) it.next(); 210 worker.interrupt(); 211 } 212 } 213 214 } 215 216 } 217 | Popular Tags |