KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > perseus > concurrency > distributed > globallock > lib > GlobalLockCoordinator


1 /**
2  * Copyright (C) 2003-2004
3  * - France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR AE PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Release: 1.0
20  *
21  * Authors: Olivier Lobry (olivier.lobry@rd.francetelecom.com)
22  *
23  */

24
25 package org.objectweb.perseus.concurrency.distributed.globallock.lib;
26
27 import org.objectweb.perseus.distribution.api.DistResCoordinator;
28 import org.objectweb.perseus.distribution.api.DistResCoordinatorService;
29 import org.objectweb.perseus.distribution.api.NotCoordinatorException;
30 import org.objectweb.perseus.concurrency.lib.LockValue;
31
32 import java.util.HashSet JavaDoc;
33 import java.util.LinkedList JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.util.Map JavaDoc;
36 import java.util.Random JavaDoc;
37 import java.io.Serializable JavaDoc;
38
39 /**
40  * This class is the server side of the global lock management and is not
41  * intended to be used directly. Coordinators will be dynamically created by
42  * the distributed respurce management through the GlobalLockCoordinatorFactory.
43  * Coordinators communicate with users using the DistResCoordinatorService
44  * interface. As global locks are cached by users, they must be called back
45  * when necessary. The coordinator ensure that a lock is never granted to
46  * different users with incompatible levels. Incompatibility is defined by the
47  * lockValue when initializing the GlobalLockCoordinatorFactory instance. When
48  * a user request cannot be satisfied because it is not compatible with other
49  * users, a downgrade lock authoritative request (that is, a call back) is sent
50  * users with a level compatible with the requested one. On reception, users
51  * must adjust their grantable lock level so that new
52  * local reuqest will be blocked, and notify the coordinator of the effective
53  * downgrading when the local granted level is less or equal to the call back
54  * level. Note that new coordinator instance is created by the
55  * GlobalLockMasterFactory for each shared resource. Hence the instance itself
56  * is sent when coordination migration or replication.
57  * @see org.objectweb.perseus.concurrency.distributed.globallock.lib.GlobalLockUser
58  * @see org.objectweb.perseus.concurrency.distributed.globallock.lib.GlobalLockCoordinatorFactory
59  * @see org.objectweb.perseus.concurrency.lib.LockValue
60  * @see org.objectweb.perseus.distribution.api.DistResCoordinator
61  * @see org.objectweb.perseus.distribution.api.DistResCoordinatorService
62  */

