KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mckoi > database > jdbc > RemoteDatabaseInterface


1 /**
2  * com.mckoi.database.jdbc.RemoteDatabaseInterface 16 Aug 2000
3  *
4  * Mckoi SQL Database ( http://www.mckoi.com/database )
5  * Copyright (C) 2000, 2001, 2002 Diehl and Associates, Inc.
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License
9  * Version 2 as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License Version 2 for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * Version 2 along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19  *
20  * Change Log:
21  *
22  *
23  */

24
25 package com.mckoi.database.jdbc;
26
27 import java.io.*;
28 import java.sql.*;
29 import java.util.Properties JavaDoc;
30 import java.util.Vector JavaDoc;
31
32 import com.mckoi.database.global.ColumnDescription;
33 import com.mckoi.database.global.ObjectTransfer;
34 import com.mckoi.util.ByteArrayUtil;
35
36
37 /**
38  * An abstract implementation of DatabaseInterface that retrieves information
39  * from a remote server host. The actual implementation of the communication
40  * protocol is left to the derived classes.
41  *
42  * @author Tobias Downer
43  */

44
45 abstract class RemoteDatabaseInterface
46                              implements DatabaseInterface, ProtocolConstants {
47
48   /**
49    * The thread that dispatches commands to the server. This is created and
50    * started after the 'login' method is called. This can handle concurrent
51    * queries through the protocol pipe.
52    */

53   private ConnectionThread connection_thread;
54
55   /**
56    * A DatabaseCallBack implementation that is notified of all events that
57    * are received from the database.
58    */

59   private DatabaseCallBack database_call_back;
60
61
62   /**
63    * Writes the exception to the JDBC log stream.
64    */

65   private static void logException(Throwable JavaDoc e) {
66     PrintWriter out = null;
67 //#IFDEF(NO_1.1)
68
out = DriverManager.getLogWriter();
69 //#ENDIF
70
if (out != null) {
71       e.printStackTrace(out);
72     }
73 // else {
74
// e.printStackTrace(System.err);
75
// }
76
}
77
78
79   // ---------- Abstract methods ----------
80

81   /**
82    * Writes the given command to the server. The way the command is written
83    * is totally network layer dependent.
84    */

85   abstract void writeCommandToServer(byte[] command, int offset, int length)
86                                                            throws IOException;
87
88   /**
89    * Blocks until the next command is received from the server. The way this
90    * is implemented is network layer dependant.
91    */

92   abstract byte[] nextCommandFromServer(int timeout) throws IOException;
93
94   /**
95    * Closes the connection.
96    */

97   abstract void closeConnection() throws IOException;
98
99
100   // ---------- Implemented from DatabaseInterface ----------
101

102   public boolean login(String JavaDoc default_schema, String JavaDoc user, String JavaDoc password,
103                        DatabaseCallBack call_back) throws SQLException {
104
105     try {
106
107       // Do some handshaking,
108
ByteArrayOutputStream bout = new ByteArrayOutputStream();
109       DataOutputStream out = new DataOutputStream(bout);
110
111       // Write out the magic number
112
out.writeInt(0x0ced007);
113       // Write out the JDBC driver version
114
out.writeInt(MDriver.DRIVER_MAJOR_VERSION);
115       out.writeInt(MDriver.DRIVER_MINOR_VERSION);
116       byte[] arr = bout.toByteArray();
117       writeCommandToServer(arr, 0, arr.length);
118
119       byte[] response = nextCommandFromServer(0);
120
121 // printByteArray(response);
122

123       int ack = ByteArrayUtil.getInt(response, 0);
124       if (ack == ACKNOWLEDGEMENT) {
125
126         // History of server versions (inclusive)
127
// Engine version | server_version
128
// -----------------|-------------------
129
// 0.00 - 0.91 | 0
130
// 0.92 - | 1
131
// -----------------|-------------------
132

133         // Server version defaults to 0
134
// Server version 0 is for all versions of the engine previous to 0.92
135
int server_version = 0;
136         // Is there anything more to read?
137
if (response.length > 4 && response[4] == 1) {
138           // Yes so read the server version
139
server_version = ByteArrayUtil.getInt(response, 5);
140         }
141
142         // Send the username and password to the server
143
// SECURITY: username/password sent as plain text. This is okay
144
// if we are connecting to localhost, but not good if we connecting
145
// over the internet. We could encrypt this, but it would probably
146
// be better if we put the entire stream through an encyption
147
// protocol.
148

149         bout.reset();
150         out.writeUTF(default_schema);
151         out.writeUTF(user);
152         out.writeUTF(password);
153         arr = bout.toByteArray();
154         writeCommandToServer(arr, 0, arr.length);
155
156         response = nextCommandFromServer(0);
157         int result = ByteArrayUtil.getInt(response, 0);
158         if (result == USER_AUTHENTICATION_PASSED) {
159
160           // Set the call_back,
161
this.database_call_back = call_back;
162
163           // User authentication passed so we successfully logged in now.
164
connection_thread = new ConnectionThread();
165           connection_thread.start();
166           return true;
167
168         }
169         else if (result == USER_AUTHENTICATION_FAILED) {
170           throw new SQLLoginException("User Authentication failed.");
171         }
172         else {
173           throw new SQLException("Unexpected response.");
174         }
175
176       }
177       else {
178         throw new SQLException("No acknowledgement received from server.");
179       }
180
181     }
182     catch (IOException e) {
183       logException(e);
184       throw new SQLException("IOException: " + e.getMessage());
185     }
186
187   }
188
189   
190   public void pushStreamableObjectPart(byte type, long object_id,
191                 long object_length, byte[] buf, long offset, int length)
192                                                          throws SQLException {
193     try {
194       // Push the object part
195
int dispatch_id = connection_thread.pushStreamableObjectPart(
196                         type, object_id, object_length, buf, offset, length);
197       // Get the response
198
ServerCommand command =
199               connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id);
200       // If command == null then we timed out
201
if (command == null) {
202         throw new SQLException("Query timed out after " +
203                                MDriver.QUERY_TIMEOUT + " seconds.");
204       }
205
206       DataInputStream din = new DataInputStream(command.getInputStream());
207       int status = din.readInt();
208
209       // If failed report the error.
210
if (status == FAILED) {
211         throw new SQLException("Push object failed: " + din.readUTF());
212       }
213
214     }
215     catch (IOException e) {
216       logException(e);
217       throw new SQLException("IO Error: " + e.getMessage());
218     }
219
220   }
221   
222
223   public QueryResponse execQuery(SQLQuery sql) throws SQLException {
224
225     try {
226       // Execute the query
227
int dispatch_id = connection_thread.executeQuery(sql);
228       // Get the response
229
ServerCommand command =
230               connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id);
231       // If command == null then we timed out
232
if (command == null) {
233         throw new SQLException("Query timed out after " +
234                                MDriver.QUERY_TIMEOUT + " seconds.");
235       }
236
237       DataInputStream in = new DataInputStream(command.getInputStream());
238
239       // Query response protocol...
240
int status = in.readInt();
241       if (status == SUCCESS) {
242         final int result_id = in.readInt();
243         final int query_time = in.readInt();
244         final int row_count = in.readInt();
245         final int col_count = in.readInt();
246         final ColumnDescription[] col_list = new ColumnDescription[col_count];
247         for (int i = 0; i < col_count; ++i) {
248           col_list[i] = ColumnDescription.readFrom(in);
249         }
250
251         return new QueryResponse() {
252           public int getResultID() {
253             return result_id;
254           }
255           public int getQueryTimeMillis() {
256             return query_time;
257           }
258           public int getRowCount() {
259             return row_count;
260           }
261           public int getColumnCount() {
262             return col_count;
263           }
264           public ColumnDescription getColumnDescription(int n) {
265             return col_list[n];
266           }
267           public String JavaDoc getWarnings() {
268             return "";
269           }
270         };
271
272       }
273       else if (status == EXCEPTION) {
274         int db_code = in.readInt();
275         String JavaDoc message = in.readUTF();
276         String JavaDoc stack_trace = in.readUTF();
277 // System.out.println("**** DUMP OF SERVER STACK TRACE OF ERROR:");
278
// System.out.println(stack_trace);
279
// System.out.println("**** ----------");
280
throw new MSQLException(message, null, db_code, stack_trace);
281       }
282       else if (status == AUTHENTICATION_ERROR) {
283         // Means we could perform the query because user doesn't have enough
284
// rights.
285
String JavaDoc access_type = in.readUTF();
286         String JavaDoc table_name = in.readUTF();
287         throw new SQLException("User doesn't have enough privs to " +
288                                access_type + " table " + table_name);
289       }
290       else {
291 // System.err.println(status);
292
// int count = in.available();
293
// for (int i = 0; i < count; ++i) {
294
// System.err.print(in.read() + ", ");
295
// }
296
throw new SQLException("Illegal response code from server.");
297       }
298
299     }
300     catch (IOException e) {
301       logException(e);
302       throw new SQLException("IO Error: " + e.getMessage());
303     }
304
305   }
306
307   public ResultPart getResultPart(int result_id, int start_row, int count_rows)
308                                                         throws SQLException {
309
310     try {
311
312       // Get the first few rows of the result..
313
int dispatch_id = connection_thread.getResultPart(result_id,
314                                                         start_row, count_rows);
315
316       // Get the response
317
ServerCommand command =
318               connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id);
319       // If command == null then we timed out
320
if (command == null) {
321         throw new SQLException("Downloading result part timed out after " +
322                                MDriver.QUERY_TIMEOUT + " seconds.");
323       }
324
325       // Wrap around a DataInputStream
326
DataInputStream din = new DataInputStream(command.getInputStream());
327       int status = din.readInt();
328
329       if (status == SUCCESS) {
330         // Return the contents of the response.
331
int col_count = din.readInt();
332         int size = count_rows * col_count;
333         ResultPart list = new ResultPart(size);
334         for (int i = 0; i < size; ++i) {
335           list.addElement(ObjectTransfer.readFrom(din));
336         }
337         return list;
338       }
339       else if (status == EXCEPTION) {
340         int db_code = din.readInt();
341         String JavaDoc message = din.readUTF();
342         String JavaDoc stack_trace = din.readUTF();
343 // System.out.println("**** DUMP OF SERVER STACK TRACE OF ERROR:");
344
// System.out.println(stack_trace);
345
// System.out.println("**** ----------");
346
throw new SQLException(message, null, db_code);
347       }
348       else {
349         throw new SQLException("Illegal response code from server.");
350       }
351
352     }
353     catch (IOException e) {
354       logException(e);
355       throw new SQLException("IO Error: " + e.getMessage());
356     }
357
358   }
359
360
361   public void disposeResult(int result_id) throws SQLException {
362     try {
363       int dispatch_id = connection_thread.disposeResult(result_id);
364       // Get the response
365
ServerCommand command =
366               connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id);
367       // If command == null then we timed out
368
if (command == null) {
369         throw new SQLException("Dispose result timed out after " +
370                                MDriver.QUERY_TIMEOUT + " seconds.");
371       }
372
373       // Check the dispose was successful.
374
DataInputStream din = new DataInputStream(command.getInputStream());
375       int status = din.readInt();
376
377       // If failed report the error.
378
if (status == FAILED) {
379         throw new SQLException("Dispose failed: " + din.readUTF());
380       }
381
382     }
383     catch (IOException e) {
384       logException(e);
385       throw new SQLException("IO Error: " + e.getMessage());
386     }
387   }
388
389
390   public StreamableObjectPart getStreamableObjectPart(int result_id,
391         long streamable_object_id, long offset, int len) throws SQLException {
392     try {
393       int dispatch_id = connection_thread.getStreamableObjectPart(result_id,
394                                            streamable_object_id, offset, len);
395       ServerCommand command =
396               connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id);
397       // If command == null then we timed out
398
if (command == null) {
399         throw new SQLException("getStreamableObjectPart timed out after " +
400                                MDriver.QUERY_TIMEOUT + " seconds.");
401       }
402       
403       DataInputStream din = new DataInputStream(command.getInputStream());
404       int status = din.readInt();
405
406       if (status == SUCCESS) {
407         // Return the contents of the response.
408
int contents_size = din.readInt();
409         byte[] buf = new byte[contents_size];
410         din.readFully(buf, 0, contents_size);
411         return new StreamableObjectPart(buf);
412       }
413       else if (status == EXCEPTION) {
414         int db_code = din.readInt();
415         String JavaDoc message = din.readUTF();
416         String JavaDoc stack_trace = din.readUTF();
417         throw new SQLException(message, null, db_code);
418       }
419       else {
420         throw new SQLException("Illegal response code from server.");
421       }
422
423     }
424     catch (IOException e) {
425       logException(e);
426       throw new SQLException("IO Error: " + e.getMessage());
427     }
428   }
429
430
431   public void disposeStreamableObject(int result_id, long streamable_object_id)
432                                                          throws SQLException {
433     try {
434       int dispatch_id = connection_thread.disposeStreamableObject(
435                                               result_id, streamable_object_id);
436       ServerCommand command =
437               connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id);
438       // If command == null then we timed out
439
if (command == null) {
440         throw new SQLException("disposeStreamableObject timed out after " +
441                                MDriver.QUERY_TIMEOUT + " seconds.");
442       }
443       
444       DataInputStream din = new DataInputStream(command.getInputStream());
445       int status = din.readInt();
446
447       // If failed report the error.
448
if (status == FAILED) {
449         throw new SQLException("Dispose failed: " + din.readUTF());
450       }
451
452     }
453     catch (IOException e) {
454       logException(e);
455       throw new SQLException("IO Error: " + e.getMessage());
456     }
457   }
458
459   
460   public void dispose() throws SQLException {
461     try {
462       int dispatch_id = connection_thread.sendCloseCommand();
463 // // Get the response
464
// ServerCommand command =
465
// connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id);
466
closeConnection();
467     }
468     catch (IOException e) {
469       logException(e);
470       throw new SQLException("IO Error: " + e.getMessage());
471     }
472   }
473
474   // ---------- Inner classes ----------
475

476   /**
477    * The connection thread that can dispatch commands concurrently through the
478    * in/out pipe.
479    */

480   private class ConnectionThread extends Thread JavaDoc {
481
482     /**
483      * The command to write out to the server.
484      */

485     private MByteArrayOutputStream com_bytes;
486     private DataOutputStream com_data;
487
488     /**
489      * Running dispatch id values which we use as a unique key.
490      */

491     private int running_dispatch_id = 1;
492
493     /**
494      * Set to true when the thread is closed.
495      */

496     private boolean thread_closed;
497
498     /**
499      * The list of commands received from the server that are pending to be
500      * processed (ServerCommand).
501      */

502     private Vector JavaDoc commands_list;
503
504
505     /**
506      * Constructs the connection thread.
507      */

508     ConnectionThread() throws IOException {
509       setDaemon(true);
510       setName("Mckoi - Connection Thread");
511       com_bytes = new MByteArrayOutputStream();
512       com_data = new DataOutputStream(com_bytes);
513
514       commands_list = new Vector JavaDoc();
515       thread_closed = false;
516     }
517
518     // ---------- Utility ----------
519

520     /**
521      * Returns a unique dispatch id number for a command.
522      */

523     private int nextDispatchID() {
524       return running_dispatch_id++;
525     }
526
527     /**
528      * Blocks until a response from the server has been received with the
529      * given dispatch id. It waits for 'timeout' seconds and if the response
530      * hasn't been received by then returns null.
531      */

532     ServerCommand getCommand(int timeout, int dispatch_id)
533                                                          throws SQLException {
534       final long time_in = System.currentTimeMillis();
535       final long time_out_high = time_in + ((long) timeout * 1000);
536
537       synchronized (commands_list) {
538
539         if (commands_list == null) {
540           throw new SQLException("Connection to server closed");
541         }
542
543         while (true) {
544
545           for (int i = 0; i < commands_list.size(); ++i) {
546             ServerCommand command = (ServerCommand) commands_list.elementAt(i);
547             if (command.dispatchID() == dispatch_id) {
548               commands_list.removeElementAt(i);
549               return command;
550             }
551           }
552
553           // Return null if we haven't received a response in the timeout
554
// period.
555
if (timeout != 0 &&
556               System.currentTimeMillis() > time_out_high) {
557             return null;
558           }
559
560           // Wait a second.
561
try {
562             commands_list.wait(1000);
563           }
564           catch (InterruptedException JavaDoc e) { /* ignore */ }
565
566         } // while (true)
567

568       } // synchronized
569

570     }
571
572
573     // ---------- Server request methods ----------
574

575     /**
576      * Flushes the command in 'com_bytes' to the server.
577      */

578     private synchronized void flushCommand() throws IOException {
579       // We flush the size of the command string followed by the command
580
// itself to the server. This format allows us to implement a simple
581
// non-blocking command parser on the server.
582
writeCommandToServer(com_bytes.getBuffer(), 0, com_bytes.size());
583       com_bytes.reset();
584     }
585
586     /**
587      * Pushes a part of a streamable object onto the server. Used in
588      * preparation to executing queries containing large objects.
589      */

590     synchronized int pushStreamableObjectPart(byte type, long object_id,
591                long object_length, byte[] buf, long offset, int length)
592                                                           throws IOException {
593       int dispatch_id = nextDispatchID();
594       com_data.writeInt(PUSH_STREAMABLE_OBJECT_PART);
595       com_data.writeInt(dispatch_id);
596       com_data.writeByte(type);
597       com_data.writeLong(object_id);
598       com_data.writeLong(object_length);
599       com_data.writeInt(length);
600       com_data.write(buf, 0, length);
601       com_data.writeLong(offset);
602       flushCommand();
603
604       return dispatch_id;
605     }
606     
607     /**
608      * Sends a command to the server to process a query. The response from
609      * the server will contain a 'result_id' that is a unique number for
610      * refering to the result. It also contains information about the columns
611      * in the table, and the total number of rows in the result.
612      * <p>
613      * Returns the dispatch id key for the response from the server.
614      */

615     synchronized int executeQuery(SQLQuery sql) throws IOException {
616       int dispatch_id = nextDispatchID();
617       com_data.writeInt(QUERY);
618       com_data.writeInt(dispatch_id);
619       sql.writeTo(com_data);
620       flushCommand();
621
622       return dispatch_id;
623     }
624
625     /**
626      * Releases the server side resources associated with a given query key
627      * returned by the server. This should be called when the ResultSet is
628      * closed, or if we cancel in the middle of downloading a result.
629      * <p>
630      * It's very important that the server resources for a query is released.
631      * <p>
632      * Returns the dispatch id key for the response from the server.
633      */

634     synchronized int disposeResult(int result_id) throws IOException {
635       int dispatch_id = nextDispatchID();
636       com_data.writeInt(DISPOSE_RESULT);
637       com_data.writeInt(dispatch_id);
638       com_data.writeInt(result_id);
639       flushCommand();
640
641       return dispatch_id;
642     }
643
644     /**
645      * Requests a part of a result of a query. This is used to download a
646      * part of a result set from the server. The 'result_id' is generated
647      * by the 'query' command. Please note that this will generate an error
648      * if the result_id is invalid or has previously been disposed. The
649      * 'row_number' refers to the row to download from. The 'row_count'
650      * refers to the number of rows to download.
651      * <p>
652      * Returns the dispatch id key for the response from the server.
653      */

654     synchronized int getResultPart(int result_id, int row_number,
655                                    int row_count) throws IOException {
656       int dispatch_id = nextDispatchID();
657       com_data.writeInt(RESULT_SECTION);
658       com_data.writeInt(dispatch_id);
659       com_data.writeInt(result_id);
660       com_data.writeInt(row_number);
661       com_data.writeInt(row_count);
662       flushCommand();
663
664       return dispatch_id;
665     }
666
667     /**
668      * Requests a part of an open StreamableObject channel. This is used to
669      * download a section of a large object, such as a Blob or a Clob. The
670      * 'streamable_object_id' is returned by the 'getIdentifier' method of the
671      * StreamableObject in a ResultPart.
672      * <p>
673      * Returns the dispatch id key for the response from the server.
674      */

675     synchronized int getStreamableObjectPart(int result_id,
676                                  long streamable_object_id,
677                                  long offset, int length) throws IOException {
678       int dispatch_id = nextDispatchID();
679       com_data.writeInt(STREAMABLE_OBJECT_SECTION);
680       com_data.writeInt(dispatch_id);
681       com_data.writeInt(result_id);
682       com_data.writeLong(streamable_object_id);
683       com_data.writeLong(offset);
684       com_data.writeInt(length);
685       flushCommand();
686       
687       return dispatch_id;
688     }
689
690     /**
691      * Disposes the resources associated with a streamable object on the server.
692      * This would typically be called when either of the following situations
693      * occured - the Blob is closed/disposed/finalized, the InputStream is
694      * closes/finalized.
695      * <p>
696      * It's very important that the server resources for a streamable object is
697      * released.
698      * <p>
699      * Returns the dispatch id key for the response from the server.
700      */

701     synchronized int disposeStreamableObject(int result_id,
702                                long streamable_object_id) throws IOException {
703       int dispatch_id = nextDispatchID();
704       com_data.writeInt(DISPOSE_STREAMABLE_OBJECT);
705       com_data.writeInt(dispatch_id);
706       com_data.writeInt(result_id);
707       com_data.writeLong(streamable_object_id);
708       flushCommand();
709       
710       return dispatch_id;
711     }
712
713     /**
714      * Sends close command to server.
715      */

716     synchronized int sendCloseCommand() throws IOException {
717       int dispatch_id = nextDispatchID();
718       com_data.writeInt(CLOSE);
719       com_data.writeInt(dispatch_id);
720       flushCommand();
721
722       return dispatch_id;
723     }
724
725
726
727     // ---------- Server read methods ----------
728

729
730     /**
731      * Listens for commands from the server. When received puts the command
732      * on the dispatch list.
733      */

734     public void run() {
735
736       try {
737         while (!thread_closed) {
738
739           // Block until next command received from server.
740
byte[] buf = nextCommandFromServer(0);
741           int dispatch_id = ByteArrayUtil.getInt(buf, 0);
742
743           if (dispatch_id == -1) {
744             // This means a trigger or a ping or some other server side event.
745
processEvent(buf);
746           }
747
748           synchronized (commands_list) {
749             // Add this command to the commands list
750
commands_list.addElement(new ServerCommand(dispatch_id, buf));
751             // Notify any threads waiting on it.
752
commands_list.notifyAll();
753           }
754
755         } // while(true)
756

757       }
758       catch (IOException e) {
759 // System.err.println("Connection Thread closed because of IOException");
760
// e.printStackTrace();
761
}
762
763       finally {
764         // Invalidate this object when the thread finishes.
765
Object JavaDoc old_commands_list = commands_list;
766         synchronized (old_commands_list) {
767           commands_list = null;
768           old_commands_list.notifyAll();
769         }
770       }
771
772     }
773
774     /**
775      * Processes a server side event.
776      */

777     private void processEvent(byte[] buf) throws IOException {
778       int event = ByteArrayUtil.getInt(buf, 4);
779       if (event == PING) {
780         // Ignore ping events, they only sent by server to see if we are
781
// alive. Ping back?
782
}
783       else if (event == DATABASE_EVENT) {
784         // A database event that is passed to the DatabaseCallBack...
785
ByteArrayInputStream bin =
786                               new ByteArrayInputStream(buf, 8, buf.length - 8);
787         DataInputStream din = new DataInputStream(bin);
788
789         int event_type = din.readInt();
790         String JavaDoc event_msg = din.readUTF();
791         database_call_back.databaseEvent(event_type, event_msg);
792       }
793 // else if (event == SERVER_REQUEST) {
794
// // A server request that is passed to the DatabaseCallBack...
795
// ByteArrayInputStream bin =
796
// new ByteArrayInputStream(buf, 8, buf.length - 8);
797
// DataInputStream din = new DataInputStream(bin);
798
//
799
// int command = din.readInt(); // Currently ignored
800
// long stream_id = din.readLong();
801
// int length = din.readInt();
802
// database_call_back.streamableObjectRequest(stream_id, length);
803
// }
804
else {
805         System.err.println("[RemoteDatabaseInterface] " +
806                          "Received unrecognised server side event: " + event);
807       }
808     }
809
810   }
811
812   /**
813    * A ByteArrayOutputStream that allows us access to the underlying byte[]
814    * array.
815    */

816   static class MByteArrayOutputStream extends ByteArrayOutputStream {
817     MByteArrayOutputStream() {
818       super(256);
819     }
820     public byte[] getBuffer() {
821       return buf;
822     }
823     public int size() {
824       return count;
825     }
826   }
827
828   /**
829    * Represents the data in a command from the server.
830    */

831   static class ServerCommand {
832
833     private int dispatch_id;
834     private byte[] buf;
835
836     ServerCommand(int dispatch_id, byte[] buf) {
837       this.dispatch_id = dispatch_id;
838       this.buf = buf;
839     }
840
841     public int dispatchID() {
842       return dispatch_id;
843     }
844
845     public byte[] getBuf() {
846       return buf;
847     }
848
849     public ByteArrayInputStream getInputStream() {
850       return new ByteArrayInputStream(buf, 4, buf.length - 4);
851     }
852
853   }
854
855 }
856
Popular Tags