KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > jms > xa > ConnectionFactoryWrapper


1 /*
2  * $Id: ConnectionFactoryWrapper.java 3798 2006-11-04 04:07:14Z aperepel $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.jms.xa;
12
13 import org.apache.commons.logging.Log;
14 import org.apache.commons.logging.LogFactory;
15
16 import javax.jms.Connection JavaDoc;
17 import javax.jms.ConnectionFactory JavaDoc;
18 import javax.jms.JMSException JavaDoc;
19 import javax.jms.MessageConsumer JavaDoc;
20 import javax.jms.MessageProducer JavaDoc;
21 import javax.jms.QueueConnection JavaDoc;
22 import javax.jms.QueueConnectionFactory JavaDoc;
23 import javax.jms.QueueReceiver JavaDoc;
24 import javax.jms.QueueSender JavaDoc;
25 import javax.jms.QueueSession JavaDoc;
26 import javax.jms.Session JavaDoc;
27 import javax.jms.TopicConnection JavaDoc;
28 import javax.jms.TopicConnectionFactory JavaDoc;
29 import javax.jms.TopicPublisher JavaDoc;
30 import javax.jms.TopicSession JavaDoc;
31 import javax.jms.TopicSubscriber JavaDoc;
32 import javax.jms.XAConnection JavaDoc;
33 import javax.jms.XAConnectionFactory JavaDoc;
34 import javax.jms.XAQueueConnection JavaDoc;
35 import javax.jms.XAQueueConnectionFactory JavaDoc;
36 import javax.jms.XAQueueSession JavaDoc;
37 import javax.jms.XASession JavaDoc;
38 import javax.jms.XATopicConnection JavaDoc;
39 import javax.jms.XATopicConnectionFactory JavaDoc;
40 import javax.jms.XATopicSession JavaDoc;
41 import javax.transaction.Transaction JavaDoc;
42 import javax.transaction.TransactionManager JavaDoc;
43 import javax.transaction.xa.XAResource JavaDoc;
44
45 import java.lang.reflect.InvocationHandler JavaDoc;
46 import java.lang.reflect.InvocationTargetException JavaDoc;
47 import java.lang.reflect.Method JavaDoc;
48 import java.lang.reflect.Proxy JavaDoc;
49
50 public class ConnectionFactoryWrapper
51     implements ConnectionFactory JavaDoc, QueueConnectionFactory JavaDoc, TopicConnectionFactory JavaDoc
52 {
53
54     protected Object JavaDoc factory;
55     protected TransactionManager JavaDoc tm;
56     protected static Log logger = LogFactory.getLog(ConnectionFactoryWrapper.class);
57
58     public ConnectionFactoryWrapper(Object JavaDoc factory, TransactionManager JavaDoc tm)
59     {
60         this.factory = factory;
61         this.tm = tm;
62     }
63
64     /*
65      * (non-Javadoc)
66      *
67      * @see javax.jms.ConnectionFactory#createConnection()
68      */

69     public Connection JavaDoc createConnection() throws JMSException JavaDoc
70     {
71         XAConnection JavaDoc xac = ((XAConnectionFactory JavaDoc)factory).createXAConnection();
72         Connection JavaDoc proxy = (Connection JavaDoc)Proxy.newProxyInstance(Connection JavaDoc.class.getClassLoader(),
73             new Class JavaDoc[]{Connection JavaDoc.class}, new ConnectionInvocationHandler(xac));
74         return proxy;
75     }
76
77     /*
78      * (non-Javadoc)
79      *
80      * @see javax.jms.ConnectionFactory#createConnection(java.lang.String,
81      * java.lang.String)
82      */

83     public Connection JavaDoc createConnection(String JavaDoc username, String JavaDoc password) throws JMSException JavaDoc
84     {
85         XAConnection JavaDoc xac = ((XAConnectionFactory JavaDoc)factory).createXAConnection(username, password);
86         Connection JavaDoc proxy = (Connection JavaDoc)Proxy.newProxyInstance(Connection JavaDoc.class.getClassLoader(),
87             new Class JavaDoc[]{Connection JavaDoc.class}, new ConnectionInvocationHandler(xac));
88         return proxy;
89     }
90
91     /*
92      * (non-Javadoc)
93      *
94      * @see javax.jms.QueueConnectionFactory#createQueueConnection()
95      */

96     public QueueConnection JavaDoc createQueueConnection() throws JMSException JavaDoc
97     {
98         XAQueueConnection JavaDoc xaqc = ((XAQueueConnectionFactory JavaDoc)factory).createXAQueueConnection();
99         QueueConnection JavaDoc proxy = (QueueConnection JavaDoc)Proxy.newProxyInstance(Connection JavaDoc.class.getClassLoader(),
100             new Class JavaDoc[]{QueueConnection JavaDoc.class}, new ConnectionInvocationHandler(xaqc));
101         return proxy;
102     }
103
104     /*
105      * (non-Javadoc)
106      *
107      * @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String,
108      * java.lang.String)
109      */

