1 23 24 package org.apache.commons.transaction.memory; 25 26 import java.io.PrintWriter ; 27 import java.util.HashSet ; 28 import java.util.Iterator ; 29 import java.util.Map ; 30 import java.util.Set ; 31 import java.util.Collections ; 32 33 import org.apache.commons.transaction.locking.ReadWriteLock; 34 import org.apache.commons.transaction.util.LoggerFacade; 35 import org.apache.commons.transaction.util.PrintWriterLogger; 36 37 58 public class OptimisticMapWrapper extends TransactionalMapWrapper { 59 60 protected static final int COMMIT_TIMEOUT = 1000 * 60; protected static final int ACCESS_TIMEOUT = 1000 * 30; 63 protected Set activeTransactions; 64 65 protected LoggerFacade logger; 66 67 protected ReadWriteLock commitLock; 68 69 75 public OptimisticMapWrapper(Map wrapped) { 76 this(wrapped, new HashMapFactory(), new HashSetFactory()); 77 } 78 79 87 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory) { 88 this(wrapped, mapFactory, setFactory, new PrintWriterLogger(new PrintWriter (System.out), 89 OptimisticMapWrapper.class.getName(), false)); 90 } 91 92 102 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory, LoggerFacade logger) { 103 super(wrapped, mapFactory, setFactory); 104 activeTransactions = Collections.synchronizedSet(new HashSet ()); 105 this.logger = logger; 106 commitLock = new ReadWriteLock("COMMIT", logger); 107 } 108 109 110 public void startTransaction() { 111 if (getActiveTx() != null) { 112 throw new IllegalStateException ( 113 "Active thread " + Thread.currentThread() + " already associated with a transaction!"); 114 } 115 CopyingTxContext context = new CopyingTxContext(); 116 activeTransactions.add(context); 117 setActiveTx(context); 118 } 119 120 public void rollbackTransaction() { 121 TxContext txContext = getActiveTx(); 122 super.rollbackTransaction(); 123 activeTransactions.remove(txContext); 124 } 125 126 public void commitTransaction() throws ConflictException { 127 commitTransaction(false); 128 } 129 130 public void commitTransaction(boolean force) throws ConflictException { 131 TxContext txContext = getActiveTx(); 132 133 if (txContext == null) { 134 throw new IllegalStateException ( 135 "Active thread " + Thread.currentThread() + " not associated with a transaction!"); 136 } 137 138 if (txContext.status == STATUS_MARKED_ROLLBACK) { 139 throw new IllegalStateException ("Active thread " + Thread.currentThread() + " is marked for rollback!"); 140 } 141 142 try { 143 commitLock.acquireWrite(txContext, COMMIT_TIMEOUT); 146 147 if (!force) { 148 Object conflictKey = checkForConflicts(); 149 if (conflictKey != null) { 150 throw new ConflictException(conflictKey); 151 } 152 } 153 154 activeTransactions.remove(txContext); 155 copyChangesToConcurrentTransactions(); 156 super.commitTransaction(); 157 158 } catch (InterruptedException e) { 159 throw new ConflictException(e); 161 } finally { 162 commitLock.release(txContext); 163 } 164 } 165 166 public Object checkForConflicts() { 168 CopyingTxContext txContext = (CopyingTxContext) getActiveTx(); 169 170 Set keys = txContext.changedKeys(); 171 Set externalKeys = txContext.externalChangedKeys(); 172 173 for (Iterator it2 = keys.iterator(); it2.hasNext();) { 174 Object key = it2.next(); 175 if (externalKeys.contains(key)) { 176 return key; 177 } 178 } 179 return null; 180 } 181 182 protected void copyChangesToConcurrentTransactions() { 183 CopyingTxContext thisTxContext = (CopyingTxContext) getActiveTx(); 184 185 for (Iterator it = activeTransactions.iterator(); it.hasNext();) { 186 CopyingTxContext otherTxContext = (CopyingTxContext) it.next(); 187 188 if (otherTxContext.cleared) 190 continue; 191 192 if (thisTxContext.cleared) { 193 otherTxContext.externalChanges.putAll(wrapped); 195 } else { 197 198 for (Iterator it2 = thisTxContext.changes.entrySet().iterator(); it2.hasNext();) { 199 Map.Entry entry = (Map.Entry ) it2.next(); 200 Object value = wrapped.get(entry.getKey()); 201 if (value != null) { 202 otherTxContext.externalChanges.put(entry.getKey(), value); 204 } else { 205 otherTxContext.externalDeletes.add(entry.getKey()); 207 } 208 } 209 210 for (Iterator it2 = thisTxContext.deletes.iterator(); it2.hasNext();) { 211 Object key = it2.next(); 213 Object value = wrapped.get(key); 214 otherTxContext.externalChanges.put(key, value); 215 } 216 } 217 } 218 219 } 220 221 public class CopyingTxContext extends TxContext { 222 protected Map externalChanges; 223 protected Map externalAdds; 224 protected Set externalDeletes; 225 226 protected CopyingTxContext() { 227 super(); 228 externalChanges = mapFactory.createMap(); 229 externalDeletes = setFactory.createSet(); 230 externalAdds = mapFactory.createMap(); 231 } 232 233 protected Set externalChangedKeys() { 234 Set keySet = new HashSet (); 235 keySet.addAll(externalDeletes); 236 keySet.addAll(externalChanges.keySet()); 237 keySet.addAll(externalAdds.keySet()); 238 return keySet; 239 } 240 241 protected Set changedKeys() { 242 Set keySet = new HashSet (); 243 keySet.addAll(deletes); 244 keySet.addAll(changes.keySet()); 245 keySet.addAll(adds.keySet()); 246 return keySet; 247 } 248 249 protected Set keys() { 250 try { 251 commitLock.acquireRead(this, ACCESS_TIMEOUT); 252 Set keySet = super.keys(); 253 keySet.removeAll(externalDeletes); 254 keySet.addAll(externalAdds.keySet()); 255 return keySet; 256 } catch (InterruptedException e) { 257 return null; 258 } finally { 259 commitLock.release(this); 260 } 261 } 262 263 protected Object get(Object key) { 264 try { 265 commitLock.acquireRead(this, ACCESS_TIMEOUT); 266 267 if (deletes.contains(key)) { 268 return null; 270 } 271 272 Object changed = changes.get(key); 273 if (changed != null) { 274 return changed; 275 } 276 277 Object added = adds.get(key); 278 if (added != null) { 279 return added; 280 } 281 282 if (cleared) { 283 return null; 284 } else { 285 if (externalDeletes.contains(key)) { 286 return null; 288 } 289 290 changed = externalChanges.get(key); 291 if (changed != null) { 292 return changed; 293 } 294 295 added = externalAdds.get(key); 296 if (added != null) { 297 return added; 298 } 299 300 return wrapped.get(key); 302 } 303 } catch (InterruptedException e) { 304 return null; 305 } finally { 306 commitLock.release(this); 307 } 308 } 309 310 protected void put(Object key, Object value) { 311 try { 312 commitLock.acquireRead(this, ACCESS_TIMEOUT); 313 super.put(key, value); 314 } catch (InterruptedException e) { 315 } finally { 316 commitLock.release(this); 317 } 318 } 319 320 protected void remove(Object key) { 321 try { 322 commitLock.acquireRead(this, ACCESS_TIMEOUT); 323 super.remove(key); 324 } catch (InterruptedException e) { 325 } finally { 326 commitLock.release(this); 327 } 328 } 329 330 protected int size() { 331 try { 332 commitLock.acquireRead(this, ACCESS_TIMEOUT); 333 int size = super.size(); 334 335 size -= externalDeletes.size(); 336 size += externalAdds.size(); 337 338 return size; 339 } catch (InterruptedException e) { 340 return -1; 341 } finally { 342 commitLock.release(this); 343 } 344 } 345 346 protected void clear() { 347 try { 348 commitLock.acquireRead(this, ACCESS_TIMEOUT); 349 super.clear(); 350 externalDeletes.clear(); 351 externalChanges.clear(); 352 externalAdds.clear(); 353 } catch (InterruptedException e) { 354 } finally { 355 commitLock.release(this); 356 } 357 } 358 359 protected void merge() { 360 try { 361 commitLock.acquireRead(this, ACCESS_TIMEOUT); 362 super.merge(); 363 } catch (InterruptedException e) { 364 } finally { 365 commitLock.release(this); 366 } 367 } 368 369 protected void dispose() { 370 try { 371 commitLock.acquireRead(this, ACCESS_TIMEOUT); 372 super.dispose(); 373 setFactory.disposeSet(externalDeletes); 374 externalDeletes = null; 375 mapFactory.disposeMap(externalChanges); 376 externalChanges = null; 377 mapFactory.disposeMap(externalAdds); 378 externalAdds = null; 379 } catch (InterruptedException e) { 380 } finally { 381 commitLock.release(this); 382 } 383 } 384 385 protected void finalize() throws Throwable { 386 activeTransactions.remove(this); 387 super.finalize(); 388 } 389 } 390 } 391 | Popular Tags |