1 7 8 package org.jboss.cache.lock; 9 10 import org.apache.commons.logging.Log; 11 import org.apache.commons.logging.LogFactory; 12 13 import java.util.Map ; 14 import java.util.concurrent.TimeUnit ; 15 import java.util.concurrent.locks.Condition ; 16 import java.util.concurrent.locks.Lock ; 17 import java.util.concurrent.locks.ReadWriteLock ; 18 19 55 public class ReadWriteLockWithUpgrade implements ReadWriteLock 56 { 57 private long activeReaders_ = 0; 58 protected Thread activeWriter_ = null; 59 private long waitingReaders_ = 0; 60 private long waitingWriters_ = 0; 61 private long waitingUpgrader_ = 0; 62 protected static final Map upgraderLocal_ = new ThreadLocalMap(); 65 protected static final Object dummy_ = new Object (); 66 protected final ReaderLock readerLock_ = new ReaderLock(); 67 protected final WriterLock writerLock_ = new WriterLock(); 68 protected static final Log log_ = LogFactory.getLog(ReadWriteLockWithUpgrade.class); 69 70 71 public String toString() 72 { 73 StringBuffer sb = new StringBuffer (); 74 sb.append("activeReaders=").append(activeReaders_).append(", activeWriter=").append(activeWriter_); 75 sb.append(", waitingReaders=").append(waitingReaders_).append(", waitingWriters=").append(waitingWriters_); 76 sb.append(", waitingUpgrader=").append(waitingUpgrader_); 77 return sb.toString(); 78 } 79 80 81 public Lock writeLock() 82 { 83 return writerLock_; 84 } 85 86 public Lock readLock() 87 { 88 return readerLock_; 89 } 90 91 98 public Lock upgradeLockAttempt(long msecs) throws UpgradeException 99 { 100 if (activeReaders_ == 0) 101 throw new RuntimeException ("No reader lock available for upgrade"); 102 103 synchronized (writerLock_) 104 { 105 if (waitingUpgrader_ >= 1) 106 { 107 String errStr = "upgradeLockAttempt(): more than one reader trying to simultaneously upgrade to write lock"; 108 log_.error(errStr); 109 throw new UpgradeException(errStr); 110 } 111 waitingUpgrader_++; 112 upgraderLocal_.put(this, dummy_); 113 } 114 115 if (activeReaders_ == 1) 121 { 122 resetWaitingUpgrader(); 123 return changeLock(); 124 } 125 else 126 { 127 readerLock_.unlock(); 128 try 129 { 130 if (!writerLock_.tryLock(msecs, TimeUnit.MILLISECONDS)) 131 { 132 log_.error("upgradeLock(): failed"); 133 resetWaitingUpgrader(); 134 135 if (!readerLock_.tryLock(msecs, TimeUnit.MILLISECONDS)) 136 { 137 String errStr = "ReadWriteLockWithUpgrade.upgradeLockAttempt():" + 138 " failed to upgrade to write lock and also failed to re-obtain the read lock"; 139 log_.error(errStr); 140 throw new IllegalStateException (errStr); 141 } 142 return null; 143 } 144 resetWaitingUpgrader(); 145 } 146 catch (InterruptedException ex) 147 { 148 resetWaitingUpgrader(); 149 return null; 150 } 151 152 return writerLock_; 153 } 154 } 155 156 private void resetWaitingUpgrader() 157 { 158 synchronized (writerLock_) 159 { 160 waitingUpgrader_--; 161 upgraderLocal_.remove(this); 162 } 163 } 164 165 protected synchronized Lock changeLock() 166 { 167 --activeReaders_; 168 169 if (!startWrite()) 170 { 171 return null; 173 } 174 175 return writerLock_; 176 } 177 178 183 protected synchronized void cancelledWaitingReader() 184 { 185 --waitingReaders_; 186 } 187 188 protected synchronized void cancelledWaitingWriter() 189 { 190 --waitingWriters_; 191 } 192 193 196 protected boolean allowReader() 197 { 198 return activeWriter_ == null && waitingWriters_ == 0 && waitingUpgrader_ == 0; 199 } 200 201 protected synchronized boolean startRead() 202 { 203 boolean allowRead = allowReader(); 204 if (allowRead) 205 { 206 ++activeReaders_; 207 } 208 return allowRead; 209 } 210 211 protected synchronized boolean startWrite() 212 { 213 boolean allowWrite = activeWriter_ == null && activeReaders_ == 0; 216 if (allowWrite) activeWriter_ = Thread.currentThread(); 217 return allowWrite; 218 } 219 220 224 protected synchronized boolean startReadFromNewReader() 225 { 226 boolean pass = startRead(); 227 if (!pass) ++waitingReaders_; 228 return pass; 229 } 230 231 protected synchronized boolean startWriteFromNewWriter() 232 { 233 boolean pass = startWrite(); 234 if (!pass) ++waitingWriters_; 235 return pass; 236 } 237 238 protected synchronized boolean startReadFromWaitingReader() 239 { 240 boolean pass = startRead(); 241 if (pass) --waitingReaders_; 242 return pass; 243 } 244 245 protected synchronized boolean startWriteFromWaitingWriter() 246 { 247 boolean pass = startWrite(); 248 if (pass) --waitingWriters_; 249 return pass; 250 } 251 252 256 protected synchronized Signaller endRead() 257 { 258 if (activeReaders_ != 0 && --activeReaders_ == 0 && waitingWriters_ > 0) 259 return writerLock_; 260 else 261 return null; 262 } 263 264 265 269 protected synchronized Signaller endWrite() 270 { 271 activeWriter_ = null; 272 if (waitingReaders_ > 0 && allowReader()) 273 return readerLock_; 274 else if (waitingWriters_ > 0) 275 return writerLock_; 276 else 277 return null; 278 } 279 280 281 289 290 static interface Signaller 291 { 293 void signalWaiters(); 294 } 295 296 static abstract class LockBase implements Lock 297 { 298 299 public void lock() 300 { 301 throw new UnsupportedOperationException (); 302 } 303 304 public void lockInterruptibly() throws InterruptedException 305 { 306 throw new UnsupportedOperationException (); 307 } 308 309 public Condition newCondition() 310 { 311 throw new UnsupportedOperationException (); 312 } 313 314 public boolean tryLock() 315 { 316 throw new UnsupportedOperationException (); 317 } 318 319 330 331 } 332 333 protected class ReaderLock extends LockBase implements Signaller, Lock 334 { 335 336 public void unlock() 337 { 338 Signaller s = endRead(); 339 if (s != null) 340 { 341 s.signalWaiters(); 342 } 343 } 344 345 public synchronized void signalWaiters() 346 { 347 ReaderLock.this.notifyAll(); 348 } 349 350 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 351 { 352 if (Thread.interrupted()) throw new InterruptedException (); 353 long msecs = unit.toMillis(time); 354 InterruptedException ie = null; 355 synchronized (this) 356 { 357 if (msecs <= 0) 358 return startRead(); 359 else if (startReadFromNewReader()) 360 return true; 361 else 362 { 363 long waitTime = msecs; 364 long start = System.currentTimeMillis(); 365 while (true) 366 { 367 try 368 { 369 ReaderLock.this.wait(waitTime); 370 } 371 catch (InterruptedException ex) 372 { 373 cancelledWaitingReader(); 374 ie = ex; 375 break; 376 } 377 if (startReadFromWaitingReader()) 378 return true; 379 else 380 { 381 waitTime = msecs - (System.currentTimeMillis() - start); 382 if (waitTime <= 0) 383 { 384 cancelledWaitingReader(); 385 break; 386 } 387 } 388 } 389 } 390 } 391 writerLock_.signalWaiters(); 393 if (ie != null) 394 throw ie; 395 else 396 return false; } 398 399 } 400 401 protected class WriterLock extends LockBase implements Signaller, Lock 402 { 403 404 public void unlock() 405 { 406 Signaller s = endWrite(); 407 if (s != null) s.signalWaiters(); 408 } 409 410 public synchronized void signalWaiters() 413 { 414 WriterLock.this.notifyAll(); 415 } 416 417 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 418 { 419 if (Thread.interrupted()) throw new InterruptedException (); 420 InterruptedException ie = null; 421 long msecs = unit.toMillis(time); 422 423 synchronized (WriterLock.this) 424 { 425 if (msecs <= 0) 426 { 427 if (waitingUpgrader_ != 0) 429 { 430 if (upgraderLocal_.get(ReadWriteLockWithUpgrade.this) != null) 431 { 432 log_.info("attempt(): upgrade to write lock"); 433 return startWrite(); 434 } 435 else 436 return false; 437 } 438 else 439 return startWrite(); 440 } 441 else if (startWriteFromNewWriter()) 442 return true; 443 else 444 { 445 long waitTime = msecs; 446 long start = System.currentTimeMillis(); 447 while (true) 448 { 449 try 450 { 451 WriterLock.this.wait(waitTime); 452 } 453 catch (InterruptedException ex) 454 { 455 cancelledWaitingWriter(); 456 WriterLock.this.notifyAll(); 457 ie = ex; 458 break; 459 } 460 461 if (waitingUpgrader_ != 0) 462 { if (upgraderLocal_.get(ReadWriteLockWithUpgrade.this) != null) 464 { if (startWriteFromWaitingWriter()) 466 return true; 467 } 468 else 469 { continue; 471 } 472 } 473 else 474 { if (startWriteFromWaitingWriter()) 476 return true; 477 } 478 479 waitTime = msecs - (System.currentTimeMillis() - start); 480 if (waitTime <= 0) 481 { 482 cancelledWaitingWriter(); 483 WriterLock.this.notifyAll(); 484 break; 485 } 486 } 487 } 488 } 489 490 readerLock_.signalWaiters(); 491 if (ie != null) 492 throw ie; 493 else 494 return false; } 496 497 } 498 499 500 } 501 502 | Popular Tags |