1 10 11 package org.mule.providers.jms.xa; 12 13 import org.apache.commons.logging.Log; 14 import org.apache.commons.logging.LogFactory; 15 16 import javax.jms.Connection ; 17 import javax.jms.ConnectionFactory ; 18 import javax.jms.JMSException ; 19 import javax.jms.MessageConsumer ; 20 import javax.jms.MessageProducer ; 21 import javax.jms.QueueConnection ; 22 import javax.jms.QueueConnectionFactory ; 23 import javax.jms.QueueReceiver ; 24 import javax.jms.QueueSender ; 25 import javax.jms.QueueSession ; 26 import javax.jms.Session ; 27 import javax.jms.TopicConnection ; 28 import javax.jms.TopicConnectionFactory ; 29 import javax.jms.TopicPublisher ; 30 import javax.jms.TopicSession ; 31 import javax.jms.TopicSubscriber ; 32 import javax.jms.XAConnection ; 33 import javax.jms.XAConnectionFactory ; 34 import javax.jms.XAQueueConnection ; 35 import javax.jms.XAQueueConnectionFactory ; 36 import javax.jms.XAQueueSession ; 37 import javax.jms.XASession ; 38 import javax.jms.XATopicConnection ; 39 import javax.jms.XATopicConnectionFactory ; 40 import javax.jms.XATopicSession ; 41 import javax.transaction.Transaction ; 42 import javax.transaction.TransactionManager ; 43 import javax.transaction.xa.XAResource ; 44 45 import java.lang.reflect.InvocationHandler ; 46 import java.lang.reflect.InvocationTargetException ; 47 import java.lang.reflect.Method ; 48 import java.lang.reflect.Proxy ; 49 50 public class ConnectionFactoryWrapper 51 implements ConnectionFactory , QueueConnectionFactory , TopicConnectionFactory 52 { 53 54 protected Object factory; 55 protected TransactionManager tm; 56 protected static Log logger = LogFactory.getLog(ConnectionFactoryWrapper.class); 57 58 public ConnectionFactoryWrapper(Object factory, TransactionManager tm) 59 { 60 this.factory = factory; 61 this.tm = tm; 62 } 63 64 69 public Connection createConnection() throws JMSException 70 { 71 XAConnection xac = ((XAConnectionFactory )factory).createXAConnection(); 72 Connection proxy = (Connection )Proxy.newProxyInstance(Connection .class.getClassLoader(), 73 new Class []{Connection .class}, new ConnectionInvocationHandler(xac)); 74 return proxy; 75 } 76 77 83 public Connection createConnection(String username, String password) throws JMSException 84 { 85 XAConnection xac = ((XAConnectionFactory )factory).createXAConnection(username, password); 86 Connection proxy = (Connection )Proxy.newProxyInstance(Connection .class.getClassLoader(), 87 new Class []{Connection .class}, new ConnectionInvocationHandler(xac)); 88 return proxy; 89 } 90 91 96 public QueueConnection createQueueConnection() throws JMSException 97 { 98 XAQueueConnection xaqc = ((XAQueueConnectionFactory )factory).createXAQueueConnection(); 99 QueueConnection proxy = (QueueConnection )Proxy.newProxyInstance(Connection .class.getClassLoader(), 100 new Class []{QueueConnection .class}, new ConnectionInvocationHandler(xaqc)); 101 return proxy; 102 } 103 104 110 public QueueConnection createQueueConnection(String username, String password) throws JMSException 111 { 112 XAQueueConnection xaqc = ((XAQueueConnectionFactory )factory).createXAQueueConnection(username, 113 password); 114 QueueConnection proxy = (QueueConnection )Proxy.newProxyInstance(Connection .class.getClassLoader(), 115 new Class []{QueueConnection .class}, new ConnectionInvocationHandler(xaqc)); 116 return proxy; 117 } 118 119 124 public TopicConnection createTopicConnection() throws JMSException 125 { 126 XATopicConnection xatc = ((XATopicConnectionFactory )factory).createXATopicConnection(); 127 TopicConnection proxy = (TopicConnection )Proxy.newProxyInstance(Connection .class.getClassLoader(), 128 new Class []{TopicConnection .class}, new ConnectionInvocationHandler(xatc)); 129 return proxy; 130 } 131 132 138 public TopicConnection createTopicConnection(String username, String password) throws JMSException 139 { 140 XATopicConnection xatc = ((XATopicConnectionFactory )factory).createXATopicConnection(username, 141 password); 142 TopicConnection proxy = (TopicConnection )Proxy.newProxyInstance(Connection .class.getClassLoader(), 143 new Class []{TopicConnection .class}, new ConnectionInvocationHandler(xatc)); 144 return proxy; 145 } 146 147 protected class ConnectionInvocationHandler implements InvocationHandler 148 { 149 150 private Object xac; 151 152 public ConnectionInvocationHandler(Object xac) 153 { 154 this.xac = xac; 155 } 156 157 163 public Object invoke(Object proxy, Method method, Object [] args) throws Throwable 164 { 165 if (logger.isDebugEnabled()) 166 { 167 logger.debug("Invoking " + method); 168 } 169 if (method.getName().equals("createSession")) 170 { 171 XASession xas = ((XAConnection )xac).createXASession(); 172 return Proxy.newProxyInstance(Session .class.getClassLoader(), new Class []{Session .class}, 173 new SessionInvocationHandler(xas.getSession(), xas.getXAResource())); 174 } 175 else if (method.getName().equals("createQueueSession")) 176 { 177 XAQueueSession xaqs = ((XAQueueConnection )xac).createXAQueueSession(); 178 return Proxy.newProxyInstance(Session .class.getClassLoader(), 179 new Class []{QueueSession .class}, new SessionInvocationHandler(xaqs.getQueueSession(), 180 xaqs.getXAResource())); 181 } 182 else if (method.getName().equals("createTopicSession")) 183 { 184 XATopicSession xats = ((XATopicConnection )xac).createXATopicSession(); 185 return Proxy.newProxyInstance(Session .class.getClassLoader(), 186 new Class []{TopicSession .class}, new SessionInvocationHandler(xats.getTopicSession(), 187 xats.getXAResource())); 188 } 189 else 190 { 191 return method.invoke(xac, args); 192 } 193 } 194 195 protected class SessionInvocationHandler implements InvocationHandler 196 { 197 198 private Object session; 199 private Object xares; 200 private Transaction tx; 201 202 public SessionInvocationHandler(Object session, Object xares) 203 { 204 this.session = session; 205 this.xares = xares; 206 } 207 208 214 public Object invoke(Object proxy, Method method, Object [] args) throws Throwable 215 { 216 if (logger.isDebugEnabled()) 217 { 218 logger.debug("Invoking " + method); 219 } 220 Object result = method.invoke(session, args); 221 222 if (result instanceof TopicSubscriber ) 223 { 224 result = Proxy.newProxyInstance(Session .class.getClassLoader(), 225 new Class []{TopicSubscriber .class}, new ConsumerProducerInvocationHandler(result)); 226 } 227 else if (result instanceof QueueReceiver ) 228 { 229 result = Proxy.newProxyInstance(Session .class.getClassLoader(), 230 new Class []{QueueReceiver .class}, new ConsumerProducerInvocationHandler(result)); 231 } 232 else if (result instanceof MessageConsumer ) 233 { 234 result = Proxy.newProxyInstance(Session .class.getClassLoader(), 235 new Class []{MessageConsumer .class}, new ConsumerProducerInvocationHandler(result)); 236 } 237 else if (result instanceof TopicPublisher ) 238 { 239 result = Proxy.newProxyInstance(Session .class.getClassLoader(), 240 new Class []{TopicPublisher .class}, new ConsumerProducerInvocationHandler(result)); 241 } 242 else if (result instanceof QueueSender ) 243 { 244 result = Proxy.newProxyInstance(Session .class.getClassLoader(), 245 new Class []{QueueSender .class}, new ConsumerProducerInvocationHandler(result)); 246 } 247 else if (result instanceof MessageProducer ) 248 { 249 result = Proxy.newProxyInstance(Session .class.getClassLoader(), 250 new Class []{MessageProducer .class}, new ConsumerProducerInvocationHandler(result)); 251 } 252 return result; 253 } 254 255 protected void enlist() throws Exception 256 { 257 if (logger.isDebugEnabled()) 258 { 259 logger.debug("Enlistment request: " + this); 260 } 261 if (tx == null && tm != null) 262 { 263 tx = tm.getTransaction(); 264 if (tx != null) 265 { 266 if (logger.isDebugEnabled()) 267 { 268 logger.debug("Enlisting resource in xa transaction: " + xares); 269 } 270 XAResource xares = (XAResource )Proxy.newProxyInstance( 271 XAResource .class.getClassLoader(), new Class []{XAResource .class}, 272 new XAResourceInvocationHandler()); 273 tx.enlistResource(xares); 274 } 275 } 276 } 277 278 protected class XAResourceInvocationHandler implements InvocationHandler 279 { 280 281 287 public Object invoke(Object proxy, Method method, Object [] args) throws Throwable 288 { 289 try 290 { 291 if (logger.isDebugEnabled()) 292 { 293 logger.debug("Invoking " + method); 294 } 295 if (method.getName().equals("end")) 296 { 297 tx = null; 298 } 299 300 311 if (method.getName().equals("equals")) 312 { 313 if (Proxy.isProxyClass(args[0].getClass())) 314 { 315 return new Boolean (args[0].equals(this)); 316 } 317 else 318 { 319 return new Boolean (this.equals(args[0])); 320 } 321 } 322 323 return method.invoke(xares, args); 324 } 325 catch (InvocationTargetException e) 326 { 327 throw e.getCause(); 328 } 329 } 330 331 } 332 333 protected class ConsumerProducerInvocationHandler implements InvocationHandler 334 { 335 336 private Object target; 337 338 public ConsumerProducerInvocationHandler(Object target) 339 { 340 this.target = target; 341 } 342 343 349 public Object invoke(Object proxy, Method method, Object [] args) throws Throwable 350 { 351 if (logger.isDebugEnabled()) 352 { 353 logger.debug("Invoking " + method); 354 } 355 if (!method.getName().equals("close")) 356 { 357 enlist(); 358 } 359 return method.invoke(target, args); 360 } 361 } 362 363 } 364 } 365 366 } 367 | Popular Tags |