KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > invocation > pooled > interfaces > PooledInvokerProxy


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.invocation.pooled.interfaces;
23
24 import java.io.IOException JavaDoc;
25 import java.io.Externalizable JavaDoc;
26 import java.io.ObjectInput JavaDoc;
27 import java.io.ObjectOutput JavaDoc;
28 import java.io.BufferedOutputStream JavaDoc;
29 import java.io.BufferedInputStream JavaDoc;
30 import java.io.ObjectInputStream JavaDoc;
31 import java.io.ObjectOutputStream JavaDoc;
32 import java.io.EOFException JavaDoc;
33 import java.io.OptionalDataException JavaDoc;
34 import java.io.UnsupportedEncodingException JavaDoc;
35 import java.io.InterruptedIOException JavaDoc;
36 import java.net.Socket JavaDoc;
37 import java.net.SocketException JavaDoc;
38 import java.rmi.MarshalledObject JavaDoc;
39 import java.rmi.NoSuchObjectException JavaDoc;
40 import java.rmi.ServerException JavaDoc;
41 import java.rmi.ConnectException JavaDoc;
42 import java.util.Iterator JavaDoc;
43 import java.util.Map JavaDoc;
44 import java.util.List JavaDoc;
45 import java.util.LinkedList JavaDoc;
46
47 import javax.transaction.TransactionRolledbackException JavaDoc;
48 import javax.transaction.SystemException JavaDoc;
49 import javax.net.ssl.SSLSocket;
50 import javax.net.ssl.HandshakeCompletedListener;
51 import javax.net.ssl.HandshakeCompletedEvent;
52 import javax.net.ssl.SSLException;
53
54 import org.jboss.invocation.Invocation;
55 import org.jboss.invocation.Invoker;
56 import org.jboss.tm.TransactionPropagationContextFactory;
57 import org.jboss.logging.Logger;
58 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
59
60
61 /**
62  * Client socket connections are pooled to avoid the overhead of
63  * making a connection. RMI seems to do a new connection with each
64  * request.
65  *
66  * @author <a HREF="mailto:bill@jboss.org">Bill Burke</a>
67  * @author Scott.Stark@jboss.org
68  * @version $Revision: 45652 $
69  */

