1 package org.jgroups.blocks; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.ChannelException; 6 7 import java.io.Serializable ; 8 import java.util.HashMap ; 9 10 16 public class DistributedLockManager implements TwoPhaseVotingListener, LockManager { 17 18 23 private static final long ACQUIRE_EXPIRATION = 5000; 24 25 29 private static final long VOTE_TIMEOUT = 10000; 30 31 private final HashMap preparedLocks = new HashMap (); 33 34 private final HashMap preparedReleases = new HashMap (); 36 37 private final HashMap heldLocks = new HashMap (); 39 40 private final TwoPhaseVotingAdapter votingAdapter; 41 42 private final Object id; 43 44 protected final Log log=LogFactory.getLog(getClass()); 45 46 47 58 public DistributedLockManager(VotingAdapter voteChannel, Object id) { 59 this(new TwoPhaseVotingAdapter(voteChannel), id); 60 } 61 62 72 public DistributedLockManager(TwoPhaseVotingAdapter channel, Object id) { 73 this.id = id; 74 this.votingAdapter = channel; 75 this.votingAdapter.addListener(this); 76 } 77 78 82 private boolean localLock(LockDecree lockDecree) { 83 removeExpired(lockDecree); 85 86 LockDecree localLock = 87 (LockDecree) heldLocks.get(lockDecree.getKey()); 88 89 if (localLock == null) { 90 91 lockDecree.commit(); 93 94 if (lockDecree.managerId.equals(id)) 97 heldLocks.put(lockDecree.getKey(), lockDecree); 98 99 return true; 101 } else 102 if (localLock.requester.equals(lockDecree.requester)) 103 return true; 105 else 106 return false; 108 109 } 110 111 118 private boolean canLock(LockDecree decree) { 119 removeExpired(decree); 121 122 LockDecree lock = (LockDecree)heldLocks.get(decree.getKey()); 123 if (lock == null) 124 return true; 125 else 126 return lock.requester.equals(decree.requester); 127 } 128 129 136 private boolean canRelease(LockDecree decree) { 137 removeExpired(decree); 139 140 LockDecree lock = (LockDecree)heldLocks.get(decree.getKey()); 143 if (lock == null) 144 return true; 146 else 147 return lock.requester.equals(decree.requester); 148 } 149 150 155 private void removeExpired(LockDecree decree) { 156 LockDecree localLock = (LockDecree)heldLocks.get(decree.getKey()); 158 if (localLock != null && !localLock.isValid()) 159 heldLocks.remove(localLock.getKey()); 160 } 161 162 167 private boolean localRelease(LockDecree lockDecree) { 168 removeExpired(lockDecree); 170 171 LockDecree localLock= 172 (LockDecree) heldLocks.get(lockDecree.getKey()); 173 174 if(localLock == null) { 175 return true; 177 } 178 else if(localLock.requester.equals(lockDecree.requester)) { 179 heldLocks.remove(lockDecree.getKey()); 181 return true; 182 } 183 else 184 return false; 186 } 187 188 203 public void lock(Object lockId, Object owner, int timeout) 204 throws LockNotGrantedException, ChannelException 205 { 206 if (!(lockId instanceof Serializable ) || !(owner instanceof Serializable )) 207 throw new ClassCastException ("DistributedLockManager " + 208 "works only with serializable objects."); 209 210 boolean acquired = votingAdapter.vote( 211 new AcquireLockDecree(lockId, owner, id), timeout); 212 213 if (!acquired) 214 throw new LockNotGrantedException("Lock cannot be granted."); 215 } 216 217 226 public void unlock(Object lockId, Object owner) 227 throws LockNotReleasedException, ChannelException 228 { 229 230 if (!(lockId instanceof Serializable ) || !(owner instanceof Serializable )) 231 throw new ClassCastException ("DistributedLockManager " + 232 "works only with serializable objects."); 233 234 235 boolean released = votingAdapter.vote( 236 new ReleaseLockDecree(lockId, owner, id), VOTE_TIMEOUT); 237 238 if (!released) 239 throw new LockNotReleasedException("Lock cannot be unlocked."); 240 } 241 242 254 private boolean checkPrepared(HashMap preparedContainer, 255 LockDecree requestedDecree) 256 { 257 LockDecree preparedDecree = 258 (LockDecree)preparedContainer.get(requestedDecree.getKey()); 259 260 if ((preparedDecree != null) && !preparedDecree.isValid()) { 262 preparedContainer.remove(preparedDecree.getKey()); 263 264 preparedDecree = null; 265 } 266 267 if (preparedDecree != null) { 268 if (requestedDecree.requester.equals(preparedDecree.requester)) 269 return true; 270 else 271 return false; 272 } else 273 return true; 275 } 276 277 288 public synchronized boolean prepare(Object decree) throws VoteException { 289 if (!(decree instanceof LockDecree)) 290 throw new VoteException("Uknown decree type. Ignore me."); 291 292 if (decree instanceof AcquireLockDecree) { 293 AcquireLockDecree acquireDecree = (AcquireLockDecree)decree; 294 if(log.isDebugEnabled()) log.debug("Preparing to acquire decree " + acquireDecree.lockId); 295 296 if (!checkPrepared(preparedLocks, acquireDecree)) 297 return false; 299 300 if (canLock(acquireDecree)) { 301 preparedLocks.put(acquireDecree.getKey(), acquireDecree); 302 return true; 303 } else 304 return false; 306 } else 307 if (decree instanceof ReleaseLockDecree) { 308 ReleaseLockDecree releaseDecree = (ReleaseLockDecree)decree; 309 310 311 if(log.isDebugEnabled()) log.debug("Preparing to release decree " + releaseDecree.lockId); 312 313 if (!checkPrepared(preparedReleases, releaseDecree)) 314 return false; 316 317 if (canRelease(releaseDecree)) { 318 preparedReleases.put(releaseDecree.getKey(), releaseDecree); 319 return true; 321 } else 322 return false; 324 } 325 326 return false; 328 } 329 330 341 public synchronized boolean commit(Object decree) throws VoteException { 342 if (!(decree instanceof LockDecree)) 343 throw new VoteException("Uknown decree type. Ignore me."); 344 345 if (decree instanceof AcquireLockDecree) { 346 347 348 if(log.isDebugEnabled()) log.debug("Committing decree acquisition " + ((LockDecree)decree).lockId); 349 350 if (!checkPrepared(preparedLocks, (LockDecree)decree)) 351 return false; 353 354 if (localLock((LockDecree)decree)) { 355 preparedLocks.remove(((LockDecree)decree).getKey()); 356 return true; 357 } else 358 return false; 359 } else 360 if (decree instanceof ReleaseLockDecree) { 361 362 363 if(log.isDebugEnabled()) log.debug("Committing decree release " + ((LockDecree)decree).lockId); 364 365 if (!checkPrepared(preparedReleases, (LockDecree)decree)) 366 return false; 368 369 if (localRelease((LockDecree)decree)) { 370 preparedReleases.remove(((LockDecree)decree).getKey()); 371 return true; 372 } else 373 return false; 374 } 375 376 return false; 378 } 379 380 389 public synchronized void abort(Object decree) throws VoteException { 390 if (!(decree instanceof LockDecree)) 391 throw new VoteException("Uknown decree type. Ignore me."); 392 393 if (decree instanceof AcquireLockDecree) { 394 395 396 if(log.isDebugEnabled()) log.debug("Aborting decree acquisition " + ((LockDecree)decree).lockId); 397 398 if (!checkPrepared(preparedLocks, (LockDecree)decree)) 399 return; 401 402 preparedLocks.remove(((LockDecree)decree).getKey()); 403 } else 404 if (decree instanceof ReleaseLockDecree) { 405 406 407 if(log.isDebugEnabled()) log.debug("Aborting decree release " + ((LockDecree)decree).lockId); 408 409 if (!checkPrepared(preparedReleases, (LockDecree)decree)) 410 return; 412 413 preparedReleases.remove(((LockDecree)decree).getKey()); 414 } 415 416 } 417 418 421 public static class LockDecree implements Serializable { 422 423 protected final Object lockId; 424 protected final Object requester; 425 protected final Object managerId; 426 427 protected boolean commited; 428 429 private LockDecree(Object lockId, Object requester, Object managerId) { 430 this.lockId = lockId; 431 this.requester = requester; 432 this.managerId = managerId; 433 } 434 435 438 public Object getKey() { return lockId; } 439 440 443 public boolean isValid() { return true; } 444 445 public void commit() { this.commited = true; } 446 447 448 451 public int hashCode() { 452 return lockId.hashCode(); 453 } 454 455 public boolean equals(Object other) { 456 457 if (other instanceof LockDecree) { 458 return ((LockDecree)other).lockId.equals(this.lockId); 459 } else { 460 return false; 461 } 462 } 463 } 464 465 466 469 public static class AcquireLockDecree extends LockDecree { 470 private final long creationTime; 471 472 private AcquireLockDecree(LockDecree lockDecree) { 473 this(lockDecree.lockId, lockDecree.requester, lockDecree.managerId); 474 } 475 476 private AcquireLockDecree(Object lockId, Object requester, Object managerId) { 477 super(lockId, requester, managerId); 478 this.creationTime = System.currentTimeMillis(); 479 } 480 481 486 public boolean isValid() { 487 boolean result = super.isValid(); 488 489 if (!commited && result) 490 result = ((creationTime + ACQUIRE_EXPIRATION) > System.currentTimeMillis()); 491 492 return result; 493 } 494 495 } 496 497 500 public static class ReleaseLockDecree extends LockDecree { 501 ReleaseLockDecree(Object lockId, Object requester, Object managerId) { 502 super(lockId, requester, managerId); 503 } 504 } 505 } 506 | Popular Tags |