1 18 19 package org.apache.activemq.soaktest; 20 21 import org.apache.activemq.ActiveMQConnectionFactory; 22 import org.apache.activemq.broker.BrokerService; 23 import org.apache.activemq.tool.Producer; 24 import org.apache.activemq.tool.Consumer; 25 import org.apache.commons.logging.Log; 26 import org.apache.commons.logging.LogFactory; 27 28 import javax.jms.*; 29 30 import junit.framework.TestCase; 31 32 import java.io.File ; 33 34 35 public class SoakTestSupport extends TestCase{ 36 37 private static final Log log = LogFactory.getLog(SoakTestSupport.class); 38 protected BrokerService broker; 39 protected String brokerURL = "tcp://localhost:61616"; 40 protected int consumerCount = 1; 41 protected int producerCount = 1; 42 protected int messageSize = 1024; 43 protected int messageCount = 1000; 44 45 protected Producer[] producers; 46 protected Consumer[] consumers; 47 protected String destinationName = "TOOL.DEFAULT"; 48 protected Message payload; 49 50 protected ConnectionFactory connectionFactory; 51 protected Destination destination; 52 protected boolean createConnectionPerClient = true; 53 protected boolean topic = false; 54 protected boolean transacted = false; 55 protected boolean durable = true; 56 protected boolean useEmbeddedBroker = true; 57 protected boolean keepOnRunning = true; 58 protected int duration = 0; protected boolean useConsumerListener = true; 60 protected Consumer allMessagesList = new Consumer(); 61 private String dataFileRoot = "activemq-data"; 62 63 64 protected void setUp() throws Exception { 65 File dataFile = new File (dataFileRoot); 67 recursiveDelete(dataFile); 68 69 if (useEmbeddedBroker) { 70 if (broker == null) { 71 broker = createBroker(); 72 } 73 } 74 75 connectionFactory = createConnectionFactory(); 76 Connection con = connectionFactory.createConnection(); 77 Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); 78 79 if (topic) { 80 destination = session.createTopic(destinationName); 81 } else { 82 destination = session.createQueue(destinationName); 83 } 84 85 createPayload(session); 86 87 con.close(); 88 89 } 90 91 92 protected void createPayload(Session session) throws JMSException { 93 94 byte[] array = new byte[messageSize]; 95 for (int i = 0; i < array.length; i++) { 96 array[i] = (byte) i; 97 } 98 99 BytesMessage bystePayload = session.createBytesMessage(); 100 bystePayload.writeBytes(array); 101 payload = (Message) bystePayload; 102 } 103 104 105 protected void createProducers() throws JMSException { 106 producers = new Producer[producerCount]; 107 for (int i = 0; i < producerCount; i++) { 108 producers[i] = new Producer(connectionFactory, destination); 109 if (durable) { 110 producers[i].setDeliveryMode(DeliveryMode.PERSISTENT); 111 } 112 else { 113 producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT); 114 } 115 producers[i].start(); 116 } 117 118 } 119 120 protected void createConsumers() throws JMSException { 121 consumers = new Consumer[consumerCount]; 122 for (int i = 0; i < consumerCount; i++) { 123 consumers[i] = new Consumer(connectionFactory, destination); 124 consumers[i].setParent(allMessagesList); 125 if(useConsumerListener){ 126 consumers[i].start(); 127 } 128 129 130 } 131 } 132 133 protected ActiveMQConnectionFactory createConnectionFactory() throws JMSException { 134 135 if (useEmbeddedBroker) { 136 return new ActiveMQConnectionFactory("vm://localhost"); 137 } else { 138 return new ActiveMQConnectionFactory(brokerURL); 139 } 140 } 141 142 protected BrokerService createBroker() throws Exception { 143 BrokerService broker = new BrokerService(); 144 configureBroker(broker); 145 broker.start(); 146 return broker; 147 } 148 149 protected void configureBroker(BrokerService broker) throws Exception { 150 broker.addConnector("vm://localhost"); 151 broker.setDeleteAllMessagesOnStartup(true); 152 } 153 154 public void startTimer() { 155 156 Thread timer = new Thread (new Runnable () { 157 public void run() { 158 try { 159 160 Thread.sleep(duration * 60 * 1000); 161 keepOnRunning = true; 162 } catch (InterruptedException e) { 163 164 } finally { 165 166 } 167 } 168 }, "TimerThread"); 169 170 171 log.info("Starting timer thread... Duration :" +duration + " minutes"); 172 timer.start(); 173 } 174 175 protected void recursiveDelete(File file) { 176 if( file.isDirectory() ) { 177 File [] files = file.listFiles(); 178 for (int i = 0; i < files.length; i++) { 179 recursiveDelete(files[i]); 180 } 181 } 182 file.delete(); 183 } 184 } 185 | Popular Tags |