KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mckoi > database > jdbcserver > JDBCProcessor


1 /**
2  * com.mckoi.database.jdbcserver.JDBCProcessor 22 Jul 2000
3  *
4  * Mckoi SQL Database ( http://www.mckoi.com/database )
5  * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * Version 2 as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License Version 2 for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * Version 2 along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  * Change Log:
21  *
22  *
23  */

24
25 package com.mckoi.database.jdbcserver;
26
27 import com.mckoi.database.global.ObjectTransfer;
28 import com.mckoi.database.jdbc.StreamableObjectPart;
29 import com.mckoi.database.jdbc.ProtocolConstants;
30 import com.mckoi.database.jdbc.MSQLException;
31 import com.mckoi.database.jdbc.DatabaseCallBack;
32 import com.mckoi.database.jdbc.DatabaseInterface;
33 import com.mckoi.database.jdbc.QueryResponse;
34 import com.mckoi.database.jdbc.ResultPart;
35 import com.mckoi.database.jdbc.SQLQuery;
36 import com.mckoi.debug.*;
37 import com.mckoi.util.ByteArrayUtil;
38
39 import java.sql.SQLException JavaDoc;
40 import java.io.*;
41
42 /**
43  * This processes JDBC commands from a JDBC client and dispatches the commands
44  * to the database. This is a state based class. There is a single processor
45  * for each JDBC client connected. This class is designed to be flexible
46  * enough to handle packet based protocols as well as stream based
47  * protocols.
48  *
49  * @author Tobias Downer
50  */

