KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > resource > connectionmanager > InternalManagedConnectionPool


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.resource.connectionmanager;
23
24 import java.util.ArrayList JavaDoc;
25 import java.util.Collections JavaDoc;
26 import java.util.HashSet JavaDoc;
27 import java.util.Iterator JavaDoc;
28 import java.util.Set JavaDoc;
29
30 import javax.resource.ResourceException JavaDoc;
31 import javax.resource.spi.ConnectionRequestInfo JavaDoc;
32 import javax.resource.spi.ManagedConnection JavaDoc;
33 import javax.resource.spi.ManagedConnectionFactory JavaDoc;
34 import javax.resource.spi.ValidatingManagedConnectionFactory JavaDoc;
35 import javax.security.auth.Subject JavaDoc;
36
37 import org.jboss.logging.Logger;
38 import org.jboss.resource.JBossResourceException;
39 import org.jboss.util.UnreachableStatementException;
40
41 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
43
44 /**
45  * The internal pool implementation
46  *
47  * @author <a HREF="mailto:d_jencks@users.sourceforge.net">David Jencks</a>
48  * @author <a HREF="mailto:adrian@jboss.org">Adrian Brock</a>
49  * @author <a HREF="mailto:weston.price@jboss.com">Weston Price</a>
50  * @version $Revision: 57101 $
51  */

