1 24 25 package com.mckoi.database.jdbc; 26 27 import java.io.*; 28 import java.sql.*; 29 import java.util.Properties ; 30 import java.util.Vector ; 31 import java.util.StringTokenizer ; 32 import com.mckoi.database.global.ColumnDescription; 33 import com.mckoi.database.global.ObjectTransfer; 34 import com.mckoi.database.global.StreamableObject; 35 import com.mckoi.util.ByteArrayUtil; 36 import java.util.Hashtable ; 37 import java.util.Map ; 39 41 51 52 public class MConnection implements Connection, DatabaseCallBack { 53 54 60 private RowCache row_cache; 61 62 65 private String url; 66 67 70 private SQLWarning head_warning; 71 72 75 private boolean is_closed; 76 77 81 private boolean auto_commit; 82 83 86 private DatabaseInterface db_interface; 87 88 91 private Vector trigger_list; 92 93 97 private TriggerDispatchThread trigger_thread; 98 99 105 private boolean strict_get_object; 106 107 114 private boolean verbose_column_names; 115 116 121 private boolean case_insensitive_identifiers; 122 123 127 private Hashtable s_object_hold; 128 129 133 private long s_object_id; 134 135 136 137 private Object lock = new Object (); 139 140 141 142 145 public MConnection(String url, DatabaseInterface db_interface, 146 int cache_size, int max_size) { 147 this.url = url; 148 this.db_interface = db_interface; 149 is_closed = true; 150 auto_commit = true; 151 trigger_list = new Vector (); 152 strict_get_object = true; 153 verbose_column_names = false; 154 case_insensitive_identifiers = false; 155 row_cache = new RowCache(cache_size, max_size); 156 s_object_hold = new Hashtable (); 157 s_object_id = 0; 158 } 159 160 170 public void setStrictGetObject(boolean status) { 171 strict_get_object = status; 172 } 173 174 177 public boolean isStrictGetObject() { 178 return strict_get_object; 179 } 180 181 188 public void setVerboseColumnNames(boolean status) { 189 verbose_column_names = status; 190 } 191 192 195 public boolean verboseColumnNames() { 196 return verbose_column_names; 197 } 198 199 204 public void setCaseInsensitiveIdentifiers(boolean status) { 205 case_insensitive_identifiers = status; 206 } 207 208 211 public boolean isCaseInsensitiveIdentifiers() { 212 return case_insensitive_identifiers; 213 } 214 215 216 224 227 protected final RowCache getRowCache() { 228 return row_cache; 229 } 230 231 234 protected final void addSQLWarning(SQLWarning warning) { 235 synchronized (lock) { 236 if (head_warning == null) { 237 head_warning = warning; 238 } 239 else { 240 head_warning.setNextWarning(warning); 241 } 242 } 243 } 244 245 249 public final void internalClose() throws SQLException { 250 synchronized (lock) { 251 if (!isClosed()) { 252 try { 253 db_interface.dispose(); 254 } 255 finally { 256 is_closed = true; 257 } 258 } 259 } 260 } 261 262 265 MckoiConnection getMckoiConnection() { 266 return new MckoiConnection(this); 267 } 268 269 274 public void login(String default_schema, String username, String password) 275 throws SQLException { 276 277 synchronized (lock) { 278 if (!is_closed) { 279 throw new SQLException( 280 "Unable to login to connection because it is open."); 281 } 282 } 283 284 if (username == null || username.equals("") || 285 password == null || password.equals("")) { 286 throw new SQLException("username or password have not been set."); 287 } 288 289 if (default_schema == null) { 291 default_schema = username; 292 } 293 294 boolean li = db_interface.login(default_schema, username, password, this); 296 synchronized (lock) { 297 is_closed = !li; 298 } 299 if (!li) { 300 throw new SQLException("User authentication failed for: " + username); 301 } 302 303 setCaseInsensitiveIdentifiers(false); 305 Statement stmt = createStatement(); 306 ResultSet rs = stmt.executeQuery("SHOW CONNECTION_INFO"); 307 while (rs.next()) { 308 String key = rs.getString(1); 309 if (key.equals("case_insensitive_identifiers")) { 310 String val = rs.getString(2); 311 setCaseInsensitiveIdentifiers(val.equals("true")); 312 } 313 else if (key.equals("auto_commit")) { 314 String val = rs.getString(2); 315 auto_commit = val.equals("true"); 316 } 317 } 318 rs.close(); 319 stmt.close(); 320 321 } 322 323 325 328 String getURL() { 329 return url; 330 } 331 332 336 void login(Properties info, String default_schema) throws SQLException { 337 338 String username = info.getProperty("user", ""); 339 String password = info.getProperty("password", ""); 340 341 login(default_schema, username, password); 342 } 343 344 353 356 private void uploadStreamableObjects(SQLQuery sql) throws SQLException { 357 358 Object [] vars = sql.getVars(); 361 try { 362 for (int i = 0; i < vars.length; ++i) { 363 if (vars[i] != null && vars[i] instanceof StreamableObject) { 365 final int BUF_SIZE = 64 * 1024; 367 368 StreamableObject s_object = (StreamableObject) vars[i]; 369 long offset = 0; 370 final byte type = s_object.getType(); 371 final long total_len = s_object.getSize(); 372 final long id = s_object.getIdentifier(); 373 final byte[] buf = new byte[BUF_SIZE]; 374 375 Object sob_id = new Long (id); 377 InputStream i_stream = (InputStream) s_object_hold.get(sob_id); 378 if (i_stream == null) { 379 throw new RuntimeException ( 380 "Assertion failed: Streamable object InputStream is not available."); 381 } 382 383 while (offset < total_len) { 384 int index = 0; 386 final int block_read = 387 (int) Math.min((long) BUF_SIZE, (total_len - offset)); 388 int to_read = block_read; 389 while (to_read > 0) { 390 int count = i_stream.read(buf, index, to_read); 391 if (count == -1) { 392 throw new IOException("Premature end of stream."); 393 } 394 index += count; 395 to_read -= count; 396 } 397 398 db_interface.pushStreamableObjectPart(type, id, total_len, 400 buf, offset, block_read); 401 offset += block_read; 403 } 404 405 s_object_hold.remove(sob_id); 407 408 413 } 414 } 415 } 416 catch (IOException e) { 417 e.printStackTrace(System.err); 418 throw new SQLException("IO Error pushing large object to server: " + 419 e.getMessage()); 420 } 421 } 422 423 432 void executeQueries(SQLQuery[] queries, MResultSet[] results) 433 throws SQLException { 434 for (int i = 0; i < queries.length; ++i) { 436 executeQuery(queries[i], results[i]); 437 } 438 } 439 440 449 void executeQuery(SQLQuery sql, MResultSet result_set) throws SQLException { 450 451 uploadStreamableObjects(sql); 452 QueryResponse resp = db_interface.execQuery(sql); 454 455 ColumnDescription[] col_list = new ColumnDescription[resp.getColumnCount()]; 457 for (int i = 0; i < col_list.length; ++i) { 458 col_list[i] = resp.getColumnDescription(i); 459 } 460 result_set.connSetup(resp.getResultID(), col_list, resp.getRowCount()); 463 result_set.setQueryTime(resp.getQueryTimeMillis()); 464 465 } 466 467 471 ResultPart requestResultPart(int result_id, int start_row, int count_rows) 472 throws SQLException { 473 return db_interface.getResultPart(result_id, start_row, count_rows); 474 } 475 476 479 StreamableObjectPart requestStreamableObjectPart(int result_id, 480 long streamable_object_id, long offset, int len) throws SQLException { 481 return db_interface.getStreamableObjectPart(result_id, 482 streamable_object_id, offset, len); 483 } 484 485 491 void disposeResult(int result_id) throws SQLException { 492 if (!is_closed) { 500 db_interface.disposeResult(result_id); 501 } 502 } 503 504 508 void addTriggerListener(String trigger_name, TriggerListener listener) { 509 synchronized (trigger_list) { 510 trigger_list.addElement(trigger_name); 511 trigger_list.addElement(listener); 512 } 513 } 514 515 518 void removeTriggerListener(String trigger_name, TriggerListener listener) { 519 synchronized (trigger_list) { 520 for (int i = trigger_list.size() - 2; i >= 0; i -= 2) { 521 if (trigger_list.elementAt(i).equals(trigger_name) && 522 trigger_list.elementAt(i + 1).equals(listener)) { 523 trigger_list.removeElementAt(i); 524 trigger_list.removeElementAt(i); 525 } 526 } 527 } 528 } 529 530 531 536 StreamableObject createStreamableObject(InputStream x, 537 int length, byte type) { 538 long ob_id; 539 synchronized (s_object_hold) { 540 ob_id = s_object_id; 541 ++s_object_id; 542 s_object_hold.put(new Long (ob_id), x); 544 } 545 return new StreamableObject(type, length, ob_id); 547 } 548 549 553 void removeStreamableObject(StreamableObject s_object) { 554 s_object_hold.remove(new Long (s_object.getIdentifier())); 555 } 556 557 558 560 public void databaseEvent(int event_type, String event_message) { 565 if (event_type == 99) { 566 if (trigger_thread == null) { 567 trigger_thread = new TriggerDispatchThread(); 568 trigger_thread.start(); 569 } 570 trigger_thread.dispatchTrigger(event_message); 571 } 572 else { 573 throw new Error ("Unrecognised database event: " + event_type); 574 } 575 576 } 580 581 582 584 public Statement createStatement() throws SQLException { 585 return new MStatement(this); 586 } 587 588 public PreparedStatement prepareStatement(String sql) throws SQLException { 589 return new MPreparedStatement(this, sql); 590 } 591 592 public CallableStatement prepareCall(String sql) throws SQLException { 593 throw MSQLException.unsupported(); 594 } 595 596 public String nativeSQL(String sql) throws SQLException { 597 return sql; 599 } 600 601 public void setAutoCommit(boolean autoCommit) throws SQLException { 602 ResultSet result; 604 if (autoCommit) { 605 result = createStatement().executeQuery("SET AUTO COMMIT ON"); 606 auto_commit = true; 607 result.close(); 608 } 609 else { 610 result = createStatement().executeQuery("SET AUTO COMMIT OFF"); 611 auto_commit = false; 612 result.close(); 613 } 614 } 615 616 public boolean getAutoCommit() throws SQLException { 617 return auto_commit; 618 } 629 630 public void commit() throws SQLException { 631 ResultSet result; 632 result = createStatement().executeQuery("COMMIT"); 633 result.close(); 634 } 635 636 public void rollback() throws SQLException { 637 ResultSet result; 638 result = createStatement().executeQuery("ROLLBACK"); 639 result.close(); 640 } 641 642 public void close() throws SQLException { 643 644 if (!isClosed()) { 645 internalClose(); 646 } 647 648 657 } 669 670 public boolean isClosed() throws SQLException { 671 synchronized (lock) { 672 return is_closed; 673 } 674 } 675 676 679 public DatabaseMetaData getMetaData() throws SQLException { 680 return new MDatabaseMetaData(this); 681 } 682 683 public void setReadOnly(boolean readOnly) throws SQLException { 684 } 686 687 public boolean isReadOnly() throws SQLException { 688 return false; 690 } 691 692 public void setCatalog(String catalog) throws SQLException { 693 } 695 696 public String getCatalog() throws SQLException { 697 return null; 699 } 700 701 public void setTransactionIsolation(int level) throws SQLException { 702 if (level != TRANSACTION_SERIALIZABLE) { 703 throw new SQLException("Only 'TRANSACTION_SERIALIZABLE' supported."); 704 } 705 } 706 707 public int getTransactionIsolation() throws SQLException { 708 return TRANSACTION_SERIALIZABLE; 709 } 710 711 public SQLWarning getWarnings() throws SQLException { 712 synchronized (lock) { 713 return head_warning; 714 } 715 } 716 717 public void clearWarnings() throws SQLException { 718 synchronized (lock) { 719 head_warning = null; 720 } 721 } 722 723 725 727 public Statement createStatement(int resultSetType, 728 int resultSetConcurrency) throws SQLException { 729 Statement statement = createStatement(); 730 return statement; 733 } 734 735 public PreparedStatement prepareStatement(String sql, int resultSetType, 736 int resultSetConcurrency) throws SQLException { 737 PreparedStatement statement = prepareStatement(sql); 738 return statement; 741 } 742 743 public CallableStatement prepareCall(String sql, int resultSetType, 744 int resultSetConcurrency) throws SQLException { 745 throw MSQLException.unsupported(); 746 } 747 748 public Map getTypeMap() throws SQLException { 753 throw MSQLException.unsupported(); 754 } 755 756 public void setTypeMap(Map map) throws SQLException { 757 throw MSQLException.unsupported(); 758 } 759 760 762 764 766 public void setHoldability(int holdability) throws SQLException { 767 if (holdability == ResultSet.CLOSE_CURSORS_AT_COMMIT) { 770 throw new SQLException( 771 "CLOSE_CURSORS_AT_COMMIT holdability is not supported."); 772 } 773 } 774 775 public int getHoldability() throws SQLException { 776 return ResultSet.HOLD_CURSORS_OVER_COMMIT; 777 } 778 779 public Savepoint setSavepoint() throws SQLException { 780 throw MSQLException.unsupported(); 781 } 782 783 public Savepoint setSavepoint(String name) throws SQLException { 784 throw MSQLException.unsupported(); 785 } 786 787 public void rollback(Savepoint savepoint) throws SQLException { 788 throw MSQLException.unsupported(); 789 } 790 791 public void releaseSavepoint(Savepoint savepoint) throws SQLException { 792 throw MSQLException.unsupported(); 793 } 794 795 public Statement createStatement(int resultSetType, int resultSetConcurrency, 796 int resultSetHoldability) throws SQLException { 797 if (resultSetHoldability == ResultSet.CLOSE_CURSORS_AT_COMMIT) { 800 throw new SQLException( 801 "CLOSE_CURSORS_AT_COMMIT holdability is not supported."); 802 } 803 return createStatement(resultSetType, resultSetConcurrency); 804 } 805 806 public PreparedStatement prepareStatement( 807 String sql, int resultSetType, int resultSetConcurrency, 808 int resultSetHoldability) throws SQLException { 809 if (resultSetHoldability == ResultSet.CLOSE_CURSORS_AT_COMMIT) { 812 throw new SQLException( 813 "CLOSE_CURSORS_AT_COMMIT holdability is not supported."); 814 } 815 return prepareStatement(sql, resultSetType, resultSetConcurrency); 816 } 817 818 public CallableStatement prepareCall(String sql, int resultSetType, 819 int resultSetConcurrency, int resultSetHoldability) throws SQLException { 820 throw MSQLException.unsupported(); 821 } 822 823 public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) 824 throws SQLException { 825 throw MSQLException.unsupported(); 826 } 827 828 public PreparedStatement prepareStatement(String sql, int columnIndexes[]) 829 throws SQLException { 830 throw MSQLException.unsupported(); 831 } 832 833 public PreparedStatement prepareStatement(String sql, String columnNames[]) 834 throws SQLException { 835 throw MSQLException.unsupported(); 836 } 837 838 840 842 845 private class TriggerDispatchThread extends Thread { 846 847 private Vector trigger_messages_queue = new Vector (); 848 849 TriggerDispatchThread() { 850 setDaemon(true); 851 setName("Mckoi - Trigger Dispatcher"); 852 } 853 854 857 private void dispatchTrigger(String event_message) { 858 synchronized (trigger_messages_queue) { 859 trigger_messages_queue.addElement(event_message); 860 trigger_messages_queue.notifyAll(); 861 } 862 } 863 864 public void run() { 866 867 while (true) { 868 try { 869 String message; 870 synchronized (trigger_messages_queue) { 871 while (trigger_messages_queue.size() == 0) { 872 try { 873 trigger_messages_queue.wait(); 874 } 875 catch (InterruptedException e) { } 876 } 877 message = (String ) trigger_messages_queue.elementAt(0); 878 trigger_messages_queue.removeElementAt(0); 879 } 880 881 886 StringTokenizer tok = new StringTokenizer (message, " "); 887 String trigger_name = (String ) tok.nextElement(); 888 String trigger_source = (String ) tok.nextElement(); 889 String trigger_fire_count = (String ) tok.nextElement(); 890 891 Vector fired_triggers = new Vector (); 892 synchronized (trigger_list) { 894 for (int i = 0; i < trigger_list.size(); i += 2) { 895 String to_listen_for = (String ) trigger_list.elementAt(i); 896 if (to_listen_for.equals(trigger_name)) { 897 TriggerListener listener = 898 (TriggerListener) trigger_list.elementAt(i + 1); 899 fired_triggers.addElement(listener); 903 } 904 } 905 } 906 907 for (int i = 0; i < fired_triggers.size(); ++i) { 909 TriggerListener listener = 910 (TriggerListener) fired_triggers.elementAt(i); 911 listener.triggerFired(trigger_name); 912 } 913 914 } 915 catch (Throwable t) { 916 t.printStackTrace(System.err); 917 } 918 919 } 920 921 } 922 923 } 924 925 } 926 | Popular Tags |