1 9 package org.ozoneDB; 10 11 import org.ozoneDB.DxLib.*; 12 import org.ozoneDB.core.DbRemote.*; 13 import org.ozoneDB.core.*; 14 15 import java.io.*; 16 import java.util.Hashtable ; 17 18 19 41 public class ClientCacheDatabase extends ExternalDatabase { 42 43 47 private DxMap idTable; 48 49 53 private DxMap nameTable; 54 55 private long totalMemory; 56 57 private ExternalDatabase delegate; 58 59 private long idCount; 60 private long idBorder; 61 private long idRange = 1000; 62 63 private boolean debug; 64 65 66 77 public ClientCacheDatabase( ExternalDatabase _delegate ) { 78 this( _delegate, false ); 79 } 80 81 82 94 public ClientCacheDatabase( ExternalDatabase _delegate, boolean _debug ) { 95 delegate = _delegate; 96 delegate.setWrapper( this ); 97 98 debug = _debug; 99 idTable = new DxHashMap( 1000 ); 100 nameTable = new DxHashMap( 100 ); 101 102 calcMemory(); 103 } 104 105 106 protected void open( Hashtable _props ) throws Exception { 107 throw new RuntimeException ( "Method open() must not be called for this class." ); 108 } 109 110 111 protected synchronized ObjectID nextID() throws Exception { 112 if (idCount >= idBorder) { 113 ObjectID id = (ObjectID)delegate.sendCommand( new DbNextID( idRange ), true ); 114 idCount = id.value(); 115 idBorder = idCount + idRange; 116 } 117 return new ObjectID( ++idCount ); 118 } 119 120 121 public ExternalDatabase delegate() { 122 return delegate; 123 } 124 125 126 protected Object sendCommand( DbCommand command, boolean waitForResult, DbClient connection ) throws Exception { 127 throw new RuntimeException ( "ClientCacheDatabase must access the actual database through its delegate only." ); 128 } 129 130 131 protected DbClient newConnection() throws Exception { 132 return delegate.newConnection(); 133 } 134 135 136 public boolean isOpen() throws Exception { 137 return delegate.isOpen(); 138 } 139 140 141 public void close() throws Exception { 142 delegate.close(); 143 } 144 145 146 protected void finalize() throws Throwable { 147 close(); 148 } 149 150 151 public ExternalTransaction newTransaction() { 152 return new ExternalTransaction( this ); 153 } 154 155 156 public void beginTX( AbstractTransaction tx ) throws TransactionException, IOException { 157 if (debug) { 158 System.out.println( "[debug] beginTX()" ); 159 } 160 delegate.beginTX( tx ); 161 } 162 163 164 public void joinTX( AbstractTransaction tx ) throws TransactionException { 165 if (debug) { 166 System.out.println( "[debug] joinTX()" ); 167 } 168 delegate.joinTX( tx ); 169 } 170 171 172 public boolean leaveTX( AbstractTransaction tx ) { 173 if (debug) { 174 System.out.println( "[debug] leaveTX()" ); 175 } 176 return delegate.leaveTX( tx ); 177 } 178 179 180 public void checkpointTX( AbstractTransaction tx ) throws TransactionException, IOException { 181 if (debug) { 182 System.out.println( "[debug] checkpointTX()" ); 183 } 184 throw new RuntimeException ( "Not yet implemented." ); 185 } 186 187 188 public synchronized void prepareTX( AbstractTransaction tx ) throws TransactionException, IOException { 189 if (debug) { 190 System.out.println( "[debug] prepareTX()" ); 191 } 192 try { 193 syncCache(); 194 delegate.prepareTX( tx ); 195 updateModTimes(); 196 } catch (TransactionException e) { 197 abortCache(); 198 throw e; 199 } catch (Exception e) { 200 throw new UnexpectedException( e.toString() ); 201 } 202 } 203 204 205 public synchronized void commitTX( AbstractTransaction tx, boolean onePhase ) throws TransactionException, IOException { 206 if (debug) { 207 System.out.println( "[debug] commitTX(): onePhase:" + onePhase ); 208 } 209 210 if (onePhase) { 211 prepareTX( tx ); 212 } 213 214 try { 215 delegate.commitTX( tx, false ); 216 } catch (TransactionException e) { 217 throw e; 218 } catch (Exception e) { 219 throw new UnexpectedException( e.toString() ); 220 } 221 222 try { 224 DxIterator it = idTable.iterator(); 225 while (it.next() != null) { 226 CacheObjectContainer container = (CacheObjectContainer)it.object(); 229 container.clearState(); 230 } 231 } catch (Exception e) { 232 throw new UnexpectedException( e.toString() ); 235 } 236 } 237 238 239 public synchronized void rollbackTX( AbstractTransaction tx ) throws TransactionException, IOException { 240 if (debug) { 241 System.out.println( "[debug] rollbackTX()" ); 242 } 243 244 delegate.rollbackTX( tx ); 245 abortCache(); 246 } 247 248 249 251 252 256 public void reloadClasses() throws Exception { 257 delegate.reloadClasses(); 258 } 259 260 261 public synchronized OzoneProxy createObject( String className, int access, String name, String sig, Object [] args ) 262 throws RuntimeException { 263 try { 264 AbstractTransaction tx = delegate.txForThread( Thread.currentThread() ); 265 if (tx == null) { 266 throw new TransactionException( "Thread has not yet joined a transaction.", TransactionException.STATE ); 267 } 268 269 OzoneCompatible target = (OzoneCompatible)Env.currentEnv().classManager.classForName( className ).newInstance(); 272 273 CacheObjectContainer container = new CacheObjectContainer( target, nextID(), name, access ); 274 container.setDatabase( this ); 275 container.raiseState( ObjectContainer.STATE_CREATED ); 276 container.setDirty( true ); 277 container.tx = tx; 278 279 idTable.addForKey( container, container.id() ); 280 281 return container.ozoneProxy(); 282 } catch (Exception e) { 283 throw new RuntimeException ("Caught during createObject(): "+e); 286 } 287 } 288 289 290 public synchronized void deleteObject( OzoneRemote obj ) throws RuntimeException { 291 try { 292 OzoneProxy proxy = (OzoneProxy)obj; 293 CacheObjectContainer container = fetch0( proxy.remoteID(), Lock.LEVEL_WRITE ); 294 295 container.raiseState( ObjectContainer.STATE_DELETED ); 296 } catch (Exception e) { 297 throw new RuntimeException ("Caught during createObject(): "+e); 300 } 301 } 302 303 304 public synchronized OzoneProxy copyObject( OzoneRemote obj ) throws Exception { 305 throw new RuntimeException ( "copyObject(): Method not implemented." ); 306 } 307 308 309 public synchronized void nameObject( OzoneRemote obj, String name ) throws Exception { 310 311 ObjectID id = (ObjectID)nameTable.elementForKey( name ); 312 if (id != null) { 313 throw new PermissionDeniedException( "Root object name '" + name + "' already exists." ); 314 } 315 316 OzoneProxy proxy = (OzoneProxy)obj; 317 CacheObjectContainer container = fetch0( proxy.remoteID(), Lock.LEVEL_WRITE ); 318 container.setName( name ); 319 nameTable.addForKey( container.id(), name ); 320 } 321 322 323 public synchronized OzoneProxy objectForName( String name ) throws Exception { 324 if (debug) { 325 System.out.println( "[debug] objectForName(): name:" + name ); 326 } 327 328 ObjectID id = (ObjectID)nameTable.elementForKey( name ); 329 330 if (id != null) { 331 CacheObjectContainer container = (CacheObjectContainer)idTable.elementForKey( id ); 332 if (container != null) { 333 return container.ozoneProxy(); 334 } else { 335 container = fetch0( id, Lock.LEVEL_READ ); 336 337 if (container != null) { 338 return container.ozoneProxy(); 339 } else { 340 nameTable.removeForKey( name ); 341 return null; 342 } 343 } 344 } else { 345 OzoneProxy proxy = delegate.objectForName( name ); 346 if (proxy != null) { 347 id = proxy.remoteID(); 348 nameTable.addForKey( id, name ); 349 350 CacheObjectContainer container = fetch0( id, Lock.LEVEL_READ ); 351 return container.ozoneProxy(); 352 } else { 353 return null; 354 } 355 } 356 } 357 358 359 public Object invoke( OzoneProxy proxy, String methodName, String sig, Object [] args, int lockLevel ) 360 throws Exception { 361 throw new RuntimeException ( "invoke(): Method not implemented." ); 362 } 363 364 365 public Object invoke( OzoneProxy proxy, int methodIndex, Object [] args, int lockLevel ) throws Exception { 366 throw new RuntimeException ( "invoke(): Method not implemented." ); 367 } 368 369 370 372 373 public OzoneCompatible fetch(OzoneProxy proxy,int lockLevel) throws Exception ,org.ozoneDB.ObjectNotFoundException, java.io.IOException , java.lang.ClassNotFoundException , org.ozoneDB.TransactionException, org.ozoneDB.core.TransactionError { 374 if (debug) { 375 System.out.println( "[debug] fetch(): id:" + proxy.remoteID() ); 376 } 377 378 CacheObjectContainer container = fetch0(proxy.remoteID(),lockLevel); 379 OzoneCompatible target = container.target(); 380 if (target == null) { 381 if (debug) { 382 System.out.println( "[debug] fetch(): id:" + proxy.remoteID() ); 383 } 384 throw new ObjectNotFoundException( "Target is null." ); 385 } 386 387 return target; 388 } 389 390 391 399 protected CacheObjectContainer fetch0( ObjectID id, int lockLevel ) throws Exception ,ObjectNotFoundException,TransactionException { 400 401 AbstractTransaction tx = delegate.txForThread( Thread.currentThread() ); 402 if (tx == null) { 403 throw new TransactionException( "Thread has not yet joined a transaction.", TransactionException.STATE ); 404 } 405 406 CacheObjectContainer container = (CacheObjectContainer)idTable.elementForKey( id ); 407 if (container == null) { 408 container = fetchChunk(id,100000); 409 } 410 411 if (container == null) { 412 throw new ObjectNotFoundException( "No object for the given ID." ); 413 } 414 415 synchronized (container) { 419 420 while (container.tx != null && container.tx != tx) { 423 try { 424 container.wait(); 427 } catch (InterruptedException e) { 428 } 429 } 430 431 if (container.tx == null) { 434 container.tx = tx; 435 } 436 437 if (lockLevel == Lock.LEVEL_READ) { 438 container.raiseState( CacheObjectContainer.STATE_READ ); 439 } else { 440 container.raiseState( CacheObjectContainer.STATE_MODIFIED ); 441 container.setDirty( true ); 442 } 443 } 444 445 return container; 446 } 447 448 449 protected synchronized CacheObjectContainer fetchChunk( ObjectID rootID, int size ) throws Exception { 450 ensureSpace(size); 451 452 DxArrayBag chunk = (DxArrayBag) delegate.sendCommand(new DbCacheChunk(rootID,size),true); 453 454 for (int i = 0; i < chunk.count(); i++) { 455 CacheObjectContainer container = (CacheObjectContainer)chunk.elementAtIndex( i ); 456 if (debug) { 457 System.out.println( "[debug] fetchChunk(): container:" + container.id() ); 458 } 459 460 container.setTarget( container.target() ); 462 463 container.setDatabase( this ); 464 465 if (idTable.containsKey( container.id() )) { 466 System.out.print( "[debug] fetchChunk(): container already registered... " ); 467 468 CacheObjectContainer c = (CacheObjectContainer)idTable.elementForKey( container.id() ); 471 if (c.tx != null) { 472 System.out.println( "and locked - using old one." ); 473 } else { 474 System.out.println( "not locked - using new one." ); 475 idTable.removeForKey( container.id() ); 476 idTable.addForKey( container, container.id() ); 477 } 478 } else { 479 idTable.addForKey( container, container.id() ); 480 } 481 } 482 483 return (CacheObjectContainer)idTable.elementForKey( rootID ); 484 } 485 486 487 492 protected synchronized void syncCache() throws Exception { 493 if (debug) { 494 System.out.println( "[debug] syncCache()" ); 495 } 496 497 500 ByteArrayOutputStream bout = new ByteArrayOutputStream(); 501 ObjectOutputStream out = new ObjectOutputStream( bout ); 502 503 int count = 0; 504 DxIterator it = idTable.iterator(); 505 while (it.next() != null) { 506 CacheObjectContainer container = (CacheObjectContainer)it.object(); 507 508 if (debug) { 509 System.out.println( "[debug] id:" + container.id() + ", state:" + container.state() + ", dirty:" 510 + container.dirty() ); 511 } 512 513 if (container.dirty()) { 514 out.writeObject( container ); 515 516 container.setDirty( false ); 520 count++; 521 } 522 523 if (bout.size() > 500000) { 524 if (debug) { 525 System.out.println( "[debug] syncCache(): writing " + count + " objects" ); 526 } 527 528 out.close(); 529 delegate.sendCommand( new DbCacheChunk( bout.toByteArray() ), true ); 530 bout = new ByteArrayOutputStream(); 531 out = new ObjectOutputStream( bout ); 532 count = 0; 533 } 534 } 535 536 if (debug) { 537 System.out.println( "[debug] syncCache(): writing " + count + " objects" ); 538 } 539 out.close(); 540 delegate.sendCommand( new DbCacheChunk( bout.toByteArray() ), true ); 541 } 542 543 544 547 protected synchronized void abortCache() { 548 try { 549 DxIterator it = idTable.iterator(); 550 while (it.next() != null) { 551 CacheObjectContainer container = (CacheObjectContainer)it.object(); 552 if (container.state() >= ObjectContainer.STATE_MODIFIED) { 553 idTable.removeForKey( container.id() ); 554 if (container.name() != null) { 555 nameTable.removeForKey( container.name() ); 556 } 557 } 558 } 559 } catch (Exception e) { 560 throw new RuntimeException ( e.toString() ); 563 } 564 } 565 566 567 571 protected synchronized void updateModTimes() { 572 try { 573 if (debug) { 574 System.out.println( "[debug] updateModTimes()" ); 575 } 576 577 DbModTimes command = new DbModTimes(); 578 579 int count = 0; 580 DxIterator it = idTable.iterator(); 581 while (it.next() != null) { 582 CacheObjectContainer container = (CacheObjectContainer)it.object(); 583 if (container.state() == ObjectContainer.STATE_MODIFIED || container.state() 584 == ObjectContainer.STATE_CREATED) { 585 if (debug) { 586 System.out.println( "[debug] send: id:" + container.id() ); 587 } 588 589 command.addObjectID( container.id() ); 590 count++; 591 } 592 } 593 594 if (count > 0) { 596 DxMap map = (DxMap)delegate.sendCommand( command, true ); 597 598 it = map.iterator(); 599 Long modTime = null; 600 while ((modTime = (Long )it.next()) != null) { 601 ObjectID id = (ObjectID)it.key(); 602 603 if (debug) { 604 System.out.println( "[debug] receive: id:" + id + ", modTime:" + modTime ); 605 } 606 607 CacheObjectContainer container = (CacheObjectContainer)idTable.elementForKey( id ); 608 container.setModTime( modTime.longValue() ); 609 } 610 } 611 } catch (Exception e) { 612 throw new RuntimeException ( e.toString() ); 615 } 616 } 617 618 619 623 protected synchronized void ensureSpace( long neededSpace ) { 624 if (freeMemory() < neededSpace) { 625 626 DxMap priorityQueue = new DxTreeMap(); 628 DxIterator it = idTable.iterator(); 629 while (it.next() != null) { 630 CacheObjectContainer container = (CacheObjectContainer)it.object(); 631 priorityQueue.addForKey( container, new Long ( container.lastTouched() ) ); 632 } 633 634 it = priorityQueue.iterator(); 637 CacheObjectContainer container = (CacheObjectContainer)it.next(); 638 while (freeMemory() < neededSpace && container != null) { 639 640 for (int i = 0; i < 100 && container != null; i++) { 641 if (container.tx == null) { 642 idTable.removeForKey( container.id() ); 643 container = (CacheObjectContainer)it.next(); 644 } else { 645 System.out.println( "[debug] ensureSpace(): trying to free locked container." ); 646 } 647 } 648 System.gc(); 649 } 650 } 651 } 652 653 654 656 657 661 protected void calcMemory() { 662 Runtime rt = Runtime.getRuntime(); 663 try { 664 DxBag bag = new DxArrayBag(); 665 for (;;) { 666 bag.add( new byte[100000] ); 667 } 668 } catch (OutOfMemoryError e) { 669 totalMemory = rt.totalMemory(); 670 rt.gc(); 671 } 672 } 673 674 675 685 public long freeMemory() { 686 Runtime rt = Runtime.getRuntime(); 687 long hiddenMemory = totalMemory - rt.totalMemory(); 688 689 return Math.max( rt.freeMemory() + hiddenMemory - 2000000L, 0 ); 691 } 692 693 } 694 | Popular Tags |