1 22 package org.jboss.ejb.plugins.inflow; 23 24 import java.lang.reflect.Method ; 25 26 import javax.resource.ResourceException ; 27 import javax.transaction.Status ; 28 import javax.transaction.Transaction ; 29 import javax.transaction.TransactionManager ; 30 import javax.transaction.xa.XAResource ; 31 32 import org.jboss.ejb.MessageDrivenContainer; 33 import org.jboss.invocation.Invocation; 34 import org.jboss.logging.Logger; 35 import org.jboss.proxy.Interceptor; 36 37 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 38 39 45 public class MessageEndpointInterceptor extends Interceptor 46 { 47 49 50 private static final Logger log = Logger.getLogger(MessageEndpointInterceptor.class); 51 52 53 public static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpoint.Factory"; 54 55 56 public static final String MESSAGE_ENDPOINT_XARESOURCE = "MessageEndpoint.XAResource"; 57 58 60 61 private boolean trace = log.isTraceEnabled(); 62 63 64 private String cachedProxyString = null; 65 66 67 protected SynchronizedBoolean released = new SynchronizedBoolean(false); 68 69 70 protected boolean delivered = false; 71 72 73 protected Thread inUseThread = null; 74 75 76 protected ClassLoader oldClassLoader = null; 77 78 79 protected Transaction transaction = null; 80 81 82 protected Transaction suspended = null; 83 84 85 private JBossMessageEndpointFactory endpointFactory; 86 87 89 91 public MessageEndpointInterceptor() 92 { 93 } 94 95 97 99 public Object invoke(Invocation mi) throws Throwable 100 { 101 if (released.get()) 103 throw new IllegalStateException ("This message endpoint + " + getProxyString(mi) + " has been released"); 104 105 Thread currentThread = Thread.currentThread(); 107 if (inUseThread != null && inUseThread.equals(currentThread) == false) 108 throw new IllegalStateException ("This message endpoint + " + getProxyString(mi) + " is already in use by another thread " + inUseThread); 109 inUseThread = currentThread; 110 111 String method = mi.getMethod().getName(); 112 if (trace) 113 log.trace("MessageEndpoint " + getProxyString(mi) + " in use by " + method + " " + inUseThread); 114 115 if (method.equals("release")) 117 { 118 release(mi); 119 return null; 120 } 121 else if (method.equals("beforeDelivery")) 122 { 123 before(mi); 124 return null; 125 } 126 else if (method.equals("afterDelivery")) 127 { 128 after(mi); 129 return null; 130 } 131 else 132 return delivery(mi); 133 } 134 135 137 139 145 protected void release(Invocation mi) throws Throwable 146 { 147 released.set(true); 149 150 if (trace) 151 log.trace("MessageEndpoint " + getProxyString(mi) + " released"); 152 153 if (oldClassLoader != null) 155 { 156 try 157 { 158 finish("release", mi, false); 159 } 160 catch (Throwable t) 161 { 162 log.warn("Error in release ", t); 163 } 164 } 165 } 166 167 173 protected void before(Invocation mi) throws Throwable 174 { 175 if (oldClassLoader != null) 177 throw new IllegalStateException ("Missing afterDelivery from the previous beforeDelivery for message endpoint " + getProxyString(mi)); 178 179 if (trace) 180 log.trace("MessageEndpoint " + getProxyString(mi) + " released"); 181 182 MessageDrivenContainer container = getContainer(mi); 184 oldClassLoader = GetTCLAction.getContextClassLoader(inUseThread); 185 SetTCLAction.setContextClassLoader(inUseThread, container.getClassLoader()); 186 if (trace) 187 log.trace("MessageEndpoint " + getProxyString(mi) + " set context classloader to " + container.getClassLoader()); 188 189 try 191 { 192 startTransaction("beforeDelivery", mi, container); 193 } 194 catch (Throwable t) 195 { 196 resetContextClassLoader(mi); 197 throw new ResourceException (t); 198 } 199 } 200 201 207 protected void after(Invocation mi) throws Throwable 208 { 209 if (oldClassLoader == null) 211 throw new IllegalStateException ("afterDelivery without a previous beforeDelivery for message endpoint " + getProxyString(mi)); 212 213 try 215 { 216 finish("afterDelivery", mi, true); 217 } 218 catch (Throwable t) 219 { 220 throw new ResourceException (t); 221 } 222 } 223 224 231 protected Object delivery(Invocation mi) throws Throwable 232 { 233 if (delivered) 235 throw new IllegalStateException ("Multiple message delivery between before and after delivery is not allowed for message endpoint " + getProxyString(mi)); 236 237 if (trace) 238 log.trace("MessageEndpoint " + getProxyString(mi) + " delivering"); 239 240 if (oldClassLoader != null) 242 delivered = true; 243 244 245 MessageDrivenContainer container = getContainer(mi); 246 boolean commit = true; 247 try 248 { 249 if (oldClassLoader == null) 251 startTransaction("delivery", mi, container); 252 return getNext().invoke(mi); 253 } 254 catch (Throwable t) 255 { 256 if (trace) 257 log.trace("MessageEndpoint " + getProxyString(mi) + " delivery error", t); 258 if (t instanceof Error || t instanceof RuntimeException ) 259 { 260 if (transaction != null) 261 transaction.setRollbackOnly(); 262 commit = false; 263 } 264 throw t; 265 } 266 finally 267 { 268 if (oldClassLoader == null) 270 { 271 try 272 { 273 endTransaction(mi, commit); 275 } 276 finally 277 { 278 releaseThreadLock(mi); 279 } 280 } 281 } 282 } 283 284 292 protected void finish(String context, Invocation mi, boolean commit) throws Throwable 293 { 294 try 295 { 296 endTransaction(mi, commit); 297 } 298 finally 299 { 300 delivered = false; 302 resetContextClassLoader(mi); 304 releaseThreadLock(mi); 306 } 307 } 308 309 317 protected void startTransaction(String context, Invocation mi, MessageDrivenContainer container) throws Throwable 318 { 319 XAResource resource = (XAResource ) mi.getInvocationContext().getValue(MESSAGE_ENDPOINT_XARESOURCE); 321 322 Method method = null; 323 324 if ("delivery".equals(context)) 326 method = mi.getMethod(); 327 else 329 method = (Method ) mi.getArguments()[0]; 330 331 boolean isTransacted = getMessageEndpointFactory(mi).isDeliveryTransacted(method); 333 334 if (trace) 335 log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " method=" + method + " xaResource=" + resource + " transacted=" + isTransacted); 336 337 TransactionManager tm = container.getTransactionManager(); 339 suspended = tm.suspend(); 340 341 if (trace) 342 log.trace("MessageEndpoint " + getProxyString(mi) + " " + context + " currentTx=" + suspended); 343 344 if (isTransacted) 346 { 347 if (suspended == null) 349 { 350 tm.begin(); 351 transaction = tm.getTransaction(); 352 if (trace) 353 log.trace("MessageEndpoint " + getProxyString(mi) + " started transaction=" + transaction); 354 355 if (resource != null) 357 { 358 transaction.enlistResource(resource); 359 if (trace) 360 log.trace("MessageEndpoint " + getProxyString(mi) + " enlisted=" + resource); 361 } 362 } 363 else 364 { 365 try 367 { 368 tm.resume(suspended); 369 } 370 finally 371 { 372 suspended = null; 373 if (trace) 374 log.trace("MessageEndpoint " + getProxyString(mi) + " transaction=" + suspended + " already active, IGNORED=" + resource); 375 } 376 } 377 } 378 } 379 380 387 protected void endTransaction(Invocation mi, boolean commit) throws Throwable 388 { 389 TransactionManager tm = null; 390 Transaction currentTx = null; 391 try 392 { 393 if (transaction != null) 395 { 396 tm = getContainer(mi).getTransactionManager(); 397 currentTx = tm.getTransaction(); 398 399 if (currentTx != null && currentTx.equals(transaction) == false) 401 { 402 log.warn("Current transaction " + currentTx + " is not the expected transaction."); 403 tm.suspend(); 404 tm.resume(transaction); 405 } 406 else 407 { 408 currentTx = null; 410 } 411 412 if (commit == false || transaction.getStatus() == Status.STATUS_MARKED_ROLLBACK) 414 { 415 if (trace) 416 log.trace("MessageEndpoint " + getProxyString(mi) + " rollback"); 417 tm.rollback(); 418 } 419 else 420 { 421 if (trace) 422 log.trace("MessageEndpoint " + getProxyString(mi) + " commit"); 423 tm.commit(); 424 } 425 } 426 427 if (suspended != null) 429 { 430 try 431 { 432 tm = getContainer(mi).getTransactionManager(); 433 tm.resume(suspended); 434 } 435 finally 436 { 437 suspended = null; 438 } 439 } 440 } 441 finally 442 { 443 if (currentTx != null) 445 { 446 try 447 { 448 tm.resume(currentTx); 449 } 450 catch (Throwable t) 451 { 452 log.warn("MessageEndpoint " + getProxyString(mi) + " failed to resume old transaction " + currentTx); 453 454 } 455 } 456 } 457 } 458 459 464 protected void resetContextClassLoader(Invocation mi) 465 { 466 if (trace) 467 log.trace("MessageEndpoint " + getProxyString(mi) + " reset classloader " + oldClassLoader); 468 SetTCLAction.setContextClassLoader(inUseThread, oldClassLoader); 469 oldClassLoader = null; 470 } 471 472 477 protected void releaseThreadLock(Invocation mi) 478 { 479 if (trace) 480 log.trace("MessageEndpoint " + getProxyString(mi) + " no longer in use by " + inUseThread); 481 inUseThread = null; 482 } 483 484 490 protected String getProxyString(Invocation mi) 491 { 492 if (cachedProxyString == null) 493 cachedProxyString = mi.getInvocationContext().getCacheId().toString(); 494 return cachedProxyString; 495 } 496 497 502 protected JBossMessageEndpointFactory getMessageEndpointFactory(Invocation mi) 503 { 504 if (endpointFactory == null) 505 endpointFactory = (JBossMessageEndpointFactory) mi.getInvocationContext().getValue(MESSAGE_ENDPOINT_FACTORY); 506 return endpointFactory; 507 } 508 509 514 protected MessageDrivenContainer getContainer(Invocation mi) 515 { 516 return getMessageEndpointFactory(mi).getContainer(); 517 } 518 519 521 } 523 | Popular Tags |