1 22 package org.jboss.ejb3.mdb.inflow; 23 24 import java.lang.reflect.Method ; 25 import java.lang.reflect.InvocationHandler ; 26 27 import javax.resource.ResourceException ; 28 import javax.resource.spi.endpoint.MessageEndpointFactory ; 29 import javax.transaction.Status ; 30 import javax.transaction.Transaction ; 31 import javax.transaction.TransactionManager ; 32 import javax.transaction.xa.XAResource ; 33 34 import org.jboss.aop.joinpoint.Invocation; 35 import org.jboss.aop.joinpoint.MethodInvocation; 36 import org.jboss.aop.MethodInfo; 37 import org.jboss.ejb3.mdb.MessagingContainer; 38 import org.jboss.ejb3.mdb.MDB; 39 import org.jboss.ejb3.tx.TxUtil; 40 import org.jboss.logging.Logger; 41 42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 43 44 48 public class MessageInflowLocalProxy implements InvocationHandler 49 { 50 private static final Logger log = Logger.getLogger(MessageInflowLocalProxy.class); 51 52 53 public static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpoint.Factory"; 54 55 56 public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource"; 57 58 59 private boolean trace = log.isTraceEnabled(); 60 61 62 private String cachedProxyString = null; 63 64 65 protected SynchronizedBoolean released = new SynchronizedBoolean(false); 66 67 68 protected boolean delivered = false; 69 70 71 protected Thread inUseThread = null; 72 73 74 protected ClassLoader oldClassLoader = null; 75 76 77 protected Transaction transaction = null; 78 79 80 protected Transaction suspended = null; 81 82 83 private JBossMessageEndpointFactory endpointFactory; 84 85 private XAResource resource; 86 private MessageEndpointFactory messageEndpointFactory; 87 88 MessagingContainer container; 89 90 protected MessageInflowLocalProxy(MessagingContainer container) 91 { 92 this.container = container; 93 } 94 95 public void setMessageEndpointFactory(MessageEndpointFactory messageEndpointFactory) 96 { 97 this.messageEndpointFactory = messageEndpointFactory; 98 } 99 100 public void setXaResource(XAResource resource) 101 { 102 this.resource = resource; 103 } 104 105 public Object invoke(Object proxy, Method method, Object [] args) 106 throws Throwable 107 { 108 if (released.get()) 110 throw new IllegalStateException ("This message endpoint + " + getProxyString(proxy) + " has been released"); 111 112 Thread currentThread = Thread.currentThread(); 114 if (inUseThread != null && inUseThread.equals(currentThread) == false) 115 throw new IllegalStateException ("This message endpoint + " + getProxyString(proxy) + " is already in use by another thread " + inUseThread); 116 inUseThread = currentThread; 117 118 if (trace) 119 log.trace("MessageEndpoint " + getProxyString(proxy) + " in use by " + method + " " + inUseThread); 120 121 if (method.getName().equals("release")) 123 { 124 release(proxy); 125 return null; 126 } 127 else if (method.getName().equals("beforeDelivery")) 128 { 129 before(proxy, container, method, args); 130 return null; 131 } 132 else if (method.getName().equals("afterDelivery")) 133 { 134 after(proxy); 135 return null; 136 } 137 else 138 return delivery(proxy, container, method, args); 139 } 140 141 public String toString() 142 { 143 return container.getEjbName().toString(); 144 } 145 146 148 154 protected void release(Object proxy) throws Throwable 155 { 156 released.set(true); 158 159 if (trace) 160 log.trace("MessageEndpoint " + getProxyString(proxy) + " released"); 161 162 if (oldClassLoader != null) 164 { 165 try 166 { 167 finish("release", proxy, false); 168 } 169 catch (Throwable t) 170 { 171 log.warn("Error in release ", t); 172 } 173 } 174 } 175 176 182 protected void before(Object proxy, MessagingContainer container, Method method, Object [] args) throws Throwable 183 { 184 if (oldClassLoader != null) 186 throw new IllegalStateException ("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(proxy)); 187 188 if (trace) 189 log.trace("MessageEndpoint " + getProxyString(proxy) + " released"); 190 191 oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread); 193 SetTCLAction.setContextClassLoader(inUseThread, container.getClassloader()); 194 if (trace) 195 log.trace("MessageEndpoint " + getProxyString(proxy) + " set context classloader to " + container.getClassloader()); 196 197 try 199 { 200 MethodInfo methodInfo = container.getMethodInfo((Method )args[0]); 202 boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod()); 203 204 startTransaction("beforeDelivery", proxy, container, method, args, isTransacted); 205 } 206 catch (Throwable t) 207 { 208 resetContextClassLoader(proxy); 209 throw new ResourceException (t); 210 } 211 } 212 213 219 protected void after(Object proxy) throws Throwable 220 { 221 if (oldClassLoader == null) 223 throw new IllegalStateException ("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(proxy)); 224 225 try 227 { 228 finish("afterDelivery", proxy, true); 229 } 230 catch (Throwable t) 231 { 232 throw new ResourceException (t); 233 } 234 } 235 236 243 protected Object delivery(Object proxy, MessagingContainer container, Method method, Object [] args) throws Throwable 244 { 245 if (delivered) 247 throw new IllegalStateException ("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(proxy)); 248 249 if (trace) 250 log.trace("MessageEndpoint " + getProxyString(proxy) + " delivering"); 251 252 if (oldClassLoader != null) 254 delivered = true; 255 256 boolean commit = true; 257 MethodInfo methodInfo = container.getMethodInfo(method); 259 260 try 261 { 262 if (oldClassLoader == null) 264 { 265 boolean isTransacted = messageEndpointFactory.isDeliveryTransacted(methodInfo.getAdvisedMethod()); 266 startTransaction("delivery", proxy, container, method, args, isTransacted); 267 } 268 return container.localInvoke(methodInfo, args); 269 } 270 catch (Throwable t) 271 { 272 if (trace) 273 log.trace("MessageEndpoint " + getProxyString(proxy) + " delivery error", t); 274 if (t instanceof Error || t instanceof RuntimeException ) 275 { 276 if (transaction != null) 277 transaction.setRollbackOnly(); 278 commit = false; 279 } 280 throw t; 281 } 282 finally 283 { 284 if (oldClassLoader == null) 286 { 287 try 288 { 289 endTransaction(proxy, commit); 291 } 292 finally 293 { 294 releaseThreadLock(proxy); 295 } 296 } 297 } 298 } 299 300 308 protected void finish(String context, Object proxy, boolean commit) throws Throwable 309 { 310 try 311 { 312 endTransaction(proxy, commit); 313 } 314 finally 315 { 316 delivered = false; 318 resetContextClassLoader(proxy); 320 releaseThreadLock(proxy); 322 } 323 } 324 325 333 protected void startTransaction(String context, Object proxy, MessagingContainer container, Method m, Object [] args, boolean isTransacted) throws Throwable 334 { 335 Method method; 336 337 if ("delivery".equals(context)) 339 method = m; 340 else 342 method = (Method )args[0]; 343 344 if (trace) 345 log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " method=" + method + " xaResource=" + resource + " transacted=" + isTransacted); 346 347 TransactionManager tm = TxUtil.getTransactionManager(); suspended = tm.suspend(); 350 351 if (trace) 352 log.trace("MessageEndpoint " + getProxyString(proxy) + " " + context + " currentTx=" + suspended); 353 354 if (isTransacted) 356 { 357 if (suspended == null) 359 { 360 tm.begin(); 361 transaction = tm.getTransaction(); 362 if (trace) 363 log.trace("MessageEndpoint " + getProxyString(proxy) + " started transaction=" + transaction); 364 365 if (resource != null) 367 { 368 transaction.enlistResource(resource); 369 if (trace) 370 log.trace("MessageEndpoint " + getProxyString(proxy) + " enlisted=" + resource); 371 } 372 } 373 else 374 { 375 try 377 { 378 tm.resume(suspended); 379 } 380 finally 381 { 382 suspended = null; 383 if (trace) 384 log.trace("MessageEndpoint " + getProxyString(proxy) + " transaction=" + suspended + " already active, IGNORED=" + resource); 385 } 386 } 387 } 388 } 389 390 397 protected void endTransaction(Object proxy, boolean commit) throws Throwable 398 { 399 TransactionManager tm = null; 400 Transaction currentTx = null; 401 try 402 { 403 if (transaction != null) 405 { 406 tm = TxUtil.getTransactionManager(); currentTx = tm.getTransaction(); 408 409 if (currentTx != null && currentTx.equals(transaction) == false) 411 { 412 log.warn("Current transaction " + currentTx + " is not the expected transaction."); 413 tm.suspend(); 414 tm.resume(transaction); 415 } 416 else 417 { 418 currentTx = null; 420 } 421 422 if (commit == false || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK) 424 { 425 if (trace) 426 log.trace("MessageEndpoint " + getProxyString(proxy) + " rollback"); 427 tm.rollback(); 428 } 429 else 430 { 431 if (trace) 432 log.trace("MessageEndpoint " + getProxyString(proxy) + " commit"); 433 tm.commit(); 434 } 435 } 436 437 if (suspended != null) 439 { 440 try 441 { 442 tm = TxUtil.getTransactionManager(); tm.resume(suspended); 444 } 445 finally 446 { 447 suspended = null; 448 } 449 } 450 } 451 finally 452 { 453 if (currentTx != null) 455 { 456 try 457 { 458 tm.resume(currentTx); 459 } 460 catch (Throwable t) 461 { 462 log.warn("MessageEndpoint " + getProxyString(proxy) + " failed to resume old transaction " + currentTx); 463 464 } 465 } 466 } 467 } 468 469 474 protected void resetContextClassLoader(Object proxy) 475 { 476 if (trace) 477 log.trace("MessageEndpoint " + getProxyString(proxy) + " reset classloader " + oldClassLoader); 478 SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader); 479 oldClassLoader = null; 480 } 481 482 487 protected void releaseThreadLock(Object proxy) 488 { 489 if (trace) 490 log.trace("MessageEndpoint " + getProxyString(proxy) + " no longer in use by " + inUseThread); 491 inUseThread = null; 492 } 493 494 500 protected String getProxyString(Object proxy) 501 { 502 if (cachedProxyString == null) 503 cachedProxyString = proxy.toString(); 504 return cachedProxyString; 505 } 506 507 512 protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation invocation) 513 { 514 if (endpointFactory == null) 515 { 516 MethodInvocation mi = (MethodInvocation)invocation; 517 endpointFactory = (JBossMessageEndpointFactory) mi.getResponseAttachment(MESSAGE_ENDPOINT_FACTORY); 518 } 519 return endpointFactory; 520 } 521 522 527 protected MessagingContainer getContainer(Invocation mi) 528 { 529 return getMessageEndpointFactory(mi).getContainer(); 530 } 531 } 532 | Popular Tags |