1 package org.jgroups.service.lease; 2 3 4 import org.jgroups.Address; 5 import org.jgroups.Channel; 6 import org.jgroups.Message; 7 import org.jgroups.MessageListener; 8 import org.jgroups.blocks.PullPushAdapter; 9 import org.jgroups.service.AbstractService; 10 import org.jgroups.util.Util; 11 12 import java.io.IOException ; 13 import java.io.ObjectInput ; 14 import java.io.ObjectOutput ; 15 import java.io.Serializable ; 16 import java.util.HashMap ; 17 import java.util.Iterator ; 18 import java.util.Map ; 19 20 21 22 23 47 public class LeaseFactoryService extends AbstractService { 48 49 private static final String NEW_LEASE_METHOD = 50 "LeaseFactoryService.processNewLeaseRequest()"; 51 52 private static final String RENEW_LEASE_METHOD = 53 "LeaseFactoryService.processRenewLeaseRequest()"; 54 55 private static final String CANCEL_LEASE_METHOD = 56 "LeaseFactoryService.processCancelLeaseRequest()"; 57 58 private static final String DENY_METHOD = 59 "LeaseFactoryService.denyLeaseRequest()"; 60 61 62 public static final String LEASE_SERVICE_NAME = "Leasing Service"; 63 64 public static final int DEFAULT_BEST_FIT_DURATION = 10 * 1000; 65 66 public static final int MAXIMUM_DURATION = 60 * 1000; 67 68 protected final PullPushAdapter clientAdapter; 69 70 protected final Map leases; 71 72 83 public LeaseFactoryService(Channel serviceChannel, Channel clientChannel) { 84 super(serviceChannel, clientChannel); 85 86 this.clientAdapter = new PullPushAdapter( 87 clientChannel, new ClientMessageListener()); 88 89 leases = new HashMap (); 90 91 setMessageListener(new ServiceMessageListener()); 92 } 93 94 99 public String getName() { 100 return LEASE_SERVICE_NAME; 101 } 102 103 109 protected long getBestFitDuration() { 110 return DEFAULT_BEST_FIT_DURATION + System.currentTimeMillis(); 111 } 112 113 120 protected long getMaximumDuration() { 121 return MAXIMUM_DURATION + System.currentTimeMillis(); 122 } 123 124 129 protected void incorrectStateReceived(Object incorrectState) { 130 System.err.println("Incorrect state received : " + incorrectState); 131 } 132 133 136 protected void propagateStateChange(int type, LeaseInfo leaseInfo, 137 Object leaseTarget) 138 { 139 LeaseInfoReplicationHeader header = 140 new LeaseInfoReplicationHeader(type, leaseInfo); 141 142 Message msg = new Message(); 143 msg.putHeader(LeaseInfoReplicationHeader.HEADER_KEY, header); 144 msg.setObject((Serializable )leaseTarget); 145 146 try { 147 serviceChannel.send(msg); 148 } catch(Exception ex) { 149 ex.printStackTrace(); 150 } 151 } 152 153 156 protected void denyLeaseRequest(int denialType, Address requester, 157 String reason, Object leaseTarget, Object tenant) 158 { 159 160 if(log.isDebugEnabled()) log.debug("Denying request: type=" + denialType + 161 ", requester=" + requester + ", leaseTarget=" + leaseTarget + 162 ", tenant=" + tenant + ", reason : " + reason); 163 164 DenyResponseHeader responseHeader = 165 new DenyResponseHeader(denialType, reason, tenant); 166 167 Message msg = new Message(); 168 msg.putHeader(DenyResponseHeader.HEADER_KEY, responseHeader); 169 msg.setDest(requester); 170 msg.setObject((Serializable )leaseTarget); 171 172 try { 173 clientChannel.send(msg); 174 } catch(Exception ex) { 175 ex.printStackTrace(); 176 } 178 } 179 180 185 protected void processNewLeaseRequest(LeaseRequestHeader header, 186 Object leaseTarget, Address requester) 187 { 188 if (leaseTarget == null) 189 return; 190 191 192 if(log.isDebugEnabled()) log.debug("New lease request: " + 193 "target=" + leaseTarget + ", tenant=" + header.getTenant() + 194 ", requester=" + requester); 195 196 197 LeaseInfo leaseInfo = (LeaseInfo)leases.get(leaseTarget); 198 199 if (leaseInfo != null && leaseInfo.isExpired()) { 201 202 leases.remove(leaseTarget); 203 leaseInfo = null; 204 205 } 206 207 if (leaseInfo != null) { 209 210 denyLeaseRequest(DenyResponseHeader.LEASE_DENIED, requester, 211 "Lease target is currently in use. If you are owner of lease " + 212 "use lease renewal mechanism to extend lease time.", 213 leaseTarget, header.getTenant()); 214 215 return; 216 217 } else { 218 219 Object tenant = header.getTenant(); 220 221 if (tenant == null) { 224 225 denyLeaseRequest(DenyResponseHeader.LEASE_DENIED, requester, 226 "Tenant is unknown. Please check if you specified entity " + 227 "to which lease should be granted.", leaseTarget, tenant); 228 229 return; 230 } 231 232 long leaseExpiration = 0; 233 234 if (header.getDuration() == LeaseFactory.DURATION_ANY) 235 leaseExpiration = getBestFitDuration(); 236 else 237 if (header.getDuration() == LeaseFactory.DURATION_FOREVER) 238 leaseExpiration = getMaximumDuration(); 239 else { 240 leaseExpiration = header.getDuration(); 241 if (!header.isAbsolute()) 242 leaseExpiration += System.currentTimeMillis(); 243 } 244 245 leaseInfo = new LeaseInfo(tenant, leaseExpiration); 246 247 try { 248 LeaseResponseHeader responseHeader = new LeaseResponseHeader( 249 LeaseResponseHeader.LEASE_GRANTED, leaseExpiration, false, tenant); 250 251 Message msg = new Message(); 252 msg.putHeader(LeaseResponseHeader.HEADER_KEY, responseHeader); 253 msg.setDest(requester); 254 msg.setObject((Serializable )leaseTarget); 255 256 clientChannel.send(msg); 257 258 leases.put(leaseTarget, leaseInfo); 259 260 propagateStateChange( 261 LeaseInfo.NEW_LEASE_TYPE, leaseInfo, leaseTarget); 262 263 } catch(Exception ex) { 264 } 266 267 } 268 269 } 270 271 275 protected void processRenewLeaseRequest(LeaseRequestHeader header, 276 Object leaseTarget, Address requester) 277 { 278 if (leaseTarget == null) 279 return; 280 281 282 if(log.isDebugEnabled()) log.debug("Renew lease request: " + 283 "target=" + leaseTarget + ", tenant=" + header.getTenant() + 284 ", requester=" + requester); 285 286 LeaseInfo leaseInfo = (LeaseInfo)leases.get(leaseTarget); 287 288 if (leaseInfo != null && leaseInfo.isExpired()) { 290 leases.remove(leaseTarget); 291 leaseInfo = null; 292 } 293 294 if (leaseInfo == null) { 295 296 denyLeaseRequest(DenyResponseHeader.RENEW_DENIED, requester, 297 "Lease you are trying to extent is not available or expired.", 298 leaseTarget, header.getTenant()); 299 300 } else { 301 302 if (!leaseInfo.getTenant().equals(header.getTenant())) { 304 305 denyLeaseRequest(DenyResponseHeader.RENEW_DENIED, requester, 306 "You are not a tenant of this lease.", 307 leaseTarget, header.getTenant()); 308 309 return; 310 } 311 312 long leaseExpiration = 0; 313 314 if (header.getDuration() == LeaseFactory.DURATION_ANY) 315 leaseExpiration = getBestFitDuration(); 316 else 317 if (header.getDuration() == LeaseFactory.DURATION_FOREVER) 318 leaseExpiration = getMaximumDuration(); 319 else { 320 leaseExpiration = header.getDuration(); 321 if (!header.isAbsolute()) 322 leaseExpiration += System.currentTimeMillis(); 323 } 324 325 leaseInfo.extendLease(leaseExpiration); 326 327 try { 328 329 LeaseResponseHeader responseHeader = new LeaseResponseHeader ( 330 LeaseResponseHeader.LEASE_RENEWED, leaseExpiration, false, 331 header.getTenant()); 332 333 Message msg = new Message(); 334 msg.putHeader(LeaseResponseHeader.HEADER_KEY, responseHeader); 335 msg.setDest(requester); 336 msg.setObject((Serializable )leaseTarget); 337 338 clientChannel.send(msg); 339 340 propagateStateChange( 341 LeaseInfo.RENEW_LEASE_TYPE, leaseInfo, leaseTarget); 342 343 } catch(Exception ex) { 344 } 346 } 347 } 348 349 354 protected void processCancelLeaseRequest(LeaseRequestHeader header, 355 Object leaseTarget, Address requester) 356 { 357 if (leaseTarget == null) 358 return; 359 360 361 if(log.isDebugEnabled()) log.debug("Cancel lease request: " + 362 "target=" + leaseTarget + ", tenant=" + header.getTenant() + 363 ", requester=" + requester); 364 365 LeaseInfo leaseInfo = (LeaseInfo)leases.get(leaseTarget); 366 367 if (leaseInfo == null) { 369 370 denyLeaseRequest(DenyResponseHeader.CANCEL_DENIED, requester, 371 "No lease was granted for specified lease target.", 372 leaseTarget, header.getTenant()); 373 374 return; 375 } 376 377 if (!leaseInfo.getTenant().equals(header.getTenant())) { 379 380 denyLeaseRequest(DenyResponseHeader.CANCEL_DENIED, requester, 381 "Lease belongs to another tenant.", 382 leaseTarget, header.getTenant()); 383 384 return; 385 } 386 387 leases.remove(leaseTarget); 388 389 Message msg = new Message(); 390 msg.putHeader(LeaseResponseHeader.HEADER_KEY, 391 new LeaseResponseHeader(LeaseResponseHeader.LEASE_CANCELED, header.getTenant())); 392 msg.setDest(requester); 393 msg.setObject((Serializable )leaseTarget); 394 395 try { 396 clientChannel.send(msg); 397 398 propagateStateChange( 399 LeaseInfo.CANCEL_LEASE_TYPE, leaseInfo, leaseTarget); 400 401 } catch(Exception ex) { 402 ex.printStackTrace(); 403 } 406 } 407 408 413 private class ClientMessageListener implements MessageListener { 414 415 419 public byte[] getState() { 420 return null; 421 } 422 423 428 public void receive(Message msg) { 429 430 if (!isCoordinator()) 432 return; 433 434 LeaseRequestHeader leaseRequestHeader = null; 435 436 try { 437 438 leaseRequestHeader = (LeaseRequestHeader) 439 msg.getHeader(LeaseRequestHeader.HEADER_KEY); 440 441 } catch(ClassCastException ccex) { 442 ccex.printStackTrace(); 443 return; 445 } 446 447 if (leaseRequestHeader == null) { 449 return; 450 } 451 452 Object leaseTarget = null; 453 leaseTarget=msg.getObject(); 454 455 Address requester = msg.getSrc(); 456 457 switch(leaseRequestHeader.getType()) { 459 460 case LeaseRequestHeader.NEW_LEASE_REQUEST : 461 processNewLeaseRequest( 462 leaseRequestHeader, leaseTarget, requester); 463 464 break; 465 466 case LeaseRequestHeader.RENEW_LEASE_REQUEST : 467 processRenewLeaseRequest( 468 leaseRequestHeader, leaseTarget, requester); 469 470 break; 471 472 case LeaseRequestHeader.CANCEL_LEASE_REQUEST : 473 processCancelLeaseRequest( 474 leaseRequestHeader, leaseTarget, requester); 475 476 break; 477 478 default : 479 } 481 } 482 483 487 public void setState(byte[] state) { 488 } 490 } 491 492 495 private class ServiceMessageListener implements MessageListener { 496 497 501 public byte[] getState() { 502 try { 503 return Util.objectToByteBuffer(new HashMap (leases)); 504 } 505 catch(Exception ex) { 506 if(log.isErrorEnabled()) log.error("exception marshalling state: " + ex); 507 return null; 508 } 509 } 510 511 514 public void receive(Message msg) { 515 516 if (msg.getSrc().equals(getAddress())) { 518 return; 519 } 520 521 LeaseInfoReplicationHeader header = (LeaseInfoReplicationHeader) 522 msg.getHeader(LeaseInfoReplicationHeader.HEADER_KEY); 523 524 if (header == null) 525 return; 526 527 Object tmp=null; 528 529 tmp=msg.getObject(); 530 531 switch(header.getType()) { 532 533 case LeaseInfo.NEW_LEASE_TYPE : 534 leases.put(tmp, header.getLeaseInfo()); 535 break; 536 537 case LeaseInfo.RENEW_LEASE_TYPE : 538 leases.put(tmp, header.getLeaseInfo()); 539 break; 540 541 case LeaseInfo.CANCEL_LEASE_TYPE : 542 leases.remove(tmp); 543 break; 544 545 default : 546 System.err.println("Incorrect type " + header.getType()); 547 } 548 } 549 550 555 public void setState(byte[] data) { 556 Object state; 557 558 try { 559 state=Util.objectFromByteBuffer(data); 560 } 561 catch(Exception ex) { 562 if(log.isErrorEnabled()) log.error("exception unmarshalling state: " + ex); 563 return; 564 } 565 566 if (!(state instanceof Map )) { 568 incorrectStateReceived(state); 569 return; 570 } 571 572 Iterator iterator = ((Map )state).entrySet().iterator(); 573 while (iterator.hasNext()) { 574 Map.Entry entry = (Map.Entry )iterator.next(); 575 576 Object leaseTarget = entry.getKey(); 577 578 if (!(entry.getValue() instanceof LeaseInfo)) { 580 incorrectStateReceived(state); 581 return; 582 } 583 584 LeaseInfo leaseInfo = (LeaseInfo)entry.getValue(); 585 586 leases.put(leaseTarget, leaseInfo); 587 } 588 } 589 590 } 591 592 597 public static class LeaseInfo implements java.io.Externalizable { 598 599 public static final int NEW_LEASE_TYPE = 1; 600 601 public static final int RENEW_LEASE_TYPE = 2; 602 603 public static final int CANCEL_LEASE_TYPE = 3; 604 605 private long expiresAt; 606 607 private Object tenant; 608 609 613 public LeaseInfo() { 614 } 615 616 619 public LeaseInfo(Object tenant, long expiresAt) { 620 this.expiresAt = expiresAt; 621 this.tenant = tenant; 622 } 623 624 627 public LeaseInfo(LeaseRequestHeader requestHeader) { 628 this.tenant = requestHeader.getTenant(); 629 630 this.expiresAt = requestHeader.getDuration(); 631 632 if (!requestHeader.isAbsolute()) 633 this.expiresAt += System.currentTimeMillis(); 634 } 635 636 639 public long expiresAt() { 640 return expiresAt; 641 } 642 643 646 public Object getTenant() { 647 return tenant; 648 } 649 650 653 public void extendLease(long newExpiration) { 654 expiresAt = newExpiration; 655 } 656 657 660 public boolean isExpired() { 661 return expiresAt <= System.currentTimeMillis(); 662 } 663 664 public void readExternal(ObjectInput in) 665 throws IOException , ClassNotFoundException 666 { 667 this.expiresAt = in.readLong(); 668 this.tenant = in.readObject(); 669 } 670 671 public void writeExternal(ObjectOutput out) throws IOException { 672 out.writeLong(expiresAt); 673 out.writeObject(tenant); 674 } 675 676 677 } 678 679 } 680 | Popular Tags |