KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mckoi > database > WorkerPool


1 /**
2  * com.mckoi.database.WorkerPool 12 Aug 2000
3  *
4  * Mckoi SQL Database ( http://www.mckoi.com/database )
5  * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * Version 2 as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License Version 2 for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * Version 2 along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  * Change Log:
21  *
22  *
23  */

24
25 package com.mckoi.database;
26
27 import com.mckoi.debug.DebugLogger;
28 import java.util.LinkedList JavaDoc;
29
30 /**
31  * Maintains a pool of worker threads that are used to dispatch commands to
32  * a Database sub-system.
33  *
34  * @author Tobias Downer
35  */

36
37 final class WorkerPool {
38
39   /**
40    * The TransactionSystem that this pool is part of.
41    */

42   private TransactionSystem system;
43
44   /**
45    * This is the maximum number of worker threads that will be created.
46    */

47   private int MAXIMUM_WORKER_THREADS = 4;
48
49   /**
50    * This is a queue of 'WorkerThread' objects that are currently available
51    * to process commands from the service providers.
52    */

53   private LinkedList JavaDoc available_worker_threads;
54
55   /**
56    * The number of worker threads that have been created in total.
57    */

58   private int worker_thread_count;
59
60   /**
61    * A list of pending Runnable objects that are due to be executed. This is
62    * a queue of events to be run.
63    */

64   private LinkedList JavaDoc run_queue;
65
66   /**
67    * If this is set to false, then no commands will be executed by the
68    * 'execute' method.
69    */

70   private boolean is_executing_commands;
71
72
73   /**
74    * Constructs the worker thread pool.
75    */

76   WorkerPool(TransactionSystem system, int max_worker_threads) {
77     this.system = system;
78     MAXIMUM_WORKER_THREADS = max_worker_threads;
79
80     is_executing_commands = false;
81
82     // Set up the run queue
83
run_queue = new LinkedList JavaDoc();
84     // Set up the worker threads
85
available_worker_threads = new LinkedList JavaDoc();
86     worker_thread_count = 0;
87 // // Create a single worker thread and start it.
88
// ++worker_thread_count;
89
// WorkerThread wt = new WorkerThread(this);
90
// wt.start();
91

92   }
93
94   /**
95    * Returns a DebugLogger object that we can use to log debug messages.
96    */

97   public final DebugLogger Debug() {
98     return system.Debug();
99   }
100
101   // ---------- Thread Pooling methods ----------
102

103   /**
104    * This is called by a WorkerThread when it is decided that it is ready
105    * to service a new command.
106    */

107   void notifyWorkerReady(WorkerThread worker_thread) {
108     synchronized (available_worker_threads) {
109       // Add it to the queue of worker threads that are available.
110
available_worker_threads.add(worker_thread);
111
112       // Are there any commands pending?
113
int q_len = run_queue.size();
114       if (q_len > 0) {
115         // Execute the bottom element on the queue
116
RunCommand command = (RunCommand) run_queue.remove(0);
117         execute(command.user, command.database, command.runnable);
118       }
119     }
120   }
121
122   /**
123    * This returns the first available WorkerThread object from the thread
124    * pool. If there are no available worker threads available then it returns
125    * null. This method must execute fast and must not block.
126    */

127   private WorkerThread getFirstWaitingThread() {
128     synchronized (available_worker_threads) {
129       // Is there a worker thread available?
130
int size = available_worker_threads.size();
131       if (size > 0) {
132         // Yes so remove the first element and return it.
133
WorkerThread wt = (WorkerThread) available_worker_threads.remove(0);
134         return wt;
135       }
136       else {
137         // Otherwise create a new worker thread if we can.
138
if (worker_thread_count < MAXIMUM_WORKER_THREADS) {
139           ++worker_thread_count;
140           WorkerThread wt = new WorkerThread(this);
141           wt.start();
142           // NOTE: We must _not_ return the worker thread we have just created.
143
// We must wait until the worker thread has made it self known by
144
// it calling the 'notifyWorkerReady' method.
145
}
146         return null;
147       }
148     }
149   }
150
151   /**
152    * Executes database functions from the 'run' method of the given runnable
153    * instance on a worker thread. All database functions should go through
154    * a worker thread. If we ensure this, we can easily stop all database
155    * functions from executing. Also, we only need to have a certain number
156    * of threads active at any one time rather than a unique thread for each
157    * connection.
158    */

159   void execute(User user, DatabaseConnection database, Runnable JavaDoc runner) {
160     synchronized (available_worker_threads) {
161       if (is_executing_commands) {
162         WorkerThread worker = getFirstWaitingThread();
163         if (worker != null) {
164 // System.out.println("[Database] executing runner");
165
worker.execute(user, database, runner);
166           return;
167         }
168       }
169 // System.out.println("[Database] adding to run queue");
170
RunCommand command = new RunCommand(user, database, runner);
171       run_queue.add(command);
172     }
173   }
174
175   /**
176    * Controls whether the database is allowed to execute commands or not. If
177    * this is set to true, then calls to 'execute' will make commands execute.
178    */

179   void setIsExecutingCommands(boolean status) {
180     synchronized (available_worker_threads) {
181       if (status == true) {
182         is_executing_commands = true;
183
184         // Execute everything on the queue
185
for (int i = run_queue.size() - 1; i >= 0; --i) {
186           RunCommand command = (RunCommand) run_queue.remove(i);
187           execute(command.user, command.database, command.runnable);
188         }
189       }
190       else {
191         is_executing_commands = false;
192       }
193     }
194   }
195
196   /**
197    * Waits until all executing commands have stopped. This is best called
198    * right after a call to 'setIsExecutingCommands(false)'. If these two
199    * commands are run, the database is in a known state where no commands
200    * can be executed.
201    * <p>
202    * NOTE: This can't be called from the WorkerThread. Deadlock will
203    * result if we were allowed to do this.
204    */

205   void waitUntilAllWorkersQuiet() {
206     if (Thread.currentThread() instanceof WorkerThread) {
207       throw new Error JavaDoc("Can't call this method from a WorkerThread!");
208     }
209
210     synchronized (available_worker_threads) {
211       // loop until available works = total worker thread count.
212
while (worker_thread_count != available_worker_threads.size()) {
213         // Wait half a second
214
try {
215           available_worker_threads.wait(500);
216         }
217         catch (InterruptedException JavaDoc e) {}
218         // ISSUE: If this lasts for more than 10 minutes, one of the worker
219
// threads is likely in a state of deadlock. If this happens, we
220
// should probably find all the open worker threads and clean them
221
// up nicely.
222
}
223     }
224   }
225
226   /**
227    * Shuts down the WorkerPool object stopping all worker threads.
228    */

229   void shutdown() {
230     synchronized (available_worker_threads) {
231       while (available_worker_threads.size() > 0) {
232         WorkerThread wt = (WorkerThread) available_worker_threads.remove(0);
233         --worker_thread_count;
234         wt.shutdown();
235       }
236     }
237   }
238
239   // ---------- Inner classes ----------
240

241   /**
242    * Structures within the run_queue list. This stores the Runnable to
243    * run and the User that's executing the command.
244    */

245   private static final class RunCommand {
246     User user;
247     DatabaseConnection database;
248     Runnable JavaDoc runnable;
249     public RunCommand(User user, DatabaseConnection database,
250                       Runnable JavaDoc runnable) {
251       this.user = user;
252       this.database = database;
253       this.runnable = runnable;
254     }
255   }
256
257 }
258
Popular Tags