1 28 package de.nava.informa.utils.toolkit; 29 30 import de.nava.informa.utils.poller.PriorityComparator; 31 import org.apache.commons.collections.BinaryHeap; 32 import org.apache.commons.collections.PriorityQueue; 33 import org.apache.commons.collections.SynchronizedPriorityQueue; 34 35 import java.util.ArrayList ; 36 import java.util.Iterator ; 37 import java.util.List ; 38 39 59 public class WorkersManager { 60 private static final int DEFAULT_WORKER_THREADS = 5; 61 private static final int DEFAULT_QUEUE_LIMIT = 25; 62 63 private List workers = new ArrayList (); 64 65 private JobSource jobSource = new JobSource(); 66 67 private PriorityQueue queue = 75 new SynchronizedPriorityQueue(new BinaryHeap(DEFAULT_QUEUE_LIMIT, new PriorityComparator())); 76 77 private WorkerThreadFactoryIF workerThreadsFactory; 78 79 84 public WorkersManager(WorkerThreadFactoryIF factory) { 85 this(factory, DEFAULT_WORKER_THREADS); 86 } 87 88 94 public WorkersManager(WorkerThreadFactoryIF factory, int workerThreads) { 95 this.workerThreadsFactory = factory; 96 97 if (workerThreads <= 0) { 99 workerThreads = DEFAULT_WORKER_THREADS; 100 } 101 102 setWorkerThreads(workerThreads); 103 } 104 105 110 public final void setWorkerThreads(int count) { 111 synchronized (workers) { 112 int curWorkerThreads = workers.size(); 114 for (int i = curWorkerThreads - 1; i >= count; i--) { 115 final WorkerThread worker = (WorkerThread) workers.get(i); 116 worker.terminate(); 117 workers.remove(worker); 118 } 119 120 curWorkerThreads = workers.size(); 122 for (int i = curWorkerThreads; i < count; i++) { 123 final WorkerThread worker = workerThreadsFactory.create(); 125 worker.setJobSource(jobSource); 126 127 workers.add(worker); 129 worker.start(); 130 } 131 } 132 } 133 134 137 public final void terminateAll() { 138 synchronized (workers) { 139 int count = workers.size(); 140 for (int i = count - 1; i >= 0; i--) { 141 ((WorkerThread) workers.get(i)).terminate(); 142 workers.remove(i); 143 } 144 } 145 } 146 147 152 public final void process(ChannelRecord record) { 153 if (!isInProcess(record)) { 154 if (!loadFreeWorker(record)) { 155 putRecordInQueue(record); 157 } 158 } 159 } 160 161 167 private boolean isInProcess(ChannelRecord record) { 168 boolean found = false; 169 170 synchronized (workers) { 171 Iterator i = workers.iterator(); 172 while (!found && i.hasNext()) { 173 WorkerThread worker = (WorkerThread) i.next(); 174 found = worker.getChannelInProcess() == record; 175 } 176 } 177 178 return found; 179 } 180 181 187 private boolean loadFreeWorker(ChannelRecord record) { 188 boolean loaded = false; 189 190 synchronized (workers) { 191 Iterator i = workers.iterator(); 192 while (!loaded && i.hasNext()) { 193 WorkerThread worker = (WorkerThread) i.next(); 194 if (!worker.isBusy()) { 195 loaded = worker.startJob(record); 196 } 197 } 198 } 199 200 return loaded; 201 } 202 203 209 private void putRecordInQueue(ChannelRecord record) { 210 queue.insert(record); 211 } 212 213 218 private synchronized ChannelRecord getRecordFromQueue() { 219 ChannelRecord record = null; 220 while (record == null && !queue.isEmpty()) { 221 record = (ChannelRecord) queue.pop(); 222 if (record.isCanceled()) record = null; 223 } 224 225 return record; 226 } 227 228 231 private class JobSource implements JobSourceIF { 232 237 public ChannelRecord getNextJob() { 238 return getRecordFromQueue(); 239 } 240 } 241 } 242 | Popular Tags |