51
52 abstract class JDBCProcessor implements ProtocolConstants {
53
54   /**
55    * The version of the server protocol.
56    */

57   private static final int SERVER_VERSION = 1;
58
59
60   /**
61    * The current state we are in. 0 indicates we haven't logged in yet. 100
62    * indicates we are logged in.
63    */

64   private int state;
65
66   /**
67    * Number of authentications tried.
68    */

69   private int authentication_tries;
70
71   /**
72    * The interface to the database.
73    */

74   private DatabaseInterface db_interface;
75
76   /**
77    * An object the debug information can be logged to.
78    */

79   private DebugLogger debug;
80
81   /**
82    * Sets up the processor.
83    */

84   JDBCProcessor(DatabaseInterface db_interface, DebugLogger logger) {
85     this.debug = logger;
86     this.db_interface = db_interface;
87     state = 0;
88     authentication_tries = 0;
89   }
90
91   /**
92    * The database call back method that sends database events back to the
93    * client.
94    */

95   private DatabaseCallBack db_call_back = new DatabaseCallBack() {
96     public void databaseEvent(int event_type, String JavaDoc event_message) {
97       try {
98         // Format the call back and send the event.
99
ByteArrayOutputStream bout = new ByteArrayOutputStream();
100         DataOutputStream dout = new DataOutputStream(bout);
101         dout.writeInt(event_type);
102         dout.writeUTF(event_message);
103         sendEvent(bout.toByteArray());
104       }
105       catch (IOException e) {
106         debug.write(Lvl.ERROR, this, "IO Error: " + e.getMessage());
107         debug.writeException(e);
108       }
109     }
110   };
111
112   protected static void printByteArray(byte[] array) {
113     System.out.println("Length: " + array.length);
114     for (int i = 0; i < array.length; ++i) {
115       System.out.print(array[i]);
116       System.out.print(", ");
117     }
118   }
119
120   /**
121    * Processes a single JDBCCommand from the client. The command comes in as
122    * a byte[] array and the response is written out as a byte[] array. If
123    * it returns 'null' then it means the connection has been closed.
124    */

125   byte[] processJDBCCommand(byte[] command) throws IOException {
126
127 // printByteArray(command);
128

129     if (state == 0) {
130       // State 0 means we looking for the header...
131
int magic = ByteArrayUtil.getInt(command, 0);
132       // The driver version number
133
int maj_ver = ByteArrayUtil.getInt(command, 4);
134       int min_ver = ByteArrayUtil.getInt(command, 8);
135
136       byte[] ack_command = new byte[4 + 1 + 4 + 1];
137       // Send back an acknowledgement and the version number of the server
138
ByteArrayUtil.setInt(ACKNOWLEDGEMENT, ack_command, 0);
139       ack_command[4] = 1;
140       ByteArrayUtil.setInt(SERVER_VERSION, ack_command, 5);
141       ack_command[9] = 0;
142
143       // Set to the next state.
144
state = 4;
145
146       // Return the acknowledgement
147
return ack_command;
148
149 // // We accept drivers equal or less than 1.00 currently.
150
// if ((maj_ver == 1 && min_ver == 0) || maj_ver == 0) {
151
// // Go to next state.
152
// state = 4;
153
// return single(ACKNOWLEDGEMENT);
154
// }
155
// else {
156
// // Close the connection if driver invalid.
157
// close();
158
// }
159
//
160
// return null;
161
}
162
163     else if (state == 4) {
164       // State 4 means we looking for username and password...
165
ByteArrayInputStream bin = new ByteArrayInputStream(command);
166       DataInputStream din = new DataInputStream(bin);
167       String JavaDoc default_schema = din.readUTF();
168       String JavaDoc username = din.readUTF();
169       String JavaDoc password = din.readUTF();
170
171       try {
172         boolean good = db_interface.login(default_schema, username, password,
173                                           db_call_back);
174         if (good == false) {
175           // Close after 12 tries.
176
if (authentication_tries >= 12) {
177             close();
178           }
179           else {
180             ++authentication_tries;
181             return single(USER_AUTHENTICATION_FAILED);
182           }
183         }
184         else {
185           state = 100;
186           return single(USER_AUTHENTICATION_PASSED);
187         }
188       }
189       catch (SQLException e) { }
190       return null;
191
192     }
193
194     else if (state == 100) {
195       // Process the query
196
return processQuery(command);
197     }
198
199     else {
200       throw new Error JavaDoc("Illegal state: " + state);
201     }
202
203   }
204
205   /**
206    * Returns the state of the connection. 0 = not logged in yet. 1 = logged
207    * in.
208    */

209   int getState() {
210     return state;
211   }
212
213   /**
214    * Convenience, returns a single 4 byte array with the given int encoded
215    * into it.
216    */

217   private byte[] single(int val) {
218     byte[] buf = new byte[4];
219     ByteArrayUtil.setInt(val, buf, 0);
220     return buf;
221   }
222
223   /**
224    * Creates a response that represents an SQL exception failure.
225    */

226   private byte[] exception(int dispatch_id, SQLException e)
227                                                           throws IOException {
228
229     int code = e.getErrorCode();
230     String JavaDoc msg = e.getMessage();
231     if (msg == null) {
232       msg = "NULL exception message";
233     }
234     String JavaDoc server_msg = "";
235     String JavaDoc stack_trace = "";
236
237     if (e instanceof MSQLException) {
238       MSQLException me = (MSQLException) e;
239       server_msg = me.getServerErrorMsg();
240       stack_trace = me.getServerErrorStackTrace();
241     }
242     else {
243       StringWriter writer = new StringWriter();
244       e.printStackTrace(new PrintWriter(writer));
245       stack_trace = writer.toString();
246     }
247
248     ByteArrayOutputStream bout = new ByteArrayOutputStream();
249     DataOutputStream dout = new DataOutputStream(bout);
250     dout.writeInt(dispatch_id);
251     dout.writeInt(EXCEPTION);
252     dout.writeInt(code);
253     dout.writeUTF(msg);
254     dout.writeUTF(stack_trace);
255
256     return bout.toByteArray();
257
258   }
259
260   /**
261    * Creates a response that indicates a simple success of an operation with
262    * the given dispatch id.
263    */

264   private byte[] simpleSuccess(int dispatch_id) throws IOException {
265     byte[] buf = new byte[8];
266     ByteArrayUtil.setInt(dispatch_id, buf, 0);
267     ByteArrayUtil.setInt(SUCCESS, buf, 4);
268     return buf;
269   }
270
271   /**
272    * Processes a query on the byte[] array and returns the result.
273    */

274   private byte[] processQuery(byte[] command) throws IOException {
275
276     byte[] result;
277
278     // The first int is the command.
279
int ins = ByteArrayUtil.getInt(command, 0);
280     
281     // Otherwise must be a dispatch type request.
282
// The second is the dispatch id.
283
int dispatch_id = ByteArrayUtil.getInt(command, 4);
284
285     if (dispatch_id == -1) {
286       throw new Error JavaDoc("Special case dispatch id of -1 in query");
287     }
288
289     if (ins == RESULT_SECTION) {
290       result = resultSection(dispatch_id, command);
291     }
292     else if (ins == QUERY) {
293       result = queryCommand(dispatch_id, command);
294     }
295     else if (ins == PUSH_STREAMABLE_OBJECT_PART) {
296       result = pushStreamableObjectPart(dispatch_id, command);
297     }
298     else if (ins == DISPOSE_RESULT) {
299       result = disposeResult(dispatch_id, command);
300     }
301     else if (ins == STREAMABLE_OBJECT_SECTION) {
302       result = streamableObjectSection(dispatch_id, command);
303     }
304     else if (ins == DISPOSE_STREAMABLE_OBJECT) {
305       result = disposeStreamableObject(dispatch_id, command);
306     }
307     else if (ins == CLOSE) {
308       close();
309       result = null;
310     }
311     else {
312       throw new Error JavaDoc("Command (" + ins + ") not understood.");
313     }
314
315     return result;
316
317   }
318
319   /**
320    * Disposes of this processor.
321    */

322   void dispose() {
323     try {
324       db_interface.dispose();
325     }
326     catch (Throwable JavaDoc e) {
327       debug.writeException(Lvl.ERROR, e);
328     }
329   }
330
331
332   // ---------- JDBC primitive commands ----------
333

334   /**
335    * Executes a query and returns the header for the result in the response.
336    * This keeps track of all result sets because sections of the result are
337    * later queries via the 'RESULT_SECTION' command.
338    * <p>
339    * 'dispatch_id' is the number we need to respond with.
340    */

341   private byte[] queryCommand(int dispatch_id,
342                               byte[] command) throws IOException {
343
344     // Read the query from the command.
345
ByteArrayInputStream bin =
346                      new ByteArrayInputStream(command, 8, command.length - 8);
347     DataInputStream din = new DataInputStream(bin);
348     SQLQuery query = SQLQuery.readFrom(din);
349
350     try {
351       // Do the query
352
QueryResponse response = db_interface.execQuery(query);
353
354       // Prepare the stream to output the response to,
355
ByteArrayOutputStream bout = new ByteArrayOutputStream();
356       DataOutputStream dout = new DataOutputStream(bout);
357
358       dout.writeInt(dispatch_id);
359       dout.writeInt(SUCCESS);
360
361       // The response sends the result id, the time the query took, the
362
// total row count, and description of each column in the result.
363
dout.writeInt(response.getResultID());
364       dout.writeInt(response.getQueryTimeMillis());
365       dout.writeInt(response.getRowCount());
366       int col_count = response.getColumnCount();
367       dout.writeInt(col_count);
368       for (int i = 0; i < col_count; ++i) {
369         response.getColumnDescription(i).writeTo(dout);
370       }
371
372       return bout.toByteArray();
373
374     }
375     catch (SQLException e) {
376 // debug.writeException(e);
377
return exception(dispatch_id, e);
378     }
379
380   }
381
382   /**
383    * Pushes a part of a streamable object onto the server.
384    * <p>
385    * 'dispatch_id' is the number we need to respond with.
386    */

387   private byte[] pushStreamableObjectPart(int dispatch_id,
388                                           byte[] command) throws IOException {
389     byte type = command[8];
390     long object_id = ByteArrayUtil.getLong(command, 9);
391     long object_length = ByteArrayUtil.getLong(command, 17);
392     int length = ByteArrayUtil.getInt(command, 25);
393     byte[] ob_buf = new byte[length];
394     System.arraycopy(command, 29, ob_buf, 0, length);
395     long offset = ByteArrayUtil.getLong(command, 29 + length);
396     
397     try {
398       // Pass this through to the underlying database interface.
399
db_interface.pushStreamableObjectPart(type, object_id, object_length,
400                                             ob_buf, offset, length);
401
402       // Return operation success.
403
return simpleSuccess(dispatch_id);
404
405     }
406     catch (SQLException e) {
407       return exception(dispatch_id, e);
408     }
409     
410   }
411   
412   /**
413    * Responds with a part of the result set of a query made via the 'QUERY'
414    * command.
415    * <p>
416    * 'dispatch_id' is the number we need to respond with.
417    */

418   private byte[] resultSection(int dispatch_id,
419                                byte[] command) throws IOException {
420
421     int result_id = ByteArrayUtil.getInt(command, 8);
422     int row_number = ByteArrayUtil.getInt(command, 12);
423     int row_count = ByteArrayUtil.getInt(command, 16);
424
425     try {
426       // Get the result part...
427
ResultPart block =
428                  db_interface.getResultPart(result_id, row_number, row_count);
429
430       ByteArrayOutputStream bout = new ByteArrayOutputStream();
431       DataOutputStream dout = new DataOutputStream(bout);
432
433       dout.writeInt(dispatch_id);
434       dout.writeInt(SUCCESS);
435
436       // Send the contents of the result set.
437
// HACK - Work out column count by dividing number of entries in block
438
// by number of rows.
439
int col_count = block.size() / row_count;
440       dout.writeInt(col_count);
441       int bsize = block.size();
442       for (int index = 0; index < bsize; ++index) {
443         ObjectTransfer.writeTo(dout, block.elementAt(index));
444       }
445
446       return bout.toByteArray();
447     }
448     catch (SQLException e) {
449       return exception(dispatch_id, e);
450     }
451   }
452
453   /**
454    * Returns a section of a streamable object.
455    * <p>
456    * 'dispatch_id' is the number we need to respond with.
457    */

458   private byte[] streamableObjectSection(int dispatch_id, byte[] command)
459                                                           throws IOException {
460     int result_id = ByteArrayUtil.getInt(command, 8);
461     long streamable_object_id = ByteArrayUtil.getLong(command, 12);
462     long offset = ByteArrayUtil.getLong(command, 20);
463     int length = ByteArrayUtil.getInt(command, 28);
464
465     try {
466       StreamableObjectPart ob_part =
467           db_interface.getStreamableObjectPart(result_id, streamable_object_id,
468                                                offset, length);
469
470       ByteArrayOutputStream bout = new ByteArrayOutputStream();
471       DataOutputStream dout = new DataOutputStream(bout);
472
473       dout.writeInt(dispatch_id);
474       dout.writeInt(SUCCESS);
475
476       byte[] buf = ob_part.getContents();
477       dout.writeInt(buf.length);
478       dout.write(buf, 0, buf.length);
479
480       return bout.toByteArray();
481     }
482     catch (SQLException e) {
483       return exception(dispatch_id, e);
484     }
485     
486   }
487
488   /**
489    * Disposes of a streamable object.
490    * <p>
491    * 'dispatch_id' is the number we need to respond with.
492    */

493   private byte[] disposeStreamableObject(int dispatch_id, byte[] command)
494                                                           throws IOException {
495     int result_id = ByteArrayUtil.getInt(command, 8);
496     long streamable_object_id = ByteArrayUtil.getLong(command, 12);
497     
498     try {
499       // Pass this through to the underlying database interface.
500
db_interface.disposeStreamableObject(result_id, streamable_object_id);
501
502       // Return operation success.
503
return simpleSuccess(dispatch_id);
504
505     }
506     catch (SQLException e) {
507       return exception(dispatch_id, e);
508     }
509   }
510   
511   /**
512    * Disposes of a result set we queries via the 'QUERY' command.
513    * <p>
514    * 'dispatch_id' is the number we need to respond with.
515    */

516   private byte[] disposeResult(int dispatch_id,
517                                byte[] command) throws IOException {
518
519     // Get the result id.
520
int result_id = ByteArrayUtil.getInt(command, 8);
521
522     try {
523       // Dispose the table.
524
db_interface.disposeResult(result_id);
525       // Return operation success.
526
return simpleSuccess(dispatch_id);
527     }
528     catch (SQLException e) {
529       return exception(dispatch_id, e);
530     }
531   }
532
533   
534
535   
536   
537
538   // ---------- Abstract methods ----------
539

540   /**
541    * Sends an event to the client. This is used to notify the client of
542    * trigger events, etc.
543    * <p>
544    * SECURITY ISSUE: This is always invoked by the DatabaseDispatcher. We
545    * have to be careful that this method isn't allowed to block. Otherwise
546    * the DatabaseDispatcher thread will be out of operation. Unfortunately
547    * assuring this may not be possible until Java has non-blocking IO, or we
548    * use datagrams for transmission. I know for sure that the TCP
549    * implementation is vunrable. If the client doesn't 'read' what we are
550    * sending then this'll block when the buffers become full.
551    */

552   public abstract void sendEvent(byte[] event_msg) throws IOException;
553
554   /**
555    * Closes the connection with the client.
556    */

557   public abstract void close() throws IOException;
558
559   /**
560    * Returns true if the connection to the client is closed.
561    */

562   public abstract boolean isClosed() throws IOException;
563
564   // ---------- Finalize ----------
565

566   public final void finalize() throws Throwable JavaDoc {
567     super.finalize();
568     try {
569       dispose();
570     }
571     catch (Throwable JavaDoc e) { /* ignore */ }
572   }
573
574 }
575
Popular Tags