1 17 package org.apache.servicemix.jbi.messaging; 18 19 import java.sql.Connection ; 20 21 import javax.jbi.JBIException; 22 import javax.jbi.messaging.DeliveryChannel; 23 import javax.jbi.messaging.ExchangeStatus; 24 import javax.jbi.messaging.InOnly; 25 import javax.jbi.messaging.InOut; 26 import javax.jbi.messaging.MessageExchange; 27 import javax.jbi.messaging.MessagingException; 28 import javax.jbi.messaging.NormalizedMessage; 29 import javax.resource.spi.ConnectionManager ; 30 import javax.resource.spi.ManagedConnectionFactory ; 31 import javax.sql.DataSource ; 32 import javax.sql.XADataSource ; 33 import javax.transaction.Status ; 34 import javax.transaction.TransactionManager ; 35 import javax.xml.namespace.QName ; 36 37 import junit.framework.TestCase; 38 39 import org.apache.activemq.broker.BrokerService; 40 import org.apache.derby.jdbc.EmbeddedXADataSource; 41 import org.apache.geronimo.connector.outbound.GenericConnectionManager; 42 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.NoPool; 43 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions; 44 import org.apache.geronimo.transaction.context.GeronimoTransactionManager; 45 import org.apache.geronimo.transaction.context.TransactionContextManager; 46 import org.apache.geronimo.transaction.manager.TransactionManagerImpl; 47 import org.apache.geronimo.transaction.manager.XidFactoryImpl; 48 import org.apache.servicemix.MessageExchangeListener; 49 import org.apache.servicemix.client.DefaultServiceMixClient; 50 import org.apache.servicemix.client.ServiceMixClient; 51 import org.apache.servicemix.components.util.ComponentSupport; 52 import org.apache.servicemix.jbi.container.JBIContainer; 53 import org.apache.servicemix.jbi.jaxp.StringSource; 54 import org.apache.servicemix.jbi.nmr.flow.Flow; 55 import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow; 56 import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow; 57 import org.apache.servicemix.store.Store; 58 import org.apache.servicemix.store.jdbc.JdbcStoreFactory; 59 import org.apache.servicemix.tck.ExchangeCompletedListener; 60 import org.tranql.connector.AllExceptionsAreFatalSorter; 61 import org.tranql.connector.jdbc.AbstractXADataSourceMCF; 62 63 public class TransactionsTest extends TestCase { 64 65 public static final long TIMEOUT = 1000; 66 67 private JBIContainer jbi; 68 private BrokerService broker; 69 private TransactionManager tm; 70 private ServiceMixClient client; 71 private DataSource dataSource; 72 private Connection connection; 73 private Store store; 74 private ExchangeCompletedListener listener; 75 76 protected void setUp() throws Exception { 77 78 broker = new BrokerService(); 80 broker.setUseJmx(false); 81 broker.setPersistent(false); 82 broker.addConnector("tcp://localhost:61616"); 83 broker.start(); 84 85 TransactionManagerImpl exTransactionManager = new TransactionManagerImpl(600, new XidFactoryImpl(), null, null); 86 TransactionContextManager transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager); 87 tm = (TransactionManager ) new GeronimoTransactionManager(transactionContextManager); 88 89 ConnectionManager cm = new GenericConnectionManager( 91 new XATransactions(true, true), 92 new NoPool(), 93 false, 94 null, 95 transactionContextManager, 96 "connectionManager", 97 GenericConnectionManager.class.getClassLoader()); 98 ManagedConnectionFactory mcf = new DerbyDataSourceMCF("target/testdb"); 99 dataSource = (DataSource ) mcf.createConnectionFactory(cm); 100 101 connection = dataSource.getConnection(); 102 103 JdbcStoreFactory storeFactory = new JdbcStoreFactory(); 104 storeFactory.setDataSource(dataSource); 105 storeFactory.setTransactional(true); 106 store = storeFactory.open("store"); 107 108 JCAFlow jcaFlow = new JCAFlow(); 109 jcaFlow.setTransactionContextManager(transactionContextManager); 110 111 jbi = new JBIContainer(); 112 jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow }); 113 jbi.setEmbedded(true); 114 jbi.setUseMBeanServer(false); 115 jbi.setCreateMBeanServer(false); 116 jbi.setTransactionManager(tm); 117 jbi.setAutoEnlistInTransaction(true); 118 listener = new ExchangeCompletedListener(); 119 jbi.addListener(listener); 120 jbi.init(); 121 jbi.start(); 122 123 client = new DefaultServiceMixClient(jbi); 124 } 125 126 protected void tearDown() throws Exception { 127 listener.assertExchangeCompleted(); 128 jbi.shutDown(); 129 Thread.sleep(100); 130 broker.stop(); 131 connection.close(); 132 } 133 134 protected InOnly createInOnly() throws Exception { 135 InOnly me = client.createInOnlyExchange(); 136 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 137 me.setService(new QName ("service")); 138 return me; 139 } 140 141 protected InOut createInOut() throws Exception { 142 InOut me = client.createInOutExchange(); 143 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 144 me.setService(new QName ("service")); 145 return me; 146 } 147 148 public void testInOnlyAsyncSendAndListener() throws Exception { 149 jbi.activateComponent(new Listener(false, false), "target"); 150 151 MessageExchange me = createInOnly(); 152 tm.begin(); 153 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 154 client.send(me); 155 assertNull(client.receive(TIMEOUT)); 156 tm.commit(); 157 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 158 159 me = client.receive(TIMEOUT); 160 assertNotNull(me); 161 assertEquals(ExchangeStatus.DONE, me.getStatus()); 162 assertTrue(me.isTransacted()); 163 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 164 165 assertNotNull(store.load(me.getExchangeId())); 166 } 167 168 public void testInOnlyAsyncSendAndListenerWithRollback() throws Exception { 169 jbi.activateComponent(new Listener(false, true), "target"); 170 171 MessageExchange me = createInOnly(); 172 tm.begin(); 173 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 174 client.send(me); 175 assertNull(client.receive(TIMEOUT)); 176 tm.commit(); 177 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 178 179 assertNull(client.receive(TIMEOUT)); 180 181 assertNull(store.load(me.getExchangeId())); 182 } 183 184 public void testInOnlySyncSendAndListener() throws Exception { 185 jbi.activateComponent(new Listener(false, false), "target"); 186 187 MessageExchange me = createInOnly(); 188 tm.begin(); 189 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 190 client.sendSync(me, TIMEOUT); 191 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 192 assertEquals(ExchangeStatus.DONE, me.getStatus()); 193 assertTrue(me.isTransacted()); 194 tm.commit(); 195 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 196 197 assertNotNull(store.load(me.getExchangeId())); 198 } 199 200 public void testInOnlySyncSendAndListenerWithProviderRollback() throws Exception { 201 jbi.activateComponent(new Listener(false, true), "target"); 202 203 MessageExchange me = createInOnly(); 204 tm.begin(); 205 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 206 client.sendSync(me, TIMEOUT); 207 assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus()); 208 assertEquals(ExchangeStatus.DONE, me.getStatus()); 209 assertTrue(me.isTransacted()); 210 tm.rollback(); 211 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 212 213 assertNull(store.load(me.getExchangeId())); 214 } 215 216 public void testInOnlySyncSendAndListenerWithConsumerRollback() throws Exception { 217 jbi.activateComponent(new Listener(false, false), "target"); 218 219 MessageExchange me = createInOnly(); 220 tm.begin(); 221 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 222 client.sendSync(me, TIMEOUT); 223 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 224 tm.setRollbackOnly(); 225 assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus()); 226 assertEquals(ExchangeStatus.DONE, me.getStatus()); 227 assertTrue(me.isTransacted()); 228 tm.rollback(); 229 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 230 231 assertNull(store.load(me.getExchangeId())); 232 } 233 234 public void testInOnlyAsyncSendAndPoll() throws Exception { 235 jbi.activateComponent(new Async(false, false), "target"); 236 237 MessageExchange me = createInOnly(); 238 tm.begin(); 239 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 240 client.send(me); 241 assertNull(client.receive(TIMEOUT)); 242 tm.commit(); 243 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 244 245 me = client.receive(TIMEOUT); 246 assertNotNull(me); 247 assertEquals(ExchangeStatus.DONE, me.getStatus()); 248 assertTrue(me.isTransacted()); 249 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 250 251 assertNotNull(store.load(me.getExchangeId())); 252 } 253 254 public void testInOnlyAsyncSendAndPollWithRollback() throws Exception { 255 jbi.activateComponent(new Async(false, true), "target"); 256 257 MessageExchange me = createInOnly(); 258 tm.begin(); 259 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 260 client.send(me); 261 assertNull(client.receive(TIMEOUT)); 262 tm.commit(); 263 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 264 265 assertNull(client.receive(TIMEOUT)); 266 267 assertNull(store.load(me.getExchangeId())); 268 } 269 270 public void testInOnlySyncSendAndPoll() throws Exception { 271 jbi.activateComponent(new Async(false, false), "target"); 272 273 MessageExchange me = createInOnly(); 274 tm.begin(); 275 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 276 client.sendSync(me, TIMEOUT); 277 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 278 assertEquals(ExchangeStatus.DONE, me.getStatus()); 279 assertTrue(me.isTransacted()); 280 tm.commit(); 281 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 282 283 assertNotNull(store.load(me.getExchangeId())); 284 } 285 286 public void testInOnlySyncSendAndPollWithProviderRollback() throws Exception { 287 jbi.activateComponent(new Async(false, true), "target"); 288 289 MessageExchange me = createInOnly(); 290 tm.begin(); 291 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 292 client.sendSync(me, TIMEOUT); 293 assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus()); 294 assertEquals(ExchangeStatus.DONE, me.getStatus()); 295 assertTrue(me.isTransacted()); 296 tm.rollback(); 297 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 298 299 assertNull(store.load(me.getExchangeId())); 300 } 301 302 public void testInOnlySyncSendAndPollWithConsumerRollback() throws Exception { 303 jbi.activateComponent(new Async(false, false), "target"); 304 305 MessageExchange me = createInOnly(); 306 tm.begin(); 307 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 308 client.sendSync(me, TIMEOUT); 309 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 310 tm.setRollbackOnly(); 311 assertEquals(Status.STATUS_MARKED_ROLLBACK, tm.getStatus()); 312 assertEquals(ExchangeStatus.DONE, me.getStatus()); 313 assertTrue(me.isTransacted()); 314 tm.rollback(); 315 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 316 317 assertNull(store.load(me.getExchangeId())); 318 } 319 320 public void testInOutAsyncSendAndAsyncSendAndListener() throws Exception { 321 jbi.activateComponent(new Listener(false, false), "target"); 322 323 MessageExchange me = createInOut(); 324 tm.begin(); 325 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 326 client.send(me); 327 assertNull(client.receive(TIMEOUT)); 328 tm.commit(); 329 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 330 331 me = client.receive(TIMEOUT); 332 assertNotNull(me); 333 assertEquals(ExchangeStatus.ACTIVE, me.getStatus()); 334 assertTrue(me.isTransacted()); 335 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 336 client.done(me); 337 338 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 339 assertNotNull(store.load(me.getExchangeId())); 340 } 341 342 367 368 389 390 public void testInOutSyncSendAndSyncSendAndListener() throws Exception { 391 jbi.activateComponent(new Listener(true, false), "target"); 392 393 MessageExchange me = createInOut(); 394 tm.begin(); 395 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 396 client.sendSync(me, TIMEOUT); 397 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 398 assertEquals(ExchangeStatus.ACTIVE, me.getStatus()); 399 assertTrue(me.isTransacted()); 400 client.done(me); 401 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 402 tm.commit(); 403 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 404 405 assertNotNull(store.load(me.getExchangeId())); 406 } 407 408 public void testInOutAsyncSendAndAsyncSendAndPoll() throws Exception { 409 jbi.activateComponent(new Async(false, false), "target"); 410 411 MessageExchange me = createInOut(); 412 tm.begin(); 413 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 414 client.send(me); 415 assertNull(client.receive(TIMEOUT)); 416 tm.commit(); 417 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 418 419 me = client.receive(TIMEOUT); 420 assertNotNull(me); 421 assertEquals(ExchangeStatus.ACTIVE, me.getStatus()); 422 assertTrue(me.isTransacted()); 423 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 424 client.done(me); 425 426 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 427 assertNotNull(store.load(me.getExchangeId())); 428 } 429 430 public void testInOutSyncSendAndSyncSendAndPoll() throws Exception { 431 jbi.activateComponent(new Async(true, false), "target"); 432 433 MessageExchange me = createInOut(); 434 tm.begin(); 435 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 436 client.sendSync(me, TIMEOUT); 437 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 438 assertEquals(ExchangeStatus.ACTIVE, me.getStatus()); 439 assertTrue(me.isTransacted()); 440 client.done(me); 441 assertEquals(Status.STATUS_ACTIVE, tm.getStatus()); 442 tm.commit(); 443 assertEquals(Status.STATUS_NO_TRANSACTION, tm.getStatus()); 444 445 assertNotNull(store.load(me.getExchangeId())); 446 } 447 448 protected class Async extends ComponentSupport implements Runnable { 449 private boolean sync; 450 private boolean rollback; 451 private Thread runner; 452 private boolean running; 453 public Async(boolean sync, boolean rollback) { 454 this.sync = sync; 455 this.rollback = rollback; 456 setService(new QName ("service")); 457 setEndpoint("endpoint"); 458 } 459 public synchronized void start() throws JBIException { 460 if (!running) { 461 running = true; 462 runner = new Thread (this); 463 runner.start(); 464 } 465 } 466 public void run() { 467 while (running) { 468 try { 469 DeliveryChannel deliveryChannel = getContext().getDeliveryChannel(); 470 MessageExchange messageExchange = deliveryChannel.accept(); 471 process(messageExchange); 472 } 473 catch (Exception e) { 474 e.printStackTrace(); 475 } 476 } 477 } 478 public synchronized void stop() throws JBIException { 479 running = false; 480 } 481 protected void process(MessageExchange exchange) throws Exception { 482 if (exchange.getStatus() != ExchangeStatus.ACTIVE) { 483 return; 484 } 485 try { 486 store.store(exchange.getExchangeId(), exchange); 487 } catch (Exception e) { 488 throw new MessagingException(e); 489 } 490 if (rollback) { 491 try { 492 tm.setRollbackOnly(); 493 } catch (Exception e) { 494 throw new MessagingException(e); 495 } 496 } 497 if (exchange instanceof InOnly) { 498 exchange.setStatus(ExchangeStatus.DONE); 499 getDeliveryChannel().send(exchange); 500 } else { 501 NormalizedMessage msg = exchange.createMessage(); 502 msg.setContent(exchange.getMessage("in").getContent()); 503 exchange.setMessage(msg, "out"); 504 if (sync) { 505 getDeliveryChannel().sendSync(exchange); 506 } else { 507 getDeliveryChannel().send(exchange); 508 } 509 } 510 } 511 } 512 513 protected class Listener extends ComponentSupport implements MessageExchangeListener { 514 private boolean sync; 515 private boolean rollback; 516 public Listener(boolean sync, boolean rollback) { 517 this.sync = sync; 518 this.rollback = rollback; 519 setService(new QName ("service")); 520 setEndpoint("endpoint"); 521 } 522 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 523 if (exchange.getStatus() != ExchangeStatus.ACTIVE) { 524 return; 525 } 526 try { 527 store.store(exchange.getExchangeId(), exchange); 528 } catch (Exception e) { 529 throw new MessagingException(e); 530 } 531 if (rollback) { 532 try { 533 tm.setRollbackOnly(); 534 } catch (Exception e) { 535 throw new MessagingException(e); 536 } 537 } 538 if (exchange instanceof InOnly) { 539 exchange.setStatus(ExchangeStatus.DONE); 540 getDeliveryChannel().send(exchange); 541 } else { 542 NormalizedMessage msg = exchange.createMessage(); 543 msg.setContent(exchange.getMessage("in").getContent()); 544 exchange.setMessage(msg, "out"); 545 if (sync) { 546 getDeliveryChannel().sendSync(exchange, TIMEOUT); 547 } else { 548 getDeliveryChannel().send(exchange); 549 } 550 } 551 } 552 553 } 554 555 public static class DerbyDataSourceMCF extends AbstractXADataSourceMCF { 556 private static final long serialVersionUID = 7971682207810098396L; 557 protected DerbyDataSourceMCF(String dbName) { 558 super(createXADS(dbName), new AllExceptionsAreFatalSorter()); 559 } 560 public String getPassword() { 561 return null; 562 } 563 public String getUserName() { 564 return null; 565 } 566 protected static XADataSource createXADS(String dbName) { 567 EmbeddedXADataSource xads = new EmbeddedXADataSource(); 568 xads.setDatabaseName(dbName); 569 xads.setCreateDatabase("create"); 570 return xads; 571 } 572 } 573 574 } 575 | Popular Tags |