KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > axis > transport > jms > JMSConnectorManager


1 /*
2  * Copyright 2001, 2002,2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16
17 package org.apache.axis.transport.jms;
18
19 import org.apache.axis.AxisFault;
20 import org.apache.axis.components.jms.JMSVendorAdapter;
21 import org.apache.axis.components.logger.LogFactory;
22 import org.apache.axis.utils.Messages;
23 import org.apache.commons.logging.Log;
24
25 import java.util.HashMap JavaDoc;
26 import java.util.Iterator JavaDoc;
27
28 /**
29  * JMSConnectorManager manages a pool of connectors and works with the
30  * vendor adapters to support the reuse of JMS connections.
31  *
32  * @author Ray Chun (rchun@sonicsoftware.com)
33  */

34 public class JMSConnectorManager
35 {
36     protected static Log log =
37             LogFactory.getLog(JMSConnectorManager.class.getName());
38
39     private static JMSConnectorManager s_instance = new JMSConnectorManager();
40
41     private static HashMap JavaDoc vendorConnectorPools = new HashMap JavaDoc();
42     private int DEFAULT_WAIT_FOR_SHUTDOWN = 90000; // 1.5 minutes
43

44     private JMSConnectorManager()
45     {
46     }
47
48     public static JMSConnectorManager getInstance()
49     {
50         return s_instance;
51     }
52
53     /**
54      * Returns the pool of JMSConnectors for a particular vendor
55      */

56     public ShareableObjectPool getVendorPool(String JavaDoc vendorId)
57     {
58         return (ShareableObjectPool)vendorConnectorPools.get(vendorId);
59     }
60
61     /**
62      * Retrieves a JMSConnector that satisfies the provided connector criteria
63      */

64     public JMSConnector getConnector(HashMap JavaDoc connectorProperties,
65                                      HashMap JavaDoc connectionFactoryProperties,
66                                      String JavaDoc username,
67                                      String JavaDoc password,
68                                      JMSVendorAdapter vendorAdapter)
69         throws AxisFault
70     {
71         JMSConnector connector = null;
72
73         try
74         {
75             // check for a vendor-specific pool, and create if necessary
76
ShareableObjectPool vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
77             if (vendorConnectors == null)
78             {
79                 synchronized (vendorConnectorPools)
80                 {
81                     vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
82                     if (vendorConnectors == null)
83                     {
84                         vendorConnectors = new ShareableObjectPool();
85                         vendorConnectorPools.put(vendorAdapter.getVendorId(), vendorConnectors);
86                     }
87                 }
88             }
89
90             // look for a matching JMSConnector among existing connectors
91
synchronized (vendorConnectors)
92             {
93                 try
94                 {
95
96                     connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
97                                                                    connectorProperties,
98                                                                    connectionFactoryProperties,
99                                                                    username,
100                                                                    password,
101                                                                    vendorAdapter);
102                 }
103                 catch (Exception JavaDoc e) {} // ignore. a new connector will be created if no match is found
104

105                 if (connector == null)
106                 {
107                         connector = JMSConnectorFactory.createClientConnector(connectorProperties,
108                                                                               connectionFactoryProperties,
109                                                                               username,
110                                                                               password,
111                                                                               vendorAdapter);
112                         connector.start();
113                 }
114             }
115         }
116         catch (Exception JavaDoc e)
117         {
118             log.error(Messages.getMessage("cannotConnectError"), e);
119
120             if(e instanceof AxisFault)
121                 throw (AxisFault)e;
122             throw new AxisFault("cannotConnect", e);
123         }
124
125         return connector;
126     }
127
128     /**
129      * Closes JMSConnectors in all pools
130      */

131     void closeAllConnectors()
132     {
133         if (log.isDebugEnabled()) {
134             log.debug("Enter: JMSConnectorManager::closeAllConnectors");
135         }
136
137         synchronized (vendorConnectorPools)
138         {
139             Iterator JavaDoc iter = vendorConnectorPools.values().iterator();
140             while (iter.hasNext())
141             {
142                 // close all connectors in the vendor pool
143
ShareableObjectPool pool = (ShareableObjectPool)iter.next();
144                 synchronized (pool)
145                 {
146                     java.util.Iterator JavaDoc connectors = pool.getElements().iterator();
147                     while (connectors.hasNext())
148                     {
149                         JMSConnector conn = (JMSConnector)connectors.next();
150                         try
151                         {
152                             // shutdown automatically decrements the ref count of a connector before closing it
153
// call reserve() to simulate the checkout
154
reserve(conn);
155                             closeConnector(conn);
156                         }
157                         catch (Exception JavaDoc e) {} // ignore. the connector is already being deactivated
158
}
159                 }
160             }
161         }
162
163         if (log.isDebugEnabled()) {
164             log.debug("Exit: JMSConnectorManager::closeAllConnectors");
165         }
166     }
167
168     /**
169      * Closes JMS connectors that match the specified endpoint address
170      */