110     public QueueConnection JavaDoc createQueueConnection(String JavaDoc username, String JavaDoc password) throws JMSException JavaDoc
111     {
112         XAQueueConnection JavaDoc xaqc = ((XAQueueConnectionFactory JavaDoc)factory).createXAQueueConnection(username,
113             password);
114         QueueConnection JavaDoc proxy = (QueueConnection JavaDoc)Proxy.newProxyInstance(Connection JavaDoc.class.getClassLoader(),
115             new Class JavaDoc[]{QueueConnection JavaDoc.class}, new ConnectionInvocationHandler(xaqc));
116         return proxy;
117     }
118
119     /*
120      * (non-Javadoc)
121      *
122      * @see javax.jms.TopicConnectionFactory#createTopicConnection()
123      */

124     public TopicConnection JavaDoc createTopicConnection() throws JMSException JavaDoc
125     {
126         XATopicConnection JavaDoc xatc = ((XATopicConnectionFactory JavaDoc)factory).createXATopicConnection();
127         TopicConnection JavaDoc proxy = (TopicConnection JavaDoc)Proxy.newProxyInstance(Connection JavaDoc.class.getClassLoader(),
128             new Class JavaDoc[]{TopicConnection JavaDoc.class}, new ConnectionInvocationHandler(xatc));
129         return proxy;
130     }
131
132     /*
133      * (non-Javadoc)
134      *
135      * @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String,
136      * java.lang.String)
137      */

