KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > oracle > toplink > essentials > internal > helper > ConcurrencyManager


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the "License"). You may not use this file except
5  * in compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * glassfish/bootstrap/legal/CDDLv1.0.txt or
9  * https://glassfish.dev.java.net/public/CDDLv1.0.html.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * HEADER in each file and include the License file at
15  * glassfish/bootstrap/legal/CDDLv1.0.txt. If applicable,
16  * add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your
18  * own identifying information: Portions Copyright [yyyy]
19  * [name of copyright owner]
20  */

21 // Copyright (c) 1998, 2005, Oracle. All rights reserved.
22
package oracle.toplink.essentials.internal.helper;
23
24 import java.io.*;
25 import java.util.*;
26 import oracle.toplink.essentials.exceptions.*;
27 import oracle.toplink.essentials.internal.localization.*;
28 import oracle.toplink.essentials.internal.identitymaps.CacheKey;
29 import oracle.toplink.essentials.logging.*;
30
31 /**
32  * INTERNAL:
33  * <p>
34  * <b>Purpose</b>: To maintain concurrency for a paticular task.
35  * It is a wrappers of a semaphore that allows recursive waits by a single thread.
36  * <p>
37  * <b>Responsibilities</b>:
38  * <ul>
39  * <li> Keep track of the active thread.
40  * <li> Wait all other threads until the first thread is done.
41  * <li> Maintain the depth of the active thread.
42  * </ul>
43  */

44 public class ConcurrencyManager implements Serializable {
45     protected int numberOfReaders;
46     protected int depth;
47     protected int numberOfWritersWaiting;
48     protected transient Thread JavaDoc activeThread;
49     public static Hashtable deferredLockManagers;
50     protected boolean lockedByMergeManager;
51
52     /** Cachkey owner set when ConcurrencyMananger is used within an cachekey on an idenity map
53      * Used to store the owner so that the object involved can be retrieved from the cachekey
54      */

55     protected CacheKey ownerCacheKey;
56
57     /**
58      * Initialize the newly allocated instance of this class.
59      * Set the depth to zero.
60      */

61     public ConcurrencyManager() {
62         this.depth = 0;
63         this.numberOfReaders = 0;
64         this.numberOfWritersWaiting = 0;
65     }
66
67     /**
68      * Initialize a new ConcurrencyManger, seting depth to zero and setting the
69      * owner cacheKey.
70      */

71     public ConcurrencyManager(CacheKey cacheKey) {
72         this();
73         this.ownerCacheKey = cacheKey;
74     }
75
76     /**
77      * Wait for all threads except the active thread.
78      * If the active thread just increament the depth.
79      * This should be called before entering a critical section.
80      */

81     public synchronized void acquire() throws ConcurrencyException {
82         this.acquire(false);
83     }
84
85     /**
86      * Wait for all threads except the active thread.
87      * If the active thread just increament the depth.
88      * This should be called before entering a critical section.
89      * called with true from the merge process, if true then the refresh will not refresh the object
90      */

91     public synchronized void acquire(boolean forMerge) throws ConcurrencyException {
92         while (!((getActiveThread() == Thread.currentThread()) || ((getActiveThread() == null) && (getNumberOfReaders() == 0)))) {
93             // This must be in a while as multiple threads may be released, or another thread may rush the acquire after one is released.
94
try {
95                 setNumberOfWritersWaiting(getNumberOfWritersWaiting() + 1);
96                 wait();
97                 setNumberOfWritersWaiting(getNumberOfWritersWaiting() - 1);
98             } catch (InterruptedException JavaDoc exception) {
99                 throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
100             }
101         }
102         if (getActiveThread() == null) {
103             setActiveThread(Thread.currentThread());
104         }
105         setIsLockedByMergeManager(forMerge);
106         setDepth(getDepth() + 1);
107
108     }
109
110     /**
111      * If the lock is not acquired allready acquire it and return true.
112      * If it has been acquired allready return false
113      * Added for CR 2317
114      */

115     public synchronized boolean acquireNoWait() throws ConcurrencyException {
116         if (!isAcquired() || getActiveThread() == Thread.currentThread()) {
117             //if I own the lock increment depth
118
acquire(false);
119             return true;
120         } else {
121             return false;
122         }
123     }
124
125     /**
126      * If the lock is not acquired allready acquire it and return true.
127      * If it has been acquired allready return false
128      * Added for CR 2317
129      * called with true from the merge process, if true then the refresh will not refresh the object
130      */

131     public synchronized boolean acquireNoWait(boolean forMerge) throws ConcurrencyException {
132         if (!isAcquired() || getActiveThread() == Thread.currentThread()) {
133             //if I own the lock increment depth
134
acquire(forMerge);
135             return true;
136         } else {
137             return false;
138         }
139     }
140
141     /**
142      * Add deferred lock into a hashtable to avoid deadlock
143      */

144     public void acquireDeferredLock() throws ConcurrencyException {
145         Thread JavaDoc currentThread = Thread.currentThread();
146         DeferredLockManager lockManager = getDeferredLockManager(currentThread);
147         if (lockManager == null) {
148             lockManager = new DeferredLockManager();
149             putDeferredLock(currentThread, lockManager);
150         }
151         lockManager.incrementDepth();
152         synchronized (this) {
153             while (!(getNumberOfReaders() == 0)) {
154                 // There are readers of this object, wait until they are done before determining if
155
//there are any other writers. If not we will wait on the readers for acquire. If another
156
//thread is also waiting on the acquire then a deadlock could occur. See bug 3049635
157
//We could release all active locks before relesing defered but the object may not be finished building
158
//we could make the readers get a hard lock, but then we would just build a defered lock even though
159
//the object is not being built.
160
try {
161                     setNumberOfWritersWaiting(getNumberOfWritersWaiting() + 1);
162                     wait();
163                     setNumberOfWritersWaiting(getNumberOfWritersWaiting() - 1);
164                 } catch (InterruptedException JavaDoc exception) {
165                     throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
166                 }
167             }
168             if ((getActiveThread() == currentThread) || (!isAcquired())) {
169                 lockManager.addActiveLock(this);
170                 acquire();
171             } else {
172                 lockManager.addDeferredLock(this);
173                 Object JavaDoc[] params = new Object JavaDoc[2];
174                 params[0] = this.getOwnerCacheKey().getObject();
175                 params[1] = currentThread.getName();
176                 AbstractSessionLog.getLog().log(SessionLog.FINER, "acquiring_deferred_lock", params, true);
177             }
178         }
179     }
180     
181     /**
182      * Check the lock state, if locked, acquire and release a read lock.
183      * This optimizes out the normal read-lock check if not locked.
184      */

185     public void checkReadLock() throws ConcurrencyException {
186         // If it is not locked, then just return.
187
if (getActiveThread() == null) {
188             return;
189         }
190         acquireReadLock();
191         releaseReadLock();
192     }
193     
194     /**
195      * Wait on any writer.
196      * Allow concurrent reads.
197      */

198     public synchronized void acquireReadLock() throws ConcurrencyException {
199         // Cannot check for starving writers as will lead to deadlocks.
200
while (!((getActiveThread() == Thread.currentThread()) || (getActiveThread() == null))) {
201             try {
202                 wait();
203             } catch (InterruptedException JavaDoc exception) {
204                 throw ConcurrencyException.waitWasInterrupted(exception.getMessage());
205             }
206         }
207
208         setNumberOfReaders(getNumberOfReaders() + 1);
209     }
210
211     /**
212      * If this is acquired return false otherwise acquire readlock and return true
213      */

214     public synchronized boolean acquireReadLockNoWait() {
215         if (!isAcquired()) {
216             acquireReadLock();
217             return true;
218         } else {
219             return false;
220         }
221     }
222
223     /**
224          * Return the active thread.
225          */

226     public Thread JavaDoc getActiveThread() {
227         return activeThread;
228     }
229
230     /**
231      * Return the deferred lock manager from the thread
232      */

233     public synchronized static DeferredLockManager getDeferredLockManager(Thread JavaDoc thread) {
234         return (DeferredLockManager)getDeferredLockManagers().get(thread);
235     }
236
237     /**
238      * Return the deferred lock manager hashtable (thread - DeferredLockManager).
239      */

240     protected static Hashtable getDeferredLockManagers() {
241         if (deferredLockManagers == null) {
242             deferredLockManagers = new Hashtable(50);
243         }
244
245         return deferredLockManagers;
246     }
247
248     /**
249      * Return the current depth of the active thread.
250      */

251     public int getDepth() {
252         return depth;
253     }
254
255     /**
256      * Number of writer that want the lock.
257      * This is used to ensure that a writer is not starved.
258      */

259     public int getNumberOfReaders() {
260         return numberOfReaders;
261     }
262
263     /**
264      * Number of writers that want the lock.
265      * This is used to ensure that a writer is not starved.
266      */

267     public int getNumberOfWritersWaiting() {
268         return numberOfWritersWaiting;
269     }
270
271     /**
272      * Returns the owner cache key for this concurrency manager
273      */

274     public CacheKey getOwnerCacheKey() {
275         return this.ownerCacheKey;
276     }
277
278     /**
279      * Return if a thread has aquire this manager.
280      */

281     public boolean isAcquired() {
282         return depth > 0;
283     }
284
285     /**
286      * INTERNAL:
287      * Used byt the refresh process to determine if this concurrency manager is locked by
288      * the merge process. If it is then the refresh should not refresh the object
289      */

290     public boolean isLockedByMergeManager() {
291         return this.lockedByMergeManager;
292     }
293
294     /**
295      * Check if the deferred locks of a thread are all released
296      */

297     public synchronized static boolean isBuildObjectOnThreadComplete(Thread JavaDoc thread, IdentityHashtable recursiveSet) {
298         if (recursiveSet.containsKey(thread)) {
299             return true;
300         }
301         recursiveSet.put(thread, thread);
302
303         DeferredLockManager lockManager = getDeferredLockManager(thread);
304         if (lockManager == null) {
305             return true;
306         }
307
308         Vector deferredLocks = lockManager.getDeferredLocks();
309         for (Enumeration deferredLocksEnum = deferredLocks.elements();
310                  deferredLocksEnum.hasMoreElements();) {
311             ConcurrencyManager deferedLock = (ConcurrencyManager)deferredLocksEnum.nextElement();
312             Thread JavaDoc activeThread = null;
313             if (deferedLock.isAcquired()) {
314                 activeThread = deferedLock.getActiveThread();
315
316                 // the active thread may be set to null at anypoint
317
// if added for CR 2330
318
if (activeThread != null) {
319                     DeferredLockManager currentLockManager = getDeferredLockManager(activeThread);
320                     if (currentLockManager == null) {
321                         return false;
322                     } else if (currentLockManager.isThreadComplete()) {
323                         activeThread = deferedLock.getActiveThread();
324                         // The lock may suddenly finish and no longer have an active thread.
325
if (activeThread != null) {
326                             if (!isBuildObjectOnThreadComplete(activeThread, recursiveSet)) {
327                                 return false;
328                             }
329                         }
330                     } else {
331                         return false;
332                     }
333                 }
334             }
335         }
336         return true;
337     }
338
339     /**
340      * Return if this manager is within a nested aquire.
341      */

342     public boolean isNested() {
343         return depth > 1;
344     }
345
346     public synchronized void putDeferredLock(Thread JavaDoc thread, DeferredLockManager lockManager) {
347         getDeferredLockManagers().put(thread, lockManager);
348     }
349
350     /**
351      * Decrement the depth for the active thread.
352      * Assume the current thread is the active one.
353      * Raise an error if the depth become < 0.
354      * The notify will release the first thread waiting on the object,
355      * if no threads are waiting it will do nothing.
356      */

357     public synchronized void release() throws ConcurrencyException {
358         if (getDepth() == 0) {
359             throw ConcurrencyException.signalAttemptedBeforeWait();
360         } else {
361             setDepth(getDepth() - 1);
362         }
363         if (getDepth() == 0) {
364             setActiveThread(null);
365             setIsLockedByMergeManager(false);
366             notifyAll();
367         }
368     }
369
370     /**
371      * Release the deferred lock.
372      * This uses a deadlock detection and resoultion algorthm to avoid cache deadlocks.
373      * The deferred lock manager keeps track of the lock for a thread, so that other
374      * thread know when a deadlock has occured and can resolve it.
375      */

376     public void releaseDeferredLock() throws ConcurrencyException {
377         Thread JavaDoc currentThread = Thread.currentThread();
378         DeferredLockManager lockManager = getDeferredLockManager(currentThread);
379         if (lockManager == null) {
380             return;
381         }
382         int depth = lockManager.getThreadDepth();
383
384         if (depth > 1) {
385             lockManager.decrementDepth();
386             return;
387         }
388
389         // If the set is null or empty, means there is no deferred lock for this thread, return.
390
if (!lockManager.hasDeferredLock()) {
391             lockManager.releaseActiveLocksOnThread();
392             removeDeferredLockManager(currentThread);
393             return;
394         }
395
396         lockManager.setIsThreadComplete(true);
397
398         // Thread have three stages, one where they are doing work (i.e. building objects)
399
// two where they are done their own work but may be waiting on other threads to finish their work,
400
// and a third when they and all the threads they are waiting on are done.
401
// This is essentially a busy wait to determine if all the other threads are done.
402
while (true) {
403             // 2612538 - the default size of IdentityHashtable (32) is appropriate
404
IdentityHashtable recursiveSet = new IdentityHashtable();
405             if (isBuildObjectOnThreadComplete(currentThread, recursiveSet)) {// Thread job done.
406
lockManager.releaseActiveLocksOnThread();
407                 removeDeferredLockManager(currentThread);
408                 Object JavaDoc[] params = new Object JavaDoc[1];
409                 params[0] = currentThread.getName();
410                 AbstractSessionLog.getLog().log(SessionLog.FINER, "deferred_locks_released", params, true);
411                 return;
412             } else {// Not done yet, wait and check again.
413
try {
414                     Thread.sleep(10);
415                 } catch (InterruptedException JavaDoc ignoreAndContinue) {
416                 }
417             }
418         }
419     }
420
421     /**
422      * Decrement the number of readers.
423      * Used to allow concurrent reads.
424      */

425     public synchronized void releaseReadLock() throws ConcurrencyException {
426         if (getNumberOfReaders() == 0) {
427             throw ConcurrencyException.signalAttemptedBeforeWait();
428         } else {
429             setNumberOfReaders(getNumberOfReaders() - 1);
430         }
431         if (getNumberOfReaders() == 0) {
432             notifyAll();
433         }
434     }
435
436     /**
437      * Remove the deferred lock manager for the thread
438      */

439     public synchronized static DeferredLockManager removeDeferredLockManager(Thread JavaDoc thread) {
440         return (DeferredLockManager)getDeferredLockManagers().remove(thread);
441     }
442
443     /**
444      * Set the active thread.
445      */

446     public void setActiveThread(Thread JavaDoc activeThread) {
447         this.activeThread = activeThread;
448     }
449
450     /**
451      * Set the current depth of the active thread.
452      */

453     protected void setDepth(int depth) {
454         this.depth = depth;
455     }
456
457     /**
458      * INTERNAL:
459      * Used by the mergemanager to let the read know not to refresh this object as it is being
460      * loaded by the merge process.
461      */

462     public void setIsLockedByMergeManager(boolean state) {
463         this.lockedByMergeManager = state;
464     }
465
466     /**
467      * Track the number of readers.
468      */

469     protected void setNumberOfReaders(int numberOfReaders) {
470         this.numberOfReaders = numberOfReaders;
471     }
472
473     /**
474      * Number of writers that want the lock.
475      * This is used to ensure that a writer is not starved.
476      */

477     protected void setNumberOfWritersWaiting(int numberOfWritersWaiting) {
478         this.numberOfWritersWaiting = numberOfWritersWaiting;
479     }
480
481     /**
482      * Print the nested depth.
483      */

484     public String JavaDoc toString() {
485         Object JavaDoc[] args = { new Integer JavaDoc(getDepth()) };
486         return Helper.getShortClassName(getClass()) + ToStringLocalization.buildMessage("nest_level", args);
487     }
488 }
489
Popular Tags