KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > server > cluster > ClusterClient


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.server.cluster;
31
32 import com.caucho.util.Alarm;
33 import com.caucho.vfs.ReadStream;
34 import com.caucho.vfs.ReadWritePair;
35
36 import java.io.IOException JavaDoc;
37 import java.util.ArrayList JavaDoc;
38 import java.util.Date JavaDoc;
39 import java.util.logging.Level JavaDoc;
40 import java.util.logging.Logger JavaDoc;
41
42 /**
43  * Represents a connection to one of the servers in a distribution group.
44  * A {@link ClusterServer} is used to define the properties of the server
45  * that is connected to.
46  *
47  * <pre>
48  * resin:name=web-a,type=ClusterServer
49  * </pre>
50  */

51 public final class ClusterClient {
52   private static final Logger JavaDoc log
53     = Logger.getLogger(ClusterClient.class.getName());
54
55   private static final int DISABLED = 0;
56   private static final int ENABLED = 1;
57   private static final int ENABLED_SESSION = 2;
58
59   private static final int ST_NEW = 0;
60   private static final int ST_STANDBY = 1;
61   private static final int ST_SESSION_ONLY = 2;
62   // the following 5 are the active states
63
private static final int ST_STARTING = 3;
64   private static final int ST_WARMUP = 4;
65   private static final int ST_BUSY = 5;
66   private static final int ST_FAIL = 6;
67   private static final int ST_ACTIVE = 7;
68   private static final int ST_CLOSED = 8;
69
70   private ServerConnector _server;
71
72   private String JavaDoc _debugId;
73
74   private int _maxConnections = Integer.MAX_VALUE / 2;
75
76   private volatile int _enabledMode = ENABLED;
77   
78   private ClusterStream []_idle = new ClusterStream[64];
79   private volatile int _idleHead;
80   private volatile int _idleTail;
81   private int _idleSize = 16;
82
83   private int _streamCount;
84
85   private long _failRecoverTime;
86   private long _warmupTime;
87
88   private volatile int _state = ST_NEW;
89
90   private volatile long _lastFailTime;
91   private volatile long _lastBusyTime;
92   private volatile long _firstConnectTime;
93
94   private volatile int _activeCount;
95   private volatile int _startingCount;
96   
97   private volatile long _keepaliveCountTotal;
98   private volatile long _connectCountTotal;
99   private volatile long _failCountTotal;
100   private volatile long _busyCountTotal;
101
102   private volatile double _cpuLoadAvg;
103   private volatile long _cpuSetTime;
104
105   ClusterClient(ServerConnector server)
106   {
107     _server = server;
108
109     Cluster cluster = Cluster.getLocal();
110
111     String JavaDoc selfId = null;
112     if (cluster != null)
113       selfId = cluster.getId();
114
115     if (selfId == null || selfId.equals(""))
116       selfId = "default";
117
118     String JavaDoc targetId = server.getId();
119     if (targetId == null || targetId.equals(""))
120       targetId = String.valueOf(server.getIndex());
121
122     _debugId = selfId + "->" + targetId;
123
124     _failRecoverTime = server.getLoadBalanceRecoverTime();
125     _warmupTime = server.getLoadBalanceWarmupTime();
126
127     _state = ST_STARTING;
128   }
129
130   /**
131    * Returns the cluster server.
132    */

133   public ServerConnector getServer()
134   {
135     return _server;
136   }
137
138   /**
139    * Returns the number of active connections.
140    */

141   public int getActiveCount()
142   {
143     return _activeCount;
144   }
145
146   /**
147    * Returns the number of idle connections.
148    */

149   public int getIdleCount()
150   {
151     return (_idleHead - _idleTail + _idle.length) % _idle.length;
152   }
153
154   /**
155    * Returns the total number of successful socket connections
156    */

157   public long getConnectCountTotal()
158   {
159     return _connectCountTotal;
160   }
161
162   /**
163    * Returns the number of times a keepalive connection has been used.
164    */

165   public long getKeepaliveCountTotal()
166   {
167     return _keepaliveCountTotal;
168   }
169
170   /**
171    * Returns the total number of failed connect attempts.
172    */

173   public long getFailCountTotal()
174   {
175     return _failCountTotal;
176   }
177
178   /**
179    * Returns the time of the last failure.
180    */

181   public Date JavaDoc getLastFailTime()
182   {
183     return new Date JavaDoc(_lastFailTime);
184   }
185
186   /**
187    * Returns the count of busy connections.
188    */

189   public long getBusyCountTotal()
190   {
191     return _busyCountTotal;
192   }
193
194   /**
195    * Returns the time of the last busy.
196    */

197   public Date JavaDoc getLastBusyTime()
198   {
199     return new Date JavaDoc(_lastBusyTime);
200   }
201
202   /**
203    * Sets the CPU load avg (from backend).
204    */

205   public void setCpuLoadAvg(double load)
206   {
207     _cpuSetTime = Alarm.getCurrentTime();
208     _cpuLoadAvg = load;
209   }
210
211   /**
212    * Gets the CPU load avg
213    */

214   public double getCpuLoadAvg()
215   {
216     double avg = _cpuLoadAvg;
217     long time = _cpuSetTime;
218
219     long now = Alarm.getCurrentTime();
220
221     if (now - time < 10000L)
222       return avg;
223     else
224       return avg * 10000L / (now - time);
225   }
226
227   /**
228    * Returns the debug id.
229    */

230   public String JavaDoc getDebugId()
231   {
232     return _debugId;
233   }
234
235   /**
236    * Returns true if the server is active.
237    */

238   public final boolean isActive()
239   {
240     switch (_state) {
241     case ST_ACTIVE:
242       return true;
243
244     case ST_STANDBY:
245     case ST_CLOSED:
246       return false;
247
248     case ST_FAIL:
249       return (_lastFailTime + _failRecoverTime <= Alarm.getCurrentTime());
250       
251     default:
252       return false;
253     }
254   }
255
256   /**
257    * Returns the lifecycle state.
258    */

259   public String JavaDoc getState()
260   {
261     switch (_state) {
262     case ST_NEW:
263       return "init";
264     case ST_STANDBY:
265       return "standby";
266     case ST_SESSION_ONLY:
267       return "session-only";
268     case ST_STARTING:
269       return "starting";
270     case ST_WARMUP:
271       return "warmup";
272     case ST_BUSY:
273       return "busy";
274     case ST_FAIL:
275       return "fail";
276     case ST_ACTIVE:
277       return "active";
278     case ST_CLOSED:
279       return "closed";
280     default:
281       return "unknown(" + _state + ")";
282     }
283   }
284
285   /**
286    * Returns true if the server can open a connection.
287    */

288   public boolean canOpenSoft()
289   {
290     int state = _state;
291     
292     if (state == ST_ACTIVE)
293       return true;
294     else if (ST_STARTING <= state && state < ST_ACTIVE) {
295       long now = Alarm.getCurrentTime();
296
297       if (now < _lastFailTime + _failRecoverTime)
298     return false;
299       if (now < _lastBusyTime + _failRecoverTime)
300     return false;
301
302       long warmupCount;
303       long warmupTime = _warmupTime;
304
305       if (_firstConnectTime <= 0) {
306     toWarmup();
307     warmupCount = 0;
308       }
309       else if (warmupTime <= 0)
310     warmupCount = Integer.MAX_VALUE;
311       else
312     warmupCount = 16 * (now - _firstConnectTime) / warmupTime;
313
314       // Warmup time splits into 16 parts. The first 4 allow 1 request
315
// at a time. After that, each segment doubles the allowed requests
316

317       if (warmupCount > 16) {
318     toActive();
319       
320     return true;
321       }
322
323       int idleCount = getIdleCount();
324       int totalCount = _activeCount + _startingCount + idleCount;
325       
326       if (totalCount <= 1)
327     return true;
328       else if (warmupCount < 8)
329     return false;
330       else if (totalCount < (1 << (warmupCount - 8))) {
331     return true;
332       }
333       else
334     return false;
335     }
336     else {
337       return false;
338     }
339   }
340
341   /**
342    * Return true if active.
343    */

344   public boolean isEnabled()
345   {
346     int state = _state;
347     
348     return ST_STARTING <= state && state <= ST_ACTIVE;
349   }
350   
351   private void toActive()
352   {
353     synchronized (this) {
354       if (_state < ST_CLOSED)
355     _state = ST_ACTIVE;
356     }
357   }
358   
359   private void toWarmup()
360   {
361     synchronized (this) {
362       if (ST_STARTING <= _state && _state < ST_CLOSED) {
363     _state = ST_WARMUP;
364     _firstConnectTime = Alarm.getCurrentTime();
365       }
366     }
367   }
368   
369   public void toBusy()
370   {
371     _lastBusyTime = Alarm.getCurrentTime();
372     _firstConnectTime = 0;
373     
374     synchronized (this) {
375       _busyCountTotal++;
376       
377       if (_state < ST_CLOSED)
378     _state = ST_BUSY;
379     }
380   }
381   
382   public void toFail()
383   {
384     _lastFailTime = Alarm.getCurrentTime();
385     _firstConnectTime = 0;
386     
387     synchronized (this) {
388       _failCountTotal++;
389       
390       if (_state < ST_CLOSED)
391     _state = ST_FAIL;
392     }
393
394     clearRecycle();
395   }
396
397   /**
398    * Enable the client.
399    */

400   public void start()
401   {
402     synchronized (this) {
403       if (_state == ST_ACTIVE) {
404       }
405       else if (_state < ST_CLOSED)
406     _state = ST_STARTING;
407     }
408   }
409
410   /**
411    * Disable the client.
412    */

413   public void stop()
414   {
415     synchronized (this) {
416       if (_state < ST_CLOSED)
417     _state = ST_STANDBY;
418     }
419   }
420
421   /**
422    * Session only
423    */

424   public void enableSessionOnly()
425   {
426     synchronized (this) {
427       if (_state < ST_CLOSED && _state != ST_STANDBY)
428     _state = ST_SESSION_ONLY;
429     }
430   }
431
432   /**
433    * Open a stream to the target server.
434    *
435    * @return the socket's read/write pair.
436    */

437   public ClusterStream openSoft()
438   {
439     int state = _state;
440
441     if (! (ST_STARTING <= state && state <= ST_ACTIVE))
442       return null;
443
444     long now = Alarm.getCurrentTime();
445
446     if (now < _lastFailTime + _failRecoverTime)
447       return null;
448     if (now < _lastBusyTime + _failRecoverTime)
449       return null;
450
451     ClusterStream stream = openRecycle();
452
453     if (stream != null)
454       return stream;
455
456     if (canOpenSoft()) {
457       return connect();
458     }
459     else
460       return null;
461   }
462
463   /**
464    * Open a stream to the target server object persistence.
465    *
466    * @return the socket's read/write pair.
467    */

468   public ClusterStream openIfLive()
469   {
470     if (_state == ST_CLOSED)
471       return null;
472
473     ClusterStream stream = openRecycle();
474
475     if (stream != null)
476       return stream;
477
478     long now = Alarm.getCurrentTime();
479
480     if (now < _lastFailTime + _failRecoverTime)
481       return null;
482
483     return connect();
484   }
485
486   /**
487    * Open a stream to the target server for a session.
488    *
489    * @return the socket's read/write pair.
490    */

491   public ClusterStream openForSession()
492   {
493     int state = _state;
494     if (! (ST_SESSION_ONLY <= state && state < ST_CLOSED)) {
495       return null;
496     }
497
498     ClusterStream stream = openRecycle();
499
500     if (stream != null)
501       return stream;
502
503     long now = Alarm.getCurrentTime();
504
505     if (now < _lastFailTime + _failRecoverTime) {
506       return null;
507     }
508     
509     if (now < _lastBusyTime + _failRecoverTime) {
510       return null;
511     }
512
513     return connect();
514   }
515
516   /**
517    * Open a stream to the target server for the load balancer.
518    *
519    * @return the socket's read/write pair.
520    */

521   public ClusterStream open()
522   {
523     int state = _state;
524     if (! (ST_STARTING <= state && state < ST_CLOSED))
525       return null;
526
527     ClusterStream stream = openRecycle();
528
529     if (stream != null)
530       return stream;
531
532     return connect();
533   }
534
535   /**
536    * Returns a valid recycled stream from the idle pool to the backend.
537    *
538    * If the stream has been in the pool for too long (> live_time),
539    * close it instead.
540    *
541    * @return the socket's read/write pair.
542    */

543   private ClusterStream openRecycle()
544   {
545     long now = Alarm.getCurrentTime();
546     ClusterStream stream = null;
547
548     synchronized (this) {
549       if (_idleHead != _idleTail) {
550         stream = _idle[_idleHead];
551         long freeTime = stream.getFreeTime();
552
553         _idle[_idleHead] = null;
554         _idleHead = (_idleHead + _idle.length - 1) % _idle.length;
555
556         if (now < freeTime + _server.getLoadBalanceIdleTime()) {
557           _activeCount++;
558       _keepaliveCountTotal++;
559
560           return stream;
561         }
562       }
563     }
564
565     if (stream != null)
566       stream.closeImpl();
567
568     return null;
569   }
570
571   /**
572    * Connect to the backend server.
573    *
574    * @return the socket's read/write pair.
575    */

576   private ClusterStream connect()
577   {
578     synchronized (this) {
579       if (_maxConnections <= _activeCount + _startingCount)
580     return null;
581       
582       _startingCount++;
583     }
584       
585     try {
586       ReadWritePair pair = _server.openTCPPair();
587       ReadStream rs = pair.getReadStream();
588       rs.setAttribute("timeout", new Integer JavaDoc((int) _server.getSocketTimeout()));
589
590       synchronized (this) {
591         _activeCount++;
592     _connectCountTotal++;
593
594     if (_firstConnectTime <= 0) {
595       if (ST_STARTING <= _state && _state < ST_ACTIVE) {
596         _state = ST_WARMUP;
597         _firstConnectTime = Alarm.getCurrentTime();
598       }
599     }
600       }
601
602       ClusterStream stream = new ClusterStream(_streamCount++, this,
603                            rs, pair.getWriteStream());
604       
605       if (log.isLoggable(Level.FINER))
606     log.finer("connect " + stream);
607       
608       return stream;
609     } catch (IOException JavaDoc e) {
610       log.log(Level.FINER, e.toString(), e);
611
612       toFail();
613       
614       return null;
615     } finally {
616       synchronized (this) {
617     _startingCount--;
618       }
619     }
620   }
621
622   /**
623    * We now know that the server is live, e.g. if a sibling has
624    * contacted us.
625    */

626   public void wake()
627   {
628     synchronized (this) {
629       _lastFailTime = 0;
630       if (_state == ST_FAIL) {
631     _state = ST_STARTING;
632       }
633     }
634   }
635
636   /**
637    * Free the read/write pair for reuse. Called only from
638    * ClusterStream.free()
639    */

640   void free(ClusterStream stream)
641   {
642     synchronized (this) {
643       _activeCount--;
644
645       int size = (_idleHead - _idleTail + _idle.length) % _idle.length;
646
647       if (_state != ST_CLOSED && size < _idleSize) {
648         _idleHead = (_idleHead + 1) % _idle.length;
649         _idle[_idleHead] = stream;
650
651     stream = null;
652       }
653     }
654
655     long now = Alarm.getCurrentTime();
656     long maxIdleTime = _server.getLoadBalanceIdleTime();
657     ClusterStream oldStream = null;
658     
659     do {
660       oldStream = null;
661
662       synchronized (this) {
663     if (_idleHead != _idleTail) {
664       int nextTail = (_idleTail + 1) % _idle.length;
665       
666       oldStream = _idle[nextTail];
667
668       if (oldStream != null
669           && oldStream.getFreeTime() + maxIdleTime < now) {
670         _idle[nextTail] = null;
671         _idleTail = nextTail;
672       }
673       else
674         oldStream = null;
675     }
676       }
677
678       if (oldStream != null)
679     oldStream.closeImpl();
680     } while (oldStream != null);
681
682     if (stream != null)
683       stream.closeImpl();
684   }
685
686   /**
687    * Closes the read/write pair for reuse. Called only
688    * from ClusterStream.close().
689    */

690   void close(ClusterStream stream)
691   {
692     if (log.isLoggable(Level.FINER))
693       log.finer("close " + stream);
694     
695     synchronized (this) {
696       _activeCount--;
697     }
698   }
699
700   /**
701    * Clears the recycled connections, e.g. on detection of backend
702    * server going down.
703    */

704   public void clearRecycle()
705   {
706     ArrayList JavaDoc<ClusterStream> recycleList = null;
707
708     synchronized (this) {
709       _idleHead = _idleTail = 0;
710
711       for (int i = 0; i < _idle.length; i++) {
712         ClusterStream stream;
713
714         stream = _idle[i];
715         _idle[i] = null;
716
717         if (stream != null) {
718           if (recycleList == null)
719             recycleList = new ArrayList JavaDoc<ClusterStream>();
720
721           recycleList.add(stream);
722         }
723       }
724     }
725
726     if (recycleList != null) {
727       for (ClusterStream stream : recycleList) {
728         stream.closeImpl();
729       }
730     }
731   }
732
733   /**
734    * Close the client
735    */

736   public void close()
737   {
738     synchronized (this) {
739       if (_state == ST_CLOSED)
740     return;
741
742       _state = ST_CLOSED;
743     }
744     
745     synchronized (this) {
746       _idleHead = _idleTail = 0;
747     }
748
749     for (int i = 0; i < _idle.length; i++) {
750       ClusterStream stream;
751
752       synchronized (this) {
753         stream = _idle[i];
754         _idle[i] = null;
755       }
756
757       if (stream != null)
758         stream.closeImpl();
759     }
760   }
761
762   public String JavaDoc toString()
763   {
764     return ("ClusterClient[" + _server + "]");
765   }
766 }
767
Popular Tags