171     void closeMatchingJMSConnectors(HashMap JavaDoc connectorProps, HashMap JavaDoc cfProps,
172                                     String JavaDoc username, String JavaDoc password,
173                                     JMSVendorAdapter vendorAdapter)
174     {
175         if (log.isDebugEnabled()) {
176             log.debug("Enter: JMSConnectorManager::closeMatchingJMSConnectors");
177         }
178
179         try
180         {
181             String JavaDoc vendorId = vendorAdapter.getVendorId();
182
183             // get the vendor-specific pool of connectors
184
ShareableObjectPool vendorConnectors = null;
185             synchronized (vendorConnectorPools)
186             {
187                 vendorConnectors = getVendorPool(vendorId);
188             }
189
190             // it's possible that there is no pool for that vendor
191
if (vendorConnectors == null)
192                 return;
193
194             synchronized (vendorConnectors)
195             {
196                 // close any matched connectors
197
JMSConnector connector = null;
198                 while ((vendorConnectors.size() > 0) &&
199                        (connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
200                                                                        connectorProps,
201                                                                        cfProps,
202                                                                        username,
203                                                                        password,
204                                                                        vendorAdapter)) != null)
205                 {
206                     closeConnector(connector);
207                 }
208             }
209         }
210         catch (Exception JavaDoc e)
211         {
212             log.warn(Messages.getMessage("failedJMSConnectorShutdown"), e);
213         }
214
215         if (log.isDebugEnabled()) {
216             log.debug("Exit: JMSConnectorManager::closeMatchingJMSConnectors");
217         }
218     }
219
220     private void closeConnector(JMSConnector conn)
221     {
222         conn.stop();
223         conn.shutdown();
224     }
225
226     /**
227      * Adds a JMSConnector to the appropriate vendor pool
228      */

229     public void addConnectorToPool(JMSConnector conn)
230     {
231         if (log.isDebugEnabled()) {
232             log.debug("Enter: JMSConnectorManager::addConnectorToPool");
233         }
234
235         ShareableObjectPool vendorConnectors = null;
236         synchronized (vendorConnectorPools)
237         {
238             String JavaDoc vendorId = conn.getVendorAdapter().getVendorId();
239             vendorConnectors = getVendorPool(vendorId);
240             // it's possible the pool does not yet exist (if, for example, the connector
241
// is created before invoking the call/JMSTransport, as is the case with
242
// SimpleJMSListener)
243
if (vendorConnectors == null)
244             {
245                 vendorConnectors = new ShareableObjectPool();
246                 vendorConnectorPools.put(vendorId, vendorConnectors);
247             }
248         }
249
250         synchronized (vendorConnectors)
251         {
252             vendorConnectors.addObject(conn);
253         }
254
255         if (log.isDebugEnabled()) {
256             log.debug("Exit: JMSConnectorManager::addConnectorToPool");
257         }
258     }
259
260     /**
261      * Removes a JMSConnector from the appropriate vendor pool
262      */

263     public void removeConnectorFromPool(JMSConnector conn)
264     {
265         if (log.isDebugEnabled()) {
266             log.debug("Enter: JMSConnectorManager::removeConnectorFromPool");
267         }
268
269         ShareableObjectPool vendorConnectors = null;
270         synchronized (vendorConnectorPools)
271         {
272             vendorConnectors = getVendorPool(conn.getVendorAdapter().getVendorId());
273         }
274         if (vendorConnectors == null)
275             return;
276
277         synchronized (vendorConnectors)
278         {
279             // first release, to decrement the ref count (it is automatically incremented when
280
// the connector is matched)
281
vendorConnectors.release(conn);
282             vendorConnectors.removeObject(conn);
283         }
284
285         if (log.isDebugEnabled()) {
286             log.debug("Exit: JMSConnectorManager::removeConnectorFromPool");
287         }
288     }
289
290     /**
291      * Performs a non-exclusive checkout of the JMSConnector
292      */