63
64 public class GlobalLockCoordinator implements DistResCoordinator, Serializable JavaDoc {
65
66     static final boolean trace = true;
67     // the id of the resource to which the locks are attached
68
private Object JavaDoc objId;
69     private HashSet JavaDoc users; // the set of nodes that use this lock
70
private byte maxGrantedLock;// maximum lock level granted to users
71

72     private LinkedList JavaDoc waiting; // the list of nodes waiting for callbacks
73
private byte requestedDowngrade;// the level of the current call back request
74
private int nbDowngradeRequests;// the number of nodes that do not have yet
75
// answered to the call back request
76

77     // transient fields that must not be serialized for coordination operations
78
transient DistResCoordinatorService drcs; // the service to use to communicate
79
transient LockValue lockValue; // the lock value to use for lock comparison
80
transient private int lastCBnumber;
81
82
83     /**
84      * Create a new coordinator using a LockValue
85      * @param lockValue
86      * @see org.objectweb.perseus.concurrency.distributed.globallock.lib.GlobalLockCoordinator
87      * @see org.objectweb.perseus.concurrency.lib.LockValue
88      */

89
90     public GlobalLockCoordinator(Object JavaDoc resId, DistResCoordinatorService drcs,
91                                  LockValue lockValue) {
92         initCopy(resId, drcs, lockValue);
93         users = new HashSet JavaDoc();
94         waiting = new LinkedList JavaDoc();
95         // no lock level has been granted for now
96
maxGrantedLock = LockValue.NOLOCK;
97         // there is no current call back requests
98
requestedDowngrade = lockValue.maxValue();
99         nbDowngradeRequests = 0;
100         // TODO: using a random value make the probability not ignoring a obsolete
101
// call back response very very low but not null. We should take into account
102
// something like the node id
103
lastCBnumber = (new Random JavaDoc()).nextInt();
104     }
105
106     public GlobalLockCoordinator(GlobalLockCoordinator glc) {
107         this.objId = glc.objId;
108         this.users = glc.users;
109         this.maxGrantedLock = glc.maxGrantedLock;
110         this.waiting = glc.waiting;
111         this.requestedDowngrade = glc.requestedDowngrade;
112         this.nbDowngradeRequests = glc.nbDowngradeRequests;
113         this.drcs = glc.drcs;
114         this.lockValue = glc.lockValue;
115     }
116     // DistResCoordinator interface
117
synchronized public void receive(Object JavaDoc objId, Serializable JavaDoc user,
118                                          Serializable JavaDoc msg) {
119         if (!objId.equals(this.objId)) throw new InternalError JavaDoc();
120         if (msg instanceof GlobalLockMessage) {
121             GlobalLockMessage req = (GlobalLockMessage) msg;
122             if (req.type == GlobalLockMessage.UPGRADE_REQUEST)
123                 upgrade(user, req.lck);
124             else if (req.type == GlobalLockMessage.DOWNGRADE_NOTIFY)
125                 downgrade(user, req.lck, req.serialNumber);
126             else if (req.type == GlobalLockMessage.UPGRADE_CANCEL)
127                 cancelUpgrade(user);
128             else throw new InternalError JavaDoc();
129         } else throw new InternalError JavaDoc("Unexpected message type");
130     }
131
132     synchronized public Serializable JavaDoc freeze(Object JavaDoc resId) {
133         if (trace) trace("Freeze !!");
134         return this;
135     }
136
137 /* synchronized public DistResCoordinator
138             joinCoordinators(Serializable resId, Object copy,
139                              DistResCoordinatorService drcs) {
140         GlobalLockCoordinator res;
141         this.drcs = drcs;
142         this.objId = resId;
143         if (copy != null) {
144             if (trace) trace("Get coordination w/ copy");
145             res = (GlobalLockCoordinator) copy;
146             res.lockValue = lockValue;
147             res.joinCoordinators(resId, null, drcs);
148             return res;
149         }
150         if (trace) trace("New coordination");
151         return this;
152     }
153 */

154     synchronized public boolean joinUsersRequest(Object JavaDoc resId,
155                                           Serializable JavaDoc node) {
156         users.add(node);
157         if (trace) trace("Accept user: " + node);
158         return true;
159     }
160
161     public void recover(Object JavaDoc resId, Map JavaDoc userStates) {
162
163         for (Iterator JavaDoc it = userStates.entrySet().iterator(); it.hasNext();) {
164             Map.Entry JavaDoc entry = (Map.Entry JavaDoc) it.next();
165             Serializable JavaDoc nodeId = (Serializable JavaDoc) entry.getKey();
166             GlobalLockUser glu = (GlobalLockUser) entry.getValue();
167             if (trace) trace("Received state from " + nodeId + ": " + glu);
168             users.add(nodeId);
169             if (maxGrantedLock < glu.locallyGranted) maxGrantedLock
170                     = glu.locallyGranted;
171             if (glu.globallyRequested != LockValue.NOLOCK)
172                 waiting.addLast(new Waiter(nodeId, glu.globallyRequested));
173         }
174         if (!waiting.isEmpty()) {
175             Waiter firstWaiting = (Waiter) waiting.get(0);
176             callBackLocks(firstWaiting.nodeId, firstWaiting.lck);
177         }
178     }
179
180     public void nodeFailed(Object JavaDoc resId, Serializable JavaDoc nodeId) {
181         // remove pending requests of dead node
182
for (Iterator JavaDoc it = waiting.iterator(); it.hasNext();) {
183             Waiter waiter = (Waiter) it.next();
184             if (waiter.nodeId.equals(nodeId))
185                 it.remove();
186         }
187
188     }
189
190     // package methods
191

192     // called by the factory
193
void initCopy(Object JavaDoc resId, DistResCoordinatorService drcs,
194                   LockValue lockValue) {
195         this.objId = resId;
196         this.drcs = drcs;
197         this.lockValue = lockValue;
198
199     }
200
201
202     // private methods
203
private void upgrade(Serializable JavaDoc n, byte lck) {
204         try {
205             if (trace) trace(
206                     "From: " + n + " GLOBAL REQUEST: " + lck);
207
208             // treat only one request at a time in FIFO order
209
if (requestedDowngrade != lockValue.maxValue()) {
210                 if (trace) System.out.println(this
211                         + "From: " + n + " ALREADY PENDING REQUEST: " + lck);
212                 waiting.addLast(new Waiter(n, lck));
213                 if (trace) System.out.println(this
214                         + "From: " + n + " GLOBAL REQUEST QUEUED (NOT FIRST): "
215                         + lck);
216                 return;
217             }
218
219             // check if the max granted lock level is compatible
220
// with the requested one
221
if (lockValue.isCompatibleWith(lck, maxGrantedLock)) { // if so,
222
// update the maxgranted lock
223
if (maxGrantedLock < lck) maxGrantedLock = lck;
224
225                 // notify the user that the lock level is granted
226
drcs.sendToUser(objId,
227                         new GlobalLockMessage(GlobalLockMessage.UPGRADE_NOTIFY,
228                         lck, (long) 0, null), n);
229                 return;
230             }
231 // TODO: lockValue.isCompatibleWith(lck, maxGrantedLock) not redundant with previous ??
232
// check if the requesting user is the sole user
233
if (lockValue.isCompatibleWith(lck, maxGrantedLock) ||
234                     (users.size() == 1 && users.contains(n)) ) { // if so,
235
// update the maxgranted lock: equal to what is requested
236
// because the request may follow an uncache
237
// operation on the user. So we set max GrantedLock to
238
// lck even if it was greater than it
239
maxGrantedLock = lck;
240
241                 // notify the user that the lock level is granted
242
drcs.sendToUser(objId,
243                         new GlobalLockMessage(GlobalLockMessage.UPGRADE_NOTIFY,
244                                 lck, (long) 0, null), n);
245                 return;
246             }
247
248             // call back a lock level compatible with the requested one
249
byte downgradeLock = lockValue.getCompatibleWith(lockValue.maxValue(),
250                     lck);
251             // queue request
252
Waiter w= new Waiter(n, lck);
253             waiting.addLast(w);
254             callBackLocks(n, downgradeLock);
255
256             if (trace) trace("From: "
257                     + n + " GLOBAL REQUEST QUEUED (FIRST): " + lck);
258
259         } catch (NotCoordinatorException e) {
260             // this should not happen !!
261
throw new InternalError JavaDoc("Not coordinator of this resource !!!");
262         }
263     }
264
265     private void callBackLocks(Serializable JavaDoc n, byte downgradeLock) {
266         try {
267
268             // update current callback lock level
269
requestedDowngrade = downgradeLock;
270
271             // send a callback request to all users but the requesting one
272
HashSet JavaDoc dest = new HashSet JavaDoc(users);
273             dest.remove(n);
274             nbDowngradeRequests = dest.size();
275             if (trace) trace("From: "
276                 + n + " SND CALLBACK: " + downgradeLock + " TO: " + dest);
277             lastCBnumber++;
278             drcs.sendToUsers(objId,
279                 new GlobalLockMessage(GlobalLockMessage.DOWNGRADE_REQUEST,
280                         downgradeLock, (long) 0, n, lastCBnumber), dest);
281
282         } catch (NotCoordinatorException e) {
283             // this should not happen !!
284
throw new InternalError JavaDoc("Not coordinator of this resource !!!");
285         }
286     }
287
288     private void downgrade(Object JavaDoc n, byte lck, int serialNumber) {
289         try {
290             // After a crash, the waiters' list may have been re-ordered
291
// and new global lock downgrade requests may have been sent.
292
// Hence, we may receive response related to callback requests
293
// sent before the crash so we need to ignore them.
294
// We use a serial number to distinguish responses. It it initialized
295
// with a random value
296
if (serialNumber != lastCBnumber) {
297 // if ((waiting == null) || waiting.isEmpty() ||
298
// (!pending.equals(((Waiter) waiting.getFirst()).nodeId))) {
299
if (trace)
300                     trace("From: " + n + " GLOBAL RELEASE IGNORED " + lck);
301                 return;
302             }
303             // one less waiting request
304
nbDowngradeRequests--;
305
306             if (trace) trace("From: " + n + " GLOBAL RELEASE: " + lck);
307
308             // if there still are waiting request, nothing else to do
309
if (nbDowngradeRequests > 0)
310                 return;
311
312             // max granted lock level now equals the requested level
313
maxGrantedLock = requestedDowngrade;
314
315             // no callback is currently processed
316
requestedDowngrade = lockValue.maxValue();
317
318             // for each waiting node, check that its request can be granted,
319
// and if so, notify it
320
for (Iterator JavaDoc it = waiting.iterator(); it.hasNext();) {
321                 Waiter w = (Waiter) it.next();
322                 if (lockValue.isCompatibleWith(w.lck, maxGrantedLock)) {
323                     if (maxGrantedLock < w.lck) maxGrantedLock = w.lck;
324                     it.remove();
325                     if (trace) trace("To: "
326                             + w.nodeId + " NOTIFY GLOBAL UPGRADE: " + lck);
327                     drcs.sendToUser(objId,
328                             new GlobalLockMessage(GlobalLockMessage.UPGRADE_NOTIFY,
329                                     w.lck, (long) 0, null),
330                             w.nodeId);
331                 } else {
332                     callBackLocks(w.nodeId,
333                             lockValue.getCompatibleWith(lockValue.maxValue(),
334                                     w.lck));
335                     break;
336                 }
337             }
338
339         } catch (NotCoordinatorException e) {
340             // this should not happen !!
341
throw new InternalError JavaDoc("Not coordinator of this resource !!!");
342         }
343     }
344     private void cancelUpgrade(Object JavaDoc n) {
345         Waiter w = null;
346         for (Iterator JavaDoc it = waiting.iterator(); it.hasNext();) {
347             w = (Waiter) it.next();
348             if (w.nodeId.equals(n))
349                 break;
350         }
351         if ((w == null) || !w.nodeId.equals(n)) {
352             if (trace) trace("From: " + n + " UNEXPECTED CANCELATION !! (IGNORE IT)");
353         }
354
355         if (waiting.indexOf(w) == 0) {
356             if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (FIRST WAITING)");
357             waiting.removeFirst();
358             if (!waiting.isEmpty()) {
359                 w = (Waiter) waiting.getFirst();
360                 if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (NEW CALL BACK)");
361                 callBackLocks(w.nodeId,
362                         lockValue.getCompatibleWith(lockValue.maxValue(),
363                         w.lck));
364             } else {
365                 // no more downgrade requests
366
requestedDowngrade = lockValue.maxValue();
367                 nbDowngradeRequests = 0;
368                 if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (NO MORE PENDING)");
369             }
370         } else {
371             waiting.remove(w);
372             if (trace) trace("From: " + n + " CANCEL PENDING REQUEST (NOT FIRST WAITING)");
373         }
374     }
375     // class used to queue waiting request
376
private class Waiter implements Serializable JavaDoc {
377         public Serializable JavaDoc nodeId;
378         byte lck;
379         //int serialNumber;
380
Waiter(Serializable JavaDoc n, byte lck) {
381             this.lck = lck;
382             this.nodeId = n;
383         }
384     }
385
386
387     public String JavaDoc toString() {
388         return "=====MASTER OF " + objId
389                 + "(" + drcs.getNodeId() + ")[" + maxGrantedLock
390                 + requestedDowngrade + "(" + nbDowngradeRequests
391                 + "/" + users.size() + ")] T="
392                 + Thread.currentThread().hashCode() + " ";
393     }
394
395     private void trace(String JavaDoc s) {
396         System.out.println(this + " " + s);
397     }
398
399
400 }
401
Popular Tags