1 21 22 package com.rift.coad.lib.bean; 24 25 import java.util.ArrayList ; 27 import java.util.Date ; 28 import java.util.Iterator ; 29 import java.util.Map ; 30 import java.util.HashMap ; 31 import java.util.List ; 32 import java.util.LinkedHashMap ; 33 import java.util.LinkedHashSet ; 34 import java.util.Set ; 35 import java.util.concurrent.ConcurrentHashMap ; 36 import javax.rmi.PortableRemoteObject ; 37 import javax.transaction.xa.XAException ; 38 import javax.transaction.xa.XAResource ; 39 import javax.transaction.xa.Xid ; 40 41 import org.apache.log4j.Logger; 43 44 import com.rift.coad.lib.cache.Cache; 46 import com.rift.coad.lib.cache.CacheEntry; 47 import com.rift.coad.lib.configuration.ConfigurationFactory; 48 import com.rift.coad.lib.configuration.Configuration; 49 import com.rift.coad.lib.thread.ThreadStateMonitor; 50 import com.rift.coad.util.lock.LockRef; 51 import com.rift.coad.util.lock.ObjectLockFactory; 52 import com.rift.coad.util.transaction.TransactionManager; 53 54 59 public class TransactionBeanCache implements Cache,XAResource { 60 61 64 public class ChangeEntry { 65 private Object key = null; 67 private Object value = null; 68 private byte changeType = 0; 69 70 71 78 public ChangeEntry(Object key, Object value, byte changeType) { 79 this.key = key; 80 this.value = value; 81 this.changeType = changeType; 82 } 83 84 85 90 public Object getKey() { 91 return key; 92 } 93 94 95 100 public Object getValue() { 101 return value; 102 } 103 104 105 110 public byte getChangeType() { 111 return changeType; 112 } 113 } 114 115 118 public class Changes { 119 private Xid transactionId = null; 121 private List locks = new ArrayList (); 122 private List changesEntries = new ArrayList (); 123 124 125 130 public Changes(Xid transactionId) { 131 this.transactionId = transactionId; 132 } 133 134 135 140 public void addLock(LockRef lock) throws BeanException { 141 locks.add(lock); 142 } 143 144 145 152 public void addEntry(Object key, Object value) throws 153 BeanException { 154 changesEntries.add(new ChangeEntry(key,value,ADD)); 155 } 156 157 158 165 public void addRemoveEntry(Object key, Object value) throws 166 BeanException { 167 changesEntries.add(new ChangeEntry(key,value,REMOVE)); 168 } 169 170 171 176 public List getChangeEntries() { 177 return changesEntries; 178 } 179 180 181 186 public List getLocks() { 187 return locks; 188 } 189 } 190 191 private final static String CACHE_EXPIRY_TIME = "bean_cache_expiry"; 193 private final static long CACHE_EXPIRY_TIME_DEFAULT = 30 * 60 * 1000; 194 private final static byte ADD = 1; 195 private final static byte UPDATE = 2; 196 private final static byte REMOVE = 3; 197 198 protected Logger log = 200 Logger.getLogger(TransactionBeanCache.class.getName()); 201 202 private long defaultCacheExpiryTime = 0; 204 private ThreadStateMonitor status = new ThreadStateMonitor(); 205 private ThreadLocal currentTransaction = new ThreadLocal (); 206 private Map keyLockMap = new HashMap (); 207 private Map baseCacheEntries = new ConcurrentHashMap (); 208 private Map transactionChanges = new ConcurrentHashMap (); 209 210 215 public TransactionBeanCache() throws BeanException { 216 try { 217 Configuration config = ConfigurationFactory.getInstance(). 218 getConfig(BeanCache.class); 219 defaultCacheExpiryTime = config.getLong(CACHE_EXPIRY_TIME, 220 CACHE_EXPIRY_TIME_DEFAULT); 221 } catch (Exception ex) { 222 log.error("Failed to start the TransactionBeanCache object " + 223 "because : " + ex.getMessage(),ex); 224 throw new BeanException( 225 "Failed to start the TransactionBeanCache object " + 226 "because : " + ex.getMessage(),ex); 227 } 228 } 229 230 231 234 public void garbageCollect() { 235 Map entries = new HashMap (); 237 entries.putAll(this.baseCacheEntries); 238 239 Date expiryDate = new Date (); 241 for (Iterator iter = entries.keySet().iterator();iter.hasNext();) { 242 Object cacheKey = iter.next(); 243 LockRef lockRef = null; 244 try { 245 lockRef = getLockRef(cacheKey); 246 } catch (Exception ex) { 247 log.error("Failed to aquire lock on [" + cacheKey 248 + "] because :" + ex.getMessage(),ex); 249 continue; 250 } 251 try { 252 BeanCacheEntry beanCacheEntry = 253 (BeanCacheEntry)entries.get(cacheKey); 254 if (beanCacheEntry.isExpired(expiryDate)) { 255 if (beanCacheEntry.getCacheEntry() != null) { 256 try { 257 PortableRemoteObject.unexportObject( 258 (java.rmi.Remote ) 259 beanCacheEntry.getCacheEntry()); 260 this.baseCacheEntries.remove(cacheKey); 261 beanCacheEntry.cacheRelease(); 262 } catch (java.rmi.NoSuchObjectException ex) { 263 log.warn("The object was never exported : " + 264 ex.getMessage(),ex); 265 synchronized(entries) { 267 this.baseCacheEntries.remove(cacheKey); 268 } 269 beanCacheEntry.cacheRelease(); 270 } catch (Exception ex) { 271 log.error("Failed to un-export this object : " + 272 ex.getMessage(),ex); 273 } 274 } else { 275 this.baseCacheEntries.remove(cacheKey); 278 beanCacheEntry.cacheRelease(); 279 } 280 } 281 } finally { 282 try { 283 lockRef.release(); 284 } catch (Exception ex) { 285 log.error("Failed to release the lock on [" + cacheKey 286 + "] because :" + ex.getMessage(),ex); 287 } 288 } 289 } 290 } 291 292 293 296 public void clear() { 297 LockRef lockRef = null; 298 try { 299 lockRef = ObjectLockFactory.getInstance().acquireReadLock(this); 300 status.terminate(false); 302 Map entries = new HashMap (); 303 entries.putAll(this.baseCacheEntries); 304 this.baseCacheEntries.clear(); 305 306 for (Iterator iter = entries.keySet().iterator();iter.hasNext();) { 308 Object cacheKey = iter.next(); 309 BeanCacheEntry beanCacheEntry = 310 (BeanCacheEntry)entries.get(cacheKey); 311 if (beanCacheEntry.getCacheEntry() != null) 312 { 313 try { 314 PortableRemoteObject.unexportObject( 315 (java.rmi.Remote ) 316 beanCacheEntry.getCacheEntry()); 317 } catch (java.rmi.NoSuchObjectException ex) { 318 log.warn("The cache object was not bound : " + 319 ex.getMessage(),ex); 320 } catch (Exception ex) { 321 log.error("Failed to un-export the cached object : " + 322 ex.getMessage(),ex); 323 } 324 } 325 beanCacheEntry.cacheRelease(); 326 } 327 328 } catch (Exception ex) { 329 log.error("Failed to clear the bean cache : " + ex.getMessage(),ex); 330 } finally { 331 try { 332 lockRef.release(); 333 } catch (Exception ex) { 334 log.error("Failed to release the write lock : " + 335 ex.getMessage(),ex); 336 } 337 } 338 } 339 340 341 347 public boolean contains(Object cacheKey) { 348 try { 349 TransactionManager.getInstance().bindResource(this,false); 350 getLock(cacheKey); 351 return baseCacheEntries.containsKey(cacheKey); 352 } catch (Exception ex) { 353 log.error( 354 "Failed to retrieve the cache entries : " + ex.getMessage(), 355 ex); 356 return false; 357 } 358 } 359 360 361 368 public BeanCacheEntry getCacheEntry(Object cacheKey) throws BeanException { 369 try { 370 TransactionManager.getInstance().bindResource(this,false); 371 getLock(cacheKey); 372 BeanCacheEntry beanCacheEntry = 373 (BeanCacheEntry)baseCacheEntries.get(cacheKey); 374 if (beanCacheEntry != null) { 375 beanCacheEntry.touch(); 376 } 377 return beanCacheEntry; 378 } catch (Exception ex) { 379 throw new BeanException("Failed to retrieve the cache entries : " 380 + ex.getMessage(),ex); 381 } 382 } 383 384 385 392 public void addCacheEntry(long timeout,Object cacheKey, Object wrappedObject, 393 CacheEntry entry) throws BeanException { 394 try { 395 TransactionManager.getInstance().bindResource(this,false); 396 getLock(cacheKey); 397 if (baseCacheEntries.containsKey(cacheKey)) { 398 throw new BeanException("Entry is already in the cache."); 399 } 400 long cacheTimeout = timeout; 401 if (timeout == -1) { 402 cacheTimeout = defaultCacheExpiryTime; 403 } 404 BeanCacheEntry beanCacheEntry = new BeanCacheEntry(cacheTimeout, 405 cacheKey, wrappedObject, entry); 406 baseCacheEntries.put(cacheKey, beanCacheEntry); 407 Changes changes = (Changes)transactionChanges.get( 408 currentTransaction.get()); 409 changes.addEntry(cacheKey,beanCacheEntry); 410 } catch (BeanException ex) { 411 throw ex; 412 } catch (Exception ex) { 413 throw new BeanException("Failed to add a cache entrie : " 414 + ex.getMessage(),ex); 415 } 416 } 417 418 419 427 public void addCacheEntry(long timeout, Object cacheKey, Object wrappedObject, 428 Object proxy, CacheEntry handle) throws BeanException { 429 try { 430 TransactionManager.getInstance().bindResource(this,false); 431 getLock(cacheKey); 432 if (baseCacheEntries.containsKey(cacheKey)) { 433 throw new BeanException("Entry is already in the cache."); 434 } 435 long cacheTimeout = timeout; 436 if (timeout == -1) { 437 cacheTimeout = defaultCacheExpiryTime; 438 } 439 BeanCacheEntry beanCacheEntry = new BeanCacheEntry(cacheTimeout, 440 cacheKey, wrappedObject, proxy, handle); 441 442 baseCacheEntries.put(cacheKey, beanCacheEntry); 443 Changes changes = (Changes)transactionChanges.get( 444 currentTransaction.get()); 445 changes.addEntry(cacheKey,beanCacheEntry); 446 } catch (BeanException ex) { 447 throw ex; 448 } catch (Exception ex) { 449 throw new BeanException("Failed to add a cache entrie : " 450 + ex.getMessage(),ex); 451 } 452 } 453 454 455 460 public void removeCacheEntry(Object cacheKey) throws BeanException { 461 try { 462 TransactionManager.getInstance().bindResource(this,false); 463 getLock(cacheKey); 464 Object entry = baseCacheEntries.get(cacheKey); 465 baseCacheEntries.remove(cacheKey); 466 Changes changes = (Changes)transactionChanges.get( 467 currentTransaction.get()); 468 changes.addRemoveEntry(cacheKey,entry); 469 } catch (Exception ex) { 470 throw new BeanException("Failed to remove a cache entrie : " 471 + ex.getMessage(),ex); 472 } 473 } 474 475 476 483 public synchronized void commit(Xid xid, boolean b) throws XAException { 484 try { 485 if (this.status.isTerminated()) { 486 log.error("Commit called on terminated cache, ignoring."); 487 return; 488 } 489 Changes changes = (Changes)transactionChanges.get(xid); 490 for (Iterator iter = changes.getLocks().iterator(); 491 iter.hasNext();) { 492 LockRef lockRef = (LockRef)iter.next(); 493 lockRef.release(); 494 } 495 transactionChanges.remove(xid); 496 } catch (Exception ex) { 497 log.error("Failed to commit the changes : " + 498 ex.getMessage(),ex); 499 throw new XAException ("Failed to commit the changes : " + 500 ex.getMessage()); 501 } 502 } 503 504 505 512 public void end(Xid xid, int i) throws XAException { 513 } 514 515 516 522 public void forget(Xid xid) throws XAException { 523 try { 524 if (this.status.isTerminated()) { 525 log.error("Commit called on terminated cache, ignoring."); 526 return; 527 } 528 Changes changes = (Changes)transactionChanges.get(xid); 529 for (Iterator iter = changes.getLocks().iterator(); 530 iter.hasNext();) { 531 LockRef lockRef = (LockRef)iter.next(); 532 lockRef.release(); 533 } 534 transactionChanges.remove(xid); 535 } catch (Exception ex) { 536 log.error("Failed to forget the changes : " + 537 ex.getMessage(),ex); 538 throw new XAException ("Failed to forget the changes : " + 539 ex.getMessage()); 540 } 541 } 542 543 544 550 public int getTransactionTimeout() throws XAException { 551 return -1; 552 } 553 554 555 563 public boolean isSameRM(XAResource xAResource) throws XAException { 564 return this == xAResource; 565 } 566 567 568 575 public int prepare(Xid xid) throws XAException { 576 return XAResource.XA_OK; 577 } 578 579 580 588 public Xid [] recover(int i) throws XAException { 589 return null; 590 } 591 592 593 599 public void rollback(Xid xid) throws XAException { 600 try { 601 if (this.status.isTerminated()) { 602 log.error("Commit called on terminated cache, ignoring."); 603 return; 604 } 605 Changes changes = (Changes)transactionChanges.get(xid); 606 List changeEntries = changes.getChangeEntries(); 607 for (int index = 0; index < changeEntries.size(); index++) { 608 ChangeEntry changeEntry = (ChangeEntry)changeEntries.get( 609 changeEntries.size() - (index + 1)); 610 if (changeEntry.getChangeType() == ADD) { 611 BeanCacheEntry beanCacheEntry = 612 (BeanCacheEntry)changeEntry.getValue(); 613 if (beanCacheEntry.getCacheEntry() != null) { 614 try { 615 PortableRemoteObject.unexportObject( 616 (java.rmi.Remote ) 617 beanCacheEntry.getCacheEntry()); 618 beanCacheEntry.cacheRelease(); 619 } catch (java.rmi.NoSuchObjectException ex) { 620 log.warn("The object was never exported : " + 621 ex.getMessage(),ex); 622 beanCacheEntry.cacheRelease(); 623 } catch (Exception ex) { 624 log.error("Failed to un-export this object : " + 625 ex.getMessage(),ex); 626 beanCacheEntry.cacheRelease(); 627 } 628 } else { 629 beanCacheEntry.cacheRelease(); 632 } 633 baseCacheEntries.remove(changeEntry.getKey()); 634 635 } else if (changeEntry.getChangeType() == REMOVE) { 636 baseCacheEntries.put(changeEntry.getKey(), 637 changeEntry.getValue()); 638 } 639 } 640 641 for (Iterator iter = changes.getLocks().iterator(); 642 iter.hasNext();) { 643 LockRef lockRef = (LockRef)iter.next(); 644 lockRef.release(); 645 } 646 transactionChanges.remove(xid); 647 } catch (Exception ex) { 648 log.error("Failed to rollback the changes : " + 649 ex.getMessage(),ex); 650 throw new XAException ("Failed to rollback the changes : " + 651 ex.getMessage()); 652 } 653 } 654 655 656 663 public boolean setTransactionTimeout(int i) throws XAException { 664 return true; 665 } 666 667 668 675 public void start(Xid xid, int i) throws XAException { 676 try { 677 checkStatus(); 678 if (!transactionChanges.containsKey(xid)) { 679 transactionChanges.put(xid,new Changes(xid)); 680 } 681 currentTransaction.set(xid); 682 } catch (Exception ex) { 683 log.error("Cannot start a transaction because : " + 684 ex.getMessage(),ex); 685 throw new XAException ("Cannot start a transaction because : " + 686 ex.getMessage()); 687 } 688 } 689 690 691 694 private void checkStatus() throws BeanException { 695 if (status.isTerminated()) { 696 throw new BeanException("Bean cache has been terminated."); 697 } 698 } 699 700 701 708 private void getLock(Object name) throws BeanException { 709 try { 710 Object key = null; 711 synchronized(keyLockMap) { 712 if (keyLockMap.containsKey(name)) { 713 key = keyLockMap.get(name); 714 } else { 715 key = name.toString(); 716 keyLockMap.put(name,key); 717 } 718 } 719 LockRef lockRef = 720 ObjectLockFactory.getInstance().acquireWriteLock(key, 721 currentTransaction.get()); 722 Changes changes = (Changes)transactionChanges.get( 723 currentTransaction.get()); 724 changes.addLock(lockRef); 725 } catch (Exception ex) { 726 log.error("Failed to retrieve a lock on the bean cache entry : " + 727 ex.getMessage(),ex); 728 throw new BeanException 729 ("Failed to retrieve a lock on the bean cache entry : " + 730 ex.getMessage(),ex); 731 } 732 } 733 734 735 742 private LockRef getLockRef(Object name) throws BeanException { 743 try { 744 Object key = null; 745 synchronized(keyLockMap) { 746 if (keyLockMap.containsKey(name)) { 747 key = keyLockMap.get(name); 748 } else { 749 key = name.toString(); 750 keyLockMap.put(name,key); 751 } 752 } 753 return ObjectLockFactory.getInstance().acquireWriteLock(key); 754 } catch (Exception ex) { 755 log.error("Failed to retrieve a lock on the bean cache entry : " + 756 ex.getMessage(),ex); 757 throw new BeanException 758 ("Failed to retrieve a lock on the bean cache entry : " + 759 ex.getMessage(),ex); 760 } 761 } 762 } 763 | Popular Tags |