1 22 package org.jboss.test.jca.mbean; 23 24 import java.io.Serializable ; 25 import java.util.Collections ; 26 import java.util.HashMap ; 27 import java.util.HashSet ; 28 import java.util.Map ; 29 import java.util.Set ; 30 31 import javax.naming.InitialContext ; 32 import javax.resource.cci.Connection ; 33 import javax.resource.cci.ConnectionFactory ; 34 import javax.sql.DataSource ; 35 import javax.transaction.Synchronization ; 36 import javax.transaction.Transaction ; 37 import javax.transaction.TransactionManager ; 38 39 import org.jboss.logging.Logger; 40 import org.jboss.tm.TxUtils; 41 42 50 public class MTOperation implements Serializable 51 { 52 54 55 private static final long serialVersionUID = 1L; 56 57 58 public static final int TM_GET_STATUS = 0; 59 public static final int TM_BEGIN = 1; 60 public static final int TM_SUSPEND = 2; 61 public static final int TM_RESUME = 3; 62 public static final int TM_COMMIT = 4; 63 public static final int TX_COMMIT = 5; 64 public static final int TX_REGISTER_SYNC = 6; 65 66 public static final int CF_LOOKUP = 10; 67 public static final int CF_BY_TX_LOOKUP = 11; 68 public static final int CF_GET_CONN = 12; 69 public static final int CN_CLOSE_CONN = 13; 70 71 public static final int DS_TEST_LOOKUP = 15; 72 public static final int DS_DEFAULT_LOOKUP = 16; 73 public static final int DS_GET_CONN = 17; 74 public static final int DS_CLOSE_CONN = 18; 75 76 public static final int XX_SLEEP_200 = 20; 77 public static final int XX_SLEEP_RANDOM = 21; 78 public static final int XX_POST_SIGNAL = 22; 79 public static final int XX_WAIT_FOR_SIGNAL = 23; 80 public static final int XX_WAIT_FOR_TX = 24; 81 public static final int XX_WAIT_FOR_CONN = 25; 82 83 84 protected static Logger log; 85 86 87 protected static TransactionManager tm = null; 88 89 90 protected static Map connections = Collections.synchronizedMap(new HashMap ()); 91 92 93 protected static Map transactions = Collections.synchronizedMap(new HashMap ()); 94 95 96 protected static Set signals = Collections.synchronizedSet(new HashSet ()); 97 98 99 protected static ConnectionFactory cf = null; 100 101 102 protected static DataSource ds = null; 103 104 105 protected static boolean testMarkedForExit; 106 107 109 110 protected Integer id; 111 112 113 protected int op; 114 115 116 protected Throwable throwable; 117 118 120 123 public static void init(Logger log) throws Exception 124 { 125 MTOperation.log = log; 126 127 if (getTM().getTransaction() != null) 128 { 129 throw new IllegalStateException ("Invalid thread association " + getTM().getTransaction()); 130 } 131 connections.clear(); 132 transactions.clear(); 133 signals.clear(); 134 135 setTestMarkedForExit(false); 137 } 138 139 142 public static TransactionManager getTM() throws Exception 143 { 144 if (tm == null) 145 { 146 tm = (TransactionManager ) new InitialContext ().lookup("java:/TransactionManager"); 147 } 148 return tm; 149 } 150 151 154 public static void destroy() 155 { 156 connections.clear(); 157 transactions.clear(); 158 signals.clear(); 159 } 160 161 164 public static boolean isTestMarkedForExit() 165 { 166 return testMarkedForExit; 167 } 168 169 172 public static void setTestMarkedForExit(boolean testMarkedForExit) 173 { 174 MTOperation.testMarkedForExit = testMarkedForExit; 175 } 176 177 182 public static void checkTestMarkedForExit() throws Exception 183 { 184 if (testMarkedForExit) 185 { 186 throw new MarkedForExitException(); 187 } 188 } 189 190 193 private static class MarkedForExitException extends Exception 194 { 195 } 197 198 200 public MTOperation(int op) 201 { 202 this(op, 0); 203 } 204 205 public MTOperation(int op, int id) 206 { 207 this.id = new Integer (id); 208 this.op = op; 209 } 210 211 public MTOperation(int op, int id, Throwable throwable) 212 { 213 this.id = new Integer (id); 214 this.op = op; 215 this.throwable = throwable; 216 } 217 218 220 public void perform() throws Exception 221 { 222 Throwable caught = null; 223 try 224 { 225 switch (op) 226 { 227 case TM_GET_STATUS: 228 tmGetStatus(); 229 break; 230 231 case TM_BEGIN: 232 tmBegin(); 233 break; 234 235 case TM_SUSPEND: 236 tmSuspend(); 237 break; 238 239 case TM_RESUME: 240 tmResume(); 241 break; 242 243 case TM_COMMIT: 244 tmCommit(); 245 break; 246 247 case TX_COMMIT: 248 txCommit(); 249 break; 250 251 case TX_REGISTER_SYNC: 252 txRegisterSync(); 253 break; 254 255 case XX_SLEEP_200: 256 xxSleep200(); 257 break; 258 259 case XX_SLEEP_RANDOM: 260 xxSleepRandom(); 261 break; 262 263 case XX_POST_SIGNAL: 264 xxPostSignal(); 265 break; 266 267 case XX_WAIT_FOR_SIGNAL: 268 xxWaitForSignal(); 269 break; 270 271 case XX_WAIT_FOR_TX: 272 xxWaitForTx(); 273 break; 274 275 case XX_WAIT_FOR_CONN: 276 xxWaitForConn(); 277 break; 278 279 case CF_LOOKUP: 280 cfLookup(); 281 break; 282 283 case CF_BY_TX_LOOKUP: 284 cfByTxLookup(); 285 break; 286 287 case DS_TEST_LOOKUP: 288 dsTestLookup(); 289 break; 290 291 case DS_DEFAULT_LOOKUP: 292 dsDefaultLookup(); 293 break; 294 295 case DS_GET_CONN: 296 dsGetConn(); 297 break; 298 299 case DS_CLOSE_CONN: 300 dsCloseConn(); 301 break; 302 303 case CF_GET_CONN: 304 cfGetConn(); 305 break; 306 307 case CN_CLOSE_CONN: 308 cnCloseConn(); 309 break; 310 311 default: 312 throw new IllegalArgumentException ("Invalid operation " + op); 313 } 314 } 315 catch (MarkedForExitException e) 316 { 317 log.info(tid() + "Early exit"); 318 return; 319 } 320 catch (Throwable t) 321 { 322 caught = t; 323 } 324 325 if (throwable != null && caught == null) 327 { 328 setTestMarkedForExit(true); 329 throw new Exception ("Expected throwable ", throwable); 330 } 331 332 if (throwable != null && (throwable.getClass().isAssignableFrom(caught.getClass())) == false) 334 { 335 log.warn("Caught wrong throwable", caught); 336 setTestMarkedForExit(true); 337 throw new Exception ("Expected throwable " + throwable + " caught ", caught); 338 } 339 340 if (throwable == null && caught != null) 342 { 343 log.warn("Caught unexpected throwable", caught); 344 setTestMarkedForExit(true); 345 throw new Exception ("Unexpected throwable ", caught); 346 } 347 } 348 349 public void cfLookup() throws Exception 350 { 351 log.info(tid() + " CF_LOOKUP"); 352 InitialContext ctx = new InitialContext (); 353 cf = (ConnectionFactory )ctx.lookup("java:JBossTestCF"); 354 } 355 356 public void cfByTxLookup() throws Exception 357 { 358 log.info(tid() + " CF_BY_TX_LOOKUP"); 359 InitialContext ctx = new InitialContext (); 360 cf = (ConnectionFactory )ctx.lookup("java:JBossTestCFByTx"); 361 } 362 363 public void cfGetConn() throws Exception 364 { 365 log.info(tid() + " CF_GET_CONN (" + id + ")"); 366 Connection conn = cf.getConnection(); 367 connections.put(id, conn); 368 } 369 370 public void cnCloseConn() throws Exception 371 { 372 log.info(tid() + " CN_CLOSE_CONN (" + id + ")"); 373 Connection conn = (Connection )connections.get(id); 374 conn.close(); 375 } 376 377 public void dsTestLookup() throws Exception 378 { 379 log.info(tid() + " DS_TEST_LOOKUP"); 380 InitialContext ctx = new InitialContext (); 381 ds = (DataSource )ctx.lookup("java:StatementTestsConnectionDS"); 382 383 } 384 385 public void dsDefaultLookup() throws Exception 386 { 387 log.info(tid() + " DS_DEFAULT_LOOKUP"); 388 InitialContext ctx = new InitialContext (); 389 ds = (DataSource )ctx.lookup("java:DefaultDS"); 390 } 391 392 public void dsGetConn() throws Exception 393 { 394 log.info(tid() + " DS_GET_CONN (" + id + ")"); 395 java.sql.Connection conn = ds.getConnection(); 396 connections.put(id, conn); 397 } 398 399 public void dsCloseConn() throws Exception 400 { 401 log.info(tid() + " DS_CLOSE_CONN (" + id + ")"); 402 java.sql.Connection conn = (java.sql.Connection )connections.get(id); 403 conn.close(); 404 } 405 406 public void tmGetStatus() throws Exception 407 { 408 log.info(tid() + " " + TxUtils.getStatusAsString(getTM().getStatus())); 409 } 410 411 public void tmBegin() throws Exception 412 { 413 log.info(tid() + " TM_BEGIN (" + id + ")"); 414 getTM().begin(); 415 Transaction tx = getTM().getTransaction(); 416 synchronized (transactions) 417 { 418 transactions.put(id, tx); 419 transactions.notifyAll(); 420 } 421 } 422 423 public void tmSuspend() throws Exception 424 { 425 log.info(tid() + " TM_SUSPEND (" + id + ")"); 426 Transaction tx = getTM().suspend(); 427 transactions.put(id, tx); 428 } 429 430 public void tmResume() throws Exception 431 { 432 log.info(tid() + " TM_RESUME (" + id + ")"); 433 Transaction tx = (Transaction )transactions.get(id); 434 if (tx == null) 435 { 436 throw new IllegalStateException ("Tx not found:" + id); 437 } 438 else 439 { 440 getTM().resume(tx); 441 } 442 } 443 444 public void tmCommit() throws Exception 445 { 446 log.info(tid() + " TM_COMMIT"); 447 getTM().commit(); 448 } 449 450 public void txCommit() throws Exception 451 { 452 log.info(tid() + " TX_COMMIT (" + id + ")"); 453 Transaction tx = (Transaction )transactions.get(id); 454 if (tx == null) 455 { 456 throw new IllegalStateException ("Tx not found: " + id); 457 } 458 else 459 { 460 tx.commit(); 461 } 462 } 463 464 public void txRegisterSync() throws Exception 465 { 466 log.info(tid() + " TX_REGISTER_SYNC (" + id + ")"); 467 Transaction tx = (Transaction )transactions.get(id); 468 if (tx == null) 469 { 470 throw new IllegalStateException ("Tx not found: " + id); 471 } 472 Synchronization sync = new Synchronization () 473 { 474 public void beforeCompletion() 475 { 476 log.info(tid() + " beforeCompletion() called"); 477 } 478 479 public void afterCompletion(int status) 480 { 481 log.info (tid() + " afterCompletion(" + TxUtils.getStatusAsString(status) + ") called"); 482 } 483 }; 484 tx.registerSynchronization(sync); 485 } 486 487 public void xxWaitForTx() throws Exception 488 { 489 log.info(tid() + " XX_WAIT_FOR_TX (" + id + ")"); 490 491 Transaction tx = (Transaction )transactions.get(id); 492 while (tx == null) 493 { 494 checkTestMarkedForExit(); 495 496 log.info(tid() + " Sleeping for 100 msecs"); 497 498 synchronized (transactions) 499 { 500 try 501 { 502 transactions.wait(100); 503 } 504 catch (InterruptedException ignore) {} 505 } 506 tx = (Transaction )transactions.get(id); 507 } 508 log.info(tid() + " Got it"); 509 } 510 511 public void xxWaitForConn() throws Exception 512 { 513 log.info(tid() + " XX_WAIT_FOR_CONN (" + id + ")"); 514 515 boolean contained = connections.containsKey(id); 516 while (contained == false) 517 { 518 checkTestMarkedForExit(); 519 520 log.info(tid() + " Sleeping for 100 msecs"); 521 522 synchronized (connections) 523 { 524 try 525 { 526 connections.wait(100); 527 } 528 catch (InterruptedException ignore) {} 529 } 530 contained = connections.containsKey(id); 531 } 532 log.info(tid() + " Got it"); 533 } 534 535 public void xxSleep200() throws Exception 536 { 537 log.info(tid() + " XX_SLEEP_200"); 538 Thread.sleep(200); 539 } 540 541 public void xxSleepRandom() throws Exception 542 { 543 long random = Math.round((Math.random() * 100)); 544 log.info(tid() + " XX_SLEEP_RANDOM (" + random + ")"); 545 Thread.sleep(random); 546 } 547 548 public void xxPostSignal() throws Exception 549 { 550 log.info(tid() + " XX_POST_SIGNAL (" + id + ")"); 551 synchronized (signals) 552 { 553 signals.add(id); 554 signals.notifyAll(); 555 } 556 } 557 558 public void xxWaitForSignal() throws Exception 559 { 560 log.info(tid() + " XX_WAIT_FOR_SIGNAL (" + id + ")"); 561 562 boolean posted = signals.contains(id); 563 while (posted == false) 564 { 565 checkTestMarkedForExit(); 566 567 log.info(tid() + " Signal not posted, waiting..."); 568 569 synchronized (signals) 570 { 571 try 572 { 573 signals.wait(100); 574 } 575 catch (InterruptedException ignore) {} 576 } 577 posted = signals.contains(id); 578 } 579 log.info(tid() + " Got it!"); 580 } 581 582 private String tid() 583 { 584 return Thread.currentThread().getName(); 585 } 586 587 } 588 | Popular Tags |