KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > oracle > toplink > essentials > internal > helper > WriteLockManager


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the "License"). You may not use this file except
5  * in compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * glassfish/bootstrap/legal/CDDLv1.0.txt or
9  * https://glassfish.dev.java.net/public/CDDLv1.0.html.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * HEADER in each file and include the License file at
15  * glassfish/bootstrap/legal/CDDLv1.0.txt. If applicable,
16  * add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your
18  * own identifying information: Portions Copyright [yyyy]
19  * [name of copyright owner]
20  */

21 // Copyright (c) 1998, 2006, Oracle. All rights reserved.
22
package oracle.toplink.essentials.internal.helper;
23
24 import java.util.*;
25
26 import oracle.toplink.essentials.exceptions.ConcurrencyException;
27 import oracle.toplink.essentials.internal.queryframework.ContainerPolicy;
28 import oracle.toplink.essentials.mappings.DatabaseMapping;
29 import oracle.toplink.essentials.descriptors.ClassDescriptor;
30 import oracle.toplink.essentials.internal.sessions.*;
31 import oracle.toplink.essentials.internal.identitymaps.*;
32 import oracle.toplink.essentials.internal.helper.linkedlist.*;
33 import oracle.toplink.essentials.logging.SessionLog;
34
35 /**
36  * INTERNAL:
37  * <p>
38  * <b>Purpose</b>: Acquires all required locks for a particular merge process.
39  * Implements a deadlock avoidance algorithm to prevent concurrent merge conflicts
40  *
41  * <p>
42  * <b>Responsibilities</b>:
43  * <ul>
44  * <li> Acquires locks for writing threads.
45  * <li> Provides deadlock avoidance behaviour.
46  * <li> Releases locks for writing threads.
47  * </ul>
48  * @author Gordon Yorke
49  * @since 10.0.3
50  */

51 public class WriteLockManager {
52
53     /* This attribute stores the list of threads that have had a problem acquiring locks */
54     /* the first element in this list will be the prevailing thread */
55     protected ExposedNodeLinkedList prevailingQueue;
56
57     public WriteLockManager() {
58         this.prevailingQueue = new ExposedNodeLinkedList();
59     }
60
61     // this will allow us to prevent a readlock thread form looping forever.
62
public static int MAXTRIES = 10000;
63
64     /**
65      * INTERNAL:
66      * This method will return once the object is locked and all non-indirect
67      * related objects are also locked.
68      */

69     public Map acquireLocksForClone(Object JavaDoc objectForClone, ClassDescriptor descriptor, Vector primaryKeys, AbstractSession session) {
70         boolean successful = false;
71         TopLinkIdentityHashMap lockedObjects = new TopLinkIdentityHashMap();
72         try {
73             // if the descriptor has indirectin for all mappings then wait as there will be no deadlock risks
74
CacheKey toWaitOn = acquireLockAndRelatedLocks(objectForClone, lockedObjects, primaryKeys, descriptor, session);
75             int tries = 0;
76             while (toWaitOn != null) {// loop until we've tried too many times.
77
for (Iterator lockedList = lockedObjects.values().iterator(); lockedList.hasNext();) {
78                     ((CacheKey)lockedList.next()).releaseReadLock();
79                     lockedList.remove();
80                 }
81                 synchronized (toWaitOn.getMutex()) {
82                     try {
83                         if (toWaitOn.isAcquired()) {//last minute check to insure it is still locked.
84
toWaitOn.getMutex().wait();// wait for lock on object to be released
85
}
86                     } catch (InterruptedException JavaDoc ex) {
87                         // Ignore exception thread should continue.
88
}
89                 }
90                 toWaitOn = acquireLockAndRelatedLocks(objectForClone, lockedObjects, primaryKeys, descriptor, session);
91                 if ((toWaitOn != null) && ((++tries) > MAXTRIES)) {
92                     // If we've tried too many times abort.
93
throw ConcurrencyException.maxTriesLockOnCloneExceded(objectForClone);
94                 }
95             }
96             successful = true;//successfully acquired all locks
97
} finally {
98             if (!successful) {//did not acquire locks but we are exiting
99
for (Iterator lockedList = lockedObjects.values().iterator(); lockedList.hasNext();) {
100                     ((CacheKey)lockedList.next()).releaseReadLock();
101                     lockedList.remove();
102                 }
103             }
104         }
105         return lockedObjects;
106     }
107
108     /**
109      * INTERNAL:
110      * This is a recursive method used to acquire read locks on all objects that
111      * will be cloned. These include all related objects for which there is no
112      * indirection.
113      * The returned object is the first object that the lock could not be acquired for.
114      * The caller must try for exceptions and release locked objects in the case
115      * of an exception.
116      */

117     public CacheKey acquireLockAndRelatedLocks(Object JavaDoc objectForClone, Map lockedObjects, Vector primaryKeys, ClassDescriptor descriptor, AbstractSession session) {
118         // Attempt to get a read-lock, null is returned if cannot be read-locked.
119
CacheKey lockedCacheKey = session.getIdentityMapAccessorInstance().acquireReadLockOnCacheKeyNoWait(primaryKeys, descriptor.getJavaClass(), descriptor);
120         if (lockedCacheKey != null) {
121             if (lockedCacheKey.getObject() == null) {
122                 // this will be the case for deleted objects, NoIdentityMap, and aggregates
123
lockedObjects.put(objectForClone, lockedCacheKey);
124             } else {
125                 objectForClone = lockedCacheKey.getObject();
126                 if (lockedObjects.containsKey(objectForClone)) {
127                     //this is a check for loss of identity, the orignal check in
128
//checkAndLockObject() will shortcircut in the usual case
129
lockedCacheKey.releaseReadLock();
130                     return null;
131                 }
132                 lockedObjects.put(objectForClone, lockedCacheKey);//store locked cachekey for release later
133
}
134             return traverseRelatedLocks(objectForClone, lockedObjects, descriptor, session);
135         } else {
136             // Return the cache key that could not be locked.
137
return session.getIdentityMapAccessorInstance().getCacheKeyForObject(primaryKeys, descriptor.getJavaClass(), descriptor);
138         }
139     }
140
141     /**
142      * INTERNAL:
143      * Traverse the object and acquire locks on all related objects.
144      */

145     public CacheKey traverseRelatedLocks(Object JavaDoc objectForClone, Map lockedObjects, ClassDescriptor descriptor, AbstractSession session) {
146         // If all mappings have indirection short-circuit.
147
if (descriptor.shouldAcquireCascadedLocks()) {
148             for (Iterator mappings = descriptor.getLockableMappings().iterator();
149                      mappings.hasNext();) {
150                 DatabaseMapping mapping = (DatabaseMapping)mappings.next();
151     
152                 // any mapping in this list must not have indirection.
153
Object JavaDoc objectToLock = mapping.getAttributeValueFromObject(objectForClone);
154                 if (mapping.isCollectionMapping()) {
155                     ContainerPolicy cp = mapping.getContainerPolicy();
156                     Object JavaDoc iterator = cp.iteratorFor(objectToLock);
157                     while (cp.hasNext(iterator)) {
158                         Object JavaDoc object = cp.next(iterator, session);
159                         if (mapping.getReferenceDescriptor().hasWrapperPolicy()) {
160                             object = mapping.getReferenceDescriptor().getWrapperPolicy().unwrapObject(object, session);
161                         }
162                         CacheKey toWaitOn = checkAndLockObject(object, lockedObjects, mapping, session);
163                         if (toWaitOn != null) {
164                             return toWaitOn;
165                         }
166                     }
167                 } else {
168                     if (mapping.getReferenceDescriptor().hasWrapperPolicy()) {
169                         objectToLock = mapping.getReferenceDescriptor().getWrapperPolicy().unwrapObject(objectToLock, session);
170                     }
171                     CacheKey toWaitOn = checkAndLockObject(objectToLock, lockedObjects, mapping, session);
172                     if (toWaitOn != null) {
173                         return toWaitOn;
174                     }
175                 }
176             }
177         }
178         return null;
179     }
180     
181     /**
182      * INTERNAL:
183      * This method will be the entry point for threads attempting to acquire locks for all objects that have
184      * a changeset. This method will hand off the processing of the deadlock algorithm to other member
185      * methods. The mergeManager must be the active mergemanager for the calling thread.
186      * Returns true if all required locks were acquired
187      */

188     public void acquireRequiredLocks(MergeManager mergeManager, UnitOfWorkChangeSet changeSet) {
189         if (!MergeManager.LOCK_ON_MERGE) {//lockOnMerge is a backdoor and not public
190
return;
191         }
192         boolean locksToAcquire = true;
193         boolean isForDistributedMerge = false;
194
195         //while that thread has locks to acquire continue to loop.
196
try {
197             AbstractSession session = mergeManager.getSession();
198             if (session.isUnitOfWork()) {
199                 session = ((UnitOfWorkImpl)session).getParent();
200             } else {
201                 // if the session in the mergemanager is not a unit of work then the
202
//merge is of a changeSet into a distributed session.
203
isForDistributedMerge = true;
204             }
205             while (locksToAcquire) {
206                 //lets assume all locks will be acquired
207
locksToAcquire = false;
208                 //first access the changeSet and begin to acquire locks
209
Iterator classIterator = changeSet.getObjectChanges().keySet().iterator();
210                 while (classIterator.hasNext()) {
211                     // Bug 3294426 - objectChanges is now indexed by class name instead of class
212
String JavaDoc objectClassName = (String JavaDoc)classIterator.next();
213                     Hashtable changeSetTable = (Hashtable)changeSet.getObjectChanges().get(objectClassName);
214
215                     //the order here does not matter as the deadlock avoidance code will handle any conflicts and maintaining
216
//order would be costly
217
Iterator changeSetIterator = changeSetTable.keySet().iterator();
218
219                     // Perf: Bug 3324418 - Reduce the number of Class.forName() calls
220
Class JavaDoc objectClass = null;
221                     while (changeSetIterator.hasNext()) {
222                         ObjectChangeSet objectChangeSet = (ObjectChangeSet)changeSetIterator.next();
223                         if (objectChangeSet.getCacheKey() == null) {
224                             //skip this process as we will be unable to acquire the correct cachekey anyway
225
//this is a new object with identity after write sequencing
226
continue;
227                         }
228                         if (objectClass == null) {
229                             objectClass = objectChangeSet.getClassType(session);
230                         }
231                         // It would be so much nicer if the change set was keyed by the class instead of class name,
232
// so this could be done once. We should key on class, and only convert to keying on name when broadcasting changes.
233
ClassDescriptor descriptor = session.getDescriptor(objectClass);
234                         CacheKey activeCacheKey = attemptToAcquireLock(objectClass, objectChangeSet.getCacheKey(), session);
235                         if (activeCacheKey == null) {
236                             //if cacheKey is null then the lock was not available
237
//no need to synchronize this block,because if the check fails then this thread
238
//will just return to the queue until it gets woken up.
239
if (this.prevailingQueue.getFirst() == mergeManager) {
240                                 //wait on this object until it is free, because this thread is the prevailing thread
241
activeCacheKey = waitOnObjectLock(objectClass, objectChangeSet.getCacheKey(), session);
242                                 mergeManager.getAcquiredLocks().add(activeCacheKey);
243                             } else {
244                                 //failed to acquire lock, release all acquired locks and place thread on waiting list
245
releaseAllAcquiredLocks(mergeManager);
246                                 //get cacheKey
247
activeCacheKey = session.getIdentityMapAccessorInstance().getCacheKeyForObject(objectChangeSet.getCacheKey().getKey(), objectClass, descriptor);
248                                 Object JavaDoc[] params = new Object JavaDoc[2];
249                                 params[0] = activeCacheKey.getObject();
250                                 params[1] = Thread.currentThread().getName();
251                                 session.log(SessionLog.FINER, SessionLog.CACHE, "dead_lock_encountered_on_write", params, null, true);
252                                 if (mergeManager.getWriteLockQueued() == null) {
253                                     //thread is entering the wait queue for the first time
254
//set the QueueNode to be the node from the linked list for quick removal upon
255
//acquiring all locks
256
mergeManager.setQueueNode(this.prevailingQueue.addLast(mergeManager));
257                                 }
258
259                                 //set the cache key on the merge manager for the object that could not be acquired
260
mergeManager.setWriteLockQueued(objectChangeSet.getCacheKey());
261                                 try {
262                                     //wait on the lock of the object that we couldn't get.
263
synchronized (activeCacheKey.getMutex()) {
264                                         // verify that the cache key is still locked before we wait on it, as
265
//it may have been releases since we tried to acquire it.
266
if (activeCacheKey.getMutex().isAcquired() && (activeCacheKey.getMutex().getActiveThread() != Thread.currentThread())) {
267                                             activeCacheKey.getMutex().wait();
268                                         }
269                                     }
270                                 } catch (InterruptedException JavaDoc exception) {
271                                     throw oracle.toplink.essentials.exceptions.ConcurrencyException.waitWasInterrupted(exception.getMessage());
272                                 }
273                                 locksToAcquire = true;
274                                 //failed to acquire, exit this loop and ensure that the original loop will continue
275
break;
276                             }
277                         } else {
278                             mergeManager.getAcquiredLocks().add(activeCacheKey);
279                         }
280                     }
281
282                     //if a lock failed reset to the beginning
283
if (locksToAcquire) {
284                         break;
285                     }
286                 }
287             }
288         } catch (RuntimeException JavaDoc exception) {
289             // if there was an exception then release.
290
//must not release in a finally block as release only occurs in this method
291
// if there is a problem or all of the locks can not be acquired.
292
releaseAllAcquiredLocks(mergeManager);
293             throw exception;
294         } finally {
295             if (mergeManager.getWriteLockQueued() != null) {
296                 //the merge manager entered the wait queue and must be cleaned up
297
this.prevailingQueue.remove(mergeManager.getQueueNode());
298                 mergeManager.setWriteLockQueued(null);
299             }
300         }
301     }
302
303     /**
304      * INTERNAL:
305      * This method will be called by a merging thread that is attempting to lock
306      * a new object that was not locked previously. Unlike the other methods
307      * within this class this method will lock only this object.
308      */

309     public Object JavaDoc appendLock(Vector primaryKeys, Object JavaDoc objectToLock, ClassDescriptor descriptor, MergeManager mergeManager, AbstractSession session){
310         for (int tries = 0; tries < 1000; ++tries) { //lets try a fixed number of times
311
CacheKey lockedCacheKey = session.getIdentityMapAccessorInstance().acquireLockNoWait(primaryKeys, descriptor.getJavaClass(), true, descriptor);
312             if (lockedCacheKey == null){
313                 //acquire readlock and wait for owning thread to populate cachekey
314
//bug 4483312
315
lockedCacheKey = session.getIdentityMapAccessorInstance().acquireReadLockOnCacheKey(primaryKeys, descriptor.getJavaClass(), descriptor);
316                 Object JavaDoc cachedObject = lockedCacheKey.getObject();
317                 lockedCacheKey.releaseReadLock();
318                 if (cachedObject == null){
319                     session.getSessionLog().log(SessionLog.FINEST, SessionLog.CACHE, "Found null object in identity map on appendLock, retrying");
320                     continue;
321                 }else{
322                     return cachedObject;
323                 }
324             }
325             if (lockedCacheKey.getObject() == null){
326                 lockedCacheKey.setObject(objectToLock); // set the object in the cachekey
327
// for others to find an prevent cycles
328
}
329             mergeManager.getAcquiredLocks().add(lockedCacheKey);
330             return objectToLock;
331         }
332         throw ConcurrencyException.maxTriesLockOnMergeExceded(objectToLock);
333     }
334     
335     /**
336      * INTERNAL:
337      * This method performs the operations of finding the cacheKey and locking it if possible.
338      * Returns True if the lock was acquired, false otherwise
339      */

340     protected CacheKey attemptToAcquireLock(Class JavaDoc objectClass, CacheKey cacheKey, AbstractSession session) {
341         return session.getIdentityMapAccessorInstance().acquireLockNoWait(cacheKey.getKey(), objectClass, true, session.getDescriptor(objectClass));
342     }
343
344     /**
345      * INTERNAL:
346      * Simply check that the object is not already locked then pass it on to the locking method
347      */

348     protected CacheKey checkAndLockObject(Object JavaDoc objectToLock, Map lockedObjects, DatabaseMapping mapping, AbstractSession session) {
349         //the cachekey should always reference an object otherwise what would we be cloning.
350
if ((objectToLock != null) && !lockedObjects.containsKey(objectToLock)) {
351             Vector primaryKeysToLock = null;
352             ClassDescriptor referenceDescriptor = null;
353             if (mapping.getReferenceDescriptor().hasInheritance()) {
354                 referenceDescriptor = session.getDescriptor(objectToLock);
355             } else {
356                 referenceDescriptor = mapping.getReferenceDescriptor();
357             }
358             // Need to traverse aggregates, but not lock aggregates directly.
359
if (referenceDescriptor.isAggregateDescriptor() || referenceDescriptor.isAggregateCollectionDescriptor()) {
360                 traverseRelatedLocks(objectToLock, lockedObjects, referenceDescriptor, session);
361             } else {
362                 primaryKeysToLock = referenceDescriptor.getObjectBuilder().extractPrimaryKeyFromObject(objectToLock, session);
363                 CacheKey toWaitOn = acquireLockAndRelatedLocks(objectToLock, lockedObjects, primaryKeysToLock, referenceDescriptor, session);
364                 if (toWaitOn != null) {
365                     return toWaitOn;
366                 }
367             }
368         }
369         return null;
370     }
371
372     /**
373      * INTERNAL:
374      * This method will release all acquired locks
375      */

376     public void releaseAllAcquiredLocks(MergeManager mergeManager) {
377         if (!MergeManager.LOCK_ON_MERGE) {//lockOnMerge is a backdoor and not public
378
return;
379         }
380         Iterator locks = mergeManager.getAcquiredLocks().iterator();
381         while (locks.hasNext()) {
382             CacheKey cacheKeyToRemove = (CacheKey)locks.next();
383             if (cacheKeyToRemove.getObject() == null && cacheKeyToRemove.getOwningMap() != null){
384                 cacheKeyToRemove.getOwningMap().remove(cacheKeyToRemove);
385             }
386             cacheKeyToRemove.release();
387             locks.remove();
388         }
389     }
390
391     /**
392       * INTERNAL:
393       * This method performs the operations of finding the cacheKey and locking it if possible.
394       * Waits until the lock can be acquired
395       */

396     protected CacheKey waitOnObjectLock(Class JavaDoc objectClass, CacheKey cacheKey, AbstractSession session) {
397         return session.getIdentityMapAccessorInstance().acquireLock(cacheKey.getKey(), objectClass, true, session.getDescriptor(objectClass));
398     }
399 }
400
Popular Tags