1 17 package org.apache.servicemix.common; 18 19 import javax.jbi.component.Component; 20 import javax.jbi.component.ComponentContext; 21 import javax.jbi.messaging.ExchangeStatus; 22 import javax.jbi.messaging.InOnly; 23 import javax.jbi.messaging.MessageExchange; 24 import javax.jbi.messaging.MessageExchange.Role; 25 import javax.jbi.servicedesc.ServiceEndpoint; 26 import javax.transaction.Status ; 27 import javax.transaction.TransactionManager ; 28 import javax.xml.namespace.QName ; 29 30 import junit.framework.TestCase; 31 32 import org.apache.activemq.broker.BrokerService; 33 import org.apache.geronimo.transaction.context.GeronimoTransactionManager; 34 import org.apache.geronimo.transaction.context.TransactionContextManager; 35 import org.apache.geronimo.transaction.manager.TransactionManagerImpl; 36 import org.apache.geronimo.transaction.manager.XidFactoryImpl; 37 import org.apache.servicemix.client.DefaultServiceMixClient; 38 import org.apache.servicemix.client.ServiceMixClient; 39 import org.apache.servicemix.jbi.container.JBIContainer; 40 import org.apache.servicemix.jbi.jaxp.StringSource; 41 import org.apache.servicemix.jbi.nmr.flow.Flow; 42 import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow; 43 import org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow; 44 45 public class TransactionsTest extends TestCase { 46 47 private JBIContainer jbi; 48 private BrokerService broker; 49 private TransactionManagerImpl exTransactionManager; 50 private TransactionContextManager transactionContextManager; 51 private TransactionManager txManager; 52 private Component component; 53 private ServiceMixClient client; 54 private Exception exceptionToThrow; 55 private boolean exceptionShouldRollback; 56 57 protected void setUp() throws Exception { 58 exceptionToThrow = null; 59 exceptionShouldRollback = false; 60 61 broker = new BrokerService(); 62 broker.setPersistent(false); 63 broker.addConnector("tcp://localhost:61616"); 64 broker.start(); 65 66 exTransactionManager = new TransactionManagerImpl(600, new XidFactoryImpl(), null, null); 67 transactionContextManager = new TransactionContextManager(exTransactionManager, exTransactionManager); 68 txManager = (TransactionManager ) new GeronimoTransactionManager(transactionContextManager); 69 70 JCAFlow jcaFlow = new JCAFlow(); 71 jcaFlow.setTransactionContextManager(transactionContextManager); 72 73 jbi = new JBIContainer(); 74 jbi.setFlows(new Flow[] { new SedaFlow(), jcaFlow }); 75 jbi.setEmbedded(true); 76 jbi.setUseMBeanServer(false); 77 jbi.setTransactionManager(txManager); 78 jbi.setAutoEnlistInTransaction(true); 79 jbi.init(); 80 jbi.start(); 81 component = new TestComponent(); 82 jbi.activateComponent(component, "test"); 83 client = new DefaultServiceMixClient(jbi); 84 } 85 86 protected void tearDown() throws Exception { 87 jbi.shutDown(); 88 broker.stop(); 89 } 90 91 public void testTxOkAsync() throws Exception { 92 txManager.begin(); 93 InOnly me = client.createInOnlyExchange(); 94 me.setService(new QName ("service")); 95 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 96 client.send(me); 97 assertEquals(Status.STATUS_ACTIVE, txManager.getStatus()); 98 assertEquals(ExchangeStatus.ACTIVE, me.getStatus()); 99 txManager.commit(); 100 me = (InOnly) client.receive(1000); 101 assertNotNull(me); 102 assertEquals(ExchangeStatus.DONE, me.getStatus()); 103 } 104 105 public void testTxOkSync() throws Exception { 106 txManager.begin(); 107 InOnly me = client.createInOnlyExchange(); 108 me.setService(new QName ("service")); 109 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 110 boolean ok = client.sendSync(me, 1000); 111 assertTrue(ok); 112 assertEquals(Status.STATUS_ACTIVE, txManager.getStatus()); 113 assertEquals(ExchangeStatus.DONE, me.getStatus()); 114 txManager.commit(); 115 } 116 117 public void testTxExceptionAsync() throws Exception { 118 exceptionToThrow = new Exception ("Business exception"); 119 txManager.begin(); 120 InOnly me = client.createInOnlyExchange(); 121 me.setService(new QName ("service")); 122 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 123 client.send(me); 124 assertEquals(Status.STATUS_ACTIVE, txManager.getStatus()); 125 assertEquals(ExchangeStatus.ACTIVE, me.getStatus()); 126 txManager.commit(); 127 me = (InOnly) client.receive(1000); 128 assertNotNull(me); 129 assertEquals(ExchangeStatus.ERROR, me.getStatus()); 130 } 131 132 public void testTxExceptionSync() throws Exception { 133 exceptionToThrow = new Exception ("Business exception"); 134 txManager.begin(); 135 InOnly me = client.createInOnlyExchange(); 136 me.setService(new QName ("service")); 137 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 138 boolean ok = client.sendSync(me, 1000); 139 assertTrue(ok); 140 assertEquals(Status.STATUS_ACTIVE, txManager.getStatus()); 141 assertEquals(ExchangeStatus.ERROR, me.getStatus()); 142 txManager.commit(); 143 } 144 145 public void testTxExceptionRollbackAsync() throws Exception { 146 exceptionToThrow = new Exception ("Business exception"); 147 exceptionShouldRollback = true; 148 txManager.begin(); 149 InOnly me = client.createInOnlyExchange(); 150 me.setService(new QName ("service")); 151 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 152 client.send(me); 153 assertEquals(Status.STATUS_ACTIVE, txManager.getStatus()); 154 assertEquals(ExchangeStatus.ACTIVE, me.getStatus()); 155 txManager.commit(); 156 me = (InOnly) client.receive(1000); 161 assertNull(me); 162 } 163 164 public void testTxExceptionRollbackSync() throws Exception { 165 exceptionToThrow = new RuntimeException ("Runtime exception"); 166 exceptionShouldRollback = true; 167 txManager.begin(); 168 InOnly me = client.createInOnlyExchange(); 169 me.setService(new QName ("service")); 170 me.getInMessage().setContent(new StringSource("<hello>world</hello>")); 171 boolean ok = client.sendSync(me, 1000); 172 assertTrue(ok); 173 assertEquals(Status.STATUS_MARKED_ROLLBACK, txManager.getStatus()); 174 assertEquals(ExchangeStatus.ERROR, me.getStatus()); 175 txManager.rollback(); 176 } 177 178 private class TestComponent extends BaseComponent { 179 public TestComponent() { 180 super(); 181 } 182 protected BaseLifeCycle createLifeCycle() { 183 return new TestLifeCycle(); 184 } 185 186 protected class TestLifeCycle extends BaseLifeCycle { 187 protected ServiceUnit su; 188 public TestLifeCycle() { 189 super(TestComponent.this); 190 } 191 protected void doInit() throws Exception { 192 super.doInit(); 193 su = new ServiceUnit(); 194 su.setComponent(component); 195 TestEndpoint ep = new TestEndpoint(); 196 ep.setService(new QName ("service")); 197 ep.setEndpoint("endpoint"); 198 ep.setServiceUnit(su); 199 su.addEndpoint(ep); 200 getRegistry().registerServiceUnit(su); 201 } 202 protected void doStart() throws Exception { 203 super.doStart(); 204 su.start(); 205 } 206 protected void doStop() throws Exception { 207 super.doStop(); 208 su.stop(); 209 } 210 protected boolean exceptionShouldRollbackTx(Exception e) { 211 return exceptionShouldRollback; 212 } 213 } 214 215 protected class TestEndpoint extends Endpoint implements ExchangeProcessor { 216 protected ServiceEndpoint activated; 217 public void activate() throws Exception { 218 ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext(); 219 activated = ctx.activateEndpoint(service, endpoint); 220 } 221 public void deactivate() throws Exception { 222 ComponentContext ctx = this.serviceUnit.getComponent().getComponentContext(); 223 ctx.deactivateEndpoint(activated); 224 activated = null; 225 } 226 public ExchangeProcessor getProcessor() { 227 return this; 228 } 229 public Role getRole() { 230 return Role.PROVIDER; 231 } 232 public void process(MessageExchange exchange) throws Exception { 233 if (exceptionToThrow != null) { 234 throw exceptionToThrow; 235 } 236 exchange.setStatus(ExchangeStatus.DONE); 237 getComponentContext().getDeliveryChannel().send(exchange); 238 } 239 public void start() throws Exception { 240 } 241 public void stop() throws Exception { 242 } 243 } 244 } 245 246 247 } 248 | Popular Tags |