1 package org.jgroups.service.lease; 2 3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.jgroups.*; 6 import org.jgroups.blocks.PullPushAdapter; 7 8 import java.io.Serializable ; 9 import java.util.HashMap ; 10 11 24 public class LeaseFactoryClient implements LeaseFactory { 25 26 private static final String LEASE_CLIENT_RECEIVE_METHOD = 27 "LeaseFactoryClient.ClientMessageListener.receive()"; 28 29 private static final String NEW_LEASE_METHOD = 30 "LeaseFactoryClient.newLease()"; 31 32 private static final String RENEW_LEASE_METHOD = 33 "LeaseFactoryClient.renewLease()"; 34 35 private static final String CANCEL_LEASE_METHOD = 36 "LeaseFactoryClient.cancelLease()"; 37 38 39 public static final int DEFAULT_LEASE_TIMEOUT = 10000; 40 41 public static final int DEFAULT_CANCEL_TIMEOUT = 1000; 42 43 protected final Channel clientChannel; 44 45 protected final PullPushAdapter clientAdapter; 46 47 protected final int leaseTimeout = DEFAULT_LEASE_TIMEOUT; 48 49 protected final int cancelTimeout = DEFAULT_CANCEL_TIMEOUT; 50 51 protected final HashMap pendingLeases = new HashMap (); 52 53 protected final HashMap pendingRenewals = new HashMap (); 54 55 protected final HashMap pendingCancels = new HashMap (); 56 57 protected final Log log=LogFactory.getLog(this.getClass()); 58 59 63 public LeaseFactoryClient(Channel clientChannel) { 64 this(clientChannel, DEFAULT_LEASE_TIMEOUT, DEFAULT_CANCEL_TIMEOUT); 65 } 66 67 78 public LeaseFactoryClient(Channel clientChannel, int leaseTimeout, 79 int cancelTimeout) 80 { 81 this.clientChannel = clientChannel; 82 this.clientAdapter = new PullPushAdapter( 83 clientChannel, new ClientMessageListener()); 84 } 85 86 89 public void cancelLease(Lease existingLease) throws UnknownLeaseException { 90 if (existingLease == null) 91 throw new UnknownLeaseException( 92 "Existing lease cannot be null.", existingLease); 93 94 if (existingLease.isExpired()) 95 throw new UnknownLeaseException( 96 "You existing lease has expired. " + 97 "You cannot use this method to obtain new lease.", existingLease); 98 99 ClientLeaseInfo leaseInfo = new ClientLeaseInfo( 100 existingLease.getLeaseTarget(), existingLease.getTenant()); 101 102 if (pendingCancels.keySet().contains(leaseInfo)) 103 throw new UnknownLeaseException("There's pending cancel " + 104 "request for specified lease target and tenant.", 105 existingLease); 106 107 try { 108 Object leaseMutex = new Object (); 111 pendingCancels.put(leaseInfo, leaseMutex); 112 113 try { 116 117 synchronized(leaseMutex) { 118 LeaseRequestHeader requestHeader = new LeaseRequestHeader( 119 LeaseRequestHeader.CANCEL_LEASE_REQUEST, 0, false, 120 existingLease.getTenant()); 121 122 Message msg = new Message(); 123 msg.putHeader(LeaseRequestHeader.HEADER_KEY, requestHeader); 124 msg.setObject((Serializable )existingLease.getLeaseTarget()); 125 126 clientChannel.send(msg); 128 129 leaseMutex.wait(leaseTimeout); 130 } 131 132 } catch(InterruptedException ex) { 133 134 throw new UnknownLeaseException( 135 "Did not get any reply before the thread was interrupted.", 136 null); 137 138 } catch(ChannelNotConnectedException ex) { 139 140 throw new UnknownLeaseException( 141 "Unable to send request, channel is not connected " + 142 ex.getMessage(), null); 143 144 } catch(ChannelClosedException ex) { 145 146 throw new UnknownLeaseException( 147 "Unable to send request, channel is closed " + 148 ex.getMessage(), null); 149 150 } 151 152 if (!(pendingCancels.get(leaseInfo) instanceof Message)) 156 return; 157 158 Message reply = (Message)pendingCancels.get(leaseInfo); 159 160 DenyResponseHeader denyHeader = (DenyResponseHeader) 162 reply.getHeader(DenyResponseHeader.HEADER_KEY); 163 164 if (denyHeader != null) 166 throw new UnknownLeaseException( 167 denyHeader.getDenialReason(), existingLease); 168 } finally { 169 pendingCancels.remove(leaseInfo); 170 } 171 } 172 173 176 public Lease newLease(Object leaseTarget, Object tenant, 177 long requestedDuration, boolean isAbsolute) throws LeaseDeniedException 178 { 179 if (leaseTarget == null || tenant == null) 180 throw new LeaseDeniedException( 181 "Lease target and tenant should be not null.", leaseTarget); 182 183 if (!(leaseTarget instanceof Serializable )) 184 throw new LeaseDeniedException( 185 "This lease factory can process only serializable lease targets", 186 leaseTarget); 187 188 if (!(tenant instanceof Serializable )) 189 throw new LeaseDeniedException( 190 "This lease factory can process only serializable tenants", 191 leaseTarget); 192 193 ClientLeaseInfo leaseInfo = new ClientLeaseInfo(leaseTarget, tenant); 194 195 if (pendingLeases.keySet().contains(leaseInfo)) 196 throw new RecursiveLeaseRequestException("There's pending lease " + 197 "request for specified lease target and tenant.", leaseTarget, tenant); 198 199 try { 200 Object leaseMutex = new Object (); 203 204 pendingLeases.put(leaseInfo, leaseMutex); 205 206 207 if(log.isDebugEnabled()) log.debug("Added lease info for leaseTarget=" + leaseTarget + 208 ", tenant=" + tenant); 209 210 try { 213 214 if (leaseMutex != null) { 215 synchronized(leaseMutex) { 216 217 LeaseRequestHeader requestHeader = new LeaseRequestHeader( 218 LeaseRequestHeader.NEW_LEASE_REQUEST, 219 requestedDuration, isAbsolute, tenant); 220 221 Message msg = new Message(); 222 msg.putHeader(LeaseRequestHeader.HEADER_KEY, requestHeader); 223 msg.setObject((Serializable )leaseTarget); 224 225 clientChannel.send(msg); 226 227 leaseMutex.wait(leaseTimeout); 228 } 229 } 230 231 } catch(InterruptedException ex) { 232 233 throw new LeaseDeniedException( 234 "Did not get any reply before the thread was interrupted."); 235 236 } catch(ChannelNotConnectedException ex) { 237 238 throw new LeaseDeniedException( 239 "Unable to send request, channel is not connected " + 240 ex.getMessage(), null); 241 242 } catch(ChannelClosedException ex) { 243 244 throw new LeaseDeniedException( 245 "Unable to send request, channel is closed " + 246 ex.getMessage(), null); 247 248 } 249 250 if (!(pendingLeases.get(leaseInfo) instanceof Message)) { 253 throw new LeaseDeniedException( 254 "Did not get reply from leasing service within specified timeframe.", 255 leaseTarget); 256 } 257 258 Message reply = (Message)pendingLeases.get(leaseInfo); 259 260 DenyResponseHeader denyHeader = (DenyResponseHeader) 262 reply.getHeader(DenyResponseHeader.HEADER_KEY); 263 264 if (denyHeader != null) { 266 Object tmp=reply.getObject(); 267 throw new LeaseDeniedException( 268 denyHeader.getDenialReason(), tmp); 269 } 270 271 LeaseResponseHeader responseHeader = (LeaseResponseHeader) 273 reply.getHeader(LeaseResponseHeader.HEADER_KEY); 274 275 return new LocalLease(leaseTarget, tenant, 278 responseHeader.getDuration()); 279 280 } finally { 281 282 283 if(log.isDebugEnabled()) log.debug("Removing lease info for leaseTarget=" + leaseTarget + 284 ", tenant=" + tenant); 285 286 pendingLeases.remove(leaseInfo); 287 } 288 } 289 290 294 public Lease renewLease(Lease existingLease, long requestedDuration, 295 boolean isAbsolute) throws LeaseDeniedException 296 { 297 if (existingLease == null) 298 throw new LeaseDeniedException( 299 "Existing lease cannot be null.", null); 300 301 if (existingLease.isExpired()) 302 throw new LeaseDeniedException( 303 "You existing lease has expired. " + 304 "You cannot use this method to obtain new lease.", 305 existingLease.getLeaseTarget()); 306 307 ClientLeaseInfo leaseInfo = new ClientLeaseInfo( 308 existingLease.getLeaseTarget(), existingLease.getTenant()); 309 310 if (pendingLeases.keySet().contains(leaseInfo)) 311 throw new RecursiveLeaseRequestException("There's pending lease " + 312 "request for specified lease target and tenant.", 313 existingLease.getLeaseTarget(), 314 existingLease.getTenant()); 315 316 try { 317 Object leaseMutex = new Object (); 320 321 pendingLeases.put(leaseInfo, leaseMutex); 322 323 if(log.isDebugEnabled()) log.debug("Added lease info for leaseTarget=" + existingLease.getLeaseTarget() + 324 ", tenant=" + existingLease.getTenant()); 325 326 327 try { 330 synchronized(leaseMutex) { 331 332 LeaseRequestHeader requestHeader = new LeaseRequestHeader( 334 LeaseRequestHeader.RENEW_LEASE_REQUEST, 335 requestedDuration, isAbsolute, existingLease.getTenant()); 336 337 Message msg = new Message(); 338 msg.putHeader(LeaseRequestHeader.HEADER_KEY, requestHeader); 339 msg.setObject((Serializable )existingLease.getLeaseTarget()); 340 341 clientChannel.send(msg); 343 344 leaseMutex.wait(leaseTimeout); 345 } 346 } catch(InterruptedException ex) { 347 348 throw new LeaseDeniedException( 349 "Did not get any reply before the thread was interrupted."); 350 351 } catch(ChannelNotConnectedException ex) { 352 353 throw new LeaseDeniedException( 354 "Unable to send request, channel is not connected " + 355 ex.getMessage(), null); 356 357 } catch(ChannelClosedException ex) { 358 359 throw new LeaseDeniedException( 360 "Unable to send request, channel is closed " + 361 ex.getMessage(), null); 362 363 } 364 365 if (!(pendingLeases.get(leaseInfo) instanceof Message)) { 368 throw new LeaseDeniedException( 369 "Did not get reply from leasing service within specified timeframe.", 370 null); 371 } 372 373 Message reply = (Message)pendingLeases.get(leaseInfo); 374 375 DenyResponseHeader denyHeader = (DenyResponseHeader) 377 reply.getHeader(DenyResponseHeader.HEADER_KEY); 378 379 if (denyHeader != null) 381 throw new LeaseDeniedException(denyHeader.getDenialReason(), null); 382 383 LeaseResponseHeader responseHeader = (LeaseResponseHeader) 385 reply.getHeader(LeaseResponseHeader.HEADER_KEY); 386 387 return new LocalLease(existingLease.getLeaseTarget(), 390 existingLease.getTenant(), responseHeader.getDuration()); 391 392 } finally { 393 394 395 if(log.isDebugEnabled()) log.debug("Removing lease info for leaseTarget=" + 396 existingLease.getLeaseTarget() + ", tenant=" + 397 existingLease.getTenant()); 398 399 pendingLeases.remove(leaseInfo); 400 } 401 } 402 403 406 public Address getAddress() { 407 return clientChannel.getLocalAddress(); 408 } 409 410 private class ClientMessageListener implements MessageListener { 411 412 416 public byte[] getState() { 417 return null; 418 } 419 420 423 public void receive(Message msg) { 424 425 DenyResponseHeader denyHeader = (DenyResponseHeader) 426 msg.getHeader(DenyResponseHeader.HEADER_KEY); 427 428 LeaseResponseHeader leaseHeader = (LeaseResponseHeader) 429 msg.getHeader(LeaseResponseHeader.HEADER_KEY); 430 431 if (denyHeader == null && leaseHeader == null) 432 return; 433 434 Object leaseTarget = msg.getObject(); 435 436 Object tenant = denyHeader != null ? 437 denyHeader.getTenant() : leaseHeader.getTenant(); 438 439 boolean cancelReply = 440 (denyHeader != null && denyHeader.getType() == DenyResponseHeader.CANCEL_DENIED) || 441 (leaseHeader != null && leaseHeader.getType() == LeaseResponseHeader.LEASE_CANCELED); 442 443 444 if(log.isDebugEnabled()) log.debug("Received response: type=" + (denyHeader != null ? "deny" : "grant") + 445 ", leaseTarget=" + leaseTarget + ", tenant=" + tenant + 446 ", cancelReply=" + cancelReply); 447 448 449 ClientLeaseInfo leaseInfo = new ClientLeaseInfo(leaseTarget, tenant); 450 451 HashMap workingMap = cancelReply ? pendingCancels : pendingLeases; 452 453 Object leaseMutex = workingMap.get(leaseInfo); 454 455 if (leaseMutex != null) 456 synchronized(leaseMutex) { 457 workingMap.put(leaseInfo, msg); 458 459 leaseMutex.notifyAll(); 460 461 462 if(log.isDebugEnabled()) log.debug("Notified mutex for leaseTarget="+ leaseTarget + 463 ", tenant=" + tenant); 464 } 465 else { 466 467 if(log.isDebugEnabled()) log.debug("Could not find mutex for leaseTarget=" + leaseTarget + 468 ", tenant=" + tenant); 469 470 workingMap.remove(leaseInfo); 471 } 472 473 } 474 475 479 public void setState(byte[] state) { 480 } 482 } 483 484 488 private static class ClientLeaseInfo { 489 490 private final Object leaseTarget; 491 492 private final Object tenant; 493 494 497 public ClientLeaseInfo(Object leaseTarget, Object tenant) { 498 this.leaseTarget = leaseTarget; 499 this.tenant = tenant; 500 } 501 502 505 public Object getLeaseTarget() { 506 return leaseTarget; 507 } 508 509 512 public Object getTenant() { 513 return tenant; 514 } 515 516 519 public boolean equals(Object obj) { 520 if (obj == this) return true; 521 522 if (!(obj instanceof ClientLeaseInfo)) return false; 523 524 ClientLeaseInfo that = (ClientLeaseInfo)obj; 525 526 return that.getLeaseTarget().equals(leaseTarget) && 527 that.getTenant().equals(tenant); 528 } 529 530 534 public int hashCode() { 535 return leaseTarget.hashCode() ^ tenant.hashCode(); 536 } 537 } 538 539 542 private class LocalLease implements Lease { 543 private final long expiresAt; 544 545 private final long creationTime; 546 547 private final Object leaseTarget; 548 549 private final Object tenant; 550 551 555 public LocalLease(Object leaseTarget, Object tenant, long expiresAt) { 556 this.leaseTarget = leaseTarget; 557 this.tenant = tenant; 558 this.expiresAt = expiresAt; 559 this.creationTime = System.currentTimeMillis(); 560 } 561 562 565 public long getExpiration() { 566 return expiresAt; 567 } 568 569 572 public long getDuration() { 573 return expiresAt > System.currentTimeMillis() ? 574 expiresAt - System.currentTimeMillis() : -1; 575 } 576 577 580 public LeaseFactory getFactory() { 581 return LeaseFactoryClient.this; 582 } 583 584 587 public Object getLeaseTarget() { 588 return leaseTarget; 589 } 590 591 594 public boolean isExpired() { 595 return System.currentTimeMillis() >= expiresAt; 596 } 597 598 601 public Object getTenant() { 602 return tenant; 603 } 604 } 605 } 606 | Popular Tags |