1 24 25 package com.mckoi.database; 26 27 import com.mckoi.debug.DebugLogger; 28 import java.util.LinkedList ; 29 30 36 37 final class WorkerPool { 38 39 42 private TransactionSystem system; 43 44 47 private int MAXIMUM_WORKER_THREADS = 4; 48 49 53 private LinkedList available_worker_threads; 54 55 58 private int worker_thread_count; 59 60 64 private LinkedList run_queue; 65 66 70 private boolean is_executing_commands; 71 72 73 76 WorkerPool(TransactionSystem system, int max_worker_threads) { 77 this.system = system; 78 MAXIMUM_WORKER_THREADS = max_worker_threads; 79 80 is_executing_commands = false; 81 82 run_queue = new LinkedList (); 84 available_worker_threads = new LinkedList (); 86 worker_thread_count = 0; 87 92 } 93 94 97 public final DebugLogger Debug() { 98 return system.Debug(); 99 } 100 101 103 107 void notifyWorkerReady(WorkerThread worker_thread) { 108 synchronized (available_worker_threads) { 109 available_worker_threads.add(worker_thread); 111 112 int q_len = run_queue.size(); 114 if (q_len > 0) { 115 RunCommand command = (RunCommand) run_queue.remove(0); 117 execute(command.user, command.database, command.runnable); 118 } 119 } 120 } 121 122 127 private WorkerThread getFirstWaitingThread() { 128 synchronized (available_worker_threads) { 129 int size = available_worker_threads.size(); 131 if (size > 0) { 132 WorkerThread wt = (WorkerThread) available_worker_threads.remove(0); 134 return wt; 135 } 136 else { 137 if (worker_thread_count < MAXIMUM_WORKER_THREADS) { 139 ++worker_thread_count; 140 WorkerThread wt = new WorkerThread(this); 141 wt.start(); 142 } 146 return null; 147 } 148 } 149 } 150 151 159 void execute(User user, DatabaseConnection database, Runnable runner) { 160 synchronized (available_worker_threads) { 161 if (is_executing_commands) { 162 WorkerThread worker = getFirstWaitingThread(); 163 if (worker != null) { 164 worker.execute(user, database, runner); 166 return; 167 } 168 } 169 RunCommand command = new RunCommand(user, database, runner); 171 run_queue.add(command); 172 } 173 } 174 175 179 void setIsExecutingCommands(boolean status) { 180 synchronized (available_worker_threads) { 181 if (status == true) { 182 is_executing_commands = true; 183 184 for (int i = run_queue.size() - 1; i >= 0; --i) { 186 RunCommand command = (RunCommand) run_queue.remove(i); 187 execute(command.user, command.database, command.runnable); 188 } 189 } 190 else { 191 is_executing_commands = false; 192 } 193 } 194 } 195 196 205 void waitUntilAllWorkersQuiet() { 206 if (Thread.currentThread() instanceof WorkerThread) { 207 throw new Error ("Can't call this method from a WorkerThread!"); 208 } 209 210 synchronized (available_worker_threads) { 211 while (worker_thread_count != available_worker_threads.size()) { 213 try { 215 available_worker_threads.wait(500); 216 } 217 catch (InterruptedException e) {} 218 } 223 } 224 } 225 226 229 void shutdown() { 230 synchronized (available_worker_threads) { 231 while (available_worker_threads.size() > 0) { 232 WorkerThread wt = (WorkerThread) available_worker_threads.remove(0); 233 --worker_thread_count; 234 wt.shutdown(); 235 } 236 } 237 } 238 239 241 245 private static final class RunCommand { 246 User user; 247 DatabaseConnection database; 248 Runnable runnable; 249 public RunCommand(User user, DatabaseConnection database, 250 Runnable runnable) { 251 this.user = user; 252 this.database = database; 253 this.runnable = runnable; 254 } 255 } 256 257 } 258 | Popular Tags |