293     public void reserve(JMSConnector connector) throws Exception JavaDoc
294     {
295         ShareableObjectPool pool = null;
296         synchronized (vendorConnectorPools)
297         {
298             pool = getVendorPool(connector.getVendorAdapter().getVendorId());
299         }
300         if (pool != null)
301             pool.reserve(connector);
302     }
303
304     /**
305      * Performs a non-exclusive checkin of the JMSConnector
306      */

307     public void release(JMSConnector connector)
308     {
309         ShareableObjectPool pool = null;
310         synchronized (vendorConnectorPools)
311         {
312             pool = getVendorPool(connector.getVendorAdapter().getVendorId());
313         }
314         if (pool != null)
315             pool.release(connector);
316     }
317
318     /**
319      * A simple non-blocking pool impl for objects that can be shared.
320      * Only a ref count is necessary to prevent collisions at shutdown.
321      * Todo: max size, cleanup stale connections
322      */

323     public class ShareableObjectPool
324     {
325         // maps object to ref count wrapper
326
private java.util.HashMap JavaDoc m_elements;
327
328         // holds objects which should no longer be leased (pending removal)
329
private java.util.HashMap JavaDoc m_expiring;
330
331         private int m_numElements = 0;
332
333         public ShareableObjectPool()
334         {
335             m_elements = new java.util.HashMap JavaDoc();
336             m_expiring = new java.util.HashMap JavaDoc();
337         }
338
339         /**
340          * Adds the object to the pool, if not already added
341          */

342         public void addObject(Object JavaDoc obj)
343         {
344             ReferenceCountedObject ref = new ReferenceCountedObject(obj);
345             synchronized (m_elements)
346             {
347                 if (!m_elements.containsKey(obj) && !m_expiring.containsKey(obj))
348                     m_elements.put(obj, ref);
349             }
350         }
351
352         /**
353          * Removes the object from the pool. If the object is reserved,
354          * waits the specified time before forcibly removing
355          * Todo: check expirations with the next request instead of holding up the current request
356          */

357         public void removeObject(Object JavaDoc obj, long waitTime)
358         {
359             ReferenceCountedObject ref = null;
360             synchronized (m_elements)
361             {
362                 ref = (ReferenceCountedObject)m_elements.get(obj);
363                 if (ref == null)
364                     return;
365
366                 m_elements.remove(obj);
367
368                 if (ref.count() == 0)
369                     return;
370                 else
371                     // mark the object for expiration
372
m_expiring.put(obj, ref);
373             }
374
375             // connector is now marked for expiration. wait for the ref count to drop to zero
376
long expiration = System.currentTimeMillis() + waitTime;
377             while (ref.count() > 0)
378             {
379                 try
380                 {
381                     Thread.sleep(5000);
382                 }
383                 catch (InterruptedException JavaDoc e) {} // ignore
384
if (System.currentTimeMillis() > expiration)
385                     break;
386             }
387
388             // also clear from the expiring list
389
m_expiring.remove(obj);
390         }
391
392         public void removeObject(Object JavaDoc obj)
393         {
394             removeObject(obj, DEFAULT_WAIT_FOR_SHUTDOWN);
395         }
396
397         /**
398          * Marks the connector as in use by incrementing the connector's reference count
399          */

400         public void reserve(Object JavaDoc obj) throws Exception JavaDoc
401         {
402             synchronized (m_elements)
403             {
404                 if (m_expiring.containsKey(obj))
405                     throw new Exception JavaDoc("resourceUnavailable");
406
407                 ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj);
408                 ref.increment();
409             }
410         }
411
412         /**
413          * Decrements the connector's reference count
414          */

415         public void release(Object JavaDoc obj)
416         {
417             synchronized (m_elements)
418             {
419                 ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj);
420                 ref.decrement();
421             }
422         }
423
424         public synchronized java.util.Set JavaDoc getElements()
425         {
426             return m_elements.keySet();
427         }
428
429         public synchronized int size()
430         {
431             return m_elements.size();
432         }
433
434         /**
435          * Wrapper to track the use count of an object
436          */

437         public class ReferenceCountedObject
438         {
439             private Object JavaDoc m_object;
440             private int m_refCount;
441
442             public ReferenceCountedObject(Object JavaDoc obj)
443             {
444                 m_object = obj;
445                 m_refCount = 0;
446             }
447
448             public synchronized void increment()
449             {
450                 m_refCount++;
451             }
452
453             public synchronized void decrement()
454             {
455                 if (m_refCount > 0)
456                     m_refCount--;
457             }
458
459             public synchronized int count()
460             {
461                 return m_refCount;
462             }
463         }
464     }
465 }
Popular Tags