52 public class InternalManagedConnectionPool implements IdleConnectionRemovalSupport
53 {
54    /** The managed connection factory */
55    private final ManagedConnectionFactory JavaDoc mcf;
56
57    /** The connection listener factory */
58    private final ConnectionListenerFactory clf;
59
60    /** The default subject */
61    private final Subject JavaDoc defaultSubject;
62
63    /** The default connection request information */
64    private final ConnectionRequestInfo JavaDoc defaultCri;
65
66    /** The pooling parameters */
67    private final PoolParams poolParams;
68
69    /** Copy of the maximum size from the pooling parameters.
70     * Dynamic changes to this value are not compatible with
71     * the semaphore which cannot change be dynamically changed.
72     */

73    private int maxSize;
74
75    /** The available connection event listeners */
76    private ArrayList JavaDoc cls;
77
78    /** The permits used to control who can checkout a connection */
79    private final FIFOSemaphore permits;
80
81    /** The log */
82    private final Logger log;
83
84    /** Whether trace is enabled */
85    private final boolean trace;
86
87    /** Stats */
88    private final Counter connectionCounter = new Counter();
89
90    /** The checked out connections */
91    private final HashSet JavaDoc checkedOut = new HashSet JavaDoc();
92
93    /** Whether the pool has been started */
94    private boolean started = false;
95
96    /** Whether the pool has been shutdown */
97    private SynchronizedBoolean shutdown = new SynchronizedBoolean(false);
98
99    /** the max connections ever checked out **/
100    private volatile int maxUsedConnections = 0;
101
102    /**
103     * Create a new internal pool
104     *
105     * @param mcf the managed connection factory
106     * @param subject the subject
107     * @param cri the connection request information
108     * @param poolParams the pooling parameters
109     * @param log the log
110     */

111    protected InternalManagedConnectionPool(ManagedConnectionFactory JavaDoc mcf, ConnectionListenerFactory clf, Subject JavaDoc subject,
112          ConnectionRequestInfo JavaDoc cri, PoolParams poolParams, Logger log)
113    {
114       this.mcf = mcf;
115       this.clf = clf;
116       defaultSubject = subject;
117       defaultCri = cri;
118       this.poolParams = poolParams;
119       this.maxSize = poolParams.maxSize;
120       this.log = log;
121       this.trace = log.isTraceEnabled();
122       cls = new ArrayList JavaDoc(this.maxSize);
123       permits = new FIFOSemaphore(this.maxSize);
124   
125       if(poolParams.prefill){
126          
127          PoolFiller.fillPool(this);
128          
129       }
130    }
131
132    /**
133     * Initialize the pool
134     */

135    protected void initialize()
136    {
137       if (poolParams.idleTimeout != 0)
138          IdleRemover.registerPool(this, poolParams.idleTimeout);
139
140       if (poolParams.backgroundValidation)
141       {
142
143          log.debug("Registering for background validation at interval " + poolParams.backgroundInterval);
144          ConnectionValidator.registerPool(this, poolParams.backgroundInterval);
145
146       }
147    }
148
149    public long getAvailableConnections()
150    {
151       return permits.permits();
152    }
153
154    public int getMaxConnectionsInUseCount()
155    {
156       return maxUsedConnections;
157    }
158
159    public int getConnectionInUseCount()
160    {
161       return checkedOut.size();
162    }
163
164    /**
165     * todo distinguish between connection dying while match called
166     * and bad match strategy. In latter case we should put it back in
167     * the pool.
168     */

169    public ConnectionListener getConnection(Subject JavaDoc subject, ConnectionRequestInfo JavaDoc cri) throws ResourceException JavaDoc
170    {
171       subject = (subject == null) ? defaultSubject : subject;
172       cri = (cri == null) ? defaultCri : cri;
173       long startWait = System.currentTimeMillis();
174       try
175       {
176          connectionCounter.updateBlockTime(System.currentTimeMillis() - startWait);
177          
178          if (permits.attempt(poolParams.blockingTimeout))
179          {
180             //We have a permit to get a connection. Is there one in the pool already?
181
ConnectionListener cl = null;
182             do
183             {
184                synchronized (cls)
185                {
186                   if (shutdown.get())
187                   {
188                      permits.release();
189                      throw new ResourceException JavaDoc("The pool has been shutdown");
190                   }
191
192                   if (cls.size() > 0)
193                   {
194                      cl = (ConnectionListener) cls.remove(cls.size() - 1);
195                      checkedOut.add(cl);
196                      int size = (int) (maxSize - permits.permits());
197                      if (size > maxUsedConnections)
198                         maxUsedConnections = size;
199                   }
200                }
201                if (cl != null)
202                {
203                   //Yes, we retrieved a ManagedConnection from the pool. Does it match?
204
try
205                   {
206                      Object JavaDoc matchedMC = mcf.matchManagedConnections(Collections.singleton(cl.getManagedConnection()),
207                            subject, cri);
208                      if (matchedMC != null)
209                      {
210                         if (trace)
211                            log.trace("supplying ManagedConnection from pool: " + cl);
212                         cl.grantPermit(true);
213                         return cl;
214                      }
215
216                      //Match did not succeed but no exception was thrown.
217
//Either we have the matching strategy wrong or the
218
//connection died while being checked. We need to
219
//distinguish these cases, but for now we always
220
//destroy the connection.
221
log.warn("Destroying connection that could not be successfully matched: " + cl);
222                      synchronized (cls)
223                      {
224                         checkedOut.remove(cl);
225                      }
226                      doDestroy(cl);
227                      cl = null;
228                   }
229                   catch (Throwable JavaDoc t)
230                   {
231                      log.warn("Throwable while trying to match ManagedConnection, destroying connection: " + cl, t);
232                      synchronized (cls)
233                      {
234                         checkedOut.remove(cl);
235                      }
236                      doDestroy(cl);
237                      cl = null;
238                   }
239                }
240             }
241             while (cls.size() > 0);//end of do loop
242

243             //OK, we couldnt find a working connection from the pool. Make a new one.
244
try
245             {
246                //No, the pool was empty, so we have to make a new one.
247
cl = createConnectionEventListener(subject, cri);
248                synchronized (cls)
249                {
250                   checkedOut.add(cl);
251                   int size = (int) (maxSize - permits.permits());
252                   if (size > maxUsedConnections)
253                      maxUsedConnections = size;
254                }
255
256                //lack of synch on "started" probably ok, if 2 reads occur we will just
257
//run fillPool twice, no harm done.
258
if (started == false)
259                {
260                   started = true;
261                   if (poolParams.minSize > 0)
262                      PoolFiller.fillPool(this);
263                }
264                if (trace)
265                   log.trace("supplying new ManagedConnection: " + cl);
266                cl.grantPermit(true);
267                return cl;
268             }
269             catch (Throwable JavaDoc t)
270             {
271                log.warn("Throwable while attempting to get a new connection: " + cl, t);
272                //return permit and rethrow
273
synchronized (cls)
274                {
275                   checkedOut.remove(cl);
276                }
277                permits.release();
278                JBossResourceException.rethrowAsResourceException("Unexpected throwable while trying to create a connection: " + cl, t);
279                throw new UnreachableStatementException();
280             }
281          }
282          else
283          {
284             // we timed out
285
throw new ResourceException JavaDoc("No ManagedConnections available within configured blocking timeout ( "
286                   + poolParams.blockingTimeout + " [ms] )");
287          }
288
289       }
290       catch (InterruptedException JavaDoc ie)
291       {
292          long end = System.currentTimeMillis() - startWait;
293          throw new ResourceException JavaDoc("Interrupted while requesting permit! Waited " + end + " ms");
294       }
295    }
296
297    public void returnConnection(ConnectionListener cl, boolean kill)
298    {
299       if (cl.getState() == ConnectionListener.DESTROYED)
300       {
301          log.trace("ManagedConnection is being returned after it was destroyed" + cl);
302          if (cl.hasPermit())
303          {
304             // release semaphore
305
cl.grantPermit(false);
306             permits.release();
307          }
308
309          return;
310       }
311
312       if (trace)
313          log.trace("putting ManagedConnection back into pool kill=" + kill + " cl=" + cl);
314       try
315       {
316          cl.getManagedConnection().cleanup();
317       }
318       catch (ResourceException JavaDoc re)
319       {
320          log.warn("ResourceException cleaning up ManagedConnection: " + cl, re);
321          kill = true;
322       }
323
324       // We need to destroy this one
325
if (cl.getState() == ConnectionListener.DESTROY)
326          kill = true;
327
328       synchronized (cls)
329       {
330          checkedOut.remove(cl);
331
332          // This is really an error
333
if (kill == false && cls.size() >= poolParams.maxSize)
334          {
335             log.warn("Destroying returned connection, maximum pool size exceeded " + cl);
336             kill = true;
337          }
338
339          // If we are destroying, check the connection is not in the pool
340
if (kill)
341          {
342             // Adrian Brock: A resource adapter can asynchronously notify us that
343
// a connection error occurred.
344
// This could happen while the connection is not checked out.
345
// e.g. JMS can do this via an ExceptionListener on the connection.
346
// I have twice had to reinstate this line of code, PLEASE DO NOT REMOTE IT!
347
cls.remove(cl);
348          }
349          // return to the pool
350
else
351          {
352             cl.used();
353             cls.add(cl);
354          }
355
356          if (cl.hasPermit())
357          {
358             // release semaphore
359
cl.grantPermit(false);
360             permits.release();
361          }
362       }
363
364       if (kill)
365       {
366          if (trace)
367             log.trace("Destroying returned connection " + cl);
368          doDestroy(cl);
369       }
370
371    }
372
373    public void flush()
374    {
375       ArrayList JavaDoc destroy = null;
376       synchronized (cls)
377       {
378          if (trace)
379             log.trace("Flushing pool checkedOut=" + checkedOut + " inPool=" + cls);
380
381          // Mark checked out connections as requiring destruction
382
for (Iterator JavaDoc i = checkedOut.iterator(); i.hasNext();)
383          {
384             ConnectionListener cl = (ConnectionListener) i.next();
385             if (trace)
386                log.trace("Flush marking checked out connection for destruction " + cl);
387             cl.setState(ConnectionListener.DESTROY);
388          }
389          // Destroy connections in the pool
390
while (cls.size() > 0)
391          {
392             ConnectionListener cl = (ConnectionListener) cls.remove(0);
393             if (destroy == null)
394                destroy = new ArrayList JavaDoc();
395             destroy.add(cl);
396          }
397       }
398
399       // We need to destroy some connections
400
if (destroy != null)
401       {
402          for (int i = 0; i < destroy.size(); ++i)
403          {
404             ConnectionListener cl = (ConnectionListener) destroy.get(i);
405             if (trace)
406                log.trace("Destroying flushed connection " + cl);
407             doDestroy(cl);
408          }
409
410          // We destroyed something, check the minimum.
411
if (shutdown.get() == false && poolParams.minSize > 0)
412             PoolFiller.fillPool(this);
413       }
414    }
415
416    public void removeIdleConnections()
417    {
418       ArrayList JavaDoc destroy = null;
419       long timeout = System.currentTimeMillis() - poolParams.idleTimeout;
420       while (true)
421       {
422          synchronized (cls)
423          {
424             
425             // Nothing left to destroy
426
if (cls.size() == 0)
427                break;
428
429             // Check the first in the list
430
ConnectionListener cl = (ConnectionListener) cls.get(0);
431             if (cl.isTimedOut(timeout) && shouldRemove())
432             {
433                connectionCounter.incTimedOut();
434                // We need to destroy this one
435
cls.remove(0);
436                if (destroy == null)
437                   destroy = new ArrayList JavaDoc();
438                destroy.add(cl);
439             }
440             else
441             {
442                //They were inserted chronologically, so if this one isn't timed out, following ones won't be either.
443
break;
444             }
445          }
446       }
447
448       // We found some connections to destroy
449
if (destroy != null)
450       {
451          for (int i = 0; i < destroy.size(); ++i)
452          {
453             ConnectionListener cl = (ConnectionListener) destroy.get(i);
454             if (trace)
455                log.trace("Destroying timedout connection " + cl);
456             doDestroy(cl);
457          }
458
459          // We destroyed something, check the minimum.
460
if (shutdown.get() == false && poolParams.minSize > 0)
461             PoolFiller.fillPool(this);
462       }
463    }
464
465    
466    /**
467     * For testing
468     */

469    public void shutdownWithoutClear()
470    {
471       IdleRemover.unregisterPool(this);
472       IdleRemover.waitForBackgroundThread();
473       ConnectionValidator.unRegisterPool(this);
474       ConnectionValidator.waitForBackgroundThread();
475
476       fillToMin();
477       shutdown.set(true);
478    }
479
480    public void shutdown()
481    {
482       shutdown.set(true);
483       IdleRemover.unregisterPool(this);
484       ConnectionValidator.unRegisterPool(this);
485       flush();
486    }
487
488    public void fillToMin()
489    {
490       while (true)
491       {
492          // Get a permit - avoids a race when the pool is nearly full
493
// Also avoids unnessary fill checking when all connections are checked out
494
try
495          {
496             if (permits.attempt(poolParams.blockingTimeout))
497             {
498                try
499                {
500                   if (shutdown.get())
501                      return;
502
503                   // We already have enough connections
504
if (getMinSize() - connectionCounter.getGuaranteedCount() <= 0)
505                      return;
506
507                   // Create a connection to fill the pool
508
try
509                   {
510                      ConnectionListener cl = createConnectionEventListener(defaultSubject, defaultCri);
511                      synchronized (cls)
512                      {
513                         if (trace)
514                            log.trace("Filling pool cl=" + cl);
515                         cls.add(cl);
516                      }
517                   }
518                   catch (ResourceException JavaDoc re)
519                   {
520                      log.warn("Unable to fill pool ", re);
521                      return;
522                   }
523                }
524                finally
525                {
526                   permits.release();
527                }
528             }
529          }
530          catch (InterruptedException JavaDoc ignored)
531          {
532             log.trace("Interrupted while requesting permit in fillToMin");
533          }
534       }
535    }
536
537    public int getConnectionCount()
538    {
539       return connectionCounter.getCount();
540    }
541    
542    public long getTotalBlockTime(){
543       
544       return connectionCounter.getTotalBlockTime();
545       
546    }
547    
548    public int getTimedOut(){
549       
550       return connectionCounter.getTimedOut();
551    }
552    
553    public long getAverageBlockTime(){
554       
555       return connectionCounter.getTotalBlockTime() / getConnectionCreatedCount();
556             
557    }
558    public int getConnectionCreatedCount()
559    {
560       return connectionCounter.getCreatedCount();
561    }
562
563    public int getConnectionDestroyedCount()
564    {
565       return connectionCounter.getDestroyedCount();
566    }
567
568    /**
569     * Create a connection event listener
570     *
571     * @param subject the subject
572     * @param cri the connection request information
573     * @return the new listener
574     * @throws ResourceException for any error
575     */

576    private ConnectionListener createConnectionEventListener(Subject JavaDoc subject, ConnectionRequestInfo JavaDoc cri)
577          throws ResourceException JavaDoc
578    {
579       ManagedConnection JavaDoc mc = mcf.createManagedConnection(subject, cri);
580       connectionCounter.inc();
581       try
582       {
583          return clf.createConnectionListener(mc, this);
584       }
585       catch (ResourceException JavaDoc re)
586       {
587          connectionCounter.dec();
588          mc.destroy();
589          throw re;
590       }
591    }
592
593    /**
594     * Destroy a connection
595     *
596     * @param cl the connection to destroy
597     */

598    private void doDestroy(ConnectionListener cl)
599    {
600       if (cl.getState() == ConnectionListener.DESTROYED)
601       {
602          log.trace("ManagedConnection is already destroyed " + cl);
603          return;
604       }
605
606       connectionCounter.dec();
607       cl.setState(ConnectionListener.DESTROYED);
608       try
609       {
610          cl.getManagedConnection().destroy();
611       }
612       catch (Throwable JavaDoc t)
613       {
614          log.warn("Exception destroying ManagedConnection " + cl, t);
615       }
616
617    }
618    
619    private boolean shouldRemove()
620    {
621       boolean remove = true;
622       
623       if(poolParams.stictMin)
624       {
625          remove = cls.size() > poolParams.minSize;
626          
627          log.trace("StrictMin is active. Current connection will be removed is " + remove);
628          
629       }
630       
631       return remove;
632       
633    }
634    
635    public void validateConnections() throws Exception JavaDoc
636    {
637
638       if (trace)
639          log.trace("Attempting to validate connections for pool " + this);
640
641       if (permits.attempt(poolParams.blockingTimeout))
642       {
643
644          boolean destroyed = false;
645
646          try
647          {
648
649             while (true)
650             {
651
652                ConnectionListener cl = null;
653
654                synchronized (cls)
655                {
656                   if (cls.size() == 0)
657                   {
658
659                      break;
660
661                   }
662
663                   cl = removeForFrequencyCheck();
664
665                }
666
667                if (cl == null)
668                {
669
670                   break;
671                }
672
673                try
674                {
675
676                   Set JavaDoc candidateSet = Collections.singleton(cl.getManagedConnection());
677
678                   if (mcf instanceof ValidatingManagedConnectionFactory JavaDoc)
679                   {
680                      ValidatingManagedConnectionFactory JavaDoc vcf = (ValidatingManagedConnectionFactory JavaDoc) mcf;
681                      candidateSet = vcf.getInvalidConnections(candidateSet);
682
683                      if (candidateSet != null && candidateSet.size() > 0)
684                      {
685
686                         if (cl.getState() != ConnectionListener.DESTROY)
687                         {
688                            log.warn("");
689                            doDestroy(cl);
690                            destroyed = true;
691
692                         }
693                      }
694
695                   }
696                   else
697                   {
698                      log.warn("warning: background validation was specified with a non compliant ManagedConnectionFactory interface.");
699                   }
700
701                }
702                finally
703                {
704                   synchronized (cls)
705                   {
706
707                      returnForFrequencyCheck(cl);
708
709                   }
710
711                }
712
713             }
714
715          }
716          finally
717          {
718             permits.release();
719
720             if (destroyed && shutdown.get() == false && poolParams.minSize > 0)
721             {
722                PoolFiller.fillPool(this);
723             }
724
725          }
726
727       }
728
729    }
730    private ConnectionListener removeForFrequencyCheck()
731    {
732
733       log.debug("Checking for connection within frequency");
734
735       ConnectionListener cl = null;
736
737       for (Iterator JavaDoc iter = cls.iterator(); iter.hasNext();)
738       {
739
740          cl = (ConnectionListener) iter.next();
741          long lastCheck = cl.getLastValidatedTime();
742
743          if ((System.currentTimeMillis() - lastCheck) >= poolParams.backgroundInterval)
744          {
745             cls.remove(cl);
746             break;
747
748          }
749          else
750          {
751             cl = null;
752          }
753
754       }
755
756       return cl;
757    }
758
759    private void returnForFrequencyCheck(ConnectionListener cl)
760    {
761
762       log.debug("Returning for connection within frequency");
763
764       cl.setLastValidatedTime(System.currentTimeMillis());
765       cls.add(cl);
766
767    }
768    /**
769     * Guard against configurations or
770     * dynamic changes that may increase the minimum
771     * beyond the maximum
772     */

773    private int getMinSize()
774    {
775       if (poolParams.minSize > maxSize)
776          return maxSize;
777       
778       return poolParams.minSize;
779    }
780
781    public static class PoolParams
782    {
783        public int minSize = 0;
784
785         public int maxSize = 10;
786
787         public int blockingTimeout = 30000; // milliseconds
788

789         public long idleTimeout = 1000 * 60 * 30; // milliseconds, 30 minutes.
790

791         public boolean backgroundValidation; // set to false by default
792

793         public long backgroundInterval = 1000 * 60 * 10; // milliseconds, 10
794
// minutes;
795
public boolean prefill;
796       
797         public boolean stictMin;
798    }
799
800    /**
801      * Stats
802      */

803    private static class Counter
804    {
805       private int created = 0;
806
807       private int destroyed = 0;
808
809       private long totalBlockTime;
810       
811       private int timedOut;
812       
813       synchronized int getGuaranteedCount()
814       {
815          return created - destroyed;
816       }
817
818       int getCount()
819       {
820          return created - destroyed;
821       }
822
823       int getCreatedCount()
824       {
825          return created;
826       }
827
828       int getDestroyedCount()
829       {
830          return destroyed;
831       }
832
833       synchronized void inc()
834       {
835          ++created;
836       }
837
838       synchronized void dec()
839       {
840          ++destroyed;
841       }
842    
843       synchronized void updateBlockTime(long latest){
844          
845          totalBlockTime += latest;
846          
847       }
848
849       long getTotalBlockTime(){
850       
851          return totalBlockTime;
852          
853       }
854  
855       int getTimedOut(){
856          
857          return timedOut;
858          
859       }
860       
861       synchronized void incTimedOut(){
862          
863          ++timedOut;
864          
865       }
866    }
867 }
Popular Tags