KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mckoi > database > jdbcserver > SingleThreadedConnectionPoolServer


1 /**
2  * com.mckoi.database.jdbcserver.SingleThreadedConnectionPoolServer 22 Jun 2001
3  *
4  * Mckoi SQL Database ( http://www.mckoi.com/database )
5  * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * Version 2 as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License Version 2 for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * Version 2 along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  * Change Log:
21  *
22  *
23  */

24
25 package com.mckoi.database.jdbcserver;
26
27 import com.mckoi.database.User;
28 import com.mckoi.database.Database;
29 import com.mckoi.database.DatabaseSystem;
30 import com.mckoi.debug.*;
31 import java.io.IOException JavaDoc;
32 import java.util.ArrayList JavaDoc;
33 import java.util.LinkedList JavaDoc;
34 import java.util.ResourceBundle JavaDoc;
35
36 /**
37  * A generic database server class that provides a thread that dispatches
38  * commands to the underlying database. This class only provides a framework
39  * for creating a server. It doesn't provide any implementation
40  * specifics for protocols.
41  * <p>
42  * An TCP implementation of this class would wait for connections and then
43  * create a ServerConnection implementation and feed it into the pool for
44  * processing. This object will then poll the ServerConnection until a
45  * command is pending, and then dispatch the command to a database worker
46  * thread.
47  * <p>
48  * This object will ping the clients every so often to see if they are alive.
49  *
50  * @author Tobias Downer
51  */