70 public class PooledInvokerProxy
71    implements Invoker, Externalizable JavaDoc
72 {
73    // Attributes ----------------------------------------------------
74
private static final Logger log = Logger.getLogger(PooledInvokerProxy.class);
75    /** The serialVersionUID @since 1.1.4.3 */
76    private static final long serialVersionUID = -1456509931095566410L;
77    /** The current wire format we write */
78    private static final int WIRE_VERSION = 1;
79
80    /**
81     * Factory for transaction propagation contexts.
82     *
83     * @todo marcf remove all transaction spill from here
84     *
85     * When set to a non-null value, it is used to get transaction
86     * propagation contexts for remote method invocations.
87     * If <code>null</code>, transactions are not propagated on
88     * remote method invocations.
89     */

90    protected static TransactionPropagationContextFactory tpcFactory = null;
91
92    // @todo: MOVE TO TRANSACTION
93
//
94
// TPC factory
95
public static void setTPCFactory(TransactionPropagationContextFactory tpcf) {
96       tpcFactory = tpcf;
97    }
98
99    // Simple performance measurements, not thread safe
100
public static long getSocketTime = 0;
101    public static long readTime = 0;
102    public static long writeTime = 0;
103    public static long serializeTime = 0;
104    public static long deserializeTime = 0;
105    /** The number of times a connection has been obtained from a pool */
106    public static long usedPooled = 0;
107    /** The number of connections in use */
108    private static int inUseCount = 0;
109    /** The number of socket connections made */
110    private static long socketConnectCount = 0;
111    /** The number of socket close calls made */
112    private static long socketCloseCount = 0;
113
114    /**
115     * Set number of retries in getSocket method
116     */

117    public static int MAX_RETRIES = 10;
118
119    /** A class wide pool Map<ServerAddres, LinkedList<ClientSocket>> */
120    protected static final Map JavaDoc connectionPools = new ConcurrentReaderHashMap();
121
122    /**
123     * connection information
124     */

125    protected ServerAddress address;
126
127    /**
128     * Pool for this invoker. This is shared between all
129     * instances of proxies attached to a specific invoker
130     * This should not be serializable, but is for backward compatibility.
131     */

132    protected LinkedList JavaDoc pool = null;
133    /** */
134    protected int maxPoolSize;
135    /** The number of times to retry after seeing a ConnectionException */
136    protected int retryCount = 1;
137    /** The logging trace flag */
138    private transient boolean trace;
139
140    /**
141     * An encapsulation of a client connection
142     */

143    protected static class ClientSocket
144       implements HandshakeCompletedListener
145    {
146       public ObjectOutputStream JavaDoc out;
147       public ObjectInputStream JavaDoc in;
148       public Socket JavaDoc socket;
149       public int timeout;
150       public String JavaDoc sessionID;
151       private boolean handshakeComplete = false;
152       private boolean trace;
153
154       public ClientSocket(Socket JavaDoc socket, int timeout) throws Exception JavaDoc
155       {
156          this.socket = socket;
157          trace = log.isTraceEnabled();
158          boolean needHandshake = false;
159
160          if( socket instanceof SSLSocket )
161          {
162             SSLSocket ssl = (SSLSocket) socket;
163             ssl.addHandshakeCompletedListener(this);
164             if( trace )
165                log.trace("Starting SSL handshake");
166             needHandshake = true;
167             handshakeComplete = false;
168             ssl.startHandshake();
169          }
170          socket.setSoTimeout(timeout);
171          this.timeout = timeout;
172          out = new OptimizedObjectOutputStream(new BufferedOutputStream JavaDoc(socket.getOutputStream()));
173          out.flush();
174          in = new OptimizedObjectInputStream(new BufferedInputStream JavaDoc(socket.getInputStream()));
175          if( needHandshake )
176          {
177             // Loop waiting for the handshake to complete
178
socket.setSoTimeout(1000);
179             for(int n = 0; handshakeComplete == false && n < 60; n ++)
180             {
181                try
182                {
183                   int b = in.read();
184                }
185                catch(SSLException e)
186                {
187                   if( trace )
188                      log.trace("Error while waiting for handshake to complete", e);
189                   throw e;
190                }
191                catch(IOException JavaDoc e)
192                {
193                   if( trace )
194                      log.trace("Handshaked read()", e);
195                }
196             }
197             if( handshakeComplete == false )
198                throw new SSLException("Handshaked failed to complete in 60 seconds");
199             // Restore the original timeout
200
socket.setSoTimeout(timeout);
201          }
202
203       }
204
205       public void handshakeCompleted(HandshakeCompletedEvent event)
206       {
207          handshakeComplete = true;
208          byte[] id = event.getSession().getId();
209          try
210          {
211             sessionID = new String JavaDoc(id, "UTF-8");
212          }
213          catch (UnsupportedEncodingException JavaDoc e)
214          {
215             log.warn("Failed to create session id using UTF-8, using default", e);
216             sessionID = new String JavaDoc(id);
217          }
218          if( trace )
219          {
220             log.trace("handshakeCompleted, event="+event+", sessionID="+sessionID);
221          }
222       }
223
224       public String JavaDoc toString()
225       {
226          StringBuffer JavaDoc tmp = new StringBuffer JavaDoc("ClientSocket@");
227          tmp.append(System.identityHashCode(this));
228          tmp.append('[');
229          tmp.append("socket=");
230          tmp.append(socket.toString());
231          tmp.append(']');
232          return tmp.toString();
233       }
234
235       /**
236        * @todo should this be handled with weak references as this should
237        * work better with gc
238        */

239       protected void finalize()
240       {
241          if (socket != null)
242          {
243             if( trace )
244                log.trace("Closing socket in finalize: "+socket);
245             try
246             {
247                socketCloseCount --;
248                socket.close();
249             }
250             catch (Exception JavaDoc ignored) {}
251             finally
252             {
253                socket = null;
254             }
255          }
256       }
257    }
258
259    /**
260     * Clear all class level stats
261     */

262    public static void clearStats()
263    {
264       getSocketTime = 0;
265       readTime = 0;
266       writeTime = 0;
267       serializeTime = 0;
268       deserializeTime = 0;
269       usedPooled = 0;
270    }
271
272    /**
273     * @return the active number of client connections
274     */

275    public static long getInUseCount()
276    {
277       return inUseCount;
278    }
279
280    /**
281     * @return the number of times a connection was returned from a pool
282     */

283    public static long getUsedPooled()
284    {
285       return usedPooled;
286    }
287    public static long getSocketConnectCount()
288    {
289       return socketConnectCount;
290    }
291    public static long getSocketCloseCount()
292    {
293       return socketCloseCount;
294    }
295
296    /**
297     * @return the total number of pooled connections across all ServerAddresses
298     */

299    public static int getTotalPoolCount()
300    {
301       int count = 0;
302       Iterator JavaDoc iter = connectionPools.values().iterator();
303       while( iter.hasNext() )
304       {
305          List JavaDoc pool = (List JavaDoc) iter.next();
306          if( pool != null )
307             count += pool.size();
308       }
309       return count;
310    }
311
312    /**
313     * @return the proxy local pool count
314     */

315    public long getPoolCount()
316    {
317       return pool.size();
318    }
319
320    /**
321     * Exposed for externalization.
322     */

323    public PooledInvokerProxy()
324    {
325       super();
326       trace = log.isTraceEnabled();
327    }
328
329    /**
330     * Create a new Proxy.
331     *
332     */

333    public PooledInvokerProxy(ServerAddress sa, int maxPoolSize)
334    {
335       this(sa, maxPoolSize, MAX_RETRIES);
336    }
337    public PooledInvokerProxy(ServerAddress sa, int maxPoolSize, int retryCount)
338    {
339       this.address = sa;
340       this.maxPoolSize = maxPoolSize;
341       this.retryCount = retryCount;
342    }
343
344    /**
345     * Close all sockets in a specific pool.
346     */

347    public static void clearPool(ServerAddress sa)
348    {
349       boolean trace = log.isTraceEnabled();
350       if( trace )
351          log.trace("clearPool, sa: "+sa);
352       try
353       {
354          LinkedList JavaDoc thepool = (LinkedList JavaDoc)connectionPools.get(sa);
355          if (thepool == null) return;
356          synchronized (thepool)
357          {
358             int size = thepool.size();
359             for (int i = 0; i < size; i++)
360             {
361                ClientSocket cs = null;
362                try
363                {
364                   ClientSocket socket = (ClientSocket)thepool.removeFirst();
365                   cs = socket;
366                   if( trace )
367                      log.trace("Closing, ClientSocket: "+socket);
368                   socketCloseCount --;
369                   socket.socket.close();
370                }
371                catch (Exception JavaDoc ignored)
372                {
373                }
374                finally
375                {
376                   if( cs != null )
377                      cs.socket = null;
378                }
379             }
380          }
381       }
382       catch (Exception JavaDoc ex)
383       {
384          // ignored
385
}
386    }
387    /**
388     * Close all sockets in all pools
389     */

390    public static void clearPools()
391    {
392       synchronized (connectionPools)
393       {
394          Iterator JavaDoc it = connectionPools.keySet().iterator();
395          while (it.hasNext())
396          {
397             ServerAddress sa = (ServerAddress)it.next();
398             clearPool(sa);
399          }
400       }
401    }
402
403    protected void initPool()
404    {
405       synchronized (connectionPools)
406       {
407          pool = (LinkedList JavaDoc)connectionPools.get(address);
408          if (pool == null)
409          {
410             pool = new LinkedList JavaDoc();
411             connectionPools.put(address, pool);
412          }
413       }
414    }
415
416    protected ClientSocket getConnection() throws Exception JavaDoc
417    {
418       Socket JavaDoc socket = null;
419       ClientSocket cs = null;
420
421       //
422
// Need to retry a few times
423
// on socket connection because, at least on Windoze,
424
// if too many concurrent threads try to connect
425
// at same time, you get ConnectionRefused
426
//
427
// Retrying seems to be the most performant.
428
//
429
// This problem always happens with RMI and seems to
430
// have nothing to do with backlog or number of threads
431
// waiting in accept() on the server.
432
//
433
for (int i = 0; i < retryCount; i++)
434       {
435          ClientSocket pooled = getPooledConnection();
436          if (pooled != null)
437          {
438             usedPooled++;
439             inUseCount ++;
440             return pooled;
441          }
442
443          try
444          {
445             if( trace)
446             {
447                log.trace("Connecting to addr: "+address.address
448                   +", port: "+address.port
449                   +",clientSocketFactory: "+address.clientSocketFactory
450                   +",enableTcpNoDelay: "+address.enableTcpNoDelay
451                   +",timeout: "+address.timeout);
452             }
453             if( address.clientSocketFactory != null )
454                socket = address.clientSocketFactory.createSocket(address.address, address.port);
455             else
456                socket = new Socket JavaDoc(address.address, address.port);
457             socketConnectCount ++;
458             if( trace )
459                log.trace("Connected, socket="+socket);
460
461             socket.setTcpNoDelay(address.enableTcpNoDelay);
462             cs = new ClientSocket(socket, address.timeout);
463             inUseCount ++;
464             if( trace )
465             {
466                log.trace("New ClientSocket: "+cs
467                   +", usedPooled="+ usedPooled
468                   +", inUseCount="+ inUseCount
469                   +", socketConnectCount="+ socketConnectCount
470                   +", socketCloseCount="+ socketCloseCount
471                );
472             }
473             break;
474          }
475          catch (Exception JavaDoc ex)
476          {
477             if( ex instanceof InterruptedIOException JavaDoc || ex instanceof SocketException JavaDoc )
478             {
479                if( trace )
480                   log.trace("Connect failed", ex);
481                if (i + 1 < retryCount)
482                {
483                   Thread.sleep(1);
484                   continue;
485                }
486             }
487             throw ex;
488          }
489       }
490       // Should not happen
491
if( cs == null )
492          throw new ConnectException JavaDoc("Failed to obtain a socket, tries="+retryCount);
493       return cs;
494    }
495
496    protected synchronized ClientSocket getPooledConnection()
497    {
498       ClientSocket socket = null;
499       while (pool.size() > 0)
500       {
501          try
502          {
503             synchronized( pool )
504             {
505                socket = (ClientSocket)pool.removeFirst();
506             }
507             // Test to see if socket is alive by send ACK message
508
if( trace )
509                log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress());
510             final byte ACK = 1;
511             socket.out.writeByte(ACK);
512             socket.out.flush();
513             socket.in.readByte();
514             if( trace )
515             {
516                log.trace("Using pooled ClientSocket: "+socket
517                   +", usedPooled="+ usedPooled
518                   +", inUseCount="+ inUseCount
519                   +", socketConnectCount="+ socketConnectCount
520                   +", socketCloseCount="+ socketCloseCount
521                );
522             }
523             return socket;
524          }
525          catch (Exception JavaDoc ex)
526          {
527             if( trace )
528                log.trace("Failed to validate pooled socket: "+socket, ex);
529             try
530             {
531                if( socket != null )
532                {
533                   socketCloseCount --;
534                   socket.socket.close();
535                }
536             }
537             catch (Exception JavaDoc ignored)
538             {
539             }
540             finally
541             {
542                if( socket != null )
543                   socket.socket = null;
544             }
545          }
546       }
547       return null;
548    }
549
550    /**
551     * Return a socket to the pool
552     * @param socket
553     * @return true if socket was added to the pool, false if the pool
554     * was full
555     */