138     public TopicConnection JavaDoc createTopicConnection(String JavaDoc username, String JavaDoc password) throws JMSException JavaDoc
139     {
140         XATopicConnection JavaDoc xatc = ((XATopicConnectionFactory JavaDoc)factory).createXATopicConnection(username,
141             password);
142         TopicConnection JavaDoc proxy = (TopicConnection JavaDoc)Proxy.newProxyInstance(Connection JavaDoc.class.getClassLoader(),
143             new Class JavaDoc[]{TopicConnection JavaDoc.class}, new ConnectionInvocationHandler(xatc));
144         return proxy;
145     }
146
147     protected class ConnectionInvocationHandler implements InvocationHandler JavaDoc
148     {
149
150         private Object JavaDoc xac;
151
152         public ConnectionInvocationHandler(Object JavaDoc xac)
153         {
154             this.xac = xac;
155         }
156
157         /*
158          * (non-Javadoc)
159          *
160          * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object,
161          * java.lang.reflect.Method, java.lang.Object[])
162          */

163         public Object JavaDoc invoke(Object JavaDoc proxy, Method JavaDoc method, Object JavaDoc[] args) throws Throwable JavaDoc
164         {
165             if (logger.isDebugEnabled())
166             {
167                 logger.debug("Invoking " + method);
168             }
169             if (method.getName().equals("createSession"))
170             {
171                 XASession JavaDoc xas = ((XAConnection JavaDoc)xac).createXASession();
172                 return Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(), new Class JavaDoc[]{Session JavaDoc.class},
173                     new SessionInvocationHandler(xas.getSession(), xas.getXAResource()));
174             }
175             else if (method.getName().equals("createQueueSession"))
176             {
177                 XAQueueSession JavaDoc xaqs = ((XAQueueConnection JavaDoc)xac).createXAQueueSession();
178                 return Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
179                     new Class JavaDoc[]{QueueSession JavaDoc.class}, new SessionInvocationHandler(xaqs.getQueueSession(),
180                         xaqs.getXAResource()));
181             }
182             else if (method.getName().equals("createTopicSession"))
183             {
184                 XATopicSession JavaDoc xats = ((XATopicConnection JavaDoc)xac).createXATopicSession();
185                 return Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
186                     new Class JavaDoc[]{TopicSession JavaDoc.class}, new SessionInvocationHandler(xats.getTopicSession(),
187                         xats.getXAResource()));
188             }
189             else
190             {
191                 return method.invoke(xac, args);
192             }
193         }
194
195         protected class SessionInvocationHandler implements InvocationHandler JavaDoc
196         {
197
198             private Object JavaDoc session;
199             private Object JavaDoc xares;
200             private Transaction JavaDoc tx;
201
202             public SessionInvocationHandler(Object JavaDoc session, Object JavaDoc xares)
203             {
204                 this.session = session;
205                 this.xares = xares;
206             }
207
208             /*
209              * (non-Javadoc)
210              *
211              * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object,
212              * java.lang.reflect.Method, java.lang.Object[])
213              */

214             public Object JavaDoc invoke(Object JavaDoc proxy, Method JavaDoc method, Object JavaDoc[] args) throws Throwable JavaDoc
215             {
216                 if (logger.isDebugEnabled())
217                 {
218                     logger.debug("Invoking " + method);
219                 }
220                 Object JavaDoc result = method.invoke(session, args);
221
222                 if (result instanceof TopicSubscriber JavaDoc)
223                 {
224                     result = Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
225                         new Class JavaDoc[]{TopicSubscriber JavaDoc.class}, new ConsumerProducerInvocationHandler(result));
226                 }
227                 else if (result instanceof QueueReceiver JavaDoc)
228                 {
229                     result = Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
230                         new Class JavaDoc[]{QueueReceiver JavaDoc.class}, new ConsumerProducerInvocationHandler(result));
231                 }
232                 else if (result instanceof MessageConsumer JavaDoc)
233                 {
234                     result = Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
235                         new Class JavaDoc[]{MessageConsumer JavaDoc.class}, new ConsumerProducerInvocationHandler(result));
236                 }
237                 else if (result instanceof TopicPublisher JavaDoc)
238                 {
239                     result = Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
240                         new Class JavaDoc[]{TopicPublisher JavaDoc.class}, new ConsumerProducerInvocationHandler(result));
241                 }
242                 else if (result instanceof QueueSender JavaDoc)
243                 {
244                     result = Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
245                         new Class JavaDoc[]{QueueSender JavaDoc.class}, new ConsumerProducerInvocationHandler(result));
246                 }
247                 else if (result instanceof MessageProducer JavaDoc)
248                 {
249                     result = Proxy.newProxyInstance(Session JavaDoc.class.getClassLoader(),
250                         new Class JavaDoc[]{MessageProducer JavaDoc.class}, new ConsumerProducerInvocationHandler(result));
251                 }
252                 return result;
253             }
254
255             protected void enlist() throws Exception JavaDoc
256             {
257                 if (logger.isDebugEnabled())
258                 {
259                     logger.debug("Enlistment request: " + this);
260                 }
261                 if (tx == null && tm != null)
262                 {
263                     tx = tm.getTransaction();
264                     if (tx != null)
265                     {
266                         if (logger.isDebugEnabled())
267                         {
268                             logger.debug("Enlisting resource in xa transaction: " + xares);
269                         }
270                         XAResource JavaDoc xares = (XAResource JavaDoc)Proxy.newProxyInstance(
271                             XAResource JavaDoc.class.getClassLoader(), new Class JavaDoc[]{XAResource JavaDoc.class},
272                             new XAResourceInvocationHandler());
273                         tx.enlistResource(xares);
274                     }
275                 }
276             }
277
278             protected class XAResourceInvocationHandler implements InvocationHandler JavaDoc
279             {
280
281                 /*
282                  * (non-Javadoc)
283                  *
284                  * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object,
285                  * java.lang.reflect.Method, java.lang.Object[])
286                  */

287                 public Object JavaDoc invoke(Object JavaDoc proxy, Method JavaDoc method, Object JavaDoc[] args) throws Throwable JavaDoc
288                 {
289                     try
290                     {
291                         if (logger.isDebugEnabled())
292                         {
293                             logger.debug("Invoking " + method);
294                         }
295                         if (method.getName().equals("end"))
296                         {
297                             tx = null;
298                         }
299
300                         /*
301                          * This has been added, since JOTM checks if the resource has
302                          * actually been enlisted & tries to compare two proxy
303                          * classes with eachother. Since the equals method is
304                          * proxied, it will effectivly compare a proxy with the
305                          * ConnectionFactory & this will fail. To solve this, if the
306                          * object passed as a parameter is actually another proxy,
307                          * call equals on the proxy passing this class as a
308                          * parameter, effictively we would be comparing the two
309                          * proxied classes.
310                          */

311                         if (method.getName().equals("equals"))
312                         {
313                             if (Proxy.isProxyClass(args[0].getClass()))
314                             {
315                                 return new Boolean JavaDoc(args[0].equals(this));
316                             }
317                             else
318                             {
319                                 return new Boolean JavaDoc(this.equals(args[0]));
320                             }
321                         }
322
323                         return method.invoke(xares, args);
324                     }
325                     catch (InvocationTargetException JavaDoc e)
326                     {
327                         throw e.getCause();
328                     }
329                 }
330
331             }
332
333             protected class ConsumerProducerInvocationHandler implements InvocationHandler JavaDoc
334             {
335
336                 private Object JavaDoc target;
337
338                 public ConsumerProducerInvocationHandler(Object JavaDoc target)
339                 {
340                     this.target = target;
341                 }
342
343                 /*
344                  * (non-Javadoc)
345                  *
346                  * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object,
347                  * java.lang.reflect.Method, java.lang.Object[])
348                  */

349                 public Object JavaDoc invoke(Object JavaDoc proxy, Method JavaDoc method, Object JavaDoc[] args) throws Throwable JavaDoc
350                 {
351                     if (logger.isDebugEnabled())
352                     {
353                         logger.debug("Invoking " + method);
354                     }
355                     if (!method.getName().equals("close"))
356                     {
357                         enlist();
358                     }
359                     return method.invoke(target, args);
360                 }
361             }
362
363         }
364     }
365
366 }
367
Popular Tags