| 1 21 package com.db4o.cs; 22 23 import java.io.*; 24 25 import com.db4o.*; 26 import com.db4o.config.*; 27 import com.db4o.cs.messages.*; 28 import com.db4o.ext.*; 29 import com.db4o.foundation.*; 30 import com.db4o.foundation.network.*; 31 import com.db4o.inside.*; 32 import com.db4o.inside.convert.*; 33 import com.db4o.inside.query.*; 34 import com.db4o.reflect.*; 35 36 39 public class YapClient extends YapStream implements ExtClient, BlobTransport { 40 41 final Object blobLock = new Object (); 42 43 private YapClientBlobThread blobThread; 44 45 private YapSocket i_socket; 46 47 Queue4 messageQueue = new Queue4(); 48 49 final Lock4 messageQueueLock = new Lock4(); 50 51 private String password; 53 int[] _prefetchedIDs; 54 55 private YapClientThread _readerThread; 56 57 int remainingIDs; 58 59 private String switchedToFile; 60 61 private boolean _singleThreaded; 62 63 private String userName; 64 65 private Db4oDatabase i_db; 66 67 protected boolean _doFinalize=true; 68 69 private int _blockSize = 1; 70 71 72 private YapClient(Configuration config) { 73 super(config,null); 74 } 75 76 79 public YapClient(String fakeServerFile) { 80 this(Db4o.cloneConfiguration()); 81 synchronized (lock()) { 82 _singleThreaded = configImpl().singleThreadedClient(); 83 if (Debug.fakeServer) { 84 DebugCS.serverStream = (YapFile) Db4o.openFile(fakeServerFile); 85 DebugCS.clientStream = this; 86 DebugCS.clientMessageQueue = messageQueue; 87 DebugCS.clientMessageQueueLock = messageQueueLock; 88 readThis(); 89 } else { 90 throw new RuntimeException ( 91 "This constructor is for Debug.fakeServer use only."); 92 } 93 initialize3(); 94 Platform4.postOpen(this); 95 } 96 } 97 98 public YapClient(Configuration config,YapSocket socket, String user, String password_, boolean login) 99 throws IOException { 100 this(config); 101 synchronized (lock()) { 102 _singleThreaded = configImpl().singleThreadedClient(); 103 104 109 if (password_ == null) { 110 throw new NullPointerException (Messages.get(56)); 111 } 112 if (!login) { 113 password_ = null; 114 } 115 116 userName = user; 117 password = password_; 118 i_socket = socket; 119 try { 120 loginToServer(socket); 121 } catch (IOException e) { 122 stopSession(); 123 throw e; 124 } 125 126 if (!_singleThreaded) { 127 startReaderThread(socket, user); 128 } 129 130 logMsg(36, toString()); 131 132 readThis(); 133 134 initialize3(); 135 Platform4.postOpen(this); 136 } 137 } 138 139 private void startReaderThread(YapSocket socket, String user) { 140 _readerThread = new YapClientThread(this, socket, messageQueue, 141 messageQueueLock); 142 _readerThread.setName("db4o message client for user " + user); 143 _readerThread.start(); 144 } 145 146 public void backup(String path) throws IOException { 147 Exceptions4.throwRuntimeException(60); 148 } 149 150 public void blockSize(int blockSize){ 151 _blockSize = blockSize; 152 } 153 154 public byte blockSize() { 155 return (byte)_blockSize; 156 } 157 158 protected boolean close2() { 159 if (_readerThread == null || _readerThread.isClosed()) { 160 return super.close2(); 161 } 162 try { 163 writeMsg(Msg.COMMIT_OK); 164 expectedResponse(Msg.OK); 165 } catch (Exception e) { 166 Exceptions4.catchAllExceptDb4oException(e); 167 } 168 try { 169 writeMsg(Msg.CLOSE); 170 } catch (Exception e) { 171 Exceptions4.catchAllExceptDb4oException(e); 172 } 173 try { 174 if (!_singleThreaded) { 175 _readerThread.close(); 176 } 177 } catch (Exception e) { 178 Exceptions4.catchAllExceptDb4oException(e); 179 } 180 try { 181 i_socket.close(); 182 } catch (Exception e) { 183 Exceptions4.catchAllExceptDb4oException(e); 184 } 185 186 boolean ret = super.close2(); 187 if (Debug.fakeServer) { 188 DebugCS.serverStream.close(); 189 } 190 return ret; 191 } 192 193 public final void commit1() { 194 i_trans.commit(); 195 } 196 197 public int converterVersion() { 198 return Converter.VERSION; 199 } 200 201 YapSocket createParalellSocket() throws IOException { 202 writeMsg(Msg.GET_THREAD_ID); 203 204 int serverThreadID = expectedByteResponse(Msg.ID_LIST).readInt(); 205 206 YapSocket sock = i_socket.openParalellSocket(); 207 208 if (!(i_socket instanceof YapSocketFake)) { 209 loginToServer(sock); 210 } 211 212 if (switchedToFile != null) { 213 MsgD message = Msg.SWITCH_TO_FILE.getWriterForString(i_systemTrans, 214 switchedToFile); 215 message.write(this, sock); 216 if (!(Msg.OK.equals(Msg.readMessage(i_systemTrans, sock)))) { 217 throw new IOException(Messages.get(42)); 218 } 219 } 220 Msg.USE_TRANSACTION.getWriterForInt(i_trans, serverThreadID).write( 221 this, sock); 222 return sock; 223 } 224 225 public AbstractQueryResult newQueryResult(Transaction trans, QueryEvaluationMode mode) { 226 throw new IllegalStateException (); 227 } 228 229 final public Transaction newTransaction(Transaction parentTransaction) { 230 return new TransactionClient(this, parentTransaction); 231 } 232 233 public boolean createYapClass(YapClass a_yapClass, ReflectClass a_class, 234 YapClass a_superYapClass) { 235 writeMsg(Msg.CREATE_CLASS.getWriterForString(i_systemTrans, a_class 236 .getName())); 237 Msg resp = getResponse(); 238 if (resp == null) { 239 return false; 240 } 241 242 if (resp.equals(Msg.FAILED)) { 243 sendClassMeta(a_class); 245 resp = getResponse(); 246 } 247 248 if (resp.equals(Msg.FAILED)) { 249 if (configImpl().exceptionsOnNotStorable()) { 250 throw new ObjectNotStorableException(a_class); 251 } 252 return false; 253 } 254 if (!resp.equals(Msg.OBJECT_TO_CLIENT)) { 255 return false; 256 } 257 258 MsgObject message = (MsgObject) resp; 259 YapWriter bytes = message.unmarshall(); 260 if (bytes == null) { 261 return false; 262 } 263 bytes.setTransaction(getSystemTransaction()); 264 if (!super.createYapClass(a_yapClass, a_class, a_superYapClass)) { 265 return false; 266 } 267 a_yapClass.setID(message.getId()); 268 a_yapClass.readName1(getSystemTransaction(), bytes); 269 classCollection().addYapClass(a_yapClass); 270 classCollection().readYapClass(a_yapClass, a_class); 271 return true; 272 } 273 274 private void sendClassMeta(ReflectClass reflectClass) { 275 ClassMeta classMeta = _classMetaHelper.getClassMeta(reflectClass); 276 writeMsg(Msg.CLASS_META.getWriter(marshall(i_systemTrans, classMeta))); 277 } 278 279 public long currentVersion() { 280 writeMsg(Msg.CURRENT_VERSION); 281 return ((MsgD) expectedResponse(Msg.ID_LIST)).readLong(); 282 } 283 284 public final boolean delete4(Transaction ta, YapObject yo, int a_cascade, boolean userCall) { 285 writeMsg(Msg.DELETE.getWriterForInts(i_trans, new int[] { yo.getID(), userCall ? 1 : 0 })); 286 return true; 287 } 288 289 public boolean detectSchemaChanges() { 290 return false; 291 } 292 293 protected boolean doFinalize() { 294 return _doFinalize; 295 } 296 297 final YapReader expectedByteResponse(Msg expectedMessage) { 298 Msg msg = expectedResponse(expectedMessage); 299 if (msg == null) { 300 return null; 303 } 304 return msg.getByteLoad(); 305 } 306 307 final Msg expectedResponse(Msg expectedMessage) { 308 Msg message = getResponse(); 309 if (expectedMessage.equals(message)) { 310 return message; 311 } 312 if (Deploy.debug) { 313 new RuntimeException ().printStackTrace(); 314 if (message == null) { 315 System.out.println("Message was null"); 316 } 317 if (!expectedMessage.equals(message)) { 318 System.out.println("Unexpected Message:" + message 319 + " Expected:" + expectedMessage); 320 } 321 } 322 return null; 323 } 324 325 public AbstractQueryResult getAll(Transaction trans) { 326 int mode = config().queryEvaluationMode().asInt(); 327 writeMsg(Msg.GET_ALL.getWriterForInt(trans, mode)); 328 return readQueryResult(trans); 329 } 330 331 336 Msg getResponse() { 337 return _singleThreaded ? getResponseSingleThreaded() 338 : getResponseMultiThreaded(); 339 } 340 341 private Msg getResponseMultiThreaded() { 342 try { 343 344 return (Msg) messageQueueLock.run(new Closure4() { 345 public Object run() { 346 Msg message = retrieveMessage(); 347 if (message != null) { 348 return message; 349 } 350 351 throwOnClosed(); 352 messageQueueLock.snooze(configImpl().timeoutClientSocket()); 353 throwOnClosed(); 354 return retrieveMessage(); 355 } 356 357 private void throwOnClosed() { 358 if (_readerThread.isClosed()) { 359 _doFinalize=false; 360 throw new Db4oException(Messages.get(Messages.CLOSED_OR_OPEN_FAILED)); 361 } 362 } 363 364 private Msg retrieveMessage() { 365 Msg message = null; 366 message = (Msg) messageQueue.next(); 367 if (message != null) { 368 if (Debug.messages) { 369 System.out 370 .println(message + " processed at client"); 371 } 372 if (Msg.ERROR.equals(message)) { 373 throw new Db4oException("Client connection error"); 374 } 375 } 376 return message; 377 } 378 }); 379 } catch (Exception ex) { 380 Exceptions4.catchAllExceptDb4oException(ex); 381 return null; 382 } 383 } 384 385 private Msg getResponseSingleThreaded() { 386 while (i_socket != null) { 387 try { 388 final Msg message = Msg.readMessage(i_trans, i_socket); 389 if (Msg.PING.equals(message)) { 390 writeMsg(Msg.OK); 391 } else if (Msg.CLOSE.equals(message)) { 392 logMsg(35, toString()); 393 close(); 394 return null; 395 } else if (message != null) { 396 return message; 397 } 398 } catch (Exception e) { 399 } 400 } 401 return null; 402 } 403 404 public YapClass getYapClass(int a_id) { 405 YapClass yc = super.getYapClass(a_id); 406 if (yc != null) { 407 return yc; 408 } 409 writeMsg(Msg.CLASS_NAME_FOR_ID.getWriterForInt(i_systemTrans, a_id)); 410 MsgD message = (MsgD) expectedResponse(Msg.CLASS_NAME_FOR_ID); 411 String className = message.readString(); 412 if (className != null && className.length() > 0) { 413 ReflectClass claxx = reflector().forName(className); 414 if (claxx != null) { 415 return produceYapClass(claxx); 416 } 417 } 419 return null; 420 } 421 422 public boolean needsLockFileThread() { 423 return false; 424 } 425 426 protected boolean hasShutDownHook() { 427 return false; 428 } 429 430 public Db4oDatabase identity() { 431 if (i_db == null) { 432 writeMsg(Msg.IDENTITY); 433 YapReader reader = expectedByteResponse(Msg.ID_LIST); 434 showInternalClasses(true); 435 i_db = (Db4oDatabase) getByID(reader.readInt()); 436 activate1(i_systemTrans, i_db, 3); 437 showInternalClasses(false); 438 } 439 return i_db; 440 } 441 442 public boolean isClient() { 443 return true; 444 } 445 446 void loginToServer(YapSocket a_socket) throws IOException { 447 if (password != null) { 448 YapStringIOUnicode stringWriter = new YapStringIOUnicode(); 449 int length = stringWriter.length(userName) 450 + stringWriter.length(password); 451 452 MsgD message = Msg.LOGIN.getWriterForLength(i_systemTrans, length); 453 message.writeString(userName); 454 message.writeString(password); 455 message.write(this, a_socket); 456 Msg msg = Msg.readMessage(i_systemTrans, a_socket); 457 if (!Msg.LOGIN_OK.equals(msg)) { 458 throw new IOException(Messages.get(42)); 459 } 460 YapReader payLoad = msg.payLoad(); 461 _blockSize = payLoad.readInt(); 462 int doEncrypt = payLoad.readInt(); 463 if(doEncrypt == 0){ 464 i_handlers.oldEncryptionOff(); 465 } 466 467 } 468 } 469 470 public boolean maintainsIndices() { 471 return false; 472 } 473 474 public final int newUserObject() { 475 int prefetchIDCount = config().prefetchIDCount(); 476 ensureIDCacheAllocated(prefetchIDCount); 477 YapReader reader = null; 478 if (remainingIDs < 1) { 479 writeMsg(Msg.PREFETCH_IDS.getWriterForInt(i_trans, prefetchIDCount)); 480 reader = expectedByteResponse(Msg.ID_LIST); 481 for (int i = prefetchIDCount - 1; i >= 0; i--) { 482 _prefetchedIDs[i] = reader.readInt(); 483 } 484 remainingIDs = prefetchIDCount; 485 } 486 remainingIDs--; 487 return _prefetchedIDs[remainingIDs]; 488 } 489 490 public int prefetchObjects(IntIterator4 ids, Object [] prefetched, 491 int prefetchCount) { 492 493 int count = 0; 494 495 int toGet = 0; 496 int[] idsToGet = new int[prefetchCount]; 497 int[] position = new int[prefetchCount]; 498 499 while (count < prefetchCount) { 500 if (!ids.moveNext()) { 501 break; 502 } 503 int id = ids.currentInt(); 504 if (id > 0) { 505 Object obj = objectForIDFromCache(id); 506 if(obj != null){ 507 prefetched[count] = obj; 508 }else{ 509 idsToGet[toGet] = id; 510 position[toGet] = count; 511 toGet++; 512 } 513 count++; 514 } 515 } 516 517 if (toGet > 0) { 518 writeMsg(Msg.READ_MULTIPLE_OBJECTS.getWriterForIntArray(i_trans, 519 idsToGet, toGet)); 520 MsgD message = (MsgD) expectedResponse(Msg.READ_MULTIPLE_OBJECTS); 521 int embeddedMessageCount = message.readInt(); 522 for (int i = 0; i < embeddedMessageCount; i++) { 523 MsgObject mso = (MsgObject) Msg.OBJECT_TO_CLIENT 524 .clone(getTransaction()); 525 mso.payLoad(message.payLoad().readYapBytes()); 526 if (mso.payLoad() != null) { 527 mso.payLoad().incrementOffset(YapConst.MESSAGE_LENGTH); 528 YapWriter reader = mso.unmarshall(YapConst.MESSAGE_LENGTH); 529 Object obj = objectForIDFromCache(idsToGet[i]); 530 if(obj != null){ 531 prefetched[position[i]] = obj; 532 }else{ 533 prefetched[position[i]] = new YapObject(idsToGet[i]).readPrefetch(this, reader); 534 } 535 } 536 } 537 } 538 return count; 539 } 540 541 void processBlobMessage(MsgBlob msg) { 542 synchronized (blobLock) { 543 boolean needStart = blobThread == null || blobThread.isTerminated(); 544 if (needStart) { 545 blobThread = new YapClientBlobThread(this); 546 } 547 blobThread.add(msg); 548 if (needStart) { 549 blobThread.start(); 550 } 551 } 552 } 553 554 public void raiseVersion(long a_minimumVersion) { 555 writeMsg(Msg.RAISE_VERSION.getWriterForLong(i_trans, a_minimumVersion)); 556 } 557 558 public void readBytes(byte[] bytes, int address, int addressOffset, int length) { 559 throw Exceptions4.virtualException(); 560 } 561 562 public void readBytes(byte[] a_bytes, int a_address, int a_length) { 563 writeMsg(Msg.READ_BYTES.getWriterForInts(i_trans, new int[] { 564 a_address, a_length })); 565 YapReader reader = expectedByteResponse(Msg.READ_BYTES); 566 System.arraycopy(reader._buffer, 0, a_bytes, 0, a_length); 567 } 568 569 protected boolean rename1(Config4Impl config) { 570 logMsg(58, null); 571 return false; 572 } 573 574 public final YapWriter readWriterByID(Transaction a_ta, int a_id) { 575 try { 576 writeMsg(Msg.READ_OBJECT.getWriterForInt(a_ta, a_id)); 577 YapWriter bytes = ((MsgObject) expectedResponse(Msg.OBJECT_TO_CLIENT)) 578 .unmarshall(); 579 if (bytes == null) { 580 return null; 581 } 582 bytes.setTransaction(a_ta); 583 return bytes; 584 } catch (Exception e) { 585 return null; 586 } 587 } 588 589 public final YapReader readReaderByID(Transaction a_ta, int a_id) { 590 return readWriterByID(a_ta, a_id); 592 } 593 594 private AbstractQueryResult readQueryResult(Transaction trans) { 595 AbstractQueryResult queryResult = null; 596 YapReader reader = expectedByteResponse(Msg.QUERY_RESULT); 597 int queryResultID = reader.readInt(); 598 if(queryResultID > 0){ 599 queryResult = new LazyClientQueryResult(trans, this, queryResultID); 600 }else{ 601 queryResult = new IdListQueryResult(trans); 602 } 603 queryResult.loadFromIdReader(reader); 604 return queryResult; 605 } 606 607 void readThis() { 608 writeMsg(Msg.GET_CLASSES.getWriter(i_systemTrans)); 609 YapReader bytes = expectedByteResponse(Msg.GET_CLASSES); 610 classCollection().setID(bytes.readInt()); 611 createStringIO(bytes.readByte()); 612 classCollection().read(i_systemTrans); 613 classCollection().refreshClasses(); 614 } 615 616 public void releaseSemaphore(String name) { 617 synchronized (i_lock) { 618 checkClosed(); 619 if (name == null) { 620 throw new NullPointerException (); 621 } 622 writeMsg(Msg.RELEASE_SEMAPHORE.getWriterForString(i_trans, name)); 623 } 624 } 625 626 public void releaseSemaphores(Transaction ta) { 627 } 629 630 private void reReadAll(Configuration config) { 631 remainingIDs = 0; 632 initialize1(config); 633 initializeTransactions(); 634 readThis(); 635 } 636 637 public final void rollback1() { 638 writeMsg(Msg.ROLLBACK); 639 i_trans.rollback(); 640 } 641 642 public void send(Object obj) { 643 synchronized (i_lock) { 644 if (obj != null) { 645 writeMsg(Msg.USER_MESSAGE.getWriter(marshall(i_trans, obj))); 646 } 647 } 648 } 649 650 public final void setDirtyInSystemTransaction(YapMeta a_object) { 651 } 653 654 public boolean setSemaphore(String name, int timeout) { 655 synchronized (i_lock) { 656 checkClosed(); 657 if (name == null) { 658 throw new NullPointerException (); 659 } 660 writeMsg(Msg.SET_SEMAPHORE.getWriterForIntString(i_trans, timeout, 661 name)); 662 Msg message = getResponse(); 663 return (message.equals(Msg.SUCCESS)); 664 } 665 } 666 667 public void switchToFile(String fileName) { 668 synchronized (i_lock) { 669 commit(); 670 writeMsg(Msg.SWITCH_TO_FILE.getWriterForString(i_trans, fileName)); 671 expectedResponse(Msg.OK); 672 reReadAll(Db4o.cloneConfiguration()); 674 switchedToFile = fileName; 675 } 676 } 677 678 public void switchToMainFile() { 679 synchronized (i_lock) { 680 commit(); 681 writeMsg(Msg.SWITCH_TO_MAIN_FILE); 682 expectedResponse(Msg.OK); 683 reReadAll(Db4o.cloneConfiguration()); 685 switchedToFile = null; 686 } 687 } 688 689 public String name() { 690 return toString(); 691 } 692 693 public String toString() { 694 return "Client Connection " + userName; 698 } 699 700 public void write(boolean shuttingDown) { 701 } 703 704 public final void writeDirty() { 705 } 707 708 public final void writeEmbedded(YapWriter a_parent, YapWriter a_child) { 709 a_parent.addEmbedded(a_child); 710 } 711 712 final void writeMsg(Msg a_message) { 713 a_message.write(this, i_socket); 714 } 715 716 public final void writeNew(YapClass a_yapClass, YapWriter aWriter) { 717 writeMsg(Msg.WRITE_NEW.getWriter(a_yapClass, aWriter)); 718 } 719 720 public final void writeTransactionPointer(int a_address) { 721 } 723 724 public final void writeUpdate(YapClass a_yapClass, YapWriter a_bytes) { 725 writeMsg(Msg.WRITE_UPDATE.getWriter(a_yapClass, a_bytes)); 726 } 727 728 public boolean isAlive() { 729 try { 730 writeMsg(Msg.PING); 731 return expectedResponse(Msg.OK) != null; 732 } catch (Db4oException exc) { 733 return false; 734 } 735 } 736 737 public YapSocket socket() { 739 return i_socket; 740 } 741 742 private void ensureIDCacheAllocated(int prefetchIDCount) { 743 if(_prefetchedIDs==null) { 744 _prefetchedIDs = new int[prefetchIDCount]; 745 return; 746 } 747 if(prefetchIDCount>_prefetchedIDs.length) { 748 int[] newPrefetchedIDs=new int[prefetchIDCount]; 749 System.arraycopy(_prefetchedIDs, 0, newPrefetchedIDs, 0, _prefetchedIDs.length); 750 _prefetchedIDs=newPrefetchedIDs; 751 } 752 } 753 754 public SystemInfo systemInfo() { 755 throw new NotImplementedException("Functionality not availble on clients."); 756 } 757 758 759 public void writeBlobTo(Transaction trans, BlobImpl blob, File file) throws IOException { 760 MsgBlob msg = (MsgBlob) Msg.READ_BLOB.getWriterForInt(trans, (int) getID(blob)); 761 msg._blob = blob; 762 processBlobMessage(msg); 763 } 764 765 public void readBlobFrom(Transaction trans, BlobImpl blob, File file) throws IOException { 766 MsgBlob msg = null; 767 synchronized (lock()) { 768 set(blob); 769 int id = (int) getID(blob); 770 msg = (MsgBlob) Msg.WRITE_BLOB.getWriterForInt(trans, id); 771 msg._blob = blob; 772 blob.setStatus(Status.QUEUED); 773 } 774 processBlobMessage(msg); 775 } 776 777 public long[] getIDsForClass(Transaction trans, YapClass clazz){ 778 writeMsg(Msg.GET_INTERNAL_IDS.getWriterForInt(trans, clazz.getID())); 779 YapReader reader = expectedByteResponse(Msg.ID_LIST); 780 int size = reader.readInt(); 781 final long[] ids = new long[size]; 782 for (int i = 0; i < size; i++) { 783 ids[i] = reader.readInt(); 784 } 785 return ids; 786 } 787 788 public QueryResult classOnlyQuery(Transaction trans, YapClass clazz){ 789 long[] ids = clazz.getIDs(trans); 790 ClientQueryResult resClient = new ClientQueryResult(trans, ids.length); 791 for (int i = 0; i < ids.length; i++) { 792 resClient.add((int)ids[i]); 793 } 794 return resClient; 795 } 796 797 public QueryResult executeQuery(QQuery query){ 798 Transaction trans = query.getTransaction(); 799 query.evaluationMode(config().queryEvaluationMode()); 800 query.marshall(); 801 writeMsg(Msg.QUERY_EXECUTE.getWriter(marshall(trans,query))); 802 return readQueryResult(trans); 803 } 804 805 806 807 } 808 | Popular Tags |