556    protected synchronized boolean returnConnection(ClientSocket socket)
557    {
558       boolean pooled = false;
559       synchronized( pool )
560       {
561          if (pool.size() < maxPoolSize)
562          {
563             pool.add(socket);
564             inUseCount --;
565             pooled = true;
566          }
567       }
568       return pooled;
569    }
570
571    /**
572     * The name of of the server.
573     */

574    public String JavaDoc getServerHostName() throws Exception JavaDoc
575    {
576       return address.address;
577    }
578
579    /**
580     * ???
581     *
582     * @todo MOVE TO TRANSACTION
583     *
584     * @return the transaction propagation context of the transaction
585     * associated with the current thread.
586     * Returns <code>null</code> if the transaction manager was never
587     * set, or if no transaction is associated with the current thread.
588     */

589    public Object JavaDoc getTransactionPropagationContext()
590       throws SystemException JavaDoc
591    {
592       return (tpcFactory == null) ? null : tpcFactory.getTransactionPropagationContext();
593    }
594
595
596    /**
597     * The invocation on the delegate, calls the right invoker. Remote if we are remote,
598     * local if we are local.
599     */

600    public Object JavaDoc invoke(Invocation invocation)
601       throws Exception JavaDoc
602    {
603       boolean trace = log.isTraceEnabled();
604       // We are going to go through a Remote invocation, switch to a Marshalled Invocation
605
PooledMarshalledInvocation mi = new PooledMarshalledInvocation(invocation);
606
607       // Set the transaction propagation context
608
// @todo: MOVE TO TRANSACTION
609
mi.setTransactionPropagationContext(getTransactionPropagationContext());
610
611       Object JavaDoc response = null;
612       long start = System.currentTimeMillis();
613       ClientSocket socket = getConnection();
614       long end = System.currentTimeMillis() - start;
615       getSocketTime += end;
616       // Add the socket session if it exists
617
if( socket.sessionID != null )
618       {
619          mi.setValue("SESSION_ID", socket.sessionID);
620          if( trace )
621             log.trace("Added SESSION_ID to invocation");
622       }
623
624       try
625       {
626          if( trace )
627             log.trace("Sending invocation to: "+mi.getObjectName());
628          socket.out.writeObject(mi);
629          socket.out.reset();
630          socket.out.writeObject(Boolean.TRUE); // for stupid ObjectInputStream reset
631
socket.out.flush();
632          socket.out.reset();
633          end = System.currentTimeMillis() - start;
634          writeTime += end;
635          start = System.currentTimeMillis();
636          response = socket.in.readObject();
637          // to make sure stream gets reset
638
// Stupid ObjectInputStream holds object graph
639
// can only be set by the client/server sending a TC_RESET
640
socket.in.readObject();
641          end = System.currentTimeMillis() - start;
642          readTime += end;
643       }
644       catch (Exception JavaDoc ex)
645       {
646          if( trace )
647             log.trace("Failure during invoke", ex);
648          try
649          {
650             socketCloseCount --;
651             socket.socket.close();
652          }
653          catch (Exception JavaDoc ignored) {}
654          finally
655          {
656             socket.socket = null;
657          }
658          throw new java.rmi.ConnectException JavaDoc("Failure during invoke", ex);
659       }
660
661       // Put socket back in pool for reuse
662
if( returnConnection(socket) == false )
663       {
664          // Failed, close the socket
665
if( trace )
666             log.trace("Closing unpooled socket: "+socket);
667          try
668          {
669             socketCloseCount --;
670             socket.socket.close();
671          }
672          catch (Exception JavaDoc ignored) {}
673          finally
674          {
675             socket.socket = null;
676          }
677       }
678
679       // Return response
680

681       try
682       {
683          if (response instanceof Exception JavaDoc)
684          {
685             throw ((Exception JavaDoc)response);
686          }
687          if (response instanceof MarshalledObject JavaDoc)
688          {
689             return ((MarshalledObject JavaDoc)response).get();
690          }
691          return response;
692       }
693       catch (ServerException JavaDoc ex)
694       {
695          // Suns RMI implementation wraps NoSuchObjectException in
696
// a ServerException. We cannot have that if we want
697
// to comply with the spec, so we unwrap here.
698
if (ex.detail instanceof NoSuchObjectException JavaDoc)
699          {
700             throw (NoSuchObjectException JavaDoc) ex.detail;
701          }
702          //likewise
703
if (ex.detail instanceof TransactionRolledbackException JavaDoc)
704          {
705             throw (TransactionRolledbackException JavaDoc) ex.detail;
706          }
707          throw ex;
708       }
709    }
710
711    /**
712     * Write out the serializable data
713     * @serialData address ServerAddress
714     * @serialData maxPoolSize int
715     * @serialData WIRE_VERSION int version
716     * @serialData retryCount int
717     * @param out
718     * @throws IOException
719     */

