1 17 package org.apache.servicemix.jbi.nmr.flow.jca; 18 19 import org.apache.activemq.broker.BrokerService; 20 import org.apache.activemq.xbean.BrokerFactoryBean; 21 import org.apache.geronimo.transaction.ExtendedTransactionManager; 22 import org.apache.geronimo.transaction.context.TransactionContextManager; 23 import org.apache.servicemix.jbi.RuntimeJBIException; 24 import org.apache.servicemix.jbi.container.ActivationSpec; 25 import org.apache.servicemix.jbi.container.JBIContainer; 26 import org.apache.servicemix.jbi.nmr.flow.jca.JCAFlow; 27 import org.apache.servicemix.jbi.resolver.ServiceNameEndpointResolver; 28 import org.apache.servicemix.tck.ReceiverComponent; 29 import org.apache.servicemix.tck.SenderComponent; 30 import org.jencks.factory.GeronimoTransactionManagerFactoryBean; 31 import org.jencks.factory.TransactionContextManagerFactoryBean; 32 import org.jencks.factory.TransactionManagerFactoryBean; 33 import org.springframework.core.io.ClassPathResource; 34 import org.springframework.transaction.TransactionStatus; 35 import org.springframework.transaction.jta.JtaTransactionManager; 36 import org.springframework.transaction.support.TransactionCallback; 37 import org.springframework.transaction.support.TransactionTemplate; 38 39 import javax.jbi.JBIException; 40 import javax.transaction.TransactionManager ; 41 import javax.transaction.UserTransaction ; 42 43 import junit.framework.TestCase; 44 45 48 public class JCAFlowTest extends TestCase { 49 private JBIContainer senderContainer = new JBIContainer(); 50 private JBIContainer receiverContainer = new JBIContainer(); 51 private BrokerService broker; 52 private TransactionTemplate tt; 53 private static final int NUM_MESSAGES = 10; 54 55 58 protected void setUp() throws Exception { 59 super.setUp(); 60 61 TransactionManagerFactoryBean tmcf = new TransactionManagerFactoryBean(); 62 tmcf.afterPropertiesSet(); 63 ExtendedTransactionManager etm = (ExtendedTransactionManager) tmcf.getObject(); 64 TransactionContextManagerFactoryBean tcmfb = new TransactionContextManagerFactoryBean(); 65 tcmfb.setTransactionManager(etm); 66 tcmfb.afterPropertiesSet(); 67 TransactionContextManager tcm = (TransactionContextManager) tcmfb.getObject(); 68 GeronimoTransactionManagerFactoryBean gtmfb = new GeronimoTransactionManagerFactoryBean(); 69 gtmfb.setTransactionContextManager(tcm); 70 gtmfb.afterPropertiesSet(); 71 TransactionManager tm = (TransactionManager) gtmfb.getObject(); 72 tt = new TransactionTemplate(new JtaTransactionManager((UserTransaction ) tm)); 73 74 BrokerFactoryBean bfb = new BrokerFactoryBean(new ClassPathResource("org/apache/servicemix/jbi/nmr/flow/jca/broker.xml")); 75 bfb.afterPropertiesSet(); 76 broker = bfb.getBroker(); 77 broker.start(); 78 79 JCAFlow senderFlow = new JCAFlow(); 80 senderFlow.setJmsURL("tcp://localhost:61216"); 81 senderFlow.setTransactionContextManager(tcm); 82 senderContainer.setTransactionManager(tm); 83 senderContainer.setEmbedded(true); 84 senderContainer.setName("senderContainer"); 85 senderContainer.setFlow(senderFlow); 86 senderContainer.setMonitorInstallationDirectory(false); 87 senderContainer.init(); 88 senderContainer.start(); 89 90 91 JCAFlow receiverFlow = new JCAFlow(); 92 receiverFlow.setJmsURL("tcp://localhost:61216"); 93 receiverFlow.setTransactionContextManager(tcm); 94 receiverContainer.setTransactionManager(tm); 95 receiverContainer.setEmbedded(true); 96 receiverContainer.setName("receiverContainer"); 97 receiverContainer.setFlow(receiverFlow); 98 receiverContainer.setMonitorInstallationDirectory(false); 99 receiverContainer.init(); 100 receiverContainer.start(); 101 } 102 103 protected void tearDown() throws Exception { 104 super.tearDown(); 105 senderContainer.shutDown(); 106 receiverContainer.shutDown(); 107 broker.stop(); 108 } 109 110 public void testInOnly() throws Exception { 111 final SenderComponent sender = new SenderComponent(); 112 final ReceiverComponent receiver = new ReceiverComponent(); 113 sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE)); 114 115 senderContainer.activateComponent(new ActivationSpec("sender", sender)); 116 receiverContainer.activateComponent(new ActivationSpec("receiver", receiver)); 117 118 Thread.sleep(5000); 119 120 sender.sendMessages(NUM_MESSAGES); 121 Thread.sleep(3000); 122 receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES); 123 } 124 125 public void testTxInOnly() throws Exception { 126 final SenderComponent sender = new SenderComponent(); 127 final ReceiverComponent receiver = new ReceiverComponent(); 128 sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE)); 129 130 senderContainer.activateComponent(new ActivationSpec("sender", sender)); 131 receiverContainer.activateComponent(new ActivationSpec("receiver", receiver)); 132 133 Thread.sleep(5000); 134 135 senderContainer.setAutoEnlistInTransaction(true); 136 tt.execute(new TransactionCallback() { 137 public Object doInTransaction(TransactionStatus status) { 138 try { 139 sender.sendMessages(NUM_MESSAGES); 140 } catch (JBIException e) { 141 throw new RuntimeJBIException(e); 142 } 143 return null; 144 } 145 }); 146 Thread.sleep(3000); 147 receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES); 148 } 149 150 public void testClusteredInOnly() throws Exception { 151 final SenderComponent sender = new SenderComponent(); 152 final ReceiverComponent receiver1 = new ReceiverComponent(); 153 final ReceiverComponent receiver2 = new ReceiverComponent(); 154 sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE)); 155 156 senderContainer.activateComponent(new ActivationSpec("sender", sender)); 157 senderContainer.activateComponent(new ActivationSpec("receiver", receiver1)); 158 receiverContainer.activateComponent(new ActivationSpec("receiver", receiver2)); 159 Thread.sleep(1000); 160 161 sender.sendMessages(NUM_MESSAGES); 162 Thread.sleep(3000); 163 assertTrue(receiver1.getMessageList().hasReceivedMessage()); 164 assertTrue(receiver2.getMessageList().hasReceivedMessage()); 165 receiver1.getMessageList().flushMessages(); 166 receiver2.getMessageList().flushMessages(); 167 168 senderContainer.deactivateComponent("receiver"); 169 Thread.sleep(1000); 170 171 sender.sendMessages(NUM_MESSAGES); 172 Thread.sleep(3000); 173 assertFalse(receiver1.getMessageList().hasReceivedMessage()); 174 assertTrue(receiver2.getMessageList().hasReceivedMessage()); 175 receiver1.getMessageList().flushMessages(); 176 receiver2.getMessageList().flushMessages(); 177 178 senderContainer.activateComponent(new ActivationSpec("receiver", receiver1)); 179 receiverContainer.deactivateComponent("receiver"); 180 Thread.sleep(1000); 181 182 sender.sendMessages(NUM_MESSAGES); 183 Thread.sleep(3000); 184 assertTrue(receiver1.getMessageList().hasReceivedMessage()); 185 assertFalse(receiver2.getMessageList().hasReceivedMessage()); 186 receiver1.getMessageList().flushMessages(); 187 receiver2.getMessageList().flushMessages(); 188 } 189 190 } 191 | Popular Tags |