52
53 final class SingleThreadedConnectionPoolServer
54                                             implements ConnectionPoolServer {
55
56   /**
57    * The number of milliseconds between client pings.
58    * NOTE: Should this be a configurable variable in the '.conf' file?
59    * (45 seconds)
60    */

61   private static final int PING_BREAK = 45 * 1000; //4 * 60 * 1000;
62

63   /**
64    * If this is set to true then the server periodically outputs statistics
65    * about the connections.
66    */

67   private static final boolean DISPLAY_STATS = false;
68
69   /**
70    * The Database context.
71    */

72   private Database database;
73
74   /**
75    * The list of ServerConnection objects that are pending to be added into the
76    * current service provider list next time it is checked.
77    */

78   private ArrayList JavaDoc pending_connections_list;
79
80   /**
81    * The ServerFarmer object that polls for information from the clients and
82    * dispatches the request to the worker threads.
83    */

84   private ServerFarmer farmer;
85
86
87   /**
88    * The Constructor. The argument is the configuration file.
89    */

90   SingleThreadedConnectionPoolServer(Database database) {
91     this.database = database;
92     pending_connections_list = new ArrayList JavaDoc();
93     // Create the farmer thread that services all the connections.
94
farmer = new ServerFarmer();
95     farmer.start();
96   }
97
98   /**
99    * Returns a DebugLogger object that we can log debug messages to.
100    */

101   public final DebugLogger Debug() {
102     return database.Debug();
103   }
104
105   /**
106    * Connects a new ServerConnection into the pool of connections to clients
107    * that this server maintains. We then cycle through these connections
108    * determining whether any commands are pending. If a command is pending
109    * we spawn off a worker thread to do the task.
110    */

111   public void addConnection(ServerConnection connection) {
112     synchronized(pending_connections_list) {
113       pending_connections_list.add(connection);
114     }
115   }
116
117   /**
118    * Closes this connection pool server down.
119    */

120   public void close() {
121     farmer.close();
122   }
123
124   // ---------- Inner classes ----------
125

126   /**
127    * This thread is a low priority thread that checks all the current service
128    * providers periodically to determine if there's any commands pending.
129    */

130   private class ServerFarmer extends Thread JavaDoc {
131
132     /**
133      * The list of ServerConnection objects that are currently being serviced
134      * by this server.
135      */

136     private ArrayList JavaDoc server_connections_list;
137
138     /**
139      * Staticial information collected.
140      */

141     private int stat_display = 0;
142     private int commands_run = 0;
143     private int commands_waited = 0;
144
145     /**
146      * If this is set to true, then the farmer run method should close off.
147      */

148     private boolean farmer_closed;
149
150     /**
151      * The number of milliseconds to wait between each poll of the 'available'
152      * method of the socket. This value is determined by the configuration
153      * file during initialization.
154      */

155     private int poll_wait_time;
156
157
158
159     /**
160      * The Constructor.
161      */

162     public ServerFarmer() {
163       super();
164 // setPriority(NORM_PRIORITY - 1);
165

166       // The time in ms between each poll of the 'available' method.
167
// Default is '3 ms'
168
poll_wait_time = 3;
169
170       server_connections_list = new ArrayList JavaDoc();
171       farmer_closed = false;
172     }
173
174     /**
175      * Establishes a connection with any current pending connections in the
176      * 'pending_connections_list'.
177      */

178     private void establishPendingConnections() throws IOException JavaDoc {
179       synchronized (pending_connections_list) {
180         int len = pending_connections_list.size();
181         // Move all pending connections into the current connections list.
182
for (int i = 0; i < len; ++i) {
183           // Get the connection and create the new connection state
184
ServerConnection connection =
185                         (ServerConnection) pending_connections_list.remove(0);
186           server_connections_list.add(new ServerConnectionState(connection));
187         }
188       }
189     }
190
191     /**
192      * Checks each connection in the 'service_connection_list' list. If there
193      * is a command pending, and any previous commands on this connection have
194      * completed, then this will spawn off a new process to deal with the
195      * command.
196      */

197     private void checkCurrentConnections() {
198       int len = server_connections_list.size();
199       for (int i = len - 1; i >= 0; --i) {
200         ServerConnectionState connection_state =
201                        (ServerConnectionState) server_connections_list.get(i);
202         try {
203           // Is this connection not currently processing a command?
204
if (!connection_state.isProcessingRequest()) {
205             ServerConnection connection = connection_state.getConnection();
206             // Does this connection have a request pending?
207
if (connection_state.hasPendingCommand() ||
208                 connection.requestPending()) {
209               // Set that we have a pending command
210
connection_state.setPendingCommand();
211               connection_state.setProcessingRequest();
212
213               final ServerConnectionState current_state = connection_state;
214
215 // // Execute this on a database worker thread.
216
// final User conn_user = connection.getUser();
217
// DatabaseSystem.execute(conn_user, connection.getDatabase(),
218
// new Runnable() {
219
database.execute(null, null, new Runnable JavaDoc() {
220                 public void run() {
221
222                   try {
223                     // Process the next request that's pending.
224
current_state.getConnection().processRequest();
225                   }
226                   catch (IOException JavaDoc e) {
227                     Debug().writeException(Lvl.INFORMATION, e);
228                   }
229                   finally {
230                     // Then clear the state
231
// This makes sure that this provider may accept new
232
// commands again.
233
current_state.clearInternal();
234                   }
235
236                 }
237               });
238
239             } // if (provider_state.hasPendingCommand() ....
240
} // if (!provider_state.isProcessRequest())
241
}
242         catch (IOException JavaDoc e) {
243           // If an IOException is generated, we must remove this provider from
244
// the list.
245
try {
246             connection_state.getConnection().close();
247           }
248           catch (IOException JavaDoc e2) { /* ignore */ }
249           server_connections_list.remove(i);
250
251           // This happens if the connection closes.
252
Debug().write(Lvl.INFORMATION, this,
253                       "IOException generated while checking connections, " +
254                       "removing provider.");
255           Debug().writeException(Lvl.INFORMATION, e);
256         }
257       }
258     }
259
260     /**
261      * Performs a ping on a single random connection. If the ping fails then
262      * the connection is closed.
263      */

264     private void doPings() {
265       int len = server_connections_list.size();
266       if (len == 0) {
267         if (DISPLAY_STATS) {
268           System.out.print("[TCPServer Stats] ");
269           System.out.println("Ping tried but no connections.");
270         }
271         return;
272       }
273       int i = (int) (Math.random() * len);
274
275       if (DISPLAY_STATS) {
276         System.out.print("[TCPServer Stats] ");
277         System.out.print("Pinging client ");
278         System.out.print(i);
279         System.out.println(".");
280       }
281
282       final ServerConnectionState connection_state =
283                        (ServerConnectionState) server_connections_list.get(i);
284
285       // Is this provider not currently processing a command?
286
if (!connection_state.isProcessingRequest()) {
287         // Don't let a command interrupt the ping.
288
connection_state.setProcessingRequest();
289
290         // ISSUE: Pings are executed under 'null' user and database...
291
database.execute(null, null, new Runnable JavaDoc() {
292           public void run() {
293             try {
294               // Ping the client? - This closes the provider if the
295
// ping fails.
296
connection_state.getConnection().ping();
297             }
298             catch (IOException JavaDoc e) {
299               // Close connection
300
try {
301                 connection_state.getConnection().close();
302               }
303               catch (IOException JavaDoc e2) { /* ignore */ }
304               Debug().write(Lvl.ALERT, ServerFarmer.this,
305                           "Closed because ping failed.");
306               Debug().writeException(Lvl.ALERT, e);
307             }
308             finally {
309               connection_state.clearProcessingRequest();
310             }
311           }
312         });
313
314       } // if (!provider_state.isProcessRequest())
315
}
316
317     /**
318      * Displays statistics about the server.
319      */

320     private void displayStatistics() {
321       if (DISPLAY_STATS) {
322         if (stat_display == 0) {
323           stat_display = 500;
324           System.out.print("[TCPServer Stats] ");
325           System.out.print(commands_run);
326           System.out.print(" run, ");
327           System.out.print(commands_waited);
328           System.out.print(" wait, ");
329           System.out.print(server_connections_list.size());
330           System.out.print(" worker count");
331           System.out.println();
332         }
333         else {
334           --stat_display;
335         }
336       }
337     }
338
339     /**
340      * Call this method to stop the farmer thread.
341      */

342     public synchronized void close() {
343       farmer_closed = true;
344     }
345
346     /**
347      * The Runnable method of the farmer thread.
348      */

349     public void run() {
350       int yield_count = 0;
351       long do_ping_time = System.currentTimeMillis() + PING_BREAK;
352       int ping_count = 200;
353
354       final int method_poll_wait_time = poll_wait_time;
355
356       Debug().write(Lvl.MESSAGE, this,
357                   "Polling frequency: " + method_poll_wait_time + "ms.");
358
359       while (true) {
360         try {
361
362           // First, determine if there are any pending service providers
363
// waiting to be established.
364
if (pending_connections_list.size() > 0) {
365             establishPendingConnections();
366           }
367           checkCurrentConnections();
368
369           // Is it time to ping the clients?
370
--ping_count;
371           if (ping_count <= 0) {
372             ping_count = 2000;
373             long current_time = System.currentTimeMillis();
374             if (current_time > do_ping_time) {
375               // Randomly ping
376
doPings();
377               do_ping_time = current_time + PING_BREAK;
378             }
379           }
380
381           if (yield_count <= 0) {
382             synchronized (this) {
383               // Wait for 3ms to give everything room to breath
384
wait(method_poll_wait_time);
385               yield_count = 3;
386             }
387           }
388           else {
389             synchronized (this) {
390               // Exit if the farmer thread has been closed...
391
if (farmer_closed == true) {
392                 return;
393               }
394             }
395             Thread.yield();
396             --yield_count;
397           }
398
399           // Print out connection statistics every so often
400
displayStatistics();
401
402         }
403         catch (Throwable JavaDoc e) {
404           Debug().write(Lvl.ERROR, this, "Connection Pool Farmer Error");
405           Debug().writeException(e);
406
407           // Wait for two seconds (so debug log isn't spammed)
408
synchronized (this) {
409             try {
410               wait(2000);
411             }
412             catch (InterruptedException JavaDoc e2) { /* ignore */ }
413           }
414
415         }
416       }
417     }
418
419   };
420
421
422   /**
423    * This contains state information about a ServerConnection that is being
424    * maintained by the server.
425    */

426   private final class ServerConnectionState {
427
428     /**
429      * The local variables.
430      */

431     private ServerConnection connection;
432 // private boolean is_establish;
433
private boolean is_processing_request;
434     private boolean is_pending_command;
435     private boolean is_ping_client;
436
437     /**
438      * The Constructor.
439      */

440     ServerConnectionState(ServerConnection connection) {
441       this.connection = connection;
442       clearInternal();
443 // is_establish = true;
444
}
445
446     /**
447      * Sets the various states to true.
448      */

449     public synchronized void setProcessingRequest() {
450       is_processing_request = true;
451     }
452     public synchronized void setPendingCommand() {
453       is_pending_command = true;
454     }
455     public synchronized void setPingClient() {
456       is_ping_client = true;
457     }
458
459
460     /**
461      * Clears the internal state.
462      */

463     public synchronized void clearInternal() {
464       is_processing_request = false;
465       is_pending_command = false;
466 // is_establish = false;
467
is_ping_client = false;
468     }
469
470     /**
471      * Clears the flag that says we are processing a request.
472      */

473     public synchronized void clearProcessingRequest() {
474       is_processing_request = false;
475     }
476
477     /**
478      * Queries the internal state.
479      */

480     public synchronized ServerConnection getConnection() {
481       return connection;
482     }
483     public synchronized boolean isProcessingRequest() {
484       return is_processing_request;
485     }
486     public synchronized boolean hasPendingCommand() {
487       return is_pending_command;
488     }
489 // public synchronized boolean isEstablishConnection() {
490
// return is_establish;
491
// }
492
public synchronized boolean isPingClient() {
493       return is_ping_client;
494     }
495
496   }
497
498 }
499
Popular Tags