1 25 26 package org.objectweb.perseus.concurrency.pessimistic; 27 28 import org.objectweb.perseus.concurrency.api.ConcurrencyException; 29 import org.objectweb.perseus.concurrency.api.RolledBackConcurrencyException; 30 import org.objectweb.perseus.concurrency.lib.LockValue; 31 import org.objectweb.perseus.concurrency.lib.RWLockValue; 32 import org.objectweb.perseus.concurrency.lib.ReadWriteLockValue; 33 import org.objectweb.perseus.concurrency.lib.Semaphore; 34 import org.objectweb.perseus.dependency.api.DependencyGraph; 35 import org.objectweb.util.monolog.api.BasicLevel; 36 37 import java.util.Collection ; 38 import java.util.HashSet ; 39 import java.util.Iterator ; 40 import java.util.List ; 41 import java.util.ArrayList ; 42 import java.util.Map ; 43 import java.util.Set ; 44 45 52 public final class RWFifoLock extends Lock { 53 54 protected final static LockValue LOCKS = new ReadWriteLockValue(); 55 56 59 List owners = new ArrayList (); 60 61 65 List waiters = new ArrayList (); 66 67 byte lockLevel = ReadWriteLockValue.NOLOCK; 68 69 protected final Semaphore semaphore = new Semaphore(); 70 71 public RWFifoLock() { 72 } 73 74 public RWFifoLock(Object hints, DependencyGraph dg) { 75 super(hints, dg); 76 } 77 78 85 private final boolean acceptLockRequest(final Object task, 86 final byte askedLock, final int waiterIdx) { 87 if (owners.isEmpty()) { 88 return true; 89 } else if (owners.contains(task)) { 90 if (LOCKS.isCompatibleWith(askedLock, lockLevel)) { 92 return waiterIdx == 0; } else if (owners.size() == 1) { 95 return true; 101 } 102 } else if (LOCKS.isCompatibleWith(askedLock, lockLevel) 103 && waiterIdx == 0) { 104 return true; 105 } 106 return false; 107 } 108 109 117 private final void accessIntention(final Object task, final byte askedLock) 118 throws RolledBackConcurrencyException { 119 try { 120 boolean debug = logger != null && logger.isLoggable(BasicLevel.DEBUG); 121 semaphore.P(); 122 final int ws = waiters.size(); 123 reservations --; 124 if (acceptLockRequest(task, askedLock, ws)) { 125 if (debug) { 126 logger.log(BasicLevel.DEBUG, LOCKS.str(askedLock) + " lock accepted directly," 127 + "\n\ttask:" + task + "\n\towners=" + owners); 128 } 129 this.lockLevel = (byte) Math.max(this.lockLevel, askedLock); 130 if (!owners.contains(task)) { 131 owners.add(task); 132 } 133 semaphore.V(); 134 return; 135 } 136 138 List dependencies; if (ws == 0) { 141 dependencies = owners; 143 } else { 144 LockRequest tmp = null; 146 int i; 147 for(i = ws -1; i >= 0; i--) { 149 tmp = (LockRequest) waiters.get(i); 150 if (LOCKS.isCompatibleWith(askedLock, tmp.getLockLevel())) { 151 } else { 152 break; 153 } 154 } 155 if (i>=0) { 156 dependencies = new ArrayList (); 159 byte l = tmp.getLockLevel(); 160 do { 161 tmp = (LockRequest) waiters.get(i); 162 dependencies.add(tmp.task); 163 if (!LOCKS.isCompatibleWith(tmp.getLockLevel(), l)) { 164 break; 165 } 166 i--; 167 } while (i>=0); 168 } else { 169 dependencies = owners; 171 } 172 } 173 if (dg.addVertexes(task, dependencies) != -1) { 175 if (logger != null && logger.isLoggable(BasicLevel.INFO)) { 176 logger.log(BasicLevel.INFO, LOCKS.str(askedLock) 177 + " request: dead lock detected, cancel the task," 178 + "\n\ttask:" + task); 179 } 180 dg.removeVertexes(task, dependencies); 182 semaphore.V(); 184 throw new RolledBackConcurrencyException("Deadlock"); 185 } 186 if (debug) { 187 188 } 189 LockRequest lockRequest = new LockRequest(askedLock, task); 191 waiters.add(lockRequest); 192 193 if (debug) { 194 StringBuffer sb = new StringBuffer (); 195 sb.append(LOCKS.str(lockRequest.getLockLevel())); 196 sb.append(" lock waiting, rank="); 197 sb.append(waiters.indexOf(lockRequest)); 198 sb.append(", task="); 199 sb.append(task); 200 sb.append("\n\towners="); 201 sb.append(owners); 202 printDG(sb); 203 logger.log(BasicLevel.DEBUG, sb.toString()); 204 } 205 try { 207 synchronized(lockRequest) { 208 semaphore.V(); 209 lockRequest.wait(); 210 } 211 if (lockRequest.hasRolledBack()) { 212 forgetWaiter(lockRequest); 213 throw new RolledBackConcurrencyException( 214 "The working set has been rolled back by another thread"); 215 } 216 } catch (InterruptedException e) { 217 forgetWaiter(lockRequest); 218 throw new RolledBackConcurrencyException("Waiting of a " 219 + LOCKS.str(lockRequest.getLockLevel()) 220 + " intention has been interupted:", e); 221 } 222 if (debug) { 223 logger.log(BasicLevel.DEBUG, LOCKS.str(lockRequest.getLockLevel()) 224 + " Wake up\n\ttask:" + task); 225 } 226 } catch(Error t) { 227 logger.log(BasicLevel.ERROR, "accessIntention: " + t.getMessage(),t); 228 t.printStackTrace(); 229 } 230 231 } 232 233 237 private final void forgetWaiter(final LockRequest lr) { 238 boolean debug = logger != null && logger.isLoggable(BasicLevel.DEBUG); 239 semaphore.P(); 240 int idx = waiters.indexOf(lr); 241 if (debug) { 242 logger.log(BasicLevel.DEBUG, "forgetLockRequest: lock: " 243 + LOCKS.str(lr.getLockLevel()) 244 + ", waiter rank=" + idx + "/" + waiters.size() 245 + ", task=" + lr.task); 246 } 247 if (idx == -1 || waiters.size() == 0) { 248 semaphore.V(); 249 return; 250 } 251 waiters.remove(idx); 252 253 int i; 255 LockRequest tmp = null; 256 for(i = idx -1; i>=0; i--) { 257 tmp = (LockRequest) waiters.get(i); 258 if (!LOCKS.isCompatibleWith( 259 lr.getLockLevel(), 260 tmp.getLockLevel())) { 261 break; 262 } 263 } 264 if (i>=0) { 265 byte l = tmp.getLockLevel(); 266 do { 267 tmp = (LockRequest) waiters.get(i); 268 dg.removeVertex(lr.task, tmp.task); 269 if (!LOCKS.isCompatibleWith(tmp.getLockLevel(), l)) { 270 break; 271 } 272 i--; 273 } while (i>=0); 274 } else { 275 dg.removeVertexes(lr.task, owners); 276 } 277 278 final int ws = waiters.size(); 280 for(i = idx; i<ws; i++) { 282 tmp = (LockRequest) waiters.get(i); 283 if (!LOCKS.isCompatibleWith( 284 tmp.getLockLevel(), 285 lr.getLockLevel())) { 286 break; 287 } 288 } 289 if (i<ws) { 290 byte l = tmp.getLockLevel(); 291 do { 292 tmp = (LockRequest) waiters.get(i); 293 dg.removeVertex(tmp.task, lr.task); 294 if (!LOCKS.isCompatibleWith(tmp.getLockLevel(), l)) { 295 break; 296 } 297 i++; 298 } while (i<ws); 299 } 300 semaphore.V(); 301 } 302 303 308 public void readIntention(Object task) 309 throws ConcurrencyException { 310 accessIntention(task, RWLockValue.READ); 311 } 312 313 318 public void writeIntention(Object task) 319 throws ConcurrencyException { 320 accessIntention(task, ReadWriteLockValue.WRITE); 321 } 322 323 331 public boolean close(Object task) { 332 try { 333 semaphore.P(); 334 boolean isowner = owners.remove(task); 335 if (isowner) { 336 byte tmpLock = ReadWriteLockValue.NOLOCK; 337 for(int i=0; i<waiters.size();i++) { 339 LockRequest lr = (LockRequest) waiters.get(i); 340 dg.removeVertex(lr.task, task); 341 if (LOCKS.isCompatibleWith(lr.getLockLevel(), tmpLock)) { 342 tmpLock = (byte) Math.max(lr.getLockLevel(), tmpLock); 343 } else { 344 break; 345 } 346 } 347 if (owners.isEmpty()) { 348 lockLevel = ReadWriteLockValue.NOLOCK; 349 } 350 } else { 351 } 358 int nbNotified = 0; 360 while(!waiters.isEmpty()) { 361 LockRequest lr = (LockRequest) waiters.get(0); 362 if (!acceptLockRequest(lr.task, lr.getLockLevel(), 0)) { 363 break; 364 } 365 waiters.remove(0); 366 dg.removeVertexes(lr.task, owners); 367 if (!owners.contains(lr.task)) { 368 owners.add(lr.task); 369 } 370 lockLevel = (byte) Math.max(lr.getLockLevel(), lockLevel); 371 synchronized(lr) { 372 lr.notify(); 373 } 374 nbNotified++; 375 } 376 if (logger != null && logger.isLoggable(BasicLevel.DEBUG)) { 377 StringBuffer sb = new StringBuffer (); 378 if (isowner) { 379 sb.append("task closed: "); 380 } else { 381 sb.append("task closed without lock: "); 382 } 383 sb.append(task); 384 sb.append(" / new notified: "); 385 sb.append(nbNotified); 386 sb.append(" / reservations:"); 387 sb.append(reservations); 388 sb.append(" / current lock: "); 389 sb.append(LOCKS.str(lockLevel)); 390 sb.append("\n\towners:"); 391 sb.append(owners); 392 sb.append("\n\twaiters:"); 393 sb.append(waiters); 394 printDG(sb); 395 logger.log(BasicLevel.DEBUG, sb.toString()); 396 } 397 try { 398 return reservations == 0 && owners.isEmpty(); 399 } finally { 400 semaphore.V(); 401 } 402 } catch(Error t) { 403 logger.log(BasicLevel.ERROR, "close: " + t.getMessage(),t); 404 t.printStackTrace(); 405 return false; 406 } 407 } 408 409 410 public synchronized byte getMax() { 411 return lockLevel; 412 } 413 414 private void printDG(StringBuffer sb) { 415 Map m = dg.getVertexes(); 416 if (m.size() > 0) { 417 int ws = waiters.size(); 418 sb.append("\ndependency Graph: "); 419 List waiters = new ArrayList (m.keySet()); 420 Set s = new HashSet (waiters.size() * 2); 421 s.addAll(m.keySet()); 422 for (Iterator it = ((Collection ) m.values()).iterator(); it.hasNext();) { 423 s.addAll((Collection ) it.next()); 424 } 425 List all = new ArrayList (s); 426 for (int i = 0; i <all.size() ; i++) { 427 Object t1 = all.get(i); 428 int t1Idx = all.indexOf(t1); 429 Collection dependencies = (Collection ) m.get(t1); 430 if (dependencies != null) { 431 for (Iterator it = dependencies.iterator(); it.hasNext();) { 432 sb.append("\nws"); 433 sb.append(t1Idx); 434 sb.append(" "); 435 sb.append(t1); 436 sb.append(" = > "); 437 sb.append("ws"); 438 Object t2 = it.next(); 439 sb.append(all.indexOf(t2)); 440 sb.append(" "); 441 sb.append(t2); 442 } 443 } 444 } 445 } 446 } 447 448 } 449 450 final class LockRequest { 451 454 private byte lockLevel; 455 456 459 final Object task; 460 461 469 LockRequest(final byte lockLevel, final Object task) { 470 this.task = task; 471 this.lockLevel = lockLevel; 472 } 473 474 final byte getLockLevel() { 475 return lockLevel; 476 } 477 478 Object getTask() { 479 return task; 480 } 481 482 boolean hasRolledBack() { 483 return false; 484 } 485 486 public final int hashCode() { 487 return task.hashCode(); 488 } 489 490 public final boolean equals(Object o) { 491 return lockLevel == ((LockRequest) o).lockLevel 492 && task.equals(((LockRequest) o).task); 493 } 494 495 public final String toString() { 496 return "lockLevel: " + lockLevel + " / tasks: " + task; 497 } 498 } 499 | Popular Tags |