KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > server > port > Port


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.server.port;
31
32 import com.caucho.config.Config;
33 import com.caucho.config.ConfigException;
34 import com.caucho.config.types.Period;
35 import com.caucho.lifecycle.Lifecycle;
36 import com.caucho.loader.Environment;
37 import com.caucho.loader.EnvironmentBean;
38 import com.caucho.loader.EnvironmentClassLoader;
39 import com.caucho.loader.EnvironmentListener;
40 import com.caucho.log.Log;
41 import com.caucho.management.server.PortMXBean;
42 import com.caucho.server.cluster.ClusterServer;
43 import com.caucho.server.cluster.Server;
44 import com.caucho.util.FreeList;
45 import com.caucho.util.L10N;
46 import com.caucho.util.ThreadPool;
47 import com.caucho.vfs.JsseSSLFactory;
48 import com.caucho.vfs.QJniServerSocket;
49 import com.caucho.vfs.QServerSocket;
50 import com.caucho.vfs.QSocket;
51 import com.caucho.vfs.SSLFactory;
52
53 import javax.annotation.PostConstruct;
54 import java.net.ConnectException JavaDoc;
55 import java.net.InetAddress JavaDoc;
56 import java.net.InetSocketAddress JavaDoc;
57 import java.net.Socket JavaDoc;
58 import java.net.UnknownHostException JavaDoc;
59 import java.util.logging.Level JavaDoc;
60 import java.util.logging.Logger JavaDoc;
61
62 /**
63  * Represents a protocol connection.
64  */

65 public class Port
66   implements EnvironmentListener, Runnable JavaDoc
67 {
68   private static final L10N L = new L10N(Port.class);
69
70   private static final Logger JavaDoc log = Log.open(Port.class);
71
72   private static final int DEFAULT = -0xcafe;
73
74   // started at 128, but that seems wasteful since the active threads
75
// themselves are buffering the free connections
76
private FreeList<TcpConnection> _freeConn
77     = new FreeList<TcpConnection>(32);
78
79   // The owning server
80
private ProtocolDispatchServer _server;
81
82   // The id
83
private String JavaDoc _serverId = "";
84
85   // The address
86
private String JavaDoc _address;
87   // The port
88
private int _port;
89
90   // The protocol
91
private Protocol _protocol;
92
93   // The SSL factory, if any
94
private SSLFactory _sslFactory;
95
96   private InetAddress JavaDoc _socketAddress;
97
98   // default timeout
99
private long _socketTimeout = DEFAULT;
100
101   private int _connectionMax = 512;
102   private int _minSpareConnection = 16;
103
104   private int _keepaliveMax = DEFAULT;
105   
106   private long _keepaliveTimeout = DEFAULT;
107   private long _keepaliveSelectThreadTimeout = DEFAULT;
108
109   private int _acceptThreadMin = DEFAULT;
110   private int _acceptThreadMax = DEFAULT;
111
112   private int _acceptListenBacklog = DEFAULT;
113
114   // The virtual host name
115
private String JavaDoc _virtualHost;
116
117   private boolean _tcpNoDelay = true;
118
119   private final PortAdmin _admin = new PortAdmin(this);
120
121   // the server socket
122
private QServerSocket _serverSocket;
123
124   // the selection manager
125
private AbstractSelectManager _selectManager;
126
127   private volatile int _threadCount;
128   private final Object JavaDoc _threadCountLock = new Object JavaDoc();
129
130   private volatile int _idleThreadCount;
131   private volatile int _startThreadCount;
132
133   private volatile int _connectionCount;
134
135   private volatile long _lifetimeRequestCount;
136   private volatile long _lifetimeKeepaliveCount;
137   private volatile long _lifetimeClientDisconnectCount;
138   private volatile long _lifetimeRequestTime;
139   private volatile long _lifetimeReadBytes;
140   private volatile long _lifetimeWriteBytes;
141
142   private volatile int _keepaliveCount;
143   private final Object JavaDoc _keepaliveCountLock = new Object JavaDoc();
144
145   // True if the port has been bound
146
private volatile boolean _isBound;
147
148   // The port lifecycle
149
private final Lifecycle _lifecycle = new Lifecycle();
150
151   public Port()
152   {
153   }
154
155   public Port(ClusterServer server)
156   {
157   }
158
159   /**
160    * Sets the containing server.
161    */

162   public void setParent(ProtocolDispatchServer parent)
163   {
164     setServer(parent);
165   }
166
167   /**
168    * Sets the server.
169    */

170   public void setServer(ProtocolDispatchServer protocolServer)
171   {
172     _server = protocolServer;
173
174     if (_protocol != null)
175       _protocol.setServer(protocolServer);
176
177     if (protocolServer instanceof Server) {
178       Server server = (Server) protocolServer;
179
180       if (_acceptThreadMax == DEFAULT)
181     _acceptThreadMax = server.getAcceptThreadMax();
182
183       if (_acceptThreadMin == DEFAULT)
184     _acceptThreadMin = server.getAcceptThreadMin();
185
186       if (_acceptListenBacklog == DEFAULT)
187     _acceptListenBacklog = server.getAcceptListenBacklog();
188
189       if (_keepaliveMax == DEFAULT)
190     _keepaliveMax = server.getKeepaliveMax();
191
192       if (_keepaliveTimeout == DEFAULT)
193     _keepaliveTimeout = server.getKeepaliveTimeout();
194
195       if (_keepaliveSelectThreadTimeout == DEFAULT) {
196     _keepaliveSelectThreadTimeout
197       = server.getKeepaliveSelectThreadTimeout();
198       }
199
200       if (_socketTimeout == DEFAULT)
201     _socketTimeout = server.getSocketTimeout();
202     }
203   }
204
205   /**
206    * Gets the server.
207    */

208   public ProtocolDispatchServer getServer()
209   {
210     return _server;
211   }
212
213   /**
214    * Sets the id.
215    */

216   public void setId(String JavaDoc id)
217   {
218     _serverId = id;
219   }
220
221   /**
222    * Sets the server id.
223    */

224   public void setServerId(String JavaDoc id)
225   {
226     _serverId = id;
227   }
228
229   /**
230    * Gets the server id.
231    */

232   public String JavaDoc getServerId()
233   {
234     return _serverId;
235   }
236
237   public PortMXBean getAdmin()
238   {
239     return _admin;
240   }
241
242   /**
243    * Sets protocol class.
244    */

245   public void setType(Class JavaDoc cl)
246     throws InstantiationException JavaDoc, IllegalAccessException JavaDoc
247   {
248     setClass(cl);
249   }
250
251   /**
252    * Sets protocol class.
253    */

254   public void setClass(Class JavaDoc cl)
255     throws InstantiationException JavaDoc, IllegalAccessException JavaDoc
256   {
257     Config.validate(cl, Protocol.class);
258
259     _protocol = (Protocol) cl.newInstance();
260   }
261
262   public Object JavaDoc createInit()
263     throws ConfigException
264   {
265     if (_protocol == null)
266       throw new ConfigException(L.l("<init> requires a protocol class"));
267     
268     return _protocol;
269   }
270   
271   /**
272    * Set protocol.
273    */

274   public void setProtocol(Protocol protocol)
275     throws ConfigException
276   {
277     /* server/0170
278     if (_server == null)
279       throw new IllegalStateException(L.l("Server is not set."));
280     */

281
282     _protocol = protocol;
283     _protocol.setServer(_server);
284   }
285
286   /**
287    * Set protocol.
288    */

289   public Protocol getProtocol()
290   {
291     return _protocol;
292   }
293
294   /**
295    * Gets the protocol name.
296    */

297   public String JavaDoc getProtocolName()
298   {
299     if (_protocol != null)
300       return _protocol.getProtocolName();
301     else
302       return null;
303   }
304
305   /**
306    * Sets the address
307    */

308   public void setAddress(String JavaDoc address)
309     throws UnknownHostException JavaDoc
310   {
311     if ("*".equals(address))
312       address = null;
313
314     _address = address;
315     if (address != null)
316       _socketAddress = InetAddress.getByName(address);
317   }
318
319   /**
320    * @deprecated
321    */

322   public void setHost(String JavaDoc address)
323     throws UnknownHostException JavaDoc
324   {
325     setAddress(address);
326   }
327
328   /**
329    * Gets the IP address
330    */

331   public String JavaDoc getAddress()
332   {
333     return _address;
334   }
335
336   /**
337    * Sets the port.
338    */

339   public void setPort(int port)
340   {
341     _port = port;
342   }
343
344   /**
345    * Gets the port.
346    */

347   public int getPort()
348   {
349     return _port;
350   }
351
352   /**
353    * Sets the virtual host for IP-based virtual host.
354    */

355   public void setVirtualHost(String JavaDoc host)
356   {
357     _virtualHost = host;
358   }
359
360   /**
361    * Gets the virtual host for IP-based virtual host.
362    */

363   public String JavaDoc getVirtualHost()
364   {
365     return _virtualHost;
366   }
367
368   /**
369    * Sets the SSL factory
370    */

371   public void setSSL(SSLFactory factory)
372   {
373     _sslFactory = factory;
374   }
375
376   /**
377    * Sets the SSL factory
378    */

379   public SSLFactory createOpenssl()
380     throws ConfigException
381   {
382     try {
383       Class JavaDoc cl = Class.forName("com.caucho.vfs.OpenSSLFactory");
384
385       _sslFactory = (SSLFactory) cl.newInstance();
386
387       return _sslFactory;
388     } catch (Throwable JavaDoc e) {
389       log.log(Level.FINER, e.toString(), e);
390
391       throw new ConfigException(L.l("<openssl> requires Resin Professional. See http://www.caucho.com for more information."));
392     }
393   }
394
395   /**
396    * Sets the SSL factory
397    */

398   public JsseSSLFactory createJsse()
399   {
400     // should probably check that openssl exists
401
return new JsseSSLFactory();
402   }
403
404   /**
405    * Sets the SSL factory
406    */

407   public void setJsseSsl(JsseSSLFactory factory)
408   {
409     _sslFactory = factory;
410   }
411
412   /**
413    * Gets the SSL factory.
414    */

415   public SSLFactory getSSL()
416   {
417     return _sslFactory;
418   }
419
420   /**
421    * Returns true for ssl.
422    */

423   public boolean isSSL()
424   {
425     return _sslFactory != null;
426   }
427
428   /**
429    * Sets the server socket.
430    */

431   public void setServerSocket(QServerSocket socket)
432   {
433     _serverSocket = socket;
434   }
435
436   /**
437    * Sets the minimum spare listen.
438    */

439   public void setMinSpareListen(int minSpare)
440     throws ConfigException
441   {
442     setAcceptThreadMin(minSpare);
443   }
444
445   /**
446    * Sets the maximum spare listen.
447    */

448   public void setMaxSpareListen(int maxSpare)
449     throws ConfigException
450   {
451     setAcceptThreadMax(maxSpare);
452   }
453
454   /**
455    * Sets the minimum spare listen.
456    */

457   public void setAcceptThreadMin(int minSpare)
458     throws ConfigException
459   {
460     if (minSpare < 1)
461       throw new ConfigException(L.l("min-spare-listen must be at least 1."));
462
463     _acceptThreadMin = minSpare;
464   }
465
466   /**
467    * Sets the maximum spare listen.
468    */

469   public void setAcceptThreadMax(int maxSpare)
470     throws ConfigException
471   {
472     if (maxSpare < 1)
473       throw new ConfigException(L.l("max-spare-listen must be at least 1."));
474
475     _acceptThreadMax = maxSpare;
476   }
477
478   /**
479    * Gets the tcp-no-delay property
480    */

481   public boolean getTcpNoDelay()
482   {
483     return _tcpNoDelay;
484   }
485
486   /**
487    * Sets the tcp-no-delay property
488    */

489   public void setTcpNoDelay(boolean tcpNoDelay)
490   {
491     _tcpNoDelay = tcpNoDelay;
492   }
493
494   /**
495    * Returns true for ignore-client-disconnect.
496    */

497   public boolean isIgnoreClientDisconnect()
498   {
499     return _server.isIgnoreClientDisconnect();
500   }
501
502   /**
503    * Returns the thread count.
504    */

505   public int getThreadCount()
506   {
507     return _threadCount;
508   }
509
510   /**
511    * Sets the default read/write timeout for the accepted sockets.
512    */

513   public void setSocketTimeout(Period period)
514   {
515     _socketTimeout = period.getPeriod();
516   }
517
518   /**
519    * Gets the read timeout for the accepted sockets.
520    */

521   public long getSocketTimeout()
522   {
523     return _socketTimeout;
524   }
525
526   /**
527    * Sets the read timeout for the accepted sockets.
528    *
529    * @deprecated
530    */

531   public void setReadTimeout(Period period)
532   {
533     setSocketTimeout(period);
534   }
535
536   /**
537    * Sets the write timeout for the accepted sockets.
538    *
539    * @deprecated
540    */

541   public void setWriteTimeout(Period period)
542   {
543   }
544
545   /**
546    * Returns the active thread count.
547    */

548   public int getActiveThreadCount()
549   {
550     return _threadCount - _idleThreadCount;
551   }
552
553   /**
554    * Returns the count of idle threads.
555    */

556   public int getIdleThreadCount()
557   {
558     return _idleThreadCount;
559   }
560
561   /**
562    * Sets the connection max.
563    */

564   public void setConnectionMax(int max)
565   {
566     _connectionMax = max;
567   }
568
569   /**
570    * Gets the connection max.
571    */

572   public int getConnectionMax()
573   {
574     return _connectionMax;
575   }
576
577   /**
578    * Returns the number of connections
579    */

580   public int getConnectionCount()
581   {
582     return _connectionCount;
583   }
584
585   public long getLifetimeRequestCount()
586   {
587     return _lifetimeRequestCount;
588   }
589
590   public long getLifetimeKeepaliveCount()
591   {
592     return _lifetimeKeepaliveCount;
593   }
594
595   public long getLifetimeClientDisconnectCount()
596   {
597     return _lifetimeClientDisconnectCount;
598   }
599
600   public long getLifetimeRequestTime()
601   {
602     return _lifetimeRequestTime;
603   }
604
605   public long getLifetimeReadBytes()
606   {
607     return _lifetimeReadBytes;
608   }
609
610   public long getLifetimeWriteBytes()
611   {
612     return _lifetimeWriteBytes;
613   }
614
615   /**
616    * Sets the keepalive max.
617    */

618   public void setKeepaliveMax(int max)
619   {
620     _keepaliveMax = max;
621   }
622
623   /**
624    * Gets the keepalive max.
625    */

626   public int getKeepaliveMax()
627   {
628     return _keepaliveMax;
629   }
630
631   public void setKeepaliveTimeout(Period period)
632   {
633     _keepaliveTimeout = period.getPeriod();
634   }
635
636   public long getKeepaliveTimeout()
637   {
638     return _keepaliveTimeout;
639   }
640
641   public long getKeepaliveSelectThreadTimeout()
642   {
643     return _keepaliveSelectThreadTimeout;
644   }
645
646   /**
647    * Returns the number of keepalive connections
648    */

649   public int getKeepaliveCount()
650   {
651     synchronized (_keepaliveCountLock) {
652       return _keepaliveCount;
653     }
654   }
655
656   public Lifecycle getLifecycleState()
657   {
658     return _lifecycle;
659   }
660
661   /**
662    * Returns true if the port is active.
663    */

664   public boolean isActive()
665   {
666     return _lifecycle.isActive();
667   }
668
669   /**
670    * Returns the accept pool.
671    */

672   public int getFreeKeepalive()
673   {
674     int freeKeepalive = _keepaliveMax - _keepaliveCount;
675     int freeConnections = _connectionMax - _connectionCount - _minSpareConnection;
676     int freeSelect = _server.getFreeSelectKeepalive();
677
678     if (freeKeepalive < freeConnections)
679       return freeSelect < freeKeepalive ? freeSelect : freeKeepalive;
680     else
681       return freeSelect < freeConnections ? freeSelect : freeConnections;
682   }
683
684   /**
685    * Returns true if the port matches the server id.
686    */

687   public boolean matchesServerId(String JavaDoc serverId)
688   {
689     return getServerId().equals("*") || getServerId().equals(serverId);
690   }
691
692   /**
693    * Initializes the port.
694    */

695   @PostConstruct
696   public void init()
697     throws ConfigException
698   {
699     if (! _lifecycle.toInit())
700       return;
701
702     if (_server instanceof EnvironmentBean)
703       Environment.addEnvironmentListener(this, ((EnvironmentBean) _server).getClassLoader());
704   }
705
706   /**
707    * Starts the port listening.
708    */

709   public void bind()
710     throws Exception JavaDoc
711   {
712     synchronized (this) {
713       if (_isBound)
714         return;
715       _isBound = true;
716     }
717
718     if (_protocol == null)
719       throw new IllegalStateException JavaDoc(L.l("`{0}' must have a configured protocol before starting.", this));
720
721     if (_port == 0)
722       return;
723     
724     if (_serverSocket != null) {
725       if (_port == 0) {
726       }
727       else if (_address != null)
728         log.info("listening to " + _address + ":" + _port);
729       else
730         log.info("listening to " + _port);
731     }
732     else if (_sslFactory != null && _socketAddress != null) {
733       _serverSocket = _sslFactory.create(_socketAddress, _port);
734
735       log.info(_protocol.getProtocolName() + "s listening to " + _socketAddress.getHostName() + ":" + _port);
736     }
737     else if (_sslFactory != null) {
738       if (_address == null) {
739         _serverSocket = _sslFactory.create(null, _port);
740         log.info(_protocol.getProtocolName() + "s listening to *:" + _port);
741       }
742       else {
743         InetAddress JavaDoc addr = InetAddress.getByName(_address);
744
745         _serverSocket = _sslFactory.create(addr, _port);
746
747         log.info(_protocol.getProtocolName() + "s listening to " + _address + ":" + _port);
748       }
749     }
750     else if (_socketAddress != null) {
751       _serverSocket = QJniServerSocket.create(_socketAddress, _port,
752                                               _acceptListenBacklog);
753
754       log.info(_protocol.getProtocolName() + " listening to " + _socketAddress.getHostName() + ":" + _port);
755     }
756     else {
757       _serverSocket = QJniServerSocket.create(_port, _acceptListenBacklog);
758
759       log.info(_protocol.getProtocolName() + " listening to *:" + _port);
760     }
761
762     if (_tcpNoDelay)
763       _serverSocket.setTcpNoDelay(_tcpNoDelay);
764
765     _serverSocket.setConnectionSocketTimeout((int) getSocketTimeout());
766   }
767
768   /**
769    * Starts the port listening.
770    */

771   public void bind(QServerSocket ss)
772     throws Exception JavaDoc
773   {
774     if (ss == null)
775       throw new NullPointerException JavaDoc();
776
777     _isBound = true;
778
779     if (_protocol == null)
780       throw new IllegalStateException JavaDoc(L.l("`{0}' must have a configured protocol before starting.", this));
781
782     _admin.register();
783
784     _serverSocket = ss;
785
786     String JavaDoc scheme = _protocol.getProtocolName();
787     
788     if (_address != null)
789       log.info(scheme + " listening to " + _address + ":" + _port);
790     else
791       log.info(scheme + " listening to *:" + _port);
792
793     if (_sslFactory != null) {
794       throw new UnsupportedOperationException JavaDoc();
795     }
796
797     if (_tcpNoDelay)
798       _serverSocket.setTcpNoDelay(_tcpNoDelay);
799
800     _serverSocket.setConnectionSocketTimeout((int) getSocketTimeout());
801   }
802
803   /**
804    * binds for the watchdog.
805    */

806   public QServerSocket bindForWatchdog()
807     throws java.io.IOException JavaDoc
808   {
809     QServerSocket ss;
810     
811     if (_socketAddress != null) {
812       ss = QJniServerSocket.createJNI(_socketAddress, _port,
813                       _acceptListenBacklog);
814
815       if (ss == null)
816     return null;
817
818       log.fine("watchdog binding to " + _socketAddress.getHostName() + ":" + _port);
819     }
820     else {
821       ss = QJniServerSocket.createJNI(null, _port, _acceptListenBacklog);
822
823       if (ss == null)
824     return null;
825
826       log.fine("watchdog binding to *:" + _port);
827     }
828
829     if (! ss.isJNI()) {
830       ss.close();
831
832       return ss;
833     }
834
835     if (_tcpNoDelay)
836       ss.setTcpNoDelay(_tcpNoDelay);
837
838     ss.setConnectionSocketTimeout((int) getSocketTimeout());
839
840     _admin.register();
841
842     return ss;
843   }
844
845   /**
846    * Starts the port listening.
847    */

848   public void start()
849     throws Throwable JavaDoc
850   {
851     if (_port == 0)
852       return;
853     
854     if (! _lifecycle.toActive())
855       return;
856
857     try {
858       bind();
859
860       if (_serverSocket.isJNI() && _server.isEnableSelectManager()) {
861         _selectManager = _server.getSelectManager();
862
863         if (_selectManager == null) {
864           throw new IllegalStateException JavaDoc(L.l("Cannot load select manager"));
865         }
866       }
867
868       if (_keepaliveMax < 0)
869         _keepaliveMax = _server.getKeepaliveMax();
870
871       if (_keepaliveMax < 0)
872         _keepaliveMax = 256;
873
874       String JavaDoc name = "resin-port-" + _serverSocket.getLocalPort();
875       Thread JavaDoc thread = new Thread JavaDoc(this, name);
876       thread.setDaemon(true);
877
878       thread.start();
879
880       _admin.register();
881     } catch (Throwable JavaDoc e) {
882       close();
883
884       log.log(Level.WARNING, e.toString(), e);
885
886       throw e;
887     }
888   }
889
890   /**
891    * Returns the active connections.
892    */

893   public int getActiveConnectionCount()
894   {
895     return _threadCount - _idleThreadCount;
896   }
897
898   /**
899    * Returns the keepalive connections.
900    */

901   public int getKeepaliveConnectionCount()
902   {
903     return getKeepaliveCount();
904   }
905
906   /**
907    * returns the select manager.
908    */

909   public AbstractSelectManager getSelectManager()
910   {
911     return _selectManager;
912   }
913
914   /**
915    * Returns the number of connections in the select.
916    */

917   public int getSelectConnectionCount()
918   {
919     if (_selectManager != null)
920       return _selectManager.getSelectCount();
921     else
922       return -1;
923   }
924
925   /**
926    * Accepts a new connection.
927    */

928   public boolean accept(TcpConnection conn, boolean isFirst)
929   {
930     try {
931       synchronized (this) {
932         _idleThreadCount++;
933
934         if (isFirst) {
935           _startThreadCount--;
936
937           if (_startThreadCount < 0) {
938             Thread.dumpStack();
939           }
940         }
941
942         if (_acceptThreadMax < _idleThreadCount) {
943           return false;
944         }
945       }
946
947       while (_lifecycle.isActive()) {
948         QSocket socket = conn.startSocket();
949
950         Thread.interrupted();
951         if (_serverSocket.accept(socket)) {
952           conn.initSocket();
953
954           return true;
955         }
956         else {
957           if (_acceptThreadMax < _idleThreadCount) {
958             return false;
959           }
960         }
961       }
962     } catch (Throwable JavaDoc e) {
963       if (_lifecycle.isActive() && log.isLoggable(Level.FINER))
964         log.log(Level.FINER, e.toString(), e);
965     } finally {
966       synchronized (this) {
967         _idleThreadCount--;
968
969         if (_idleThreadCount + _startThreadCount < _acceptThreadMin) {
970           notify();
971         }
972       }
973     }
974
975     return false;
976   }
977
978   /**
979    * Registers the new connection as started
980    */

981   void startConnection(TcpConnection conn)
982   {
983     synchronized (this) {
984       _startThreadCount--;
985     }
986   }
987
988   /**
989    * Marks a new thread as running.
990    */

991   void threadBegin(TcpConnection conn)
992   {
993     synchronized (_threadCountLock) {
994       _threadCount++;
995     }
996   }
997
998   /**
999    * Marks a new thread as stopped.
1000   */

1001  void threadEnd(TcpConnection conn)
1002  {
1003    synchronized (_threadCountLock) {
1004      _threadCount--;
1005    }
1006  }
1007
1008  /**
1009   * Marks a keepalive as starting running. Called only from TcpConnection.
1010   */

1011  boolean keepaliveBegin(TcpConnection conn)
1012  {
1013    synchronized (_keepaliveCountLock) {
1014      if (_keepaliveMax <= _keepaliveCount)
1015        return false;
1016      else if (_connectionMax <= _connectionCount + _minSpareConnection)
1017        return false;
1018
1019      _keepaliveCount++;
1020
1021      return true;
1022    }
1023  }
1024
1025  /**
1026   * Marks the keepalive as ending. Called only from TcpConnection.
1027   */

1028  void keepaliveEnd(TcpConnection conn)
1029  {
1030    synchronized (_keepaliveCountLock) {
1031      _keepaliveCount--;
1032
1033      if (_keepaliveCount < 0) {
1034        int count = _keepaliveCount;
1035        _keepaliveCount = 0;
1036
1037        log.warning("internal error: negative keepalive count " + count);
1038      }
1039    }
1040  }
1041
1042  /**
1043   * Returns true if the port is closed.
1044   */

1045  public boolean isClosed()
1046  {
1047    return _lifecycle.isAfterActive();
1048  }
1049
1050  /**
1051   * The port thread is responsible for creating new connections.
1052   */

1053  public void run()
1054  {
1055    while (_lifecycle.isActive()) {
1056      boolean isStart;
1057
1058      try {
1059        // need delay to avoid spawing too many threads over a short time,
1060
// when the load doesn't justify it
1061
Thread.yield();
1062        Thread.sleep(10);
1063
1064        synchronized (this) {
1065          isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;
1066          if (_connectionMax <= _connectionCount)
1067            isStart = false;
1068
1069          if (! isStart) {
1070            Thread.interrupted();
1071            wait(60000);
1072          }
1073
1074          if (isStart) {
1075            _connectionCount++;
1076            _startThreadCount++;
1077          }
1078        }
1079
1080        if (isStart && _lifecycle.isActive()) {
1081          TcpConnection conn = _freeConn.allocate();
1082          if (conn == null) {
1083            conn = new TcpConnection(this, _serverSocket.createSocket());
1084            conn.setRequest(_protocol.createRequest(conn));
1085          }
1086
1087      conn.start();
1088
1089          ThreadPool.getThreadPool().schedule(conn);
1090        }
1091      } catch (Throwable JavaDoc e) {
1092        e.printStackTrace();
1093      }
1094    }
1095  }
1096
1097  /**
1098   * Handles the case where the environment is starting (after init).
1099   */

1100  public void environmentStart(EnvironmentClassLoader loader)
1101  {
1102  }
1103
1104  /**
1105   * Handles the case where the environment is stopping
1106   */

1107  public void environmentStop(EnvironmentClassLoader loader)
1108  {
1109    close();
1110  }
1111
1112  /**
1113   * Frees the connection.
1114   *
1115   * Called only from TcpConnection
1116   */

1117  void free(TcpConnection conn)
1118  {
1119    closeConnection(conn);
1120
1121    if (! _freeConn.free(conn))
1122      conn.destroy();
1123  }
1124
1125  /**
1126   * Frees the connection.
1127   *
1128   * Called only from TcpConnection
1129   */

1130  void kill(TcpConnection conn)
1131  {
1132    closeConnection(conn);
1133  }
1134
1135  /**
1136   * Closes the stats for the connection.
1137   */

1138  private void closeConnection(TcpConnection conn)
1139  {
1140    synchronized (this) {
1141      if (_connectionCount-- == _connectionMax) {
1142        try {
1143          notify();
1144        } catch (Throwable JavaDoc e) {
1145        }
1146      }
1147    }
1148  }
1149
1150  /**
1151   * Shuts the Port down. The server gives connections 30
1152   * seconds to complete.
1153   */

1154  public void close()
1155  {
1156    Environment.removeEnvironmentListener(this);
1157
1158    if (! _lifecycle.toDestroy())
1159      return;
1160
1161    if (log.isLoggable(Level.FINE))
1162      log.fine("closing " + this);
1163
1164    QServerSocket serverSocket = _serverSocket;
1165    _serverSocket = null;
1166
1167    _selectManager = null;
1168    AbstractSelectManager selectManager = null;
1169
1170    if (_server != null) {
1171      selectManager = _server.getSelectManager();
1172      _server.initSelectManager(null);
1173    }
1174
1175    InetAddress JavaDoc localAddress = null;
1176    int localPort = 0;
1177    if (serverSocket != null) {
1178      localAddress = serverSocket.getLocalAddress();
1179      localPort = serverSocket.getLocalPort();
1180    }
1181
1182    // close the server socket
1183
if (serverSocket != null) {
1184      try {
1185        serverSocket.close();
1186      } catch (Throwable JavaDoc e) {
1187      }
1188
1189      try {
1190        synchronized (serverSocket) {
1191          serverSocket.notifyAll();
1192        }
1193      } catch (Throwable JavaDoc e) {
1194      }
1195    }
1196
1197    if (selectManager != null) {
1198      try {
1199        selectManager.close();
1200      } catch (Throwable JavaDoc e) {
1201      }
1202    }
1203
1204    /*
1205    ArrayList<TcpConnection> connections = new ArrayList<TcpConnection>();
1206    synchronized (this) {
1207      connections.addAll(_activeConnections);
1208    }
1209    */

1210
1211    // Close the socket server socket and send some request to make
1212
// sure the Port accept thread is woken and dies.
1213
// The ping is before the server socket closes to avoid
1214
// confusing the threads
1215

1216    // ping the accept port to wake the listening threads
1217
if (localPort > 0) {
1218      int idleCount = _idleThreadCount + _startThreadCount;
1219
1220      for (int i = 0; i < idleCount + 10; i++) {
1221        try {
1222          Socket socket = new Socket();
1223          InetSocketAddress JavaDoc addr;
1224
1225          if (localAddress == null ||
1226              localAddress.getHostAddress().startsWith("0."))
1227            addr = new InetSocketAddress JavaDoc("127.0.0.1", localPort);
1228          else
1229            addr = new InetSocketAddress JavaDoc(localAddress, localPort);
1230
1231          socket.connect(addr, 100);
1232
1233          socket.close();
1234        } catch (ConnectException JavaDoc e) {
1235        } catch (Throwable JavaDoc e) {
1236          log.log(Level.FINEST, e.toString(), e);
1237        }
1238      }
1239    }
1240    
1241    TcpConnection conn;
1242    while ((conn = _freeConn.allocate()) != null) {
1243      conn.destroy();
1244    }
1245
1246    log.finest("closed " + this);
1247  }
1248
1249  public String JavaDoc toString()
1250  {
1251    return "Port[" + getAddress() + ":" + getPort() + "]";
1252  }
1253}
1254
Popular Tags