KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > server > port > TcpConnection


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.server.port;
31
32 import com.caucho.loader.Environment;
33 import com.caucho.management.server.AbstractManagedObject;
34 import com.caucho.management.server.TcpConnectionMXBean;
35 import com.caucho.server.connection.BroadcastTask;
36 import com.caucho.util.Alarm;
37 import com.caucho.util.ThreadPool;
38 import com.caucho.util.ThreadTask;
39 import com.caucho.vfs.ClientDisconnectException;
40 import com.caucho.vfs.QSocket;
41 import com.caucho.vfs.ReadStream;
42
43 import java.io.IOException JavaDoc;
44 import java.net.InetAddress JavaDoc;
45 import java.util.logging.Level JavaDoc;
46 import java.util.logging.Logger JavaDoc;
47
48 /**
49  * A protocol-independent TcpConnection. TcpConnection controls the
50  * TCP Socket and provides buffered streams.
51  *
52  * <p>Each TcpConnection has its own thread.
53  */

54 public class TcpConnection extends PortConnection implements ThreadTask
55 {
56   private static final Logger JavaDoc log
57     = Logger.getLogger(TcpConnection.class.getName());
58
59   private final QSocket _socket;
60
61   private boolean _isInUse;
62   private boolean _isActive;
63   private boolean _isClosed;
64
65   private boolean _isKeepalive;
66   private boolean _isDead;
67
68   private final Object JavaDoc _requestLock = new Object JavaDoc();
69
70   private final String JavaDoc _id;
71   private String JavaDoc _name;
72
73   private final Admin _admin = new Admin();
74   
75   private String JavaDoc _state = "unknown";
76   private long _startTime;
77   private Thread JavaDoc _thread;
78
79   /**
80    * Creates a new TcpConnection.
81    *
82    * @param server The TCP server controlling the connections
83    * @param request The protocol Request
84    */

85   TcpConnection(Port port, QSocket socket)
86   {
87     setPort(port);
88
89     int id = getId();
90
91     if (port.getAddress() == null) {
92       _id = "resin-tcp-connection-*:" + port.getPort() + "-" + id;
93       _name = "INADDR_ANY-" + port.getPort() + "-" + id;
94     }
95     else {
96       _id = ("resin-tcp-connection-" + port.getAddress() + ":" +
97              port.getPort() + "-" + id);
98       _name = port.getAddress() + "-" + port.getPort() + "-" + id;
99     }
100
101     _socket = socket;
102   }
103
104   /**
105    * Returns the object name for jmx.
106    */

107   public String JavaDoc getName()
108   {
109     return _name;
110   }
111
112   /**
113    * Initialize the socket.
114    */

115   public void initSocket()
116     throws IOException JavaDoc
117   {
118     _isClosed = false;
119     _isInUse = true;
120     _isKeepalive = false;
121
122     getWriteStream().init(_socket.getStream());
123     getReadStream().init(_socket.getStream(), getWriteStream());
124
125     if (log.isLoggable(Level.FINE)) {
126       Port port = getPort();
127
128       if (port != null)
129     log.fine("starting connection " + this + ", total=" + port.getConnectionCount());
130       else
131     log.fine("starting connection " + this);
132     }
133   }
134
135   /**
136    * Returns the connection's socket
137    */

138   public QSocket getSocket()
139   {
140     return _socket;
141   }
142
143   /**
144    * Returns the connection's socket
145    */

146   public QSocket startSocket()
147   {
148     _isClosed = false;
149     
150     return _socket;
151   }
152
153   /**
154    * Try to read nonblock
155    */

156   private boolean waitForKeepalive()
157     throws IOException JavaDoc
158   {
159     Port port = getPort();
160     QSocket socket = _socket;
161     
162     if (port.isClosed())
163       return false;
164
165     ReadStream is = getReadStream();
166     
167     if (getReadStream().getBufferAvailable() > 0)
168       return true;
169     
170     long timeout = port.getKeepaliveTimeout();
171
172     boolean isSelectManager = port.getServer().isEnableSelectManager();
173     
174     if (isSelectManager) {
175       timeout = port.getKeepaliveSelectThreadTimeout();
176     }
177
178     if (timeout > 0 && timeout < port.getSocketTimeout())
179       return is.fillWithTimeout(timeout);
180     else if (isSelectManager)
181       return false;
182     else
183       return true;
184   }
185
186   public boolean isSecure()
187   {
188     if (_isClosed)
189       return false;
190     else
191       return _socket.isSecure();
192   }
193
194   /**
195    * Returns true for closed.
196    */

197   public boolean isClosed()
198   {
199     return _isClosed;
200   }
201   
202   /**
203    * Set true for active.
204    */

205   public void setActive(boolean isActive)
206   {
207     _isActive = isActive;
208   }
209
210   /**
211    * Returns true for active.
212    */

213   public boolean isActive()
214   {
215     return _isActive;
216   }
217
218   /**
219    * Sets the keepalive state. Called only by the SelectManager and Port.
220    */

221   public void setKeepalive()
222   {
223     if (_isKeepalive)
224       log.warning("illegal state: setting keepalive with active keepalive: " + this);
225     
226     _isKeepalive = true;
227   }
228
229   /**
230    * Sets the keepalive state. Called only by the SelectManager and Port.
231    */

232   public void clearKeepalive()
233   {
234     if (! _isKeepalive)
235       log.warning("illegal state: clearing keepalive with inactive keepalive: " + this);
236     
237     _isKeepalive = false;
238   }
239
240   /**
241    * Returns the local address of the socket.
242    */

243   public InetAddress JavaDoc getLocalAddress()
244   {
245     // The extra cases handle Kaffe problems.
246
try {
247       return _socket.getLocalAddress();
248     } catch (Exception JavaDoc e) {
249       try {
250     return InetAddress.getLocalHost();
251       } catch (Exception JavaDoc e1) {
252     try {
253       return InetAddress.getByName("127.0.0.1");
254     } catch (Exception JavaDoc e2) {
255       return null;
256     }
257       }
258     }
259   }
260
261   /**
262    * Returns the socket's local TCP port.
263    */

264   public int getLocalPort()
265   {
266     return _socket.getLocalPort();
267   }
268
269   /**
270    * Returns the socket's remote address.
271    */

272   public InetAddress JavaDoc getRemoteAddress()
273   {
274     return _socket.getRemoteAddress();
275   }
276
277   /**
278    * Returns the socket's remote host name.
279    */

280   public String JavaDoc getRemoteHost()
281   {
282     return _socket.getRemoteHost();
283   }
284
285   /**
286    * Adds from the socket's remote address.
287    */

288   public int getRemoteAddress(byte []buffer, int offset, int length)
289   {
290     return _socket.getRemoteAddress(buffer, offset, length);
291   }
292
293   /**
294    * Returns the socket's remote port
295    */

296   public int getRemotePort()
297   {
298     return _socket.getRemotePort();
299   }
300
301   /**
302    * Returns the virtual host.
303    */

304   public String JavaDoc getVirtualHost()
305   {
306     return getPort().getVirtualHost();
307   }
308
309   /**
310    * Returns the state string.
311    */

312   public final String JavaDoc getState()
313   {
314     return _state;
315   }
316
317   /**
318    * Sets the state string.
319    */

320   public final void setState(String JavaDoc state)
321   {
322     _state = state;
323   }
324
325   /**
326    * Begins an active connection.
327    */

328   public final void beginActive()
329   {
330     _state = "active";
331     _startTime = Alarm.getCurrentTime();
332   }
333
334   /**
335    * Ends an active connection.
336    */

337   public final void endActive()
338   {
339     _state = "idle";
340     _startTime = 0;
341   }
342
343   /**
344    * Returns the thread id.
345    */

346   public final long getThreadId()
347   {
348     Thread JavaDoc thread = _thread;
349
350     if (thread != null)
351       return thread.getId();
352     else
353       return -1;
354   }
355
356   /**
357    * Returns the time the current request has taken.
358    */

359   public final long getRequestActiveTime()
360   {
361     if (_startTime > 0)
362       return Alarm.getCurrentTime() - _startTime;
363     else
364       return -1;
365   }
366
367   /**
368    * Tries to mark the connection as a keepalive connection
369    *
370    * At exit, the connection is either:
371    * 1) freed (no keepalive)
372    * 2) rescheduled (keepalive with new thread)
373    * 3) in select pool (keepalive with poll)
374    */

375   private void keepalive()
376   {
377     Port port = getPort();
378
379     if (! port.keepaliveBegin(this)) {
380       if (log.isLoggable(Level.FINE))
381         log.fine("[" + getId() + "] failed keepalive");
382
383       free();
384     }
385     else if (port.getSelectManager() != null) {
386       if (! port.getSelectManager().keepalive(this)) {
387         // XXX: s/b
388
// setKeepalive();
389
// ThreadPool.schedule(this);
390
if (log.isLoggable(Level.FINE))
391           log.fine("[" + getId() + "] FAILED keepalive (select)");
392
393         port.keepaliveEnd(this);
394     free();
395       }
396       else {
397         if (log.isLoggable(Level.FINE))
398           log.fine("[" + getId() + "] keepalive (select)");
399       }
400     }
401     else {
402       if (log.isLoggable(Level.FINE))
403         log.fine("[" + getId() + "] keepalive (thread)");
404
405       setKeepalive();
406       ThreadPool.getThreadPool().schedule(this);
407     }
408   }
409
410   /**
411    * Starts the connection.
412    */

413   public void start()
414   {
415     Thread JavaDoc thread = Thread.currentThread();
416     ClassLoader JavaDoc oldLoader = thread.getContextClassLoader();
417   }
418   
419   /**
420    * Runs as a task.
421    */

422   public void run()
423   {
424     Port port = getPort();
425
426     boolean isKeepalive = _isKeepalive;
427     _isKeepalive = false;
428
429     boolean isFirst = ! isKeepalive;
430     
431     ServerRequest request = getRequest();
432     boolean isWaitForRead = request.isWaitForRead();
433     
434     Thread JavaDoc thread = Thread.currentThread();
435     String JavaDoc oldThreadName = thread.getName();
436            
437     thread.setName(_id);
438     
439     if (isKeepalive)
440       port.keepaliveEnd(this);
441
442     port.threadBegin(this);
443
444     ClassLoader JavaDoc systemLoader = ClassLoader.getSystemClassLoader();
445
446     long startTime = Alarm.getExactTime();
447
448     thread.setContextClassLoader(systemLoader);
449     
450     //_admin.register();
451

452     try {
453       _thread = thread;
454       
455       while (! _isDead) {
456     if (isKeepalive) {
457     }
458     else if (! port.accept(this, isFirst)) {
459       return;
460     }
461
462         isFirst = false;
463     
464     try {
465       thread.interrupted();
466       // clear the interrupted flag
467

468       do {
469         thread.setContextClassLoader(systemLoader);
470
471             isKeepalive = false;
472
473         if (! port.isClosed() &&
474         (! isWaitForRead || getReadStream().waitForRead())) {
475
476               synchronized (_requestLock) {
477         isKeepalive = request.handleRequest();
478           }
479         }
480       } while (isKeepalive && waitForKeepalive() && ! port.isClosed());
481
482           if (isKeepalive) {
483         return;
484       }
485       else {
486         getRequest().protocolCloseEvent();
487       }
488     }
489     catch (ClientDisconnectException e) {
490       isKeepalive = false;
491
492           if (log.isLoggable(Level.FINER))
493         log.finer("[" + getId() + "] " + e);
494     }
495     catch (IOException JavaDoc e) {
496       isKeepalive = false;
497       
498       if (log.isLoggable(Level.FINE))
499         log.log(Level.FINE, "[" + getId() + "] " + e, e);
500       
501         }
502     finally {
503       thread.setContextClassLoader(systemLoader);
504
505           if (! isKeepalive)
506         closeImpl();
507     }
508       }
509     } catch (Throwable JavaDoc e) {
510       log.log(Level.WARNING, e.toString(), e);
511       isKeepalive = false;
512     } finally {
513       thread.setContextClassLoader(systemLoader);
514     
515       //_admin.unregister();
516

517       port.threadEnd(this);
518
519       if (isKeepalive)
520     keepalive();
521       else
522     free();
523
524       _thread = null;
525       thread.setName(oldThreadName);
526     }
527   }
528
529   /**
530    * Sends a broadcast request.
531    */

532   public void sendBroadcast(BroadcastTask task)
533   {
534     synchronized (_requestLock) {
535       task.execute(this);
536     }
537   }
538
539   /**
540    * Closes on shutdown.
541    */

542   public void closeOnShutdown()
543   {
544     QSocket socket = _socket;
545
546     if (socket != null) {
547       try {
548         socket.close();
549       } catch (Throwable JavaDoc e) {
550         log.log(Level.FINE, e.toString(), e);
551       }
552
553       Thread.currentThread().yield();
554     }
555   }
556
557   /**
558    * Closes the connection.
559    */

560   private void closeImpl()
561   {
562     QSocket socket = _socket;
563
564     boolean isClosed;
565
566     synchronized (this) {
567       isClosed = _isClosed;
568       _isClosed = true;
569     }
570
571     if (! isClosed) {
572       _isActive = false;
573       boolean isKeepalive = _isKeepalive;
574       _isKeepalive = false;
575
576       Port port = getPort();
577
578       if (isKeepalive)
579     port.keepaliveEnd(this);
580       
581       if (log.isLoggable(Level.FINE) && _isInUse) {
582     Object JavaDoc serverId = Environment.getAttribute("caucho.server-id");
583     String JavaDoc prefix = "";
584
585     if (serverId != null)
586       prefix = "[" + serverId + "] ";
587     
588     if (port != null)
589       log.fine(prefix + "closing connection " + this + ", total=" + port.getConnectionCount());
590     else
591       log.fine(prefix + "closing connection " + this);
592       }
593
594       _isInUse = false;
595
596       try {
597         getWriteStream().close();
598       } catch (Throwable JavaDoc e) {
599         log.log(Level.FINE, e.toString(), e);
600       }
601
602       try {
603         getReadStream().close();
604       } catch (Throwable JavaDoc e) {
605         log.log(Level.FINE, e.toString(), e);
606       }
607
608       if (socket != null) {
609     try {
610       socket.close();
611     } catch (Throwable JavaDoc e) {
612       log.log(Level.FINE, e.toString(), e);
613     }
614       }
615     }
616   }
617   
618   /**
619    * Destroys the connection()
620    */

621   public final void destroy()
622   {
623     _isDead = true;
624     
625     closeImpl();
626   }
627   
628   /**
629    * Frees the connection()
630    */

631   final void free()
632   {
633     closeImpl();
634
635     setState("free");
636     
637     if (! _isDead)
638       getPort().free(this);
639     else
640       getPort().kill(this);
641   }
642
643   public String JavaDoc toString()
644   {
645     if (_isActive)
646       return "TcpConnection[id=" + _id + ",socket=" + _socket + ",active]";
647     else
648       return "TcpConnection[id=" + _id + ",socket=" + _socket + ",port=" + getPort() + "]";
649   }
650
651   class Admin extends AbstractManagedObject implements TcpConnectionMXBean {
652     Admin()
653     {
654       super(ClassLoader.getSystemClassLoader());
655     }
656     
657     public String JavaDoc getName()
658     {
659       return _name;
660     }
661
662     public long getThreadId()
663     {
664       return TcpConnection.this.getThreadId();
665     }
666
667     public long getRequestActiveTime()
668     {
669       return TcpConnection.this.getRequestActiveTime();
670     }
671
672     public String JavaDoc getState()
673     {
674       return TcpConnection.this.getState();
675     }
676
677     void register()
678     {
679       registerSelf();
680     }
681
682     void unregister()
683     {
684       unregisterSelf();
685     }
686   }
687 }
688
Popular Tags