KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > cache > lock > ReadWriteLockWithUpgrade


1 /*
2  * JBoss, the OpenSource J2EE webOS
3  *
4  * Distributable under LGPL license.
5  * See terms of license at gnu.org.
6  */

7
8 package org.jboss.cache.lock;
9
10 import org.apache.commons.logging.Log;
11 import org.apache.commons.logging.LogFactory;
12
13 import java.util.Map JavaDoc;
14 import java.util.concurrent.TimeUnit JavaDoc;
15 import java.util.concurrent.locks.Condition JavaDoc;
16 import java.util.concurrent.locks.Lock JavaDoc;
17 import java.util.concurrent.locks.ReadWriteLock JavaDoc;
18
19 /*
20  * <p> This class is similar to PreferredWriterReadWriteLock except that
21  * the read lock is upgradable to write lock. I.e., when a user calls
22  * upgradeLock(), it will release the read lock, wait for
23  * all the read locks to clear and obtain a write lock afterwards. In
24  * particular, the write lock is obtained with priority to prevent deadlock
25  * situation. The current design is based in part from Doug Lea's
26  * PreferredWriterReadWriteLock.
27  *
28  * <p>Note that the pre-requisite to use upgrade lock is the pre-existing of a read
29  * lock. Otherwise, a RuntimeException will be thrown.</p>
30  * <p>Also note that currently lock can only be obtained through <code>attempt</code>
31  * api with specified timeout instead <code>acquire</code> is not supported.
32  *
33  * Internally, the upgrade is done through a Semaphore where a thread with
34  * a higher priority will obtain the write lock first. The following scenarios then can
35  * happen:
36  * <ul>
37  * <li>If there are multiple read locks granted (and no write lock request in waiting),
38  * an upgrade will release one read lock
39  * (decrease the counter), bump up upagrade counter, increase the current thread priority,
40  * set a thread local as upgrade thread,
41  * and place a write lock acquire() call. Upon waken up, it will check if the current
42  * thread is an upgrade. If it is, restore the thread priority, and decrease the
43  * upgrade counter.</li>
44  * <li>If there are mutiple write locks request in waiting (and only one read lock granted),
45  * decrease the read lock counter,
46  * bump up the upgrade counter, and increase the current thread priority.
47  * When one of the writer gets wake up, it will first check
48  * if upgrade counter is zero. If not, it will first release the semaphore so the upgrade
49  * thread can grab it, check the semaphore is gone, do notify, and issue myself another
50  * acquire to grab the next available semaphore.</li>
51  * </ul>
52  *
53  * @author Ben Wang
54 */

