|                                                                                                              1
 24
 25  package com.mckoi.database;
 26
 27  import com.mckoi.debug.DebugLogger;
 28  import java.util.LinkedList
  ; 29
 30
 36
 37  final class WorkerPool {
 38
 39
 42    private TransactionSystem system;
 43
 44
 47    private int MAXIMUM_WORKER_THREADS = 4;
 48
 49
 53    private LinkedList
  available_worker_threads; 54
 55
 58    private int worker_thread_count;
 59
 60
 64    private LinkedList
  run_queue; 65
 66
 70    private boolean is_executing_commands;
 71
 72
 73
 76    WorkerPool(TransactionSystem system, int max_worker_threads) {
 77      this.system = system;
 78      MAXIMUM_WORKER_THREADS = max_worker_threads;
 79
 80      is_executing_commands = false;
 81
 82          run_queue = new LinkedList
  (); 84          available_worker_threads = new LinkedList
  (); 86      worker_thread_count = 0;
 87
 92    }
 93
 94
 97    public final DebugLogger Debug() {
 98      return system.Debug();
 99    }
 100
 101
 103
 107   void notifyWorkerReady(WorkerThread worker_thread) {
 108     synchronized (available_worker_threads) {
 109             available_worker_threads.add(worker_thread);
 111
 112             int q_len = run_queue.size();
 114       if (q_len > 0) {
 115                 RunCommand command = (RunCommand) run_queue.remove(0);
 117         execute(command.user, command.database, command.runnable);
 118       }
 119     }
 120   }
 121
 122
 127   private WorkerThread getFirstWaitingThread() {
 128     synchronized (available_worker_threads) {
 129             int size = available_worker_threads.size();
 131       if (size > 0) {
 132                 WorkerThread wt = (WorkerThread) available_worker_threads.remove(0);
 134         return wt;
 135       }
 136       else {
 137                 if (worker_thread_count < MAXIMUM_WORKER_THREADS) {
 139           ++worker_thread_count;
 140           WorkerThread wt = new WorkerThread(this);
 141           wt.start();
 142                                       }
 146         return null;
 147       }
 148     }
 149   }
 150
 151
 159   void execute(User user, DatabaseConnection database, Runnable
  runner) { 160     synchronized (available_worker_threads) {
 161       if (is_executing_commands) {
 162         WorkerThread worker = getFirstWaitingThread();
 163         if (worker != null) {
 164           worker.execute(user, database, runner);
 166           return;
 167         }
 168       }
 169       RunCommand command = new RunCommand(user, database, runner);
 171       run_queue.add(command);
 172     }
 173   }
 174
 175
 179   void setIsExecutingCommands(boolean status) {
 180     synchronized (available_worker_threads) {
 181       if (status == true) {
 182         is_executing_commands = true;
 183
 184                 for (int i = run_queue.size() - 1; i >= 0; --i) {
 186           RunCommand command = (RunCommand) run_queue.remove(i);
 187           execute(command.user, command.database, command.runnable);
 188         }
 189       }
 190       else {
 191         is_executing_commands = false;
 192       }
 193     }
 194   }
 195
 196
 205   void waitUntilAllWorkersQuiet() {
 206     if (Thread.currentThread() instanceof WorkerThread) {
 207       throw new Error
  ("Can't call this method from a WorkerThread!"); 208     }
 209
 210     synchronized (available_worker_threads) {
 211             while (worker_thread_count != available_worker_threads.size()) {
 213                 try {
 215           available_worker_threads.wait(500);
 216         }
 217         catch (InterruptedException
  e) {} 218                                       }
 223     }
 224   }
 225
 226
 229   void shutdown() {
 230     synchronized (available_worker_threads) {
 231       while (available_worker_threads.size() > 0) {
 232         WorkerThread wt = (WorkerThread) available_worker_threads.remove(0);
 233         --worker_thread_count;
 234         wt.shutdown();
 235       }
 236     }
 237   }
 238
 239
 241
 245   private static final class RunCommand {
 246     User     user;
 247     DatabaseConnection database;
 248     Runnable
  runnable; 249     public RunCommand(User user, DatabaseConnection database,
 250                       Runnable
  runnable) { 251       this.user = user;
 252       this.database = database;
 253       this.runnable = runnable;
 254     }
 255   }
 256
 257 }
 258
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |