KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > perseus > concurrency > pessimistic > RWFifoLock


1 /**
2  * Copyright (C) 2001-2002
3  * - France Telecom R&D
4  * - Laboratoire Logiciels, Systemes, Reseaux - UMR 5526, CNRS-INPG-UJF
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * Release: 1.0
21  *
22  * Authors:
23  *
24  */

25
26 package org.objectweb.perseus.concurrency.pessimistic;
27
28 import org.objectweb.perseus.concurrency.api.ConcurrencyException;
29 import org.objectweb.perseus.concurrency.api.RolledBackConcurrencyException;
30 import org.objectweb.perseus.concurrency.lib.LockValue;
31 import org.objectweb.perseus.concurrency.lib.RWLockValue;
32 import org.objectweb.perseus.concurrency.lib.ReadWriteLockValue;
33 import org.objectweb.perseus.concurrency.lib.Semaphore;
34 import org.objectweb.perseus.dependency.api.DependencyGraph;
35 import org.objectweb.util.monolog.api.BasicLevel;
36
37 import java.util.Collection JavaDoc;
38 import java.util.HashSet JavaDoc;
39 import java.util.Iterator JavaDoc;
40 import java.util.List JavaDoc;
41 import java.util.ArrayList JavaDoc;
42 import java.util.Map JavaDoc;
43 import java.util.Set JavaDoc;
44
45 /**
46  * A lock associated to an oid (see the "locks" map within the pessimistic
47  * concurrency manager).
48  * Provides "one writer/multiple readers" concurrency policy with a FIFO
49  * scheduling.
50  * @author E. Bruneton, P. Dechamboux, S.Chassande-Barrioz
51  */

52 public final class RWFifoLock extends Lock {
53     
54     protected final static LockValue LOCKS = new ReadWriteLockValue();
55     
56     /**
57      * The current active tasks which can access to the resource
58      */

59     List JavaDoc owners = new ArrayList JavaDoc();
60     
61     /**
62      * The LockRequest instances that wait for lock for the oid to which this
63      * object is associated.
64      */

65     List JavaDoc waiters = new ArrayList JavaDoc();
66     
67     byte lockLevel = ReadWriteLockValue.NOLOCK;
68     
69     protected final Semaphore semaphore = new Semaphore();
70
71     public RWFifoLock() {
72     }
73
74     public RWFifoLock(Object JavaDoc hints, DependencyGraph dg) {
75         super(hints, dg);
76     }
77
78     /**
79      * @param task is the task identifier
80      * @param askedLock is the lock level asked by the task
81      * @param waiterIdx is the position of the lock request among waiters
82      * (0 means the request is the first)
83      * @return true if the lock request can be accepted.
84      */

85     private final boolean acceptLockRequest(final Object JavaDoc task,
86             final byte askedLock, final int waiterIdx) {
87         if (owners.isEmpty()) {
88             return true;
89         } else if (owners.contains(task)) {
90             //already has a lock
91
if (LOCKS.isCompatibleWith(askedLock, lockLevel)) {
92                 //current lock is compatible
93
return waiterIdx == 0; // no waiter
94
} else if (owners.size() == 1) {
95                 //only the task has the lock ==> up to to max level
96
//we do not take account of waiters, in order to avoid dead lock
97
//NOTE: this case happens when
98
// - a task asks again a non shareable lock
99
// - OR a task increase it lock level
100
return true;
101             }
102         } else if (LOCKS.isCompatibleWith(askedLock, lockLevel)
103                 && waiterIdx == 0) {
104             return true;
105         }
106         return false;
107     }
108     
109     /**
110      * Try to acquire a lock level for a given task.
111      *
112      * @param task is the task identifier
113      * @param askedLock is the lock level asked by the task
114      * @throws RolledBackConcurrencyException if the lock request creates a
115      * dea lock, or if the thread is intererupted during the waiting step.
116      */

117     private final void accessIntention(final Object JavaDoc task, final byte askedLock)
118             throws RolledBackConcurrencyException {
119         try {
120         boolean debug = logger != null && logger.isLoggable(BasicLevel.DEBUG);
121         semaphore.P();
122         final int ws = waiters.size();
123         reservations --;
124         if (acceptLockRequest(task, askedLock, ws)) {
125             if (debug) {
126                 logger.log(BasicLevel.DEBUG, LOCKS.str(askedLock) + " lock accepted directly,"
127                         + "\n\ttask:" + task + "\n\towners=" + owners);
128             }
129             this.lockLevel = (byte) Math.max(this.lockLevel, askedLock);
130             if (!owners.contains(task)) {
131                 owners.add(task);
132             }
133             semaphore.V();
134             return;
135         }
136         //must wait to obtain the lock level requested
137

138         //Compute dependencies
139
List JavaDoc dependencies; // The dependency of the task
140
if (ws == 0) {
141             //no waiter, it depends to the owner,
142
dependencies = owners;
143         } else {
144             //There are waiters
145
LockRequest tmp = null;
146             int i;
147             //skip compatible requests
148
for(i = ws -1; i >= 0; i--) {
149                 tmp = (LockRequest) waiters.get(i);
150                 if (LOCKS.isCompatibleWith(askedLock, tmp.getLockLevel())) {
151                 } else {
152                     break;
153                 }
154             }
155             if (i>=0) {
156                 //dependencies is the list of compatible requests after
157
// the skipped requests
158
dependencies = new ArrayList JavaDoc();
159                 byte l = tmp.getLockLevel();
160                 do {
161                     tmp = (LockRequest) waiters.get(i);
162                     dependencies.add(tmp.task);
163                     if (!LOCKS.isCompatibleWith(tmp.getLockLevel(), l)) {
164                         break;
165                     }
166                     i--;
167                 } while (i>=0);
168             } else {
169                 // all first are compatible, ==> dependencies are the owners
170
dependencies = owners;
171             }
172         }
173         //add a vertex for each dependency
174
if (dg.addVertexes(task, dependencies) != -1) {
175             if (logger != null && logger.isLoggable(BasicLevel.INFO)) {
176                 logger.log(BasicLevel.INFO, LOCKS.str(askedLock)
177                         + " request: dead lock detected, cancel the task,"
178                         + "\n\ttask:" + task);
179             }
180             //remove dependencies which can be added before the dead lock
181
dg.removeVertexes(task, dependencies);
182             //TODO: we could remove also the locks or requests of the same task
183
semaphore.V();
184             throw new RolledBackConcurrencyException("Deadlock");
185         }
186         if (debug) {
187             
188         }
189         //Create LockRequest and add it among waiters
190
LockRequest lockRequest = new LockRequest(askedLock, task);
191         waiters.add(lockRequest);
192         
193         if (debug) {
194             StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
195             sb.append(LOCKS.str(lockRequest.getLockLevel()));
196             sb.append(" lock waiting, rank=");
197             sb.append(waiters.indexOf(lockRequest));
198             sb.append(", task=");
199             sb.append(task);
200             sb.append("\n\towners=");
201             sb.append(owners);
202             printDG(sb);
203             logger.log(BasicLevel.DEBUG, sb.toString());
204         }
205         // wait
206
try {
207             synchronized(lockRequest) {
208                 semaphore.V();
209                 lockRequest.wait();
210             }
211             if (lockRequest.hasRolledBack()) {
212                 forgetWaiter(lockRequest);
213                 throw new RolledBackConcurrencyException(
214                     "The working set has been rolled back by another thread");
215             }
216         } catch (InterruptedException JavaDoc e) {
217             forgetWaiter(lockRequest);
218             throw new RolledBackConcurrencyException("Waiting of a "
219                     + LOCKS.str(lockRequest.getLockLevel())
220                     + " intention has been interupted:", e);
221         }
222         if (debug) {
223             logger.log(BasicLevel.DEBUG, LOCKS.str(lockRequest.getLockLevel())
224                     + " Wake up\n\ttask:" + task);
225         }
226         } catch(Error JavaDoc t) {
227             logger.log(BasicLevel.ERROR, "accessIntention: " + t.getMessage(),t);
228             t.printStackTrace();
229         }
230
231     }
232     
233     /**
234      * Eliminates a lock request among waiter. The dependencies are removed.
235      * @param lr is the lock request to forget
236      */

237     private final void forgetWaiter(final LockRequest lr) {
238         boolean debug = logger != null && logger.isLoggable(BasicLevel.DEBUG);
239         semaphore.P();
240         int idx = waiters.indexOf(lr);
241         if (debug) {
242             logger.log(BasicLevel.DEBUG, "forgetLockRequest: lock: "
243                     + LOCKS.str(lr.getLockLevel())
244                     + ", waiter rank=" + idx + "/" + waiters.size()
245                     + ", task=" + lr.task);
246         }
247         if (idx == -1 || waiters.size() == 0) {
248             semaphore.V();
249             return;
250         }
251         waiters.remove(idx);
252         
253         //Remove dependencies of the request to forget
254
int i;
255         LockRequest tmp = null;
256         for(i = idx -1; i>=0; i--) {
257             tmp = (LockRequest) waiters.get(i);
258             if (!LOCKS.isCompatibleWith(
259                     lr.getLockLevel(),
260                     tmp.getLockLevel())) {
261                 break;
262             }
263         }
264         if (i>=0) {
265             byte l = tmp.getLockLevel();
266             do {
267                 tmp = (LockRequest) waiters.get(i);
268                 dg.removeVertex(lr.task, tmp.task);
269                 if (!LOCKS.isCompatibleWith(tmp.getLockLevel(), l)) {
270                     break;
271                 }
272                 i--;
273             } while (i>=0);
274         } else {
275             dg.removeVertexes(lr.task, owners);
276         }
277         
278         //Remove dependencies to the request to forget
279
final int ws = waiters.size();
280         //skip compatible request
281
for(i = idx; i<ws; i++) {
282             tmp = (LockRequest) waiters.get(i);
283             if (!LOCKS.isCompatibleWith(
284                     tmp.getLockLevel(),
285                     lr.getLockLevel())) {
286                 break;
287             }
288         }
289         if (i<ws) {
290             byte l = tmp.getLockLevel();
291             do {
292                 tmp = (LockRequest) waiters.get(i);
293                 dg.removeVertex(tmp.task, lr.task);
294                 if (!LOCKS.isCompatibleWith(tmp.getLockLevel(), l)) {
295                     break;
296                 }
297                 i++;
298             } while (i<ws);
299         }
300         semaphore.V();
301     }
302
303     /**
304      * Acquires this lock in read mode for the given context. This method
305      * blocks until the lock can be acquired in read mode by this context.
306      * @param task a context.
307      */

308     public void readIntention(Object JavaDoc task)
309             throws ConcurrencyException {
310         accessIntention(task, RWLockValue.READ);
311     }
312     
313     /**
314      * Acquires this lock in write mode for the given context. This method
315      * blocks until the lock can be acquired in write mode by this context.
316      * @param task a context.
317      */

318     public void writeIntention(Object JavaDoc task)
319             throws ConcurrencyException {
320         accessIntention(task, ReadWriteLockValue.WRITE);
321     }
322     
323     /**
324      * Removes the given context from the reader and writer lists of this
325      * lock.
326      * @param task a context
327      * @return true if the reader and writer list are empty, after the
328      * context has been removed from these lists. In such a case, this
329      * object can be removed from the 'locks' map.
330      */

331     public boolean close(Object JavaDoc task) {
332         try {
333         semaphore.P();
334         boolean isowner = owners.remove(task);
335         if (isowner) {
336             byte tmpLock = ReadWriteLockValue.NOLOCK;
337             //remove depedencies to the ended task
338
for(int i=0; i<waiters.size();i++) {
339                 LockRequest lr = (LockRequest) waiters.get(i);
340                 dg.removeVertex(lr.task, task);
341                 if (LOCKS.isCompatibleWith(lr.getLockLevel(), tmpLock)) {
342                     tmpLock = (byte) Math.max(lr.getLockLevel(), tmpLock);
343                 } else {
344                     break;
345                 }
346             }
347             if (owners.isEmpty()) {
348                 lockLevel = ReadWriteLockValue.NOLOCK;
349             }
350         } else {
351             //This case occurs when dead lock has been detected or when the
352
// thread has been intereupted. In both case the fail lock request
353
// has been cleaned (@see #accessIntention method).
354
// However if tasks are multithreaded (several thread with
355
// the same task identifier), it can stay lock request among the
356
// waiters. This case is not yet managed
357
}
358         //try to notify waiters if possible
359
int nbNotified = 0;
360         while(!waiters.isEmpty()) {
361             LockRequest lr = (LockRequest) waiters.get(0);
362             if (!acceptLockRequest(lr.task, lr.getLockLevel(), 0)) {
363                 break;
364             }
365             waiters.remove(0);
366             dg.removeVertexes(lr.task, owners);
367             if (!owners.contains(lr.task)) {
368                 owners.add(lr.task);
369             }
370             lockLevel = (byte) Math.max(lr.getLockLevel(), lockLevel);
371             synchronized(lr) {
372                 lr.notify();
373             }
374             nbNotified++;
375         }
376         if (logger != null && logger.isLoggable(BasicLevel.DEBUG)) {
377             StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
378             if (isowner) {
379                 sb.append("task closed: ");
380             } else {
381                 sb.append("task closed without lock: ");
382             }
383             sb.append(task);
384             sb.append(" / new notified: ");
385             sb.append(nbNotified);
386             sb.append(" / reservations:");
387             sb.append(reservations);
388             sb.append(" / current lock: ");
389             sb.append(LOCKS.str(lockLevel));
390             sb.append("\n\towners:");
391             sb.append(owners);
392             sb.append("\n\twaiters:");
393             sb.append(waiters);
394             printDG(sb);
395             logger.log(BasicLevel.DEBUG, sb.toString());
396         }
397         try {
398             return reservations == 0 && owners.isEmpty();
399         } finally {
400             semaphore.V();
401         }
402         } catch(Error JavaDoc t) {
403             logger.log(BasicLevel.ERROR, "close: " + t.getMessage(),t);
404             t.printStackTrace();
405             return false;
406         }
407     }
408
409
410     public synchronized byte getMax() {
411         return lockLevel;
412     }
413
414     private void printDG(StringBuffer JavaDoc sb) {
415         Map JavaDoc m = dg.getVertexes();
416         if (m.size() > 0) {
417             int ws = waiters.size();
418             sb.append("\ndependency Graph: ");
419             List JavaDoc waiters = new ArrayList JavaDoc(m.keySet());
420             Set JavaDoc s = new HashSet JavaDoc(waiters.size() * 2);
421             s.addAll(m.keySet());
422             for (Iterator JavaDoc it = ((Collection JavaDoc) m.values()).iterator(); it.hasNext();) {
423                 s.addAll((Collection JavaDoc) it.next());
424             }
425             List JavaDoc all = new ArrayList JavaDoc(s);
426             for (int i = 0; i <all.size() ; i++) {
427                 Object JavaDoc t1 = all.get(i);
428                 int t1Idx = all.indexOf(t1);
429                 Collection JavaDoc dependencies = (Collection JavaDoc) m.get(t1);
430                 if (dependencies != null) {
431                     for (Iterator JavaDoc it = dependencies.iterator(); it.hasNext();) {
432                         sb.append("\nws");
433                         sb.append(t1Idx);
434                         sb.append(" ");
435                         sb.append(t1);
436                         sb.append(" = > ");
437                         sb.append("ws");
438                         Object JavaDoc t2 = it.next();
439                         sb.append(all.indexOf(t2));
440                         sb.append(" ");
441                         sb.append(t2);
442                     }
443                 }
444             }
445         }
446     }
447     
448 }
449
450 final class LockRequest {
451     /**
452      * indicates the lock level required by the task
453      */

454     private byte lockLevel;
455
456     /**
457      * The task identifier
458      */

459     final Object JavaDoc task;
460     
461     /**
462      * Create a TaskGroup with a single element. If this is a reader group,
463      * additional reader can be added with the #addReader(Object) method. If
464      * this is writer group, the group can not be modified after.
465      *
466      * @param reader indicates the access mode of the task
467      * @param object is a task (not null)
468      */

469     LockRequest(final byte lockLevel, final Object JavaDoc task) {
470         this.task = task;
471         this.lockLevel = lockLevel;
472     }
473
474     final byte getLockLevel() {
475         return lockLevel;
476     }
477     
478     Object JavaDoc getTask() {
479         return task;
480     }
481     
482     boolean hasRolledBack() {
483         return false;
484     }
485     
486     public final int hashCode() {
487         return task.hashCode();
488     }
489     
490     public final boolean equals(Object JavaDoc o) {
491         return lockLevel == ((LockRequest) o).lockLevel
492             && task.equals(((LockRequest) o).task);
493     }
494     
495     public final String JavaDoc toString() {
496         return "lockLevel: " + lockLevel + " / tasks: " + task;
497     }
498 }
499
Popular Tags