KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > perseus > concurrency > distributed > globallock > lib > GlobalLockUser


1 /**
2  * Copyright (C) 2003-2004
3  * - France Telecom R&D
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR AE PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18  *
19  * Release: 1.0
20  *
21  * Authors: Olivier Lobry (olivier.lobry@rd.francetelecom.com)
22  *
23  */

24
25 package org.objectweb.perseus.concurrency.distributed.globallock.lib;
26
27 import org.objectweb.perseus.distribution.api.DistResUser;
28 import org.objectweb.perseus.distribution.api.DistResUserService;
29 import org.objectweb.perseus.distribution.api.NotUserException;
30 import org.objectweb.perseus.concurrency.distributed.globallock.api.DeadLockException;
31 import org.objectweb.perseus.concurrency.distributed.globallock.lib.GlobalLockMessage;
32 import org.objectweb.perseus.concurrency.distributed.globallock.api.GlobalLock;
33 import org.objectweb.perseus.concurrency.distributed.globallock.api.GlobalLockWaiter;
34 import org.objectweb.perseus.concurrency.lib.RWLockValue;
35 import org.objectweb.perseus.concurrency.lib.LockValue;
36
37 import java.io.Serializable JavaDoc;
38 import java.util.List JavaDoc;
39 import java.util.LinkedList JavaDoc;
40 import java.util.Iterator JavaDoc;
41
42 /**
43  * This class the client side of global lock management. A global lock can be
44  * associated to each resource that is to be shared between different
45  * machines (actually JVMs).
46  * Global lock users are coordinated by a GlobalLockCoordinator and
47  * communicate with it through the DistResUserService interface.
48  * Global locks permit to share a resource between nodes, but not between tasks.
49  * One must ensure that the global lock that has been obtained locally
50  * is sufficient to satisfy the requirements of the local tasks, and that
51  * the local tasks are well synchronized according to the local lock levels they
52  * have obtained. Locks are managed in term of level. The lock level must be
53  * explicitly upgraded if what is grantable not sufficient to satisfy new local
54  * requirements and must be explicitly downgraded to the maximum local
55  * requirements when local requirements are released.
56  * Locks are cached: the local node indicates what is locally required but
57  * internally but what will be locally grantable can be bigger that what has
58  * been actually granted. The local lock level can be downgraded to NOLOCK,
59  * then later upgraded to some lock value without inducing a request to the
60  * coordinator provided the grantable level is bigger thant the new requested
61  * level. Lock levels are called back by the coordinator when required.
62  * For each shared resource the user state is composed of three lock level
63  * - what HAS BEEN granted locally
64  * - what CAN BE granted locally
65  * - what has been requested to the coordinator (which is hidden to users)
66  * Note that the different level of lock and their compatibility is configured
67  * through the LockValue interface. This implementation works well with
68  * Shared/Exclusive locks but should also work with more complex lock values
69  * like intention to read/write lock levels, provided lock levels are strictly
70  * ordered. This can be configured when initializing the GlobalLockMasterFactory
71  * Note that at the moment, deadlocks are detected with timeout. AE timeout can
72  * occur if the lock level cannot be granted because of its use, or because
73  * there are some communication problems. However, when in the future fault
74  * tolerence will be supported, the two behaviors will be distinguished. In
75  * this case, the timeout value will be a hint to indicate a normal amount
76  * of time a lock request should be blocked according to the application
77  * behaviour. The time spent due to failure correction should not be taken into
78  * account.
79  * @see org.objectweb.perseus.concurrency.distributed.globallock.lib.GlobalLockCoordinator
80  * @see org.objectweb.perseus.concurrency.distributed.globallock.lib.GlobalLockCoordinatorFactory
81  * @see org.objectweb.perseus.concurrency.lib.LockValue
82  * @see org.objectweb.perseus.distribution.api.DistResUser
83  * @see org.objectweb.perseus.distribution.api.DistResUserService
84  */

85
86 public class GlobalLockUser
87         implements GlobalLock, DistResUser, Serializable JavaDoc {
88
89     static final boolean trace = true;
90
91     transient public DistResUserService drus;
92
93     // name of user for tracing purpose (default is its nodeId)
94
// should be removed at term
95
// transient private String name;
96

97     // object id to which is associated the lock
98
transient Serializable JavaDoc objId;
99     // max lock level that has been granted locally
100
byte locallyGranted;
101     // max lock level that can be granted locally without contacting the master
102
byte locallyGrantable;
103     // max lock level that has been requested to the master
104
byte globallyRequested;
105
106     // list of waiters
107
transient private List JavaDoc waiters;
108
109     // id of a global requester of a lock
110
//transient private Object requester;
111
transient int callBackSN;
112
113     public GlobalLockUser(Serializable JavaDoc objId, DistResUserService drus) {
114         common_init(objId, drus);
115         // this.name = name;
116
}
117
118 /* public String getName() {
119         // return name;
120     }
121 */

122
123     private void createState(){
124     // state = new State();
125
locallyGrantable = LockValue.NOLOCK;
126         locallyGranted = LockValue.NOLOCK;
127         globallyRequested = LockValue.NOLOCK;
128         // to limit memory allocation, waiters object could be allocated only
129
// when there are waiters
130
waiters = new LinkedList JavaDoc();
131
132     }
133     private void common_init(Serializable JavaDoc objId, DistResUserService drus) {
134         this.objId = objId;
135         //if (this.drus == null)
136
this.drus = drus;
137         //else if (this.drus != drus)
138
// throw new IllegalArgumentException("Cannot handle multiple " +
139
// "DistResUserService");
140
createState();
141     }
142
143     /* local interfaces */
144     synchronized public byte getGrantable() {
145         return locallyGrantable;
146     }
147
148
149     public GlobalLockWaiter upgrade(byte lck, boolean sync, long timeout)
150             throws DeadLockException, InterruptedException JavaDoc {
151         GlobalLockWaiterImpl w;
152         synchronized(this) {
153             if (trace) trace("LOCAL REQUEST: " + lck);
154
155             // check if the lock is grantable
156
if (lck <= locallyGrantable) {
157                 // adjust what has been granted
158
if (lck > locallyGranted) locallyGranted = lck;
159                 if (trace) trace("LOCAL REQUEST GRANTED: " + lck);
160                 return null;
161             }
162             w = waitForGrantable(lck, timeout);
163         }
164
165         if (sync) {
166             boolean ok = w.waitLock(timeout);
167             if (!ok) {
168                     if (trace) trace("WAKE UP TIMED OUT (Waiting: " + lck + ")");
169                     throw new DeadLockException();
170             }
171
172             if (trace) trace("WAKE UP (Waiting: " + lck + ")");
173             w.signalHandled();
174             return null;
175         } else {
176             return w;
177         }
178     }
179
180
181     synchronized public void downgrade(byte lck) {
182         if (trace) trace("LOCAL DOWNGRADE: " + lck);
183         //if (state == null) createState();
184
// if grantable is less than granted, this means that a call-back
185
// request has been received from the coord for a lock that was locally
186
// granted at that time. Hence we must satisfy the request now that we
187
// can do it
188
boolean replyCB = ((locallyGrantable < locallyGranted)
189                 && (lck <= locallyGrantable));
190         if (locallyGranted > lck) {
191             locallyGranted = lck;
192             if (replyCB) {
193                 if (trace) trace("CALL BACK RESP: " + lck + " SN=" + callBackSN);
194                 sendCoordinator(
195                         new GlobalLockMessage(GlobalLockMessage.DOWNGRADE_NOTIFY,
196                                 lck, (long) 0, null, callBackSN));
197                 callBackSN = -1;
198             }
199         }
200
201         // cancel pending request if not valid anymore
202
// this happens when aborting a tx (e.g. after a dealock) which was
203
// waiting for a global lock to be granted by the coord
204
// Take care that there may be other threads waiting
205
if (globallyRequested > LockValue.NOLOCK) {
206             byte l = LockValue.NOLOCK;
207             for (Iterator JavaDoc it = waiters.iterator(); it.hasNext();) {
208                 GlobalLockWaiterImpl waiter = (GlobalLockWaiterImpl) it.next();
209                 byte wl = waiter.getLockLevel();
210                 if (wl > l) l = wl;
211             }
212             if (l == LockValue.NOLOCK) {
213                 if (trace) trace("CANCEL PENDING REQUEST " + lck);
214                 sendCoordinator(new GlobalLockMessage(GlobalLockMessage.UPGRADE_CANCEL,
215                                     lck, (long) 0, null));
216             }
217             globallyRequested = l;
218         }
219     }
220
221
222     synchronized public void uncache() {
223         if (trace) trace("UNCACHE");
224         locallyGrantable = LockValue.NOLOCK;
225     }
226
227     // private methods called on message reception
228

229     // called when the coord wants to call a lock back
230
synchronized private void globalDowngrade(byte lck, Object JavaDoc requester, int SN) {
231         if (trace) trace("RCV GLOBAL DOWNGRADE (CB): " + lck + " for " + requester);
232         if (locallyGrantable > lck) locallyGrantable = lck;
233         if (lck >= locallyGranted) {
234             if (trace) trace("SEND GLOBAL DOWNGRADE OK: " + lck);
235             sendCoordinator(new GlobalLockMessage(GlobalLockMessage.DOWNGRADE_NOTIFY,
236                     lck, (long) 0, requester, SN));
237         return;
238         }
239         //this.requester = requester;
240
this.callBackSN = SN;
241         if (trace) trace("SEND GLOBAL DOWNGRADE NOT OK: " + lck);
242 //TODO: notifies local that a downgrade is requested
243
// callbackListner.notifyCallback(lck);
244
}
245
246
247     // called when the coord notifies a upgrade of the local lock
248
synchronized private void globalUpgrade(byte lck) {
249         GlobalLockWaiterImpl prevWaiter = null;
250         if (trace) trace("RCV GLOBAL UPGRADE: " + lck);
251         if (locallyGrantable < lck) locallyGrantable = lck;
252         if (globallyRequested == lck) globallyRequested = RWLockValue.NOLOCK;
253
254         if (trace) trace("WAKEUP waiters: " + lck);
255         for (Iterator JavaDoc it = waiters.iterator(); it.hasNext();) {
256             GlobalLockWaiterImpl waiter = (GlobalLockWaiterImpl) it.next();
257             if (trace) trace("SIGNAL LOCK TO WAITER: " + waiter);
258             // remember value of waiting lock because signalLock will modify it
259
byte wLock = waiter.getLockLevel();
260             if (!waiter.signalLock(lck, prevWaiter)) break;
261             if (locallyGranted < wLock) locallyGranted = wLock;
262             if (trace) trace("SIGNAL OK: " + waiter);
263             prevWaiter = waiter;
264             it.remove();
265         }
266     }
267
268
269     private GlobalLockWaiterImpl waitForGrantable(byte lck, long timeout) {
270
271         // send a request to the coord only if a not stronger enough request
272
// has been already sent
273
if (lck > globallyRequested) {
274             globallyRequested = lck;
275             if (trace) trace("SEND GLOBAL REQUEST: " + lck);
276             sendCoordinator(
277                     new GlobalLockMessage(GlobalLockMessage.UPGRADE_REQUEST,
278                             lck, timeout, drus.getNodeId()));
279         } else
280             if (trace) trace("ALREADY AE PENDING REQUEST : " + globallyRequested);
281
282         if (trace) trace(" BLOCKED (Waiting: " + lck + " for " + timeout
283                 + " millis)");
284         // create and insert the waiter object
285
GlobalLockWaiterImpl w = new GlobalLockWaiterImpl(lck);
286         if (trace) trace(" ADD WAITER: " + w);
287         waiters.add(w);
288         return w;
289     }
290
291     protected void sendCoordinator(Serializable JavaDoc message) {
292         try {
293             Serializable JavaDoc resId = objId;
294             drus.sendToCoordinator(resId, message);
295         } catch (NotUserException e) {
296             e.printStackTrace();
297         }
298
299     }
300
301     // called on reception of a message from the coordinator
302
public void receive(Object JavaDoc objId, Serializable JavaDoc message) {
303 // if (!objId.equals(this.objId)) throw new InternalError();
304
if (message instanceof GlobalLockMessage) {
305             GlobalLockMessage req = (GlobalLockMessage) message;
306             if (req.type == GlobalLockMessage.UPGRADE_NOTIFY) {
307                 globalUpgrade(req.lck);
308                 // check for attached downgrade request
309
//TODO: should use generic (lockValue) value here
310
/* if ((req.auxLck != LockValue.UNDEFINED)
311                     && (req.auxLck != RWLockValue.WRITE))
312                     globalDowngrade(req.auxLck, req.requester);
313                     */

314             } else if (req.type == GlobalLockMessage.DOWNGRADE_REQUEST)
315                 globalDowngrade(req.lck, req.requester, req.serialNumber);
316             else throw new InternalError JavaDoc();
317         } else throw new InternalError JavaDoc();
318     }
319
320     public synchronized Serializable JavaDoc getState(Object JavaDoc resId) {
321         if (trace) trace("COORD SEEMS TO BE DOWN: return state");
322         return this;
323     }
324
325     public synchronized String JavaDoc toString() {
326         // handle case when drus is null (this can happen
327
// during serialization)
328
Object JavaDoc nodeId = (drus == null) ? null : drus.getNodeId();
329             return "USER OF " + objId + " ("+ nodeId + ")["
330                     + locallyGrantable + locallyGranted + globallyRequested
331                     +"] T=" + Thread.currentThread().hashCode() + " ";
332     }
333
334     protected void trace(String JavaDoc s) {
335         System.out.println(this + " " + s);
336     }
337
338     void finilaze() {
339         //TODO: should leave user's group
340
//drus.leaveUsers(objId);
341
}
342 }
343
344
Popular Tags