1 21 22 package com.rift.coad.lib.deployment.rmi; 24 25 import java.util.ArrayList ; 27 import java.util.Date ; 28 import java.util.HashMap ; 29 import java.util.Iterator ; 30 import java.util.List ; 31 import java.util.Map ; 32 import java.util.concurrent.ConcurrentHashMap ; 33 import javax.rmi.PortableRemoteObject ; 34 import javax.transaction.xa.XAException ; 35 import javax.transaction.xa.XAResource ; 36 import javax.transaction.xa.Xid ; 37 38 import org.apache.log4j.Logger; 40 41 import com.rift.coad.lib.cache.Cache; 43 import com.rift.coad.lib.cache.CacheEntry; 44 import com.rift.coad.lib.common.RandomGuid; 45 import com.rift.coad.lib.configuration.ConfigurationFactory; 46 import com.rift.coad.lib.configuration.Configuration; 47 import com.rift.coad.lib.thread.ThreadStateMonitor; 48 import com.rift.coad.util.lock.LockRef; 49 import com.rift.coad.util.lock.ObjectLockFactory; 50 import com.rift.coad.util.transaction.TransactionManager; 51 52 58 public class TransactionRMICache implements Cache,XAResource { 59 60 63 public class ChangeEntry { 64 private Xid transactionId = null; 66 private List newEntries = new ArrayList (); 67 68 69 74 public ChangeEntry(Xid transactionId) { 75 this.transactionId = transactionId; 76 } 77 78 79 84 public Xid getTransactionId() { 85 return transactionId; 86 } 87 88 89 94 public void addEntry(RMICacheEntry entry) { 95 newEntries.add(entry); 96 } 97 98 99 102 public List getEntries() { 103 return newEntries; 104 } 105 } 106 107 108 private final static String CACHE_EXPIRY_TIME = "rmi_cache_expiry"; 110 private final static long CACHE_EXPIRY_TIME_DEFAULT = 60 * 1000; 111 112 protected static Logger log = 114 Logger.getLogger(TransactionRMICache.class.getName()); 115 116 private ThreadLocal currentTransaction = new ThreadLocal (); 118 private Map cacheEntries = new ConcurrentHashMap (); 119 private Map transactionChanges = new ConcurrentHashMap (); 120 private long defaultCacheExpiryTime = 0; 121 private ThreadStateMonitor status = new ThreadStateMonitor(); 122 123 124 127 public TransactionRMICache() { 128 } 129 130 131 134 public void garbageCollect() { 135 Map currentEntryList = new HashMap (); 136 synchronized (cacheEntries) { 137 currentEntryList.putAll(cacheEntries); 138 } 139 Date expiryDate = new Date (); 140 for (Iterator iter = currentEntryList.keySet().iterator(); iter.hasNext();) { 141 Object key = iter.next(); 142 RMICacheEntry cacheEntry = (RMICacheEntry)currentEntryList.get(key); 143 if (cacheEntry.isExpired(expiryDate)) { 144 try { 145 PortableRemoteObject.unexportObject( 146 cacheEntry.getRemoteInterface()); 147 synchronized(cacheEntries) { 148 cacheEntries.remove(key); 149 } 150 cacheEntry.cacheRelease(); 151 } catch (java.rmi.NoSuchObjectException ex) { 152 log.warn("The object was never exported : " + 153 ex.getMessage(),ex); 154 synchronized(cacheEntries) { 156 cacheEntries.remove(key); 157 } 158 cacheEntry.cacheRelease(); 159 } catch (Exception ex) { 160 log.error("Failed to remove a cache entry because : " + 161 ex.getMessage(),ex); 162 } 163 } 164 } 165 } 166 167 168 171 public void clear() { 172 LockRef lockRef = null; 173 try { 174 lockRef = ObjectLockFactory.getInstance().acquireReadLock(this); 175 status.terminate(false); 176 Map currentEntryList = new HashMap (); 177 synchronized (cacheEntries) { 178 currentEntryList.putAll(cacheEntries); 179 cacheEntries.clear(); 180 } 181 for (Iterator iter = currentEntryList.keySet().iterator(); 182 iter.hasNext();) { 183 Object key = iter.next(); 184 RMICacheEntry cacheEntry = 185 (RMICacheEntry)currentEntryList.get(key); 186 try { 187 PortableRemoteObject.unexportObject( 188 cacheEntry.getRemoteInterface()); 189 } catch (java.rmi.NoSuchObjectException ex) { 190 log.warn("The object was never exported : " + 191 ex.getMessage(),ex); 192 } catch (Exception ex) { 193 log.error("Failed to remove a cache entry because : " + 194 ex.getMessage(),ex); 195 } 196 try { 197 cacheEntry.cacheRelease(); 198 } catch (Exception ex) { 199 log.error("Failed to release cache info : " + 200 ex.getMessage(),ex); 201 } 202 } 203 } catch (Exception ex) { 204 log.error("Failed clear the cache : " + ex.getMessage(),ex); 205 } finally { 206 try { 207 if (lockRef != null) { 208 lockRef.release(); 209 } 210 } catch (Exception ex) { 211 log.error("Failed to release the lock : " + 212 ex.getMessage(),ex); 213 } 214 } 215 } 216 217 218 223 public void addCacheEntry(long timeout, CacheEntry entry) throws 224 RMIException { 225 try { 226 checkStatus(); 227 TransactionManager.getInstance().bindResource(this,false); 228 long cacheTimeout = timeout; 229 if (timeout == -1) { 230 cacheTimeout = defaultCacheExpiryTime; 231 } 232 RMICacheEntry newEntry = new RMICacheEntry(cacheTimeout,entry); 233 ChangeEntry changeEntry = (ChangeEntry)transactionChanges.get( 234 currentTransaction.get()); 235 changeEntry.addEntry(newEntry); 236 } catch (Exception ex) { 237 log.error("Failed to add the cache entry : " + 238 ex.getMessage(),ex); 239 throw new RMIException("Failed to add the cache entry : " + 240 ex.getMessage(),ex); 241 } 242 } 243 244 245 250 private void checkStatus() throws RMIException { 251 if (status.isTerminated()) { 252 throw new RMIException("The RMI cache has been shut down."); 253 } 254 } 255 256 257 263 public boolean contains(Object cacheEntry) { 264 if (!status.isTerminated()) { 265 return cacheEntries.containsKey(cacheEntry); 266 } 267 return false; 268 } 269 270 271 278 public void commit(Xid xid, boolean b) throws XAException { 279 try { 280 ChangeEntry changeEntry = (ChangeEntry)transactionChanges.get(xid); 281 List entries = changeEntry.getEntries(); 282 for (Iterator iter = entries.iterator(); iter.hasNext();) { 283 RMICacheEntry rmiCacheEntry = (RMICacheEntry)iter.next(); 284 cacheEntries.put(rmiCacheEntry.getCacheEntry(),rmiCacheEntry); 285 } 286 } catch (Exception ex) { 287 log.error("Failed to commit the changes : " + ex.getMessage(),ex); 288 throw new XAException ("Failed to commit the changes : " + 289 ex.getMessage()); 290 } 291 } 292 293 294 301 public void end(Xid xid, int i) throws XAException { 302 } 303 304 305 311 public void forget(Xid xid) throws XAException { 312 try { 313 transactionChanges.remove(xid); 314 } catch (Exception ex) { 315 log.error("Failed to forget the changes : " + ex.getMessage(),ex); 316 throw new XAException ("Failed to forget the changes : " + 317 ex.getMessage()); 318 } 319 } 320 321 322 328 public int getTransactionTimeout() throws XAException { 329 return -1; 330 } 331 332 333 341 public boolean isSameRM(XAResource xAResource) throws XAException { 342 return this == xAResource; 343 } 344 345 346 353 public int prepare(Xid xid) throws XAException { 354 return XAResource.XA_OK; 355 } 356 357 358 366 public Xid [] recover(int i) throws XAException { 367 return null; 368 } 369 370 371 377 public void rollback(Xid xid) throws XAException { 378 try { 379 ChangeEntry changeEntry = (ChangeEntry)transactionChanges.get(xid); 380 List entries = changeEntry.getEntries(); 381 for (Iterator iter = entries.iterator(); iter.hasNext();) { 382 RMICacheEntry rmiCacheEntry = (RMICacheEntry)iter.next(); 383 try { 384 PortableRemoteObject.unexportObject( 385 rmiCacheEntry.getRemoteInterface()); 386 } catch (java.rmi.NoSuchObjectException ex) { 387 log.warn("The object was never exported : " + 388 ex.getMessage(),ex); 389 } catch (Exception ex) { 390 log.error("Failed to rollback a cache entry because : " + 391 ex.getMessage(),ex); 392 } 393 try { 394 rmiCacheEntry.cacheRelease(); 395 } catch (Exception ex) { 396 log.error("Failed to release the entry"); 397 } 398 } 399 transactionChanges.remove(xid); 400 } catch (Exception ex) { 401 log.error("Failed to rollback the changes : " + ex.getMessage(),ex); 402 throw new XAException ("Failed to rollback the changes : " + 403 ex.getMessage()); 404 } 405 } 406 407 408 415 public boolean setTransactionTimeout(int i) throws XAException { 416 return true; 417 } 418 419 420 427 public void start(Xid xid, int i) throws XAException { 428 if (!transactionChanges.containsKey(xid)) { 429 transactionChanges.put(xid,new ChangeEntry(xid)); 430 } 431 currentTransaction.set(xid); 432 } 433 434 } 435 | Popular Tags |