1 24 25 package org.objectweb.perseus.concurrency.distributed.globallock.lib; 26 27 import org.objectweb.perseus.distribution.api.DistResCoordinator; 28 import org.objectweb.perseus.distribution.api.DistResCoordinatorService; 29 import org.objectweb.perseus.distribution.api.NotCoordinatorException; 30 import org.objectweb.perseus.concurrency.lib.LockValue; 31 32 import java.util.HashSet ; 33 import java.util.LinkedList ; 34 import java.util.Iterator ; 35 import java.util.Map ; 36 import java.util.Random ; 37 import java.io.Serializable ; 38 39 63 64 public class GlobalLockCoordinator implements DistResCoordinator, Serializable { 65 66 static final boolean trace = true; 67 private Object objId; 69 private HashSet users; private byte maxGrantedLock; 72 private LinkedList waiting; private byte requestedDowngrade; private int nbDowngradeRequests; 77 transient DistResCoordinatorService drcs; transient LockValue lockValue; transient private int lastCBnumber; 81 82 83 89 90 public GlobalLockCoordinator(Object resId, DistResCoordinatorService drcs, 91 LockValue lockValue) { 92 initCopy(resId, drcs, lockValue); 93 users = new HashSet (); 94 waiting = new LinkedList (); 95 maxGrantedLock = LockValue.NOLOCK; 97 requestedDowngrade = lockValue.maxValue(); 99 nbDowngradeRequests = 0; 100 lastCBnumber = (new Random ()).nextInt(); 104 } 105 106 public GlobalLockCoordinator(GlobalLockCoordinator glc) { 107 this.objId = glc.objId; 108 this.users = glc.users; 109 this.maxGrantedLock = glc.maxGrantedLock; 110 this.waiting = glc.waiting; 111 this.requestedDowngrade = glc.requestedDowngrade; 112 this.nbDowngradeRequests = glc.nbDowngradeRequests; 113 this.drcs = glc.drcs; 114 this.lockValue = glc.lockValue; 115 } 116 synchronized public void receive(Object objId, Serializable user, 118 Serializable msg) { 119 if (!objId.equals(this.objId)) throw new InternalError (); 120 if (msg instanceof GlobalLockMessage) { 121 GlobalLockMessage req = (GlobalLockMessage) msg; 122 if (req.type == GlobalLockMessage.UPGRADE_REQUEST) 123 upgrade(user, req.lck); 124 else if (req.type == GlobalLockMessage.DOWNGRADE_NOTIFY) 125 downgrade(user, req.lck, req.serialNumber); 126 else if (req.type == GlobalLockMessage.UPGRADE_CANCEL) 127 cancelUpgrade(user); 128 else throw new InternalError (); 129 } else throw new InternalError ("Unexpected message type"); 130 } 131 132 synchronized public Serializable freeze(Object resId) { 133 if (trace) trace("Freeze !!"); 134 return this; 135 } 136 137 154 synchronized public boolean joinUsersRequest(Object resId, 155 Serializable node) { 156 users.add(node); 157 if (trace) trace("Accept user: " + node); 158 return true; 159 } 160 161 public void recover(Object resId, Map userStates) { 162 163 for (Iterator it = userStates.entrySet().iterator(); it.hasNext();) { 164 Map.Entry entry = (Map.Entry ) it.next(); 165 Serializable nodeId = (Serializable ) entry.getKey(); 166 GlobalLockUser glu = (GlobalLockUser) entry.getValue(); 167 if (trace) trace("Received state from " + nodeId + ": " + glu); 168 users.add(nodeId); 169 if (maxGrantedLock < glu.locallyGranted) maxGrantedLock 170 = glu.locallyGranted; 171 if (glu.globallyRequested != LockValue.NOLOCK) 172 waiting.addLast(new Waiter(nodeId, glu.globallyRequested)); 173 } 174 if (!waiting.isEmpty()) { 175 Waiter firstWaiting = (Waiter) waiting.get(0); 176 callBackLocks(firstWaiting.nodeId, firstWaiting.lck); 177 } 178 } 179 180 public void nodeFailed(Object resId, Serializable nodeId) { 181 for (Iterator it = waiting.iterator(); it.hasNext();) { 183 Waiter waiter = (Waiter) it.next(); 184 if (waiter.nodeId.equals(nodeId)) 185 it.remove(); 186 } 187 188 } 189 190 192 void initCopy(Object resId, DistResCoordinatorService drcs, 194 LockValue lockValue) { 195 this.objId = resId; 196 this.drcs = drcs; 197 this.lockValue = lockValue; 198 199 } 200 201 202 private void upgrade(Serializable n, byte lck) { 204 try { 205 if (trace) trace( 206 "From: " + n + " GLOBAL REQUEST: " + lck); 207 208 if (requestedDowngrade != lockValue.maxValue()) { 210 if (trace) System.out.println(this 211 + "From: " + n + " ALREADY PENDING REQUEST: " + lck); 212 waiting.addLast(new Waiter(n, lck)); 213 if (trace) System.out.println(this 214 + "From: " + n + " GLOBAL REQUEST QUEUED (NOT FIRST): " 215 + lck); 216 return; 217 } 218 219 if (lockValue.isCompatibleWith(lck, maxGrantedLock)) { if (maxGrantedLock < lck) maxGrantedLock = lck; 224 225 drcs.sendToUser(objId, 227 new GlobalLockMessage(GlobalLockMessage.UPGRADE_NOTIFY, 228 lck, (long) 0, null), n); 229 return; 230 } 231 if (lockValue.isCompatibleWith(lck, maxGrantedLock) || 234 (users.size() == 1 && users.contains(n)) ) { maxGrantedLock = lck; 240 241 drcs.sendToUser(objId, 243 new GlobalLockMessage(GlobalLockMessage.UPGRADE_NOTIFY, 244 lck, (long) 0, null), n); 245 return; 246 } 247 248 byte downgradeLock = lockValue.getCompatibleWith(lockValue.maxValue(), 250 lck); 251 Waiter w= new Waiter(n, lck); 253 waiting.addLast(w); 254 callBackLocks(n, downgradeLock); 255 256 if (trace) trace("From: " 257 + n + " GLOBAL REQUEST QUEUED (FIRST): " + lck); 258 259 } catch (NotCoordinatorException e) { 260 throw new InternalError ("Not coordinator of this resource !!!"); 262 } 263 } 264 265 private void callBackLocks(Serializable n, byte downgradeLock) { 266 try { 267 268 requestedDowngrade = downgradeLock; 270 271 HashSet dest = new HashSet (users); 273 dest.remove(n); 274 nbDowngradeRequests = dest.size(); 275 if (trace) trace("From: " 276 + n + " SND CALLBACK: " + downgradeLock + " TO: " + dest); 277 lastCBnumber++; 278 drcs.sendToUsers(objId, 279 new GlobalLockMessage(GlobalLockMessage.DOWNGRADE_REQUEST, 280 downgradeLock, (long) 0, n, lastCBnumber), dest); 281 282 } catch (NotCoordinatorException e) { 283 throw new InternalError ("Not coordinator of this resource !!!"); 285 } 286 } 287 288 private void downgrade(Object n, byte lck, int serialNumber) { 289 try { 290 if (serialNumber != lastCBnumber) { 297 if (trace) 300 trace("From: " + n + " GLOBAL RELEASE IGNORED " + lck); 301 return; 302 } 303 nbDowngradeRequests--; 305 306 if (trace) trace("From: " + n + " GLOBAL RELEASE: " + lck); 307 308 if (nbDowngradeRequests > 0) 310 return; 311 312 maxGrantedLock = requestedDowngrade; 314 315 requestedDowngrade = lockValue.maxValue(); 317 318 for (Iterator it = waiting.iterator(); it.hasNext();) { 321 Waiter w = (Waiter) it.next(); 322 if (lockValue.isCompatibleWith(w.lck, maxGrantedLock)) { 323 if (maxGrantedLock < w.lck) maxGrantedLock = w.lck; 324 it.remove(); 325 if (trace) trace("To: " 326 + w.nodeId + " NOTIFY GLOBAL UPGRADE: " + lck); 327 drcs.sendToUser(objId, 328 new GlobalLockMessage(GlobalLockMessage.UPGRADE_NOTIFY, 329 w.lck, (long) 0, null), 330 w.nodeId); 331 } else { 332 callBackLocks(w.nodeId, 333 lockValue.getCompatibleWith(lockValue.maxValue(), 334 w.lck)); 335 break; 336 } 337 } 338 339 } catch (NotCoordinatorException e) { 340 throw new InternalError ("Not coordinator of this resource !!!"); 342 } 343 } 344 private void cancelUpgrade(Object n) { 345 Waiter w = null; 346 for (Iterator it = waiting.iterator(); it.hasNext();) { 347 w = (Waiter) it.next(); 348 if (w.nodeId.equals(n)) 349 break; 350 } 351 if ((w == null) || !w.nodeId.equals(n)) { 352 if (trace) trace("From: " + n + " UNEXPECTED CANCELATION !! (IGNORE IT)"); 353 } 354 355 if (waiting.indexOf(w) == 0) { 356 if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (FIRST WAITING)"); 357 waiting.removeFirst(); 358 if (!waiting.isEmpty()) { 359 w = (Waiter) waiting.getFirst(); 360 if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (NEW CALL BACK)"); 361 callBackLocks(w.nodeId, 362 lockValue.getCompatibleWith(lockValue.maxValue(), 363 w.lck)); 364 } else { 365 requestedDowngrade = lockValue.maxValue(); 367 nbDowngradeRequests = 0; 368 if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (NO MORE PENDING)"); 369 } 370 } else { 371 waiting.remove(w); 372 if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (NOT FIRST WAITING)"); 373 } 374 } 375 private class Waiter implements Serializable { 377 public Serializable nodeId; 378 byte lck; 379 Waiter(Serializable n, byte lck) { 381 this.lck = lck; 382 this.nodeId = n; 383 } 384 } 385 386 387 public String toString() { 388 return "=====MASTER OF " + objId 389 + "(" + drcs.getNodeId() + ")[" + maxGrantedLock 390 + requestedDowngrade + "(" + nbDowngradeRequests 391 + "/" + users.size() + ")] T=" 392 + Thread.currentThread().hashCode() + " "; 393 } 394 395 private void trace(String s) { 396 System.out.println(this + " " + s); 397 } 398 399 400 } 401 | Popular Tags |