720    public void writeExternal(final ObjectOutput JavaDoc out)
721       throws IOException JavaDoc
722    {
723       // The legacy wire format is address, maxPoolSize
724
out.writeObject(address);
725       out.writeInt(maxPoolSize);
726       // Write out the current version format and its data
727
out.writeInt(WIRE_VERSION);
728       out.writeInt(retryCount);
729    }
730
731    public void readExternal(final ObjectInput JavaDoc in)
732       throws IOException JavaDoc, ClassNotFoundException JavaDoc
733    {
734       trace = log.isTraceEnabled();
735       address = (ServerAddress)in.readObject();
736       maxPoolSize = in.readInt();
737       int version = 0;
738       try
739       {
740          version = in.readInt();
741       }
742       catch(EOFException JavaDoc e)
743       {
744          // No version written and there is no more data
745
}
746       catch(OptionalDataException JavaDoc e)
747       {
748          // No version written and there is data from other objects
749
}
750
751       switch( version )
752       {
753          case 0:
754             // This has no retryCount, default it to the hard-coded value
755
retryCount = MAX_RETRIES;
756             break;
757          case 1:
758             readVersion1(in);
759             break;
760          default:
761             /* Assume a newer version that only adds defaultable values.
762             The alternative would be to thrown an exception
763             */

764             break;
765       }
766       initPool();
767    }
768
769    private void readVersion1(final ObjectInput JavaDoc in)
770       throws IOException JavaDoc
771    {
772       retryCount = in.readInt();
773    }
774 }
775
Popular Tags