1 24 25 package com.mckoi.database.jdbc; 26 27 import java.io.*; 28 import java.sql.*; 29 import java.util.Properties ; 30 import java.util.Vector ; 31 32 import com.mckoi.database.global.ColumnDescription; 33 import com.mckoi.database.global.ObjectTransfer; 34 import com.mckoi.util.ByteArrayUtil; 35 36 37 44 45 abstract class RemoteDatabaseInterface 46 implements DatabaseInterface, ProtocolConstants { 47 48 53 private ConnectionThread connection_thread; 54 55 59 private DatabaseCallBack database_call_back; 60 61 62 65 private static void logException(Throwable e) { 66 PrintWriter out = null; 67 out = DriverManager.getLogWriter(); 69 if (out != null) { 71 e.printStackTrace(out); 72 } 73 } 77 78 79 81 85 abstract void writeCommandToServer(byte[] command, int offset, int length) 86 throws IOException; 87 88 92 abstract byte[] nextCommandFromServer(int timeout) throws IOException; 93 94 97 abstract void closeConnection() throws IOException; 98 99 100 102 public boolean login(String default_schema, String user, String password, 103 DatabaseCallBack call_back) throws SQLException { 104 105 try { 106 107 ByteArrayOutputStream bout = new ByteArrayOutputStream(); 109 DataOutputStream out = new DataOutputStream(bout); 110 111 out.writeInt(0x0ced007); 113 out.writeInt(MDriver.DRIVER_MAJOR_VERSION); 115 out.writeInt(MDriver.DRIVER_MINOR_VERSION); 116 byte[] arr = bout.toByteArray(); 117 writeCommandToServer(arr, 0, arr.length); 118 119 byte[] response = nextCommandFromServer(0); 120 121 123 int ack = ByteArrayUtil.getInt(response, 0); 124 if (ack == ACKNOWLEDGEMENT) { 125 126 133 int server_version = 0; 136 if (response.length > 4 && response[4] == 1) { 138 server_version = ByteArrayUtil.getInt(response, 5); 140 } 141 142 149 bout.reset(); 150 out.writeUTF(default_schema); 151 out.writeUTF(user); 152 out.writeUTF(password); 153 arr = bout.toByteArray(); 154 writeCommandToServer(arr, 0, arr.length); 155 156 response = nextCommandFromServer(0); 157 int result = ByteArrayUtil.getInt(response, 0); 158 if (result == USER_AUTHENTICATION_PASSED) { 159 160 this.database_call_back = call_back; 162 163 connection_thread = new ConnectionThread(); 165 connection_thread.start(); 166 return true; 167 168 } 169 else if (result == USER_AUTHENTICATION_FAILED) { 170 throw new SQLLoginException("User Authentication failed."); 171 } 172 else { 173 throw new SQLException("Unexpected response."); 174 } 175 176 } 177 else { 178 throw new SQLException("No acknowledgement received from server."); 179 } 180 181 } 182 catch (IOException e) { 183 logException(e); 184 throw new SQLException("IOException: " + e.getMessage()); 185 } 186 187 } 188 189 190 public void pushStreamableObjectPart(byte type, long object_id, 191 long object_length, byte[] buf, long offset, int length) 192 throws SQLException { 193 try { 194 int dispatch_id = connection_thread.pushStreamableObjectPart( 196 type, object_id, object_length, buf, offset, length); 197 ServerCommand command = 199 connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id); 200 if (command == null) { 202 throw new SQLException("Query timed out after " + 203 MDriver.QUERY_TIMEOUT + " seconds."); 204 } 205 206 DataInputStream din = new DataInputStream(command.getInputStream()); 207 int status = din.readInt(); 208 209 if (status == FAILED) { 211 throw new SQLException("Push object failed: " + din.readUTF()); 212 } 213 214 } 215 catch (IOException e) { 216 logException(e); 217 throw new SQLException("IO Error: " + e.getMessage()); 218 } 219 220 } 221 222 223 public QueryResponse execQuery(SQLQuery sql) throws SQLException { 224 225 try { 226 int dispatch_id = connection_thread.executeQuery(sql); 228 ServerCommand command = 230 connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id); 231 if (command == null) { 233 throw new SQLException("Query timed out after " + 234 MDriver.QUERY_TIMEOUT + " seconds."); 235 } 236 237 DataInputStream in = new DataInputStream(command.getInputStream()); 238 239 int status = in.readInt(); 241 if (status == SUCCESS) { 242 final int result_id = in.readInt(); 243 final int query_time = in.readInt(); 244 final int row_count = in.readInt(); 245 final int col_count = in.readInt(); 246 final ColumnDescription[] col_list = new ColumnDescription[col_count]; 247 for (int i = 0; i < col_count; ++i) { 248 col_list[i] = ColumnDescription.readFrom(in); 249 } 250 251 return new QueryResponse() { 252 public int getResultID() { 253 return result_id; 254 } 255 public int getQueryTimeMillis() { 256 return query_time; 257 } 258 public int getRowCount() { 259 return row_count; 260 } 261 public int getColumnCount() { 262 return col_count; 263 } 264 public ColumnDescription getColumnDescription(int n) { 265 return col_list[n]; 266 } 267 public String getWarnings() { 268 return ""; 269 } 270 }; 271 272 } 273 else if (status == EXCEPTION) { 274 int db_code = in.readInt(); 275 String message = in.readUTF(); 276 String stack_trace = in.readUTF(); 277 throw new MSQLException(message, null, db_code, stack_trace); 281 } 282 else if (status == AUTHENTICATION_ERROR) { 283 String access_type = in.readUTF(); 286 String table_name = in.readUTF(); 287 throw new SQLException("User doesn't have enough privs to " + 288 access_type + " table " + table_name); 289 } 290 else { 291 throw new SQLException("Illegal response code from server."); 297 } 298 299 } 300 catch (IOException e) { 301 logException(e); 302 throw new SQLException("IO Error: " + e.getMessage()); 303 } 304 305 } 306 307 public ResultPart getResultPart(int result_id, int start_row, int count_rows) 308 throws SQLException { 309 310 try { 311 312 int dispatch_id = connection_thread.getResultPart(result_id, 314 start_row, count_rows); 315 316 ServerCommand command = 318 connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id); 319 if (command == null) { 321 throw new SQLException("Downloading result part timed out after " + 322 MDriver.QUERY_TIMEOUT + " seconds."); 323 } 324 325 DataInputStream din = new DataInputStream(command.getInputStream()); 327 int status = din.readInt(); 328 329 if (status == SUCCESS) { 330 int col_count = din.readInt(); 332 int size = count_rows * col_count; 333 ResultPart list = new ResultPart(size); 334 for (int i = 0; i < size; ++i) { 335 list.addElement(ObjectTransfer.readFrom(din)); 336 } 337 return list; 338 } 339 else if (status == EXCEPTION) { 340 int db_code = din.readInt(); 341 String message = din.readUTF(); 342 String stack_trace = din.readUTF(); 343 throw new SQLException(message, null, db_code); 347 } 348 else { 349 throw new SQLException("Illegal response code from server."); 350 } 351 352 } 353 catch (IOException e) { 354 logException(e); 355 throw new SQLException("IO Error: " + e.getMessage()); 356 } 357 358 } 359 360 361 public void disposeResult(int result_id) throws SQLException { 362 try { 363 int dispatch_id = connection_thread.disposeResult(result_id); 364 ServerCommand command = 366 connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id); 367 if (command == null) { 369 throw new SQLException("Dispose result timed out after " + 370 MDriver.QUERY_TIMEOUT + " seconds."); 371 } 372 373 DataInputStream din = new DataInputStream(command.getInputStream()); 375 int status = din.readInt(); 376 377 if (status == FAILED) { 379 throw new SQLException("Dispose failed: " + din.readUTF()); 380 } 381 382 } 383 catch (IOException e) { 384 logException(e); 385 throw new SQLException("IO Error: " + e.getMessage()); 386 } 387 } 388 389 390 public StreamableObjectPart getStreamableObjectPart(int result_id, 391 long streamable_object_id, long offset, int len) throws SQLException { 392 try { 393 int dispatch_id = connection_thread.getStreamableObjectPart(result_id, 394 streamable_object_id, offset, len); 395 ServerCommand command = 396 connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id); 397 if (command == null) { 399 throw new SQLException("getStreamableObjectPart timed out after " + 400 MDriver.QUERY_TIMEOUT + " seconds."); 401 } 402 403 DataInputStream din = new DataInputStream(command.getInputStream()); 404 int status = din.readInt(); 405 406 if (status == SUCCESS) { 407 int contents_size = din.readInt(); 409 byte[] buf = new byte[contents_size]; 410 din.readFully(buf, 0, contents_size); 411 return new StreamableObjectPart(buf); 412 } 413 else if (status == EXCEPTION) { 414 int db_code = din.readInt(); 415 String message = din.readUTF(); 416 String stack_trace = din.readUTF(); 417 throw new SQLException(message, null, db_code); 418 } 419 else { 420 throw new SQLException("Illegal response code from server."); 421 } 422 423 } 424 catch (IOException e) { 425 logException(e); 426 throw new SQLException("IO Error: " + e.getMessage()); 427 } 428 } 429 430 431 public void disposeStreamableObject(int result_id, long streamable_object_id) 432 throws SQLException { 433 try { 434 int dispatch_id = connection_thread.disposeStreamableObject( 435 result_id, streamable_object_id); 436 ServerCommand command = 437 connection_thread.getCommand(MDriver.QUERY_TIMEOUT, dispatch_id); 438 if (command == null) { 440 throw new SQLException("disposeStreamableObject timed out after " + 441 MDriver.QUERY_TIMEOUT + " seconds."); 442 } 443 444 DataInputStream din = new DataInputStream(command.getInputStream()); 445 int status = din.readInt(); 446 447 if (status == FAILED) { 449 throw new SQLException("Dispose failed: " + din.readUTF()); 450 } 451 452 } 453 catch (IOException e) { 454 logException(e); 455 throw new SQLException("IO Error: " + e.getMessage()); 456 } 457 } 458 459 460 public void dispose() throws SQLException { 461 try { 462 int dispatch_id = connection_thread.sendCloseCommand(); 463 closeConnection(); 467 } 468 catch (IOException e) { 469 logException(e); 470 throw new SQLException("IO Error: " + e.getMessage()); 471 } 472 } 473 474 476 480 private class ConnectionThread extends Thread { 481 482 485 private MByteArrayOutputStream com_bytes; 486 private DataOutputStream com_data; 487 488 491 private int running_dispatch_id = 1; 492 493 496 private boolean thread_closed; 497 498 502 private Vector commands_list; 503 504 505 508 ConnectionThread() throws IOException { 509 setDaemon(true); 510 setName("Mckoi - Connection Thread"); 511 com_bytes = new MByteArrayOutputStream(); 512 com_data = new DataOutputStream(com_bytes); 513 514 commands_list = new Vector (); 515 thread_closed = false; 516 } 517 518 520 523 private int nextDispatchID() { 524 return running_dispatch_id++; 525 } 526 527 532 ServerCommand getCommand(int timeout, int dispatch_id) 533 throws SQLException { 534 final long time_in = System.currentTimeMillis(); 535 final long time_out_high = time_in + ((long) timeout * 1000); 536 537 synchronized (commands_list) { 538 539 if (commands_list == null) { 540 throw new SQLException("Connection to server closed"); 541 } 542 543 while (true) { 544 545 for (int i = 0; i < commands_list.size(); ++i) { 546 ServerCommand command = (ServerCommand) commands_list.elementAt(i); 547 if (command.dispatchID() == dispatch_id) { 548 commands_list.removeElementAt(i); 549 return command; 550 } 551 } 552 553 if (timeout != 0 && 556 System.currentTimeMillis() > time_out_high) { 557 return null; 558 } 559 560 try { 562 commands_list.wait(1000); 563 } 564 catch (InterruptedException e) { } 565 566 } 568 } 570 } 571 572 573 575 578 private synchronized void flushCommand() throws IOException { 579 writeCommandToServer(com_bytes.getBuffer(), 0, com_bytes.size()); 583 com_bytes.reset(); 584 } 585 586 590 synchronized int pushStreamableObjectPart(byte type, long object_id, 591 long object_length, byte[] buf, long offset, int length) 592 throws IOException { 593 int dispatch_id = nextDispatchID(); 594 com_data.writeInt(PUSH_STREAMABLE_OBJECT_PART); 595 com_data.writeInt(dispatch_id); 596 com_data.writeByte(type); 597 com_data.writeLong(object_id); 598 com_data.writeLong(object_length); 599 com_data.writeInt(length); 600 com_data.write(buf, 0, length); 601 com_data.writeLong(offset); 602 flushCommand(); 603 604 return dispatch_id; 605 } 606 607 615 synchronized int executeQuery(SQLQuery sql) throws IOException { 616 int dispatch_id = nextDispatchID(); 617 com_data.writeInt(QUERY); 618 com_data.writeInt(dispatch_id); 619 sql.writeTo(com_data); 620 flushCommand(); 621 622 return dispatch_id; 623 } 624 625 634 synchronized int disposeResult(int result_id) throws IOException { 635 int dispatch_id = nextDispatchID(); 636 com_data.writeInt(DISPOSE_RESULT); 637 com_data.writeInt(dispatch_id); 638 com_data.writeInt(result_id); 639 flushCommand(); 640 641 return dispatch_id; 642 } 643 644 654 synchronized int getResultPart(int result_id, int row_number, 655 int row_count) throws IOException { 656 int dispatch_id = nextDispatchID(); 657 com_data.writeInt(RESULT_SECTION); 658 com_data.writeInt(dispatch_id); 659 com_data.writeInt(result_id); 660 com_data.writeInt(row_number); 661 com_data.writeInt(row_count); 662 flushCommand(); 663 664 return dispatch_id; 665 } 666 667 675 synchronized int getStreamableObjectPart(int result_id, 676 long streamable_object_id, 677 long offset, int length) throws IOException { 678 int dispatch_id = nextDispatchID(); 679 com_data.writeInt(STREAMABLE_OBJECT_SECTION); 680 com_data.writeInt(dispatch_id); 681 com_data.writeInt(result_id); 682 com_data.writeLong(streamable_object_id); 683 com_data.writeLong(offset); 684 com_data.writeInt(length); 685 flushCommand(); 686 687 return dispatch_id; 688 } 689 690 701 synchronized int disposeStreamableObject(int result_id, 702 long streamable_object_id) throws IOException { 703 int dispatch_id = nextDispatchID(); 704 com_data.writeInt(DISPOSE_STREAMABLE_OBJECT); 705 com_data.writeInt(dispatch_id); 706 com_data.writeInt(result_id); 707 com_data.writeLong(streamable_object_id); 708 flushCommand(); 709 710 return dispatch_id; 711 } 712 713 716 synchronized int sendCloseCommand() throws IOException { 717 int dispatch_id = nextDispatchID(); 718 com_data.writeInt(CLOSE); 719 com_data.writeInt(dispatch_id); 720 flushCommand(); 721 722 return dispatch_id; 723 } 724 725 726 727 729 730 734 public void run() { 735 736 try { 737 while (!thread_closed) { 738 739 byte[] buf = nextCommandFromServer(0); 741 int dispatch_id = ByteArrayUtil.getInt(buf, 0); 742 743 if (dispatch_id == -1) { 744 processEvent(buf); 746 } 747 748 synchronized (commands_list) { 749 commands_list.addElement(new ServerCommand(dispatch_id, buf)); 751 commands_list.notifyAll(); 753 } 754 755 } 757 } 758 catch (IOException e) { 759 } 762 763 finally { 764 Object old_commands_list = commands_list; 766 synchronized (old_commands_list) { 767 commands_list = null; 768 old_commands_list.notifyAll(); 769 } 770 } 771 772 } 773 774 777 private void processEvent(byte[] buf) throws IOException { 778 int event = ByteArrayUtil.getInt(buf, 4); 779 if (event == PING) { 780 } 783 else if (event == DATABASE_EVENT) { 784 ByteArrayInputStream bin = 786 new ByteArrayInputStream(buf, 8, buf.length - 8); 787 DataInputStream din = new DataInputStream(bin); 788 789 int event_type = din.readInt(); 790 String event_msg = din.readUTF(); 791 database_call_back.databaseEvent(event_type, event_msg); 792 } 793 else { 805 System.err.println("[RemoteDatabaseInterface] " + 806 "Received unrecognised server side event: " + event); 807 } 808 } 809 810 } 811 812 816 static class MByteArrayOutputStream extends ByteArrayOutputStream { 817 MByteArrayOutputStream() { 818 super(256); 819 } 820 public byte[] getBuffer() { 821 return buf; 822 } 823 public int size() { 824 return count; 825 } 826 } 827 828 831 static class ServerCommand { 832 833 private int dispatch_id; 834 private byte[] buf; 835 836 ServerCommand(int dispatch_id, byte[] buf) { 837 this.dispatch_id = dispatch_id; 838 this.buf = buf; 839 } 840 841 public int dispatchID() { 842 return dispatch_id; 843 } 844 845 public byte[] getBuf() { 846 return buf; 847 } 848 849 public ByteArrayInputStream getInputStream() { 850 return new ByteArrayInputStream(buf, 4, buf.length - 4); 851 } 852 853 } 854 855 } 856 | Popular Tags |