KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jgroups > service > lease > LeaseFactoryClient


1 package org.jgroups.service.lease;
2
3 import org.apache.commons.logging.Log;
4 import org.apache.commons.logging.LogFactory;
5 import org.jgroups.*;
6 import org.jgroups.blocks.PullPushAdapter;
7
8 import java.io.Serializable JavaDoc;
9 import java.util.HashMap JavaDoc;
10
11 /**
12  * <code>LeaseFactoryClient</code> is an implementation of {@link LeaseFactory}
13  * interface that delegates lease granting to group containing one or more
14  * {@link LeaseFactoryService} instances.
15  * <p>
16  * This service tries to implement semi-synchronous communication pattern: each
17  * call blocks until reply from service received or timeout occurs.
18  * <p>
19  * Also this implementation assumes that pending new lease request conflicts
20  * with renewal request and request that came last is aborted.
21  *
22  * @author Roman Rokytskyy (rrokytskyy@acm.org)
23  */

24 public class LeaseFactoryClient implements LeaseFactory {
25     
26     private static final String JavaDoc LEASE_CLIENT_RECEIVE_METHOD =
27         "LeaseFactoryClient.ClientMessageListener.receive()";
28         
29     private static final String JavaDoc NEW_LEASE_METHOD =
30         "LeaseFactoryClient.newLease()";
31
32     private static final String JavaDoc RENEW_LEASE_METHOD =
33         "LeaseFactoryClient.renewLease()";
34
35     private static final String JavaDoc CANCEL_LEASE_METHOD =
36         "LeaseFactoryClient.cancelLease()";
37
38     
39     public static final int DEFAULT_LEASE_TIMEOUT = 10000;
40     
41     public static final int DEFAULT_CANCEL_TIMEOUT = 1000;
42     
43     protected final Channel clientChannel;
44
45     protected final PullPushAdapter clientAdapter;
46     
47     protected final int leaseTimeout = DEFAULT_LEASE_TIMEOUT;
48     
49     protected final int cancelTimeout = DEFAULT_CANCEL_TIMEOUT;
50     
51     protected final HashMap JavaDoc pendingLeases = new HashMap JavaDoc();
52     
53     protected final HashMap JavaDoc pendingRenewals = new HashMap JavaDoc();
54     
55     protected final HashMap JavaDoc pendingCancels = new HashMap JavaDoc();
56
57     protected final Log log=LogFactory.getLog(this.getClass());
58
59     /**
60      * Create instance of this class for specified client channel with
61      * default timeouts.
62      */

63     public LeaseFactoryClient(Channel clientChannel) {
64     this(clientChannel, DEFAULT_LEASE_TIMEOUT, DEFAULT_CANCEL_TIMEOUT);
65     }
66     
67     /**
68      * Create instance of this class for the specified channel with specified
69      * timeouts.
70      *
71      * @param clientChannel channel that will be used for client-service
72      * communication.
73      *
74      * @param leaseTimeout timeout for "new lease" and "renew lease" requests.
75      *
76      * @param cancelTimeout timeout for "cancel lease" timeout.
77      */

78     public LeaseFactoryClient(Channel clientChannel, int leaseTimeout,
79     int cancelTimeout)
80     {
81     this.clientChannel = clientChannel;
82     this.clientAdapter = new PullPushAdapter(
83         clientChannel, new ClientMessageListener());
84     }
85     
86     /**
87      * Cancel existing lease.
88      */

89     public void cancelLease(Lease existingLease) throws UnknownLeaseException {
90         if (existingLease == null)
91             throw new UnknownLeaseException(
92                     "Existing lease cannot be null.", existingLease);
93
94         if (existingLease.isExpired())
95             throw new UnknownLeaseException(
96                     "You existing lease has expired. " +
97                     "You cannot use this method to obtain new lease.", existingLease);
98
99         ClientLeaseInfo leaseInfo = new ClientLeaseInfo(
100                 existingLease.getLeaseTarget(), existingLease.getTenant());
101
102         if (pendingCancels.keySet().contains(leaseInfo))
103             throw new UnknownLeaseException("There's pending cancel " +
104                     "request for specified lease target and tenant.",
105                     existingLease);
106
107         try {
108             // here we create a mutex and associate this mutex with
109
// lease target and tenant
110
Object JavaDoc leaseMutex = new Object JavaDoc();
111             pendingCancels.put(leaseInfo, leaseMutex);
112
113             // wait on mutex until we get response from
114
// leasing service or timeout
115
try {
116
117                 synchronized(leaseMutex) {
118                     LeaseRequestHeader requestHeader = new LeaseRequestHeader(
119                             LeaseRequestHeader.CANCEL_LEASE_REQUEST, 0, false,
120                             existingLease.getTenant());
121
122                     Message msg = new Message();
123                     msg.putHeader(LeaseRequestHeader.HEADER_KEY, requestHeader);
124                     msg.setObject((Serializable JavaDoc)existingLease.getLeaseTarget());
125
126                     // send message to leasing service
127
clientChannel.send(msg);
128
129                     leaseMutex.wait(leaseTimeout);
130                 }
131
132             } catch(InterruptedException JavaDoc ex) {
133
134                 throw new UnknownLeaseException(
135                         "Did not get any reply before the thread was interrupted.",
136                         null);
137
138             } catch(ChannelNotConnectedException ex) {
139
140                 throw new UnknownLeaseException(
141                         "Unable to send request, channel is not connected " +
142                         ex.getMessage(), null);
143
144             } catch(ChannelClosedException ex) {
145
146                 throw new UnknownLeaseException(
147                         "Unable to send request, channel is closed " +
148                         ex.getMessage(), null);
149
150             }
151
152             // check type of object associated with lease target and tenant
153
// if we got response from leasing service, it should be Message
154
// otherwise we were woken up because of timeout, simply return
155
if (!(pendingCancels.get(leaseInfo) instanceof Message))
156                 return;
157
158             Message reply = (Message)pendingCancels.get(leaseInfo);
159
160             // try to fetch denial header
161
DenyResponseHeader denyHeader = (DenyResponseHeader)
162                     reply.getHeader(DenyResponseHeader.HEADER_KEY);
163
164             // throw exception if service denied lease request
165
if (denyHeader != null)
166                 throw new UnknownLeaseException(
167                         denyHeader.getDenialReason(), existingLease);
168         } finally {
169             pendingCancels.remove(leaseInfo);
170         }
171     }
172
173     /**
174      * Get new lease.
175      */

176     public Lease newLease(Object JavaDoc leaseTarget, Object JavaDoc tenant,
177     long requestedDuration, boolean isAbsolute) throws LeaseDeniedException
178     {
179     if (leaseTarget == null || tenant == null)
180         throw new LeaseDeniedException(
181         "Lease target and tenant should be not null.", leaseTarget);
182         
183     if (!(leaseTarget instanceof Serializable JavaDoc))
184         throw new LeaseDeniedException(
185         "This lease factory can process only serializable lease targets",
186         leaseTarget);
187         
188     if (!(tenant instanceof Serializable JavaDoc))
189         throw new LeaseDeniedException(
190         "This lease factory can process only serializable tenants",
191         leaseTarget);
192         
193     ClientLeaseInfo leaseInfo = new ClientLeaseInfo(leaseTarget, tenant);
194     
195     if (pendingLeases.keySet().contains(leaseInfo))
196         throw new RecursiveLeaseRequestException("There's pending lease " +
197         "request for specified lease target and tenant.", leaseTarget, tenant);
198         
199     try {
200         // here we create a mutex and associate this mutex with
201
// lease target and tenant
202
Object JavaDoc leaseMutex = new Object JavaDoc();
203             
204             pendingLeases.put(leaseInfo, leaseMutex);
205         
206
207                 if(log.isDebugEnabled()) log.debug("Added lease info for leaseTarget=" + leaseTarget +
208                     ", tenant=" + tenant);
209
210         // wait on mutex until we get response from
211
// leasing service or timeout
212
try {
213         
214                 if (leaseMutex != null) {
215                     synchronized(leaseMutex) {
216                         
217                         LeaseRequestHeader requestHeader = new LeaseRequestHeader(
218                             LeaseRequestHeader.NEW_LEASE_REQUEST,
219                             requestedDuration, isAbsolute, tenant);
220
221                         Message msg = new Message();
222                         msg.putHeader(LeaseRequestHeader.HEADER_KEY, requestHeader);
223                         msg.setObject((Serializable JavaDoc)leaseTarget);
224
225                         clientChannel.send(msg);
226                         
227                         leaseMutex.wait(leaseTimeout);
228                     }
229                 }
230         
231              } catch(InterruptedException JavaDoc ex) {
232     
233                  throw new LeaseDeniedException(
234                      "Did not get any reply before the thread was interrupted.");
235     
236              } catch(ChannelNotConnectedException ex) {
237     
238                  throw new LeaseDeniedException(
239                      "Unable to send request, channel is not connected " +
240                      ex.getMessage(), null);
241     
242              } catch(ChannelClosedException ex) {
243     
244                  throw new LeaseDeniedException(
245                      "Unable to send request, channel is closed " +
246                      ex.getMessage(), null);
247     
248              }
249         
250         // check type of object associated with lease target and tenant
251
// if we got response from leasing service, it should Message
252
if (!(pendingLeases.get(leaseInfo) instanceof Message)) {
253         throw new LeaseDeniedException(
254             "Did not get reply from leasing service within specified timeframe.",
255             leaseTarget);
256         }
257         
258         Message reply = (Message)pendingLeases.get(leaseInfo);
259         
260         // try to fetch denial header
261
DenyResponseHeader denyHeader = (DenyResponseHeader)
262         reply.getHeader(DenyResponseHeader.HEADER_KEY);
263         
264         // throw exception if service denied lease request
265
if (denyHeader != null) {
266             Object JavaDoc tmp=reply.getObject();
267             throw new LeaseDeniedException(
268                     denyHeader.getDenialReason(), tmp);
269         }
270
271         // extract header containing info about granted lease
272
LeaseResponseHeader responseHeader = (LeaseResponseHeader)
273         reply.getHeader(LeaseResponseHeader.HEADER_KEY);
274         
275         // ok, we have response from leasing service
276
// return lease to client
277
return new LocalLease(leaseTarget, tenant,
278         responseHeader.getDuration());
279         
280     } finally {
281             
282
283                 if(log.isDebugEnabled()) log.debug("Removing lease info for leaseTarget=" + leaseTarget +
284                     ", tenant=" + tenant);
285
286             pendingLeases.remove(leaseInfo);
287     }
288     }
289
290     /**
291      * Renew existing lease. This method is used to extend lease time, therefore
292      * <code>existingLease</code> must be valid.
293      */

294     public Lease renewLease(Lease existingLease, long requestedDuration,
295     boolean isAbsolute) throws LeaseDeniedException
296     {
297     if (existingLease == null)
298         throw new LeaseDeniedException(
299         "Existing lease cannot be null.", null);
300         
301     if (existingLease.isExpired())
302         throw new LeaseDeniedException(
303         "You existing lease has expired. " +
304         "You cannot use this method to obtain new lease.",
305         existingLease.getLeaseTarget());
306
307     ClientLeaseInfo leaseInfo = new ClientLeaseInfo(
308         existingLease.getLeaseTarget(), existingLease.getTenant());
309     
310     if (pendingLeases.keySet().contains(leaseInfo))
311         throw new RecursiveLeaseRequestException("There's pending lease " +
312         "request for specified lease target and tenant.",
313         existingLease.getLeaseTarget(),
314         existingLease.getTenant());
315         
316     try {
317         // here we create a mutex and associate this mutex with
318
// lease target and tenant
319
Object JavaDoc leaseMutex = new Object JavaDoc();
320             
321             pendingLeases.put(leaseInfo, leaseMutex);
322             
323             if(log.isDebugEnabled()) log.debug("Added lease info for leaseTarget=" + existingLease.getLeaseTarget() +
324                 ", tenant=" + existingLease.getTenant());
325
326         
327         // wait on mutex until we get response from
328
// leasing service or timeout
329
try {
330                 synchronized(leaseMutex) {
331
332                     // prepare to sending message to leasing service
333
LeaseRequestHeader requestHeader = new LeaseRequestHeader(
334                         LeaseRequestHeader.RENEW_LEASE_REQUEST,
335                         requestedDuration, isAbsolute, existingLease.getTenant());
336                         
337                     Message msg = new Message();
338                     msg.putHeader(LeaseRequestHeader.HEADER_KEY, requestHeader);
339                     msg.setObject((Serializable JavaDoc)existingLease.getLeaseTarget());
340                 
341                     // send message to leasing service
342
clientChannel.send(msg);
343
344                     leaseMutex.wait(leaseTimeout);
345                 }
346         } catch(InterruptedException JavaDoc ex) {
347                 
348         throw new LeaseDeniedException(
349                     "Did not get any reply before the thread was interrupted.");
350                     
351             } catch(ChannelNotConnectedException ex) {
352                 
353                 throw new LeaseDeniedException(
354                     "Unable to send request, channel is not connected " +
355                     ex.getMessage(), null);
356
357         } catch(ChannelClosedException ex) {
358
359                 throw new LeaseDeniedException(
360                     "Unable to send request, channel is closed " +
361                     ex.getMessage(), null);
362
363         }
364         
365         // check type of object associated with lease target and tenant
366
// if we got response from leasing service, it should Message
367
if (!(pendingLeases.get(leaseInfo) instanceof Message)) {
368         throw new LeaseDeniedException(
369             "Did not get reply from leasing service within specified timeframe.",
370             null);
371         }
372         
373         Message reply = (Message)pendingLeases.get(leaseInfo);
374         
375         // try to fetch denial header
376
DenyResponseHeader denyHeader = (DenyResponseHeader)
377         reply.getHeader(DenyResponseHeader.HEADER_KEY);
378         
379         // throw exception if service denied lease request
380
if (denyHeader != null)
381         throw new LeaseDeniedException(denyHeader.getDenialReason(), null);
382         
383         // extract header containing info about granted lease
384
LeaseResponseHeader responseHeader = (LeaseResponseHeader)
385         reply.getHeader(LeaseResponseHeader.HEADER_KEY);
386         
387         // ok, we have response from leasing service
388
// return lease to client
389
return new LocalLease(existingLease.getLeaseTarget(),
390         existingLease.getTenant(), responseHeader.getDuration());
391         
392     } finally {
393
394
395                 if(log.isDebugEnabled()) log.debug("Removing lease info for leaseTarget=" +
396                     existingLease.getLeaseTarget() + ", tenant=" +
397                     existingLease.getTenant());
398
399             pendingLeases.remove(leaseInfo);
400     }
401     }
402     
403     /**
404      * Get address of this client in group.
405      */

406     public Address getAddress() {
407     return clientChannel.getLocalAddress();
408     }
409     
410     private class ClientMessageListener implements MessageListener {
411     
412     /**
413      * Get group state. This method always returns <code>null</code> because
414      * we do not share group state.
415      */

416         public byte[] getState() {
417         return null;
418         }
419
420     /**
421      * Receive message from service.
422      */

423         public void receive(Message msg) {
424         
425         DenyResponseHeader denyHeader = (DenyResponseHeader)
426         msg.getHeader(DenyResponseHeader.HEADER_KEY);
427         
428         LeaseResponseHeader leaseHeader = (LeaseResponseHeader)
429         msg.getHeader(LeaseResponseHeader.HEADER_KEY);
430         
431         if (denyHeader == null && leaseHeader == null)
432         return;
433         
434         Object JavaDoc leaseTarget = msg.getObject();
435
436         Object JavaDoc tenant = denyHeader != null ?
437         denyHeader.getTenant() : leaseHeader.getTenant();
438         
439         boolean cancelReply =
440         (denyHeader != null && denyHeader.getType() == DenyResponseHeader.CANCEL_DENIED) ||
441         (leaseHeader != null && leaseHeader.getType() == LeaseResponseHeader.LEASE_CANCELED);
442
443
444                 if(log.isDebugEnabled()) log.debug("Received response: type=" + (denyHeader != null ? "deny" : "grant") +
445                     ", leaseTarget=" + leaseTarget + ", tenant=" + tenant +
446                     ", cancelReply=" + cancelReply);
447
448         
449         ClientLeaseInfo leaseInfo = new ClientLeaseInfo(leaseTarget, tenant);
450         
451         HashMap JavaDoc workingMap = cancelReply ? pendingCancels : pendingLeases;
452             
453             Object JavaDoc leaseMutex = workingMap.get(leaseInfo);
454         
455         if (leaseMutex != null)
456         synchronized(leaseMutex) {
457                     workingMap.put(leaseInfo, msg);
458                     
459             leaseMutex.notifyAll();
460                     
461
462                         if(log.isDebugEnabled()) log.debug("Notified mutex for leaseTarget="+ leaseTarget +
463                              ", tenant=" + tenant);
464         }
465         else {
466
467                     if(log.isDebugEnabled()) log.debug("Could not find mutex for leaseTarget=" + leaseTarget +
468                         ", tenant=" + tenant);
469                     
470         workingMap.remove(leaseInfo);
471         }
472         
473         }
474     
475     /**
476      * Set group state. This method is empty because we do not share any
477      * group state.
478      */

479         public void setState(byte[] state) {
480         // do nothing, we have no group state
481
}
482     }
483
484     /**
485      * This class represents temporary marker stored during lease request
486      * processing to indentify uniquely combination of lease target and tenant.
487      */

488     private static class ClientLeaseInfo {
489     
490     private final Object JavaDoc leaseTarget;
491     
492     private final Object JavaDoc tenant;
493     
494     /**
495      * Create instance of this class.
496      */

497     public ClientLeaseInfo(Object JavaDoc leaseTarget, Object JavaDoc tenant) {
498         this.leaseTarget = leaseTarget;
499         this.tenant = tenant;
500     }
501     
502     /**
503      * Get lease target.
504      */

505     public Object JavaDoc getLeaseTarget() {
506         return leaseTarget;
507     }
508     
509     /**
510      * Get tenant.
511      */

512     public Object JavaDoc getTenant() {
513         return tenant;
514     }
515
516     /**
517      * Test if <code>obj</code> is equal to this instance.
518      */

519         public boolean equals(Object JavaDoc obj) {
520             if (obj == this) return true;
521         
522         if (!(obj instanceof ClientLeaseInfo)) return false;
523         
524         ClientLeaseInfo that = (ClientLeaseInfo)obj;
525         
526         return that.getLeaseTarget().equals(leaseTarget) &&
527         that.getTenant().equals(tenant);
528         }
529
530     /**
531      * Get hash code of this object. Hash code is calculated as combination
532      * of lease target and tenant hash codes.
533      */

534         public int hashCode() {
535             return leaseTarget.hashCode() ^ tenant.hashCode();
536         }
537     }
538     
539     /**
540      * This class represents lease granted by lease factory.
541      */

542     private class LocalLease implements Lease {
543     private final long expiresAt;
544     
545     private final long creationTime;
546     
547     private final Object JavaDoc leaseTarget;
548     
549     private final Object JavaDoc tenant;
550     
551     /**
552      * Create instance of this class for the specified lease target,
553      * tenant and lease expiration time.
554      */

555     public LocalLease(Object JavaDoc leaseTarget, Object JavaDoc tenant, long expiresAt) {
556         this.leaseTarget = leaseTarget;
557         this.tenant = tenant;
558         this.expiresAt = expiresAt;
559         this.creationTime = System.currentTimeMillis();
560     }
561     
562     /**
563      * Get lease expiration time.
564      */

565     public long getExpiration() {
566         return expiresAt;
567     }
568     
569     /**
570      * Get left lease duration.
571      */

572     public long getDuration() {
573         return expiresAt > System.currentTimeMillis() ?
574         expiresAt - System.currentTimeMillis() : -1;
575     }
576     
577     /**
578      * Get lease factory that owns this lease.
579      */

580     public LeaseFactory getFactory() {
581         return LeaseFactoryClient.this;
582     }
583     
584     /**
585      * Get lease target.
586      */

587     public Object JavaDoc getLeaseTarget() {
588         return leaseTarget;
589     }
590     
591     /**
592      * Test if lease is expired.
593      */

594     public boolean isExpired() {
595         return System.currentTimeMillis() >= expiresAt;
596     }
597     
598     /**
599      * Get tenant that was granted a lease.
600      */

601     public Object JavaDoc getTenant() {
602         return tenant;
603     }
604     }
605 }
606
Popular Tags