1 22 package org.jboss.test.jca.adapter; 23 24 import java.io.PrintWriter ; 25 import java.util.ArrayList ; 26 import java.util.Collection ; 27 import java.util.HashSet ; 28 import java.util.Iterator ; 29 import java.util.List ; 30 import java.util.Map ; 31 32 import javax.resource.ResourceException ; 33 import javax.resource.spi.ConnectionEvent ; 34 import javax.resource.spi.ConnectionEventListener ; 35 import javax.resource.spi.ConnectionRequestInfo ; 36 import javax.resource.spi.LocalTransaction ; 37 import javax.resource.spi.ManagedConnection ; 38 import javax.resource.spi.ManagedConnectionMetaData ; 39 import javax.resource.spi.ResourceAdapterInternalException ; 40 import javax.security.auth.Subject ; 41 import javax.transaction.xa.XAException ; 42 import javax.transaction.xa.XAResource ; 43 import javax.transaction.xa.Xid ; 44 45 import org.jboss.logging.Logger; 46 import org.jboss.tm.TxUtils; 47 48 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 49 50 57 public class TestManagedConnection implements ManagedConnection , XAResource , LocalTransaction 58 { 59 public static final String STARTED = "STARTED"; 60 public static final String SUSPENDED = "SUSPENDED"; 61 public static final String ENDED = "ENDED"; 62 public static final String PREPARED = "PREPARED"; 63 64 public static final String LOCAL_NONE = "LOCAL_NONE"; 65 public static final String LOCAL_TRANSACTION = "LOCAL_TRANSACTION"; 66 public static final String LOCAL_COMMITTED = "LOCAL_COMMITTED"; 67 public static final String LOCAL_ROLLEDBACK = "LOCAL_ROLLEDBACK"; 68 69 private final int id; 70 71 private Logger log = Logger.getLogger(getClass()); 72 private TestManagedConnectionFactory mcf; 73 private HashSet handles = new HashSet (); 74 private HashSet listeners = new HashSet (); 75 76 private GlobalXID currentXid; 77 78 private SynchronizedBoolean destroyed = new SynchronizedBoolean(false); 79 80 private boolean failInPrepare = false; 81 private boolean failInCommit = false; 82 private static boolean failInStart = false; 83 private static boolean failInEnd = false; 84 85 private static int xaCode; 86 87 private String localState = LOCAL_NONE; 88 89 public static void setFailInStart(boolean fis, int xa) 90 { 91 failInStart = fis; 92 xaCode = xa; 93 } 94 public static void setFailInEnd(boolean fie, int xa) 95 { 96 failInEnd = fie; 97 xaCode = xa; 98 99 } 100 101 public TestManagedConnection (final TestManagedConnectionFactory mcf, final Subject subject, final TestConnectionRequestInfo cri, final int id) 102 { 103 this.mcf = mcf; 104 this.id = id; 105 } 106 107 void setFailInPrepare(final boolean fail, final int xaCode) 108 { 109 this.failInPrepare = fail; 110 this.xaCode = xaCode; 111 } 112 113 void setFailInCommit(final boolean fail, final int xaCode) 114 { 115 this.failInCommit = fail; 116 this.xaCode = xaCode; 117 } 118 119 121 public synchronized void destroy() throws ResourceException 122 { 123 log.info("Destroying connection: " + this); 124 if (destroyed.get()) 125 return; 126 cleanup(); 127 destroyed.set(true); 128 currentXid = null; 129 } 130 131 public synchronized void cleanup() throws ResourceException 132 { 133 log.info("cleanup: " + this +" handles=" + handles); 134 135 checkDestroyedResourceException(); 136 for (Iterator i = handles.iterator(); i.hasNext(); ) 137 { 138 TestConnection c = (TestConnection)i.next(); 139 c.setMc(null); 140 i.remove(); 141 } 142 } 143 144 public synchronized Object getConnection(Subject param1, ConnectionRequestInfo param2) throws ResourceException 145 { 146 log.info("getConnection " + this); 147 148 checkDestroyedResourceException(); 149 150 if (param2 != null && ((TestConnectionRequestInfo) param2).failure.equals("getConnectionResource")) 151 throw new ResourceException (this.toString()); 152 if (param2 != null && ((TestConnectionRequestInfo) param2).failure.equals("getConnectionRuntime")) 153 throw new RuntimeException (this.toString()); 154 TestConnection c = new TestConnection(this); 155 handles.add(c); 156 return c; 157 } 158 159 public synchronized void associateConnection(Object p) throws ResourceException 160 { 161 log.info("associateConnecton " + this + " connection=" + p); 162 163 checkDestroyedResourceException(); 164 165 if (p instanceof TestConnection) 166 { 167 ((TestConnection)p).setMc(this); 168 handles.add(p); 169 } 170 else 171 { 172 throw new ResourceException ("wrong kind of Connection " + p); 173 } 174 } 175 176 public synchronized void addConnectionEventListener(ConnectionEventListener cel) 177 { 178 log.info("addCEL: " + this + " " + cel); 179 listeners.add(cel); 180 } 181 182 public synchronized void removeConnectionEventListener(ConnectionEventListener cel) 183 { 184 log.info("removeCEL: " + this + " " + cel); 185 listeners.remove(cel); 186 } 187 188 public synchronized XAResource getXAResource() throws ResourceException 189 { 190 checkDestroyedResourceException(); 191 return this; 192 } 193 194 public LocalTransaction getLocalTransaction() throws ResourceException 195 { 196 return this; 197 } 198 199 public ManagedConnectionMetaData getMetaData() throws ResourceException 200 { 201 return null; 202 } 203 204 public void setLogWriter(PrintWriter param1) throws ResourceException 205 { 206 } 207 208 public PrintWriter getLogWriter() throws ResourceException 209 { 210 return null; 211 } 212 213 215 public List getListeners() 216 { 217 List result = null; 218 219 synchronized (listeners) 220 { 221 result = new ArrayList (listeners); 222 } 223 224 return result; 225 226 } 227 public void start(Xid xid, int flags) throws XAException 228 { 229 long sleepInStart = mcf.getSleepInStart(); 230 if (flags == TMNOFLAGS && sleepInStart != 0) 231 doSleep(sleepInStart); 232 233 synchronized (this) 234 { 235 if(failInStart) 236 { 237 XAException xaex = new XAException (xaCode + "for" + this); 238 broadcastConnectionError(xaex); 239 throw new XAException (xaCode + "for" + this); 240 } 241 242 GlobalXID gid = new GlobalXID(xid); 243 String flagString = TxUtils.getXAResourceFlagsAsString(flags); 244 log.info("start with xid=" + gid + " flags=" + flagString + " for " + this); 245 checkDestroyedXAException(); 246 Map xids = getXids(); 247 synchronized (xids) 248 { 249 String state = (String ) xids.get(gid); 250 if (state == null && flags != TMNOFLAGS) 251 throw new XAException ("Invalid start state=" + state + " xid=" + gid + " flags=" + flagString + " for " + this); 252 if (state != null && state != SUSPENDED && state != ENDED 253 && (state != STARTED || ((flags & TMJOIN) == 0)) 254 && (state != STARTED || ((flags & TMRESUME) == 0)) 255 ) 256 throw new XAException ("Invalid start state=" + state + " xid=" + gid + " flags=" + flagString + " for " + this); 257 if ((flags & TMJOIN) != 0 && mcf.failJoin) 258 throw new XAException ("Join is not allowed " + state + " xid=" + gid + " flags=" + flagString + " for " + this); 259 xids.put(gid, STARTED); 260 } 261 262 this.currentXid = gid; 263 } 264 } 265 266 public void end(final Xid xid, final int flags) throws XAException 267 { 268 269 if(failInEnd) 270 { 271 XAException xaex = new XAException (xaCode + "for" + this); 272 broadcastConnectionError(xaex); 273 throw new XAException (xaCode + "for" + this); 274 } 275 276 long sleepInEnd = mcf.getSleepInEnd(); 277 if (flags != TMSUCCESS && sleepInEnd != 0) 278 doSleep(sleepInEnd); 279 280 synchronized (this) 281 { 282 GlobalXID gid = new GlobalXID(xid); 283 String flagString = TxUtils.getXAResourceFlagsAsString(flags); 284 log.info("end with xid=" + gid + " flags=" + flagString + " for " + this); 285 Map xids = getXids(); 287 synchronized (xids) 288 { 289 String state = (String ) xids.get(gid); 290 if (state != STARTED && state != SUSPENDED && state != ENDED) 291 throw new XAException ("Invalid end state=" + state + " xid=" + gid + " " + this); 292 if ((flags & TMSUSPEND) == 0) 293 xids.put(gid, ENDED); 294 else 295 xids.put(gid, SUSPENDED); 296 } 297 298 this.currentXid = null; 299 } 300 } 301 302 public synchronized void commit(Xid xid, boolean onePhase) throws XAException 303 { 304 GlobalXID gid = new GlobalXID(xid); 305 log.info("commit with xid=" + gid + " onePhase=" + onePhase + " for " + this); 306 checkDestroyedXAException(); 307 if (failInCommit) 308 throw new XAException (xaCode + " for " + this); 309 Map xids = getXids(); 310 synchronized (xids) 311 { 312 String state = (String ) xids.get(gid); 313 if (onePhase) 314 { 315 if (state != SUSPENDED && state != ENDED) 316 throw new XAException ("Invalid one phase commit state=" + state + " xid=" + gid + " " + this); 317 } 318 else 319 { 320 if (state != PREPARED) 321 throw new XAException ("Invalid two phase commit state=" + state + " xid=" + gid + " " + this); 322 } 323 xids.remove(gid); 324 } 325 } 326 327 public synchronized void rollback(Xid xid) throws XAException 328 { 329 GlobalXID gid = new GlobalXID(xid); 330 log.info("rollback with xid=" + gid + " for " + this); 331 checkDestroyedXAException(); 332 Map xids = getXids(); 333 synchronized (xids) 334 { 335 String state = (String ) xids.get(gid); 336 if (state != SUSPENDED && state != ENDED && state != PREPARED) 337 throw new XAException ("Invalid rollback state=" + state + " xid=" + gid + " " + this); 338 xids.remove(gid); 339 } 340 } 341 342 public synchronized int prepare(Xid xid) throws XAException 343 { 344 GlobalXID gid = new GlobalXID(xid); 345 log.info("prepare with xid=" + gid + " for " + this); 346 checkDestroyedXAException(); 347 Map xids = getXids(); 348 synchronized (xids) 349 { 350 String state = (String ) xids.get(gid); 351 if (state != SUSPENDED && state != ENDED) 352 throw new XAException ("Invalid prepare state=" + state + " xid=" + gid + " " + this); 353 if (failInPrepare) 354 throw new XAException (xaCode + " for " + this); 355 xids.put(gid, PREPARED); 356 return XA_OK; 357 } 358 } 359 360 public synchronized void forget(Xid xid) throws XAException 361 { 362 GlobalXID gid = new GlobalXID(xid); 363 log.info("forget with xid=" + gid + " for " + this); 364 checkDestroyedXAException(); 365 Map xids = getXids(); 366 synchronized (xids) 367 { 368 xids.remove(gid); 369 } 370 } 371 372 public Xid [] recover(int param1) throws XAException 373 { 374 return null; 375 } 376 377 public boolean isSameRM(XAResource xar) throws XAException 378 { 379 if (xar == null || xar instanceof TestManagedConnection == false) 380 return false; 381 TestManagedConnection other = (TestManagedConnection) xar; 382 return (mcf == other.mcf); 383 } 384 385 public int getTransactionTimeout() throws XAException 386 { 387 return 0; 388 } 389 390 public boolean setTransactionTimeout(int param1) throws XAException 391 { 392 return false; 393 } 394 395 public String getLocalState() 396 { 397 return localState; 398 } 399 400 public void begin() throws ResourceException 401 { 402 localState = LOCAL_TRANSACTION; 403 } 404 405 public void sendBegin() throws ResourceException 406 { 407 begin(); 408 ConnectionEvent event = new ConnectionEvent (this, ConnectionEvent.LOCAL_TRANSACTION_STARTED); 409 Collection copy = new ArrayList (listeners); 410 for (Iterator i = copy.iterator(); i.hasNext(); ) 411 { 412 ConnectionEventListener cel = (ConnectionEventListener )i.next(); 413 try 414 { 415 cel.localTransactionStarted(event); 416 } 417 catch (Throwable ignored) 418 { 419 log.warn("Ignored", ignored); 420 } 421 } 422 } 423 424 public void commit() throws ResourceException 425 { 426 localState = LOCAL_COMMITTED; 427 } 428 429 public void sendCommit() throws ResourceException 430 { 431 commit(); 432 433 ConnectionEvent event = new ConnectionEvent (this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED); 434 Collection copy = new ArrayList (listeners); 435 for (Iterator i = copy.iterator(); i.hasNext(); ) 436 { 437 ConnectionEventListener cel = (ConnectionEventListener )i.next(); 438 try 439 { 440 cel.localTransactionCommitted(event); 441 } 442 catch (Throwable ignored) 443 { 444 log.warn("Ignored", ignored); 445 } 446 } 447 } 448 449 public void rollback() throws ResourceException 450 { 451 localState = LOCAL_ROLLEDBACK; 452 } 453 454 public void sendRollback() throws ResourceException 455 { 456 rollback(); 457 458 ConnectionEvent event = new ConnectionEvent (this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK); 459 Collection copy = new ArrayList (listeners); 460 for (Iterator i = copy.iterator(); i.hasNext(); ) 461 { 462 ConnectionEventListener cel = (ConnectionEventListener )i.next(); 463 try 464 { 465 cel.localTransactionRolledback(event); 466 } 467 catch (Throwable ignored) 468 { 469 log.warn("Ignored", ignored); 470 } 471 } 472 } 473 474 synchronized boolean isInTx() 475 { 476 log.info("isInTx: " + this); 477 return currentXid != null; 478 } 479 480 Map getXids() 481 { 482 return mcf.getXids(); 483 } 484 485 void connectionClosed(TestConnection handle) 486 { 487 if (destroyed.get()) 488 return; 489 490 log.info("Connetion closed handle=" + handle + " for " + this); 491 492 ConnectionEvent ce = new ConnectionEvent (this ,ConnectionEvent.CONNECTION_CLOSED); 493 ce.setConnectionHandle(handle); 494 Collection copy = new ArrayList (listeners); 495 for (Iterator i = copy.iterator(); i.hasNext(); ) 496 { 497 log.info("notifying 1 cel connectionClosed"); 498 ConnectionEventListener cel = (ConnectionEventListener )i.next(); 499 try 500 { 501 cel.connectionClosed(ce); 502 } 503 catch (Throwable ignored) 504 { 505 log.warn("Ignored", ignored); 506 } 507 } 508 synchronized (this) 509 { 510 handles.remove(handle); 511 } 512 } 513 514 protected void broadcastConnectionError(Throwable e) 515 { 516 if(destroyed.get()) 517 return; 518 519 Exception ex = null; 520 if (e instanceof Exception ) 521 ex = (Exception ) e; 522 else 523 ex = new ResourceAdapterInternalException ("Unexpected error", e); 524 ConnectionEvent ce = new ConnectionEvent (this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, ex); 525 Collection copy = null; 526 synchronized(listeners) 527 { 528 copy = new ArrayList (listeners); 529 } 530 for (Iterator i = copy.iterator(); i.hasNext(); ) 531 { 532 ConnectionEventListener cel = (ConnectionEventListener )i.next(); 533 try 534 { 535 cel.connectionErrorOccurred(ce); 536 } 537 catch (Throwable t) 538 { 539 } 540 } 541 } 542 543 void connectionError(TestConnection handle, Exception e) 544 { 545 if (destroyed.get()) 546 return; 547 548 log.info("Connetion error handle=" + handle + " for " + this, e); 549 550 ConnectionEvent ce = new ConnectionEvent (this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, e); 551 ce.setConnectionHandle(handle); 552 Collection copy = new ArrayList (listeners); 553 for (Iterator i = copy.iterator(); i.hasNext(); ) 554 { 555 ConnectionEventListener cel = (ConnectionEventListener )i.next(); 556 try 557 { 558 cel.connectionErrorOccurred(ce); 559 } 560 catch (Throwable ignored) 561 { 562 } 563 } 564 } 565 566 void checkDestroyedResourceException() throws ResourceException 567 { 568 if (destroyed.get()) 569 throw new ResourceException ("Already destroyed " + this); 570 } 571 572 void checkDestroyedXAException() throws XAException 573 { 574 if (destroyed.get()) 575 throw new XAException ("Already destroyed " + this); 576 } 577 578 public synchronized String toString() 579 { 580 StringBuffer buffer = new StringBuffer (); 581 buffer.append("TestManagedConnection#").append(id); 582 buffer.append("{"); 583 buffer.append("xid=").append(currentXid); 584 buffer.append(" destroyed=").append(destroyed.get()); 585 buffer.append("}"); 586 return buffer.toString(); 587 } 588 589 public void doSleep(long sleep) 590 { 591 boolean interrupted = false; 592 try 593 { 594 Thread.sleep(sleep); 595 } 596 catch (InterruptedException e) 597 { 598 interrupted = true; 599 } 600 if (interrupted) 601 Thread.currentThread().interrupt(); 602 } 603 604 public class GlobalXID 605 { 606 byte[] gid; 607 int hashCode; 608 String toString; 609 610 public GlobalXID(Xid xid) 611 { 612 gid = xid.getGlobalTransactionId(); 613 614 for (int i = 0; i < gid.length; ++i) 615 hashCode += 37 * gid[i]; 616 toString = new String (gid).trim(); 617 } 618 619 public int hashCode() 620 { 621 return hashCode; 622 } 623 624 public String toString() 625 { 626 return toString; 627 } 628 629 public boolean equals(Object obj) 630 { 631 if (obj == null || obj instanceof GlobalXID == false) 632 return false; 633 GlobalXID other = (GlobalXID) obj; 634 return toString.equals(other.toString); 635 } 636 } 637 } 638 | Popular Tags |