1 18 package org.apache.activemq.broker.ft; 19 20 import java.io.File ; 21 import java.net.URISyntaxException ; 22 import org.apache.activemq.ActiveMQConnectionFactory; 23 import org.apache.activemq.JmsTopicTransactionTest; 24 import org.apache.activemq.broker.BrokerService; 25 import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; 26 import org.apache.activemq.test.JmsResourceProvider; 27 31 public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest{ 32 protected BrokerService slave; 33 protected int inflightMessageCount=0; 34 protected int failureCount=50; 35 protected String uriString="failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false"; 36 37 protected void setUp() throws Exception { 38 failureCount=super.batchCount/2; 39 broker=createBroker(); 41 broker.start(); 42 KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(); 43 slave = new BrokerService(); 44 slave.setBrokerName("slave"); 45 slave.setPersistenceAdapter(adaptor); 46 slave.addConnector("tcp://localhost:62002"); 47 slave.setDeleteAllMessagesOnStartup(true); 48 slave.setMasterConnectorURI("tcp://localhost:62001"); 49 slave.start(); 50 Thread.sleep(1000); 52 resourceProvider=getJmsResourceProvider(); 53 topic=resourceProvider.isTopic(); 54 resourceProvider.setTransacted(true); 56 connectionFactory=resourceProvider.createConnectionFactory(); 57 reconnect(); 58 } 59 60 protected void tearDown() throws Exception { 61 slave.stop(); 62 slave=null; 63 super.tearDown(); 64 } 65 66 protected BrokerService createBroker() throws Exception ,URISyntaxException { 67 BrokerService broker=new BrokerService(); 68 broker.setBrokerName("master"); 69 KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(); 70 broker.setPersistenceAdapter(adaptor); 71 broker.addConnector("tcp://localhost:62001"); 72 broker.setDeleteAllMessagesOnStartup(true); 73 return broker; 74 } 75 76 protected JmsResourceProvider getJmsResourceProvider(){ 77 JmsResourceProvider p=super.getJmsResourceProvider(); 78 p.setServerUri(uriString); 79 return p; 80 } 81 82 protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { 83 return new ActiveMQConnectionFactory(uriString); 84 } 85 86 protected void messageSent() throws Exception { 87 if(true) 88 return; 89 if(++inflightMessageCount>=failureCount){ 90 inflightMessageCount=0; 91 Thread.sleep(1000); 92 broker.stop(); 93 } 94 } 95 } 96 | Popular Tags |