1 29 30 package com.caucho.server.cluster; 31 32 import com.caucho.util.Alarm; 33 import com.caucho.vfs.ReadStream; 34 import com.caucho.vfs.ReadWritePair; 35 36 import java.io.IOException ; 37 import java.util.ArrayList ; 38 import java.util.Date ; 39 import java.util.logging.Level ; 40 import java.util.logging.Logger ; 41 42 51 public final class ClusterClient { 52 private static final Logger log 53 = Logger.getLogger(ClusterClient.class.getName()); 54 55 private static final int DISABLED = 0; 56 private static final int ENABLED = 1; 57 private static final int ENABLED_SESSION = 2; 58 59 private static final int ST_NEW = 0; 60 private static final int ST_STANDBY = 1; 61 private static final int ST_SESSION_ONLY = 2; 62 private static final int ST_STARTING = 3; 64 private static final int ST_WARMUP = 4; 65 private static final int ST_BUSY = 5; 66 private static final int ST_FAIL = 6; 67 private static final int ST_ACTIVE = 7; 68 private static final int ST_CLOSED = 8; 69 70 private ServerConnector _server; 71 72 private String _debugId; 73 74 private int _maxConnections = Integer.MAX_VALUE / 2; 75 76 private volatile int _enabledMode = ENABLED; 77 78 private ClusterStream []_idle = new ClusterStream[64]; 79 private volatile int _idleHead; 80 private volatile int _idleTail; 81 private int _idleSize = 16; 82 83 private int _streamCount; 84 85 private long _failRecoverTime; 86 private long _warmupTime; 87 88 private volatile int _state = ST_NEW; 89 90 private volatile long _lastFailTime; 91 private volatile long _lastBusyTime; 92 private volatile long _firstConnectTime; 93 94 private volatile int _activeCount; 95 private volatile int _startingCount; 96 97 private volatile long _keepaliveCountTotal; 98 private volatile long _connectCountTotal; 99 private volatile long _failCountTotal; 100 private volatile long _busyCountTotal; 101 102 private volatile double _cpuLoadAvg; 103 private volatile long _cpuSetTime; 104 105 ClusterClient(ServerConnector server) 106 { 107 _server = server; 108 109 Cluster cluster = Cluster.getLocal(); 110 111 String selfId = null; 112 if (cluster != null) 113 selfId = cluster.getId(); 114 115 if (selfId == null || selfId.equals("")) 116 selfId = "default"; 117 118 String targetId = server.getId(); 119 if (targetId == null || targetId.equals("")) 120 targetId = String.valueOf(server.getIndex()); 121 122 _debugId = selfId + "->" + targetId; 123 124 _failRecoverTime = server.getLoadBalanceRecoverTime(); 125 _warmupTime = server.getLoadBalanceWarmupTime(); 126 127 _state = ST_STARTING; 128 } 129 130 133 public ServerConnector getServer() 134 { 135 return _server; 136 } 137 138 141 public int getActiveCount() 142 { 143 return _activeCount; 144 } 145 146 149 public int getIdleCount() 150 { 151 return (_idleHead - _idleTail + _idle.length) % _idle.length; 152 } 153 154 157 public long getConnectCountTotal() 158 { 159 return _connectCountTotal; 160 } 161 162 165 public long getKeepaliveCountTotal() 166 { 167 return _keepaliveCountTotal; 168 } 169 170 173 public long getFailCountTotal() 174 { 175 return _failCountTotal; 176 } 177 178 181 public Date getLastFailTime() 182 { 183 return new Date (_lastFailTime); 184 } 185 186 189 public long getBusyCountTotal() 190 { 191 return _busyCountTotal; 192 } 193 194 197 public Date getLastBusyTime() 198 { 199 return new Date (_lastBusyTime); 200 } 201 202 205 public void setCpuLoadAvg(double load) 206 { 207 _cpuSetTime = Alarm.getCurrentTime(); 208 _cpuLoadAvg = load; 209 } 210 211 214 public double getCpuLoadAvg() 215 { 216 double avg = _cpuLoadAvg; 217 long time = _cpuSetTime; 218 219 long now = Alarm.getCurrentTime(); 220 221 if (now - time < 10000L) 222 return avg; 223 else 224 return avg * 10000L / (now - time); 225 } 226 227 230 public String getDebugId() 231 { 232 return _debugId; 233 } 234 235 238 public final boolean isActive() 239 { 240 switch (_state) { 241 case ST_ACTIVE: 242 return true; 243 244 case ST_STANDBY: 245 case ST_CLOSED: 246 return false; 247 248 case ST_FAIL: 249 return (_lastFailTime + _failRecoverTime <= Alarm.getCurrentTime()); 250 251 default: 252 return false; 253 } 254 } 255 256 259 public String getState() 260 { 261 switch (_state) { 262 case ST_NEW: 263 return "init"; 264 case ST_STANDBY: 265 return "standby"; 266 case ST_SESSION_ONLY: 267 return "session-only"; 268 case ST_STARTING: 269 return "starting"; 270 case ST_WARMUP: 271 return "warmup"; 272 case ST_BUSY: 273 return "busy"; 274 case ST_FAIL: 275 return "fail"; 276 case ST_ACTIVE: 277 return "active"; 278 case ST_CLOSED: 279 return "closed"; 280 default: 281 return "unknown(" + _state + ")"; 282 } 283 } 284 285 288 public boolean canOpenSoft() 289 { 290 int state = _state; 291 292 if (state == ST_ACTIVE) 293 return true; 294 else if (ST_STARTING <= state && state < ST_ACTIVE) { 295 long now = Alarm.getCurrentTime(); 296 297 if (now < _lastFailTime + _failRecoverTime) 298 return false; 299 if (now < _lastBusyTime + _failRecoverTime) 300 return false; 301 302 long warmupCount; 303 long warmupTime = _warmupTime; 304 305 if (_firstConnectTime <= 0) { 306 toWarmup(); 307 warmupCount = 0; 308 } 309 else if (warmupTime <= 0) 310 warmupCount = Integer.MAX_VALUE; 311 else 312 warmupCount = 16 * (now - _firstConnectTime) / warmupTime; 313 314 317 if (warmupCount > 16) { 318 toActive(); 319 320 return true; 321 } 322 323 int idleCount = getIdleCount(); 324 int totalCount = _activeCount + _startingCount + idleCount; 325 326 if (totalCount <= 1) 327 return true; 328 else if (warmupCount < 8) 329 return false; 330 else if (totalCount < (1 << (warmupCount - 8))) { 331 return true; 332 } 333 else 334 return false; 335 } 336 else { 337 return false; 338 } 339 } 340 341 344 public boolean isEnabled() 345 { 346 int state = _state; 347 348 return ST_STARTING <= state && state <= ST_ACTIVE; 349 } 350 351 private void toActive() 352 { 353 synchronized (this) { 354 if (_state < ST_CLOSED) 355 _state = ST_ACTIVE; 356 } 357 } 358 359 private void toWarmup() 360 { 361 synchronized (this) { 362 if (ST_STARTING <= _state && _state < ST_CLOSED) { 363 _state = ST_WARMUP; 364 _firstConnectTime = Alarm.getCurrentTime(); 365 } 366 } 367 } 368 369 public void toBusy() 370 { 371 _lastBusyTime = Alarm.getCurrentTime(); 372 _firstConnectTime = 0; 373 374 synchronized (this) { 375 _busyCountTotal++; 376 377 if (_state < ST_CLOSED) 378 _state = ST_BUSY; 379 } 380 } 381 382 public void toFail() 383 { 384 _lastFailTime = Alarm.getCurrentTime(); 385 _firstConnectTime = 0; 386 387 synchronized (this) { 388 _failCountTotal++; 389 390 if (_state < ST_CLOSED) 391 _state = ST_FAIL; 392 } 393 394 clearRecycle(); 395 } 396 397 400 public void start() 401 { 402 synchronized (this) { 403 if (_state == ST_ACTIVE) { 404 } 405 else if (_state < ST_CLOSED) 406 _state = ST_STARTING; 407 } 408 } 409 410 413 public void stop() 414 { 415 synchronized (this) { 416 if (_state < ST_CLOSED) 417 _state = ST_STANDBY; 418 } 419 } 420 421 424 public void enableSessionOnly() 425 { 426 synchronized (this) { 427 if (_state < ST_CLOSED && _state != ST_STANDBY) 428 _state = ST_SESSION_ONLY; 429 } 430 } 431 432 437 public ClusterStream openSoft() 438 { 439 int state = _state; 440 441 if (! (ST_STARTING <= state && state <= ST_ACTIVE)) 442 return null; 443 444 long now = Alarm.getCurrentTime(); 445 446 if (now < _lastFailTime + _failRecoverTime) 447 return null; 448 if (now < _lastBusyTime + _failRecoverTime) 449 return null; 450 451 ClusterStream stream = openRecycle(); 452 453 if (stream != null) 454 return stream; 455 456 if (canOpenSoft()) { 457 return connect(); 458 } 459 else 460 return null; 461 } 462 463 468 public ClusterStream openIfLive() 469 { 470 if (_state == ST_CLOSED) 471 return null; 472 473 ClusterStream stream = openRecycle(); 474 475 if (stream != null) 476 return stream; 477 478 long now = Alarm.getCurrentTime(); 479 480 if (now < _lastFailTime + _failRecoverTime) 481 return null; 482 483 return connect(); 484 } 485 486 491 public ClusterStream openForSession() 492 { 493 int state = _state; 494 if (! (ST_SESSION_ONLY <= state && state < ST_CLOSED)) { 495 return null; 496 } 497 498 ClusterStream stream = openRecycle(); 499 500 if (stream != null) 501 return stream; 502 503 long now = Alarm.getCurrentTime(); 504 505 if (now < _lastFailTime + _failRecoverTime) { 506 return null; 507 } 508 509 if (now < _lastBusyTime + _failRecoverTime) { 510 return null; 511 } 512 513 return connect(); 514 } 515 516 521 public ClusterStream open() 522 { 523 int state = _state; 524 if (! (ST_STARTING <= state && state < ST_CLOSED)) 525 return null; 526 527 ClusterStream stream = openRecycle(); 528 529 if (stream != null) 530 return stream; 531 532 return connect(); 533 } 534 535 543 private ClusterStream openRecycle() 544 { 545 long now = Alarm.getCurrentTime(); 546 ClusterStream stream = null; 547 548 synchronized (this) { 549 if (_idleHead != _idleTail) { 550 stream = _idle[_idleHead]; 551 long freeTime = stream.getFreeTime(); 552 553 _idle[_idleHead] = null; 554 _idleHead = (_idleHead + _idle.length - 1) % _idle.length; 555 556 if (now < freeTime + _server.getLoadBalanceIdleTime()) { 557 _activeCount++; 558 _keepaliveCountTotal++; 559 560 return stream; 561 } 562 } 563 } 564 565 if (stream != null) 566 stream.closeImpl(); 567 568 return null; 569 } 570 571 576 private ClusterStream connect() 577 { 578 synchronized (this) { 579 if (_maxConnections <= _activeCount + _startingCount) 580 return null; 581 582 _startingCount++; 583 } 584 585 try { 586 ReadWritePair pair = _server.openTCPPair(); 587 ReadStream rs = pair.getReadStream(); 588 rs.setAttribute("timeout", new Integer ((int) _server.getSocketTimeout())); 589 590 synchronized (this) { 591 _activeCount++; 592 _connectCountTotal++; 593 594 if (_firstConnectTime <= 0) { 595 if (ST_STARTING <= _state && _state < ST_ACTIVE) { 596 _state = ST_WARMUP; 597 _firstConnectTime = Alarm.getCurrentTime(); 598 } 599 } 600 } 601 602 ClusterStream stream = new ClusterStream(_streamCount++, this, 603 rs, pair.getWriteStream()); 604 605 if (log.isLoggable(Level.FINER)) 606 log.finer("connect " + stream); 607 608 return stream; 609 } catch (IOException e) { 610 log.log(Level.FINER, e.toString(), e); 611 612 toFail(); 613 614 return null; 615 } finally { 616 synchronized (this) { 617 _startingCount--; 618 } 619 } 620 } 621 622 626 public void wake() 627 { 628 synchronized (this) { 629 _lastFailTime = 0; 630 if (_state == ST_FAIL) { 631 _state = ST_STARTING; 632 } 633 } 634 } 635 636 640 void free(ClusterStream stream) 641 { 642 synchronized (this) { 643 _activeCount--; 644 645 int size = (_idleHead - _idleTail + _idle.length) % _idle.length; 646 647 if (_state != ST_CLOSED && size < _idleSize) { 648 _idleHead = (_idleHead + 1) % _idle.length; 649 _idle[_idleHead] = stream; 650 651 stream = null; 652 } 653 } 654 655 long now = Alarm.getCurrentTime(); 656 long maxIdleTime = _server.getLoadBalanceIdleTime(); 657 ClusterStream oldStream = null; 658 659 do { 660 oldStream = null; 661 662 synchronized (this) { 663 if (_idleHead != _idleTail) { 664 int nextTail = (_idleTail + 1) % _idle.length; 665 666 oldStream = _idle[nextTail]; 667 668 if (oldStream != null 669 && oldStream.getFreeTime() + maxIdleTime < now) { 670 _idle[nextTail] = null; 671 _idleTail = nextTail; 672 } 673 else 674 oldStream = null; 675 } 676 } 677 678 if (oldStream != null) 679 oldStream.closeImpl(); 680 } while (oldStream != null); 681 682 if (stream != null) 683 stream.closeImpl(); 684 } 685 686 690 void close(ClusterStream stream) 691 { 692 if (log.isLoggable(Level.FINER)) 693 log.finer("close " + stream); 694 695 synchronized (this) { 696 _activeCount--; 697 } 698 } 699 700 704 public void clearRecycle() 705 { 706 ArrayList <ClusterStream> recycleList = null; 707 708 synchronized (this) { 709 _idleHead = _idleTail = 0; 710 711 for (int i = 0; i < _idle.length; i++) { 712 ClusterStream stream; 713 714 stream = _idle[i]; 715 _idle[i] = null; 716 717 if (stream != null) { 718 if (recycleList == null) 719 recycleList = new ArrayList <ClusterStream>(); 720 721 recycleList.add(stream); 722 } 723 } 724 } 725 726 if (recycleList != null) { 727 for (ClusterStream stream : recycleList) { 728 stream.closeImpl(); 729 } 730 } 731 } 732 733 736 public void close() 737 { 738 synchronized (this) { 739 if (_state == ST_CLOSED) 740 return; 741 742 _state = ST_CLOSED; 743 } 744 745 synchronized (this) { 746 _idleHead = _idleTail = 0; 747 } 748 749 for (int i = 0; i < _idle.length; i++) { 750 ClusterStream stream; 751 752 synchronized (this) { 753 stream = _idle[i]; 754 _idle[i] = null; 755 } 756 757 if (stream != null) 758 stream.closeImpl(); 759 } 760 } 761 762 public String toString() 763 { 764 return ("ClusterClient[" + _server + "]"); 765 } 766 } 767 | Popular Tags |