55 public class ReadWriteLockWithUpgrade implements ReadWriteLock JavaDoc
56 {
57    private long activeReaders_ = 0;
58    protected Thread JavaDoc activeWriter_ = null;
59    private long waitingReaders_ = 0;
60    private long waitingWriters_ = 0;
61    private long waitingUpgrader_ = 0;
62    // Store a default object to signal that we are upgrade thread.
63
//protected final ThreadLocal upgraderLocal_ = new ThreadLocal();
64
protected static final Map JavaDoc upgraderLocal_ = new ThreadLocalMap();
65    protected static final Object JavaDoc dummy_ = new Object JavaDoc();
66    protected final ReaderLock readerLock_ = new ReaderLock();
67    protected final WriterLock writerLock_ = new WriterLock();
68    protected static final Log log_ = LogFactory.getLog(ReadWriteLockWithUpgrade.class);
69
70
71    public String JavaDoc toString()
72    {
73       StringBuffer JavaDoc sb = new StringBuffer JavaDoc();
74       sb.append("activeReaders=").append(activeReaders_).append(", activeWriter=").append(activeWriter_);
75       sb.append(", waitingReaders=").append(waitingReaders_).append(", waitingWriters=").append(waitingWriters_);
76       sb.append(", waitingUpgrader=").append(waitingUpgrader_);
77       return sb.toString();
78    }
79
80
81    public Lock JavaDoc writeLock()
82    {
83       return writerLock_;
84    }
85
86    public Lock JavaDoc readLock()
87    {
88       return readerLock_;
89    }
90
91    /**
92     * Attempt to obtain an upgrade to writer lock. If successful, the read lock is upgraded to
93     * write lock. If fails, the owner retains the read lock.
94     *
95     * @param msecs Time to wait in millisecons.
96     * @return Sync object. Null if not successful or timeout.
97     */

98    public Lock JavaDoc upgradeLockAttempt(long msecs) throws UpgradeException
99    {
100       if (activeReaders_ == 0)
101          throw new RuntimeException JavaDoc("No reader lock available for upgrade");
102
103       synchronized (writerLock_)
104       {
105          if (waitingUpgrader_ >= 1)
106          {
107             String JavaDoc errStr = "upgradeLockAttempt(): more than one reader trying to simultaneously upgrade to write lock";
108             log_.error(errStr);
109             throw new UpgradeException(errStr);
110          }
111          waitingUpgrader_++;
112          upgraderLocal_.put(this, dummy_);
113       }
114
115       // If there is only one reader left, switch to write lock immediately.
116
// If there is more than one reader, release this one and acquire the write
117
// lock. There is still a chance for deadlock when there are two reader locks
118
// and suddenly the second lock is released when this lock is released and acquired
119
// as is else case. Solution is to let it timeout.
120
if (activeReaders_ == 1)
121       {
122          resetWaitingUpgrader();
123          return changeLock();
124       }
125       else
126       {
127          readerLock_.unlock();
128          try
129          {
130             if (!writerLock_.tryLock(msecs, TimeUnit.MILLISECONDS))
131             {
132                log_.error("upgradeLock(): failed");
133                resetWaitingUpgrader();
134
135                if (!readerLock_.tryLock(msecs, TimeUnit.MILLISECONDS))
136                {
137                   String JavaDoc errStr = "ReadWriteLockWithUpgrade.upgradeLockAttempt():" +
138                           " failed to upgrade to write lock and also failed to re-obtain the read lock";
139                   log_.error(errStr);
140                   throw new IllegalStateException JavaDoc(errStr);
141                }
142                return null;
143             }
144             resetWaitingUpgrader();
145          }
146          catch (InterruptedException JavaDoc ex)
147          {
148             resetWaitingUpgrader();
149             return null;
150          }
151
152          return writerLock_;
153       }
154    }
155
156    private void resetWaitingUpgrader()
157    {
158       synchronized (writerLock_)
159       {
160          waitingUpgrader_--;
161          upgraderLocal_.remove(this);
162       }
163    }
164
165    protected synchronized Lock JavaDoc changeLock()
166    {
167       --activeReaders_;
168
169       if (!startWrite())
170       {
171          // Something is wrong.
172
return null;
173       }
174
175       return writerLock_;
176    }
177
178    /*
179      A bunch of small synchronized methods are needed
180      to allow communication from the Lock objects
181      back to this object, that serves as controller
182    */

183    protected synchronized void cancelledWaitingReader()
184    {
185       --waitingReaders_;
186    }
187
188    protected synchronized void cancelledWaitingWriter()
189    {
190       --waitingWriters_;
191    }
192
193    /**
194     * Override this method to change to reader preference *
195     */

196    protected boolean allowReader()
197    {
198       return activeWriter_ == null && waitingWriters_ == 0 && waitingUpgrader_ == 0;
199    }
200
201    protected synchronized boolean startRead()
202    {
203       boolean allowRead = allowReader();
204       if (allowRead)
205       {
206          ++activeReaders_;
207       }
208       return allowRead;
209    }
210
211    protected synchronized boolean startWrite()
212    {
213       // The allowWrite expression cannot be modified without
214
// also changing startWrite, so is hard-wired
215
boolean allowWrite = activeWriter_ == null && activeReaders_ == 0;
216       if (allowWrite) activeWriter_ = Thread.currentThread();
217       return allowWrite;
218    }
219
220    /*
221       Each of these variants is needed to maintain atomicity of wait counts during wait loops. They could be
222       made faster by manually inlining each other. We hope that compilers do this for us though.
223    */

224    protected synchronized boolean startReadFromNewReader()
225    {
226       boolean pass = startRead();
227       if (!pass) ++waitingReaders_;
228       return pass;
229    }
230
231    protected synchronized boolean startWriteFromNewWriter()
232    {
233       boolean pass = startWrite();
234       if (!pass) ++waitingWriters_;
235       return pass;
236    }
237
238    protected synchronized boolean startReadFromWaitingReader()
239    {
240       boolean pass = startRead();
241       if (pass) --waitingReaders_;
242       return pass;
243    }
244
245    protected synchronized boolean startWriteFromWaitingWriter()
246    {
247       boolean pass = startWrite();
248       if (pass) --waitingWriters_;
249       return pass;
250    }
251
252    /**
253     * Called upon termination of a read.
254     * Returns the object to signal to wake up a waiter, or null if no such
255     */

256    protected synchronized Signaller endRead()
257    {
258       if (activeReaders_ != 0 && --activeReaders_ == 0 && waitingWriters_ > 0)
259          return writerLock_;
260       else
261          return null;
262    }
263
264
265    /**
266     * Called upon termination of a write.
267     * Returns the object to signal to wake up a waiter, or null if no such
268     */

269    protected synchronized Signaller endWrite()
270    {
271       activeWriter_ = null;
272       if (waitingReaders_ > 0 && allowReader())
273          return readerLock_;
274       else if (waitingWriters_ > 0)
275          return writerLock_;
276       else
277          return null;
278    }
279
280
281    /**
282     * Reader and Writer requests are maintained in two different
283     * wait sets, by two different objects. These objects do not
284     * know whether the wait sets need notification since they
285     * don't know preference rules. So, each supports a
286     * method that can be selected by main controlling object
287     * to perform the notifications. This base class simplifies mechanics.
288     */

289
290    static interface Signaller
291    { // base for ReaderLock and WriterLock
292

293       void signalWaiters();
294    }
295
296    static abstract class LockBase implements Lock JavaDoc
297    {
298
299       public void lock()
300       {
301          throw new UnsupportedOperationException JavaDoc();
302       }
303
304       public void lockInterruptibly() throws InterruptedException JavaDoc
305       {
306          throw new UnsupportedOperationException JavaDoc();
307       }
308
309       public Condition JavaDoc newCondition()
310       {
311          throw new UnsupportedOperationException JavaDoc();
312       }
313
314       public boolean tryLock()
315       {
316          throw new UnsupportedOperationException JavaDoc();
317       }
318
319       /*
320       public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
321       {
322          throw new UnsupportedOperationException();
323       }
324
325       public void unlock()
326       {
327          throw new UnsupportedOperationException();
328       }
329       */

330
331    }
332
333    protected class ReaderLock extends LockBase implements Signaller, Lock JavaDoc
334    {
335
336       public void unlock()
337       {
338          Signaller s = endRead();
339          if (s != null)
340          {
341             s.signalWaiters();
342          }
343       }
344
345       public synchronized void signalWaiters()
346       {
347          ReaderLock.this.notifyAll();
348       }
349
350       public boolean tryLock(long time, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc
351       {
352          if (Thread.interrupted()) throw new InterruptedException JavaDoc();
353          long msecs = unit.toMillis(time);
354          InterruptedException JavaDoc ie = null;
355          synchronized (this)
356          {
357             if (msecs <= 0)
358                return startRead();
359             else if (startReadFromNewReader())
360                return true;
361             else
362             {
363                long waitTime = msecs;
364                long start = System.currentTimeMillis();
365                while (true)
366                {
367                   try
368                   {
369                      ReaderLock.this.wait(waitTime);
370                   }
371                   catch (InterruptedException JavaDoc ex)
372                   {
373                      cancelledWaitingReader();
374                      ie = ex;
375                      break;
376                   }
377                   if (startReadFromWaitingReader())
378                      return true;
379                   else
380                   {
381                      waitTime = msecs - (System.currentTimeMillis() - start);
382                      if (waitTime <= 0)
383                      {
384                         cancelledWaitingReader();
385                         break;
386                      }
387                   }
388                }
389             }
390          }
391          // safeguard on interrupt or timeout:
392
writerLock_.signalWaiters();
393          if (ie != null)
394             throw ie;
395          else
396             return false; // timed out
397
}
398
399    }
400
401    protected class WriterLock extends LockBase implements Signaller, Lock JavaDoc
402    {
403
404       public void unlock()
405       {
406          Signaller s = endWrite();
407          if (s != null) s.signalWaiters();
408       }
409
410       // Waking up all thread in waiting now for them to compete.
411
// Thread with higher priority, i.e., upgrading, will win.
412
public synchronized void signalWaiters()
413       {
414          WriterLock.this.notifyAll();
415       }
416
417       public boolean tryLock(long time, TimeUnit JavaDoc unit) throws InterruptedException JavaDoc
418       {
419          if (Thread.interrupted()) throw new InterruptedException JavaDoc();
420          InterruptedException JavaDoc ie = null;
421          long msecs = unit.toMillis(time);
422
423          synchronized (WriterLock.this)
424          {
425             if (msecs <= 0)
426             {
427                // Upgrade thread has prioirty.
428
if (waitingUpgrader_ != 0)
429                {
430                   if (upgraderLocal_.get(ReadWriteLockWithUpgrade.this) != null)
431                   {
432                      log_.info("attempt(): upgrade to write lock");
433                      return startWrite();
434                   }
435                   else
436                      return false;
437                }
438                else
439                   return startWrite();
440             }
441             else if (startWriteFromNewWriter())
442                return true;
443             else
444             {
445                long waitTime = msecs;
446                long start = System.currentTimeMillis();
447                while (true)
448                {
449                   try
450                   {
451                      WriterLock.this.wait(waitTime);
452                   }
453                   catch (InterruptedException JavaDoc ex)
454                   {
455                      cancelledWaitingWriter();
456                      WriterLock.this.notifyAll();
457                      ie = ex;
458                      break;
459                   }
460
461                   if (waitingUpgrader_ != 0)
462                   { // Has upgrade request
463
if (upgraderLocal_.get(ReadWriteLockWithUpgrade.this) != null)
464                      { // Upgrade thread
465
if (startWriteFromWaitingWriter())
466                            return true;
467                      }
468                      else
469                      { // Normal write thread, go back to wait.
470
continue;
471                      }
472                   }
473                   else
474                   { // Every one is normal write thread. Compete; if fail go back to wait.
475
if (startWriteFromWaitingWriter())
476                         return true;
477                   }
478
479                   waitTime = msecs - (System.currentTimeMillis() - start);
480                   if (waitTime <= 0)
481                   {
482                      cancelledWaitingWriter();
483                      WriterLock.this.notifyAll();
484                      break;
485                   }
486                }
487             }
488          }
489
490          readerLock_.signalWaiters();
491          if (ie != null)
492             throw ie;
493          else
494             return false; // timed out
495
}
496
497    }
498
499
500 }
501
502
Popular Tags