1 24 25 package org.objectweb.perseus.concurrency.distributed.globallock.lib; 26 27 import org.objectweb.perseus.distribution.api.DistResUser; 28 import org.objectweb.perseus.distribution.api.DistResUserService; 29 import org.objectweb.perseus.distribution.api.NotUserException; 30 import org.objectweb.perseus.concurrency.distributed.globallock.api.DeadLockException; 31 import org.objectweb.perseus.concurrency.distributed.globallock.lib.GlobalLockMessage; 32 import org.objectweb.perseus.concurrency.distributed.globallock.api.GlobalLock; 33 import org.objectweb.perseus.concurrency.distributed.globallock.api.GlobalLockWaiter; 34 import org.objectweb.perseus.concurrency.lib.RWLockValue; 35 import org.objectweb.perseus.concurrency.lib.LockValue; 36 37 import java.io.Serializable ; 38 import java.util.List ; 39 import java.util.LinkedList ; 40 import java.util.Iterator ; 41 42 85 86 public class GlobalLockUser 87 implements GlobalLock, DistResUser, Serializable { 88 89 static final boolean trace = true; 90 91 transient public DistResUserService drus; 92 93 97 transient Serializable objId; 99 byte locallyGranted; 101 byte locallyGrantable; 103 byte globallyRequested; 105 106 transient private List waiters; 108 109 transient int callBackSN; 112 113 public GlobalLockUser(Serializable objId, DistResUserService drus) { 114 common_init(objId, drus); 115 } 117 118 122 123 private void createState(){ 124 locallyGrantable = LockValue.NOLOCK; 126 locallyGranted = LockValue.NOLOCK; 127 globallyRequested = LockValue.NOLOCK; 128 waiters = new LinkedList (); 131 132 } 133 private void common_init(Serializable objId, DistResUserService drus) { 134 this.objId = objId; 135 this.drus = drus; 137 createState(); 141 } 142 143 144 synchronized public byte getGrantable() { 145 return locallyGrantable; 146 } 147 148 149 public GlobalLockWaiter upgrade(byte lck, boolean sync, long timeout) 150 throws DeadLockException, InterruptedException { 151 GlobalLockWaiterImpl w; 152 synchronized(this) { 153 if (trace) trace("LOCAL REQUEST: " + lck); 154 155 if (lck <= locallyGrantable) { 157 if (lck > locallyGranted) locallyGranted = lck; 159 if (trace) trace("LOCAL REQUEST GRANTED: " + lck); 160 return null; 161 } 162 w = waitForGrantable(lck, timeout); 163 } 164 165 if (sync) { 166 boolean ok = w.waitLock(timeout); 167 if (!ok) { 168 if (trace) trace("WAKE UP TIMED OUT (Waiting: " + lck + ")"); 169 throw new DeadLockException(); 170 } 171 172 if (trace) trace("WAKE UP (Waiting: " + lck + ")"); 173 w.signalHandled(); 174 return null; 175 } else { 176 return w; 177 } 178 } 179 180 181 synchronized public void downgrade(byte lck) { 182 if (trace) trace("LOCAL DOWNGRADE: " + lck); 183 boolean replyCB = ((locallyGrantable < locallyGranted) 189 && (lck <= locallyGrantable)); 190 if (locallyGranted > lck) { 191 locallyGranted = lck; 192 if (replyCB) { 193 if (trace) trace("CALL BACK RESP: " + lck + " SN=" + callBackSN); 194 sendCoordinator( 195 new GlobalLockMessage(GlobalLockMessage.DOWNGRADE_NOTIFY, 196 lck, (long) 0, null, callBackSN)); 197 callBackSN = -1; 198 } 199 } 200 201 if (globallyRequested > LockValue.NOLOCK) { 206 byte l = LockValue.NOLOCK; 207 for (Iterator it = waiters.iterator(); it.hasNext();) { 208 GlobalLockWaiterImpl waiter = (GlobalLockWaiterImpl) it.next(); 209 byte wl = waiter.getLockLevel(); 210 if (wl > l) l = wl; 211 } 212 if (l == LockValue.NOLOCK) { 213 if (trace) trace("CANCEL PENDING REQUEST " + lck); 214 sendCoordinator(new GlobalLockMessage(GlobalLockMessage.UPGRADE_CANCEL, 215 lck, (long) 0, null)); 216 } 217 globallyRequested = l; 218 } 219 } 220 221 222 synchronized public void uncache() { 223 if (trace) trace("UNCACHE"); 224 locallyGrantable = LockValue.NOLOCK; 225 } 226 227 229 synchronized private void globalDowngrade(byte lck, Object requester, int SN) { 231 if (trace) trace("RCV GLOBAL DOWNGRADE (CB): " + lck + " for " + requester); 232 if (locallyGrantable > lck) locallyGrantable = lck; 233 if (lck >= locallyGranted) { 234 if (trace) trace("SEND GLOBAL DOWNGRADE OK: " + lck); 235 sendCoordinator(new GlobalLockMessage(GlobalLockMessage.DOWNGRADE_NOTIFY, 236 lck, (long) 0, requester, SN)); 237 return; 238 } 239 this.callBackSN = SN; 241 if (trace) trace("SEND GLOBAL DOWNGRADE NOT OK: " + lck); 242 } 245 246 247 synchronized private void globalUpgrade(byte lck) { 249 GlobalLockWaiterImpl prevWaiter = null; 250 if (trace) trace("RCV GLOBAL UPGRADE: " + lck); 251 if (locallyGrantable < lck) locallyGrantable = lck; 252 if (globallyRequested == lck) globallyRequested = RWLockValue.NOLOCK; 253 254 if (trace) trace("WAKEUP waiters: " + lck); 255 for (Iterator it = waiters.iterator(); it.hasNext();) { 256 GlobalLockWaiterImpl waiter = (GlobalLockWaiterImpl) it.next(); 257 if (trace) trace("SIGNAL LOCK TO WAITER: " + waiter); 258 byte wLock = waiter.getLockLevel(); 260 if (!waiter.signalLock(lck, prevWaiter)) break; 261 if (locallyGranted < wLock) locallyGranted = wLock; 262 if (trace) trace("SIGNAL OK: " + waiter); 263 prevWaiter = waiter; 264 it.remove(); 265 } 266 } 267 268 269 private GlobalLockWaiterImpl waitForGrantable(byte lck, long timeout) { 270 271 if (lck > globallyRequested) { 274 globallyRequested = lck; 275 if (trace) trace("SEND GLOBAL REQUEST: " + lck); 276 sendCoordinator( 277 new GlobalLockMessage(GlobalLockMessage.UPGRADE_REQUEST, 278 lck, timeout, drus.getNodeId())); 279 } else 280 if (trace) trace("ALREADY AE PENDING REQUEST : " + globallyRequested); 281 282 if (trace) trace(" BLOCKED (Waiting: " + lck + " for " + timeout 283 + " millis)"); 284 GlobalLockWaiterImpl w = new GlobalLockWaiterImpl(lck); 286 if (trace) trace(" ADD WAITER: " + w); 287 waiters.add(w); 288 return w; 289 } 290 291 protected void sendCoordinator(Serializable message) { 292 try { 293 Serializable resId = objId; 294 drus.sendToCoordinator(resId, message); 295 } catch (NotUserException e) { 296 e.printStackTrace(); 297 } 298 299 } 300 301 public void receive(Object objId, Serializable message) { 303 if (message instanceof GlobalLockMessage) { 305 GlobalLockMessage req = (GlobalLockMessage) message; 306 if (req.type == GlobalLockMessage.UPGRADE_NOTIFY) { 307 globalUpgrade(req.lck); 308 314 } else if (req.type == GlobalLockMessage.DOWNGRADE_REQUEST) 315 globalDowngrade(req.lck, req.requester, req.serialNumber); 316 else throw new InternalError (); 317 } else throw new InternalError (); 318 } 319 320 public synchronized Serializable getState(Object resId) { 321 if (trace) trace("COORD SEEMS TO BE DOWN: return state"); 322 return this; 323 } 324 325 public synchronized String toString() { 326 Object nodeId = (drus == null) ? null : drus.getNodeId(); 329 return "USER OF " + objId + " ("+ nodeId + ")[" 330 + locallyGrantable + locallyGranted + globallyRequested 331 +"] T=" + Thread.currentThread().hashCode() + " "; 332 } 333 334 protected void trace(String s) { 335 System.out.println(this + " " + s); 336 } 337 338 void finilaze() { 339 } 342 } 343 344 | Popular Tags |