1 18 package org.apache.activemq.tool; 19 20 import java.io.IOException ; 21 import java.io.PrintWriter ; 22 import java.util.ArrayList ; 23 import java.util.Collections ; 24 import java.util.Iterator ; 25 import java.util.List ; 26 import java.util.Random ; 27 28 import javax.jms.BytesMessage ; 29 import javax.jms.Connection ; 30 import javax.jms.DeliveryMode ; 31 import javax.jms.Destination ; 32 import javax.jms.JMSException ; 33 import javax.jms.Message ; 34 import javax.jms.MessageConsumer ; 35 import javax.jms.MessageProducer ; 36 import javax.jms.Session ; 37 38 import junit.framework.TestCase; 39 40 import org.apache.activemq.ActiveMQConnectionFactory; 41 import org.apache.activemq.command.ActiveMQQueue; 42 43 import java.util.concurrent.CountDownLatch ; 44 import java.util.concurrent.TimeUnit ; 45 import java.util.concurrent.atomic.AtomicBoolean ; 46 import java.util.concurrent.atomic.AtomicInteger ; 47 48 51 public class AcidTestTool extends TestCase { 52 53 private Random random = new Random (); 54 private byte data[]; 55 private int workerCount = 10; 56 private PrintWriter statWriter; 57 58 protected int recordSize = 1024; 60 protected int batchSize = 5; 61 protected int workerThinkTime = 500; 62 AtomicBoolean ignoreJMSErrors = new AtomicBoolean (false); 63 64 protected Destination target; 65 private ActiveMQConnectionFactory factory; 66 private Connection connection; 67 68 AtomicInteger publishedBatches = new AtomicInteger (0); 69 AtomicInteger consumedBatches = new AtomicInteger (0); 70 71 List errors = Collections.synchronizedList(new ArrayList ()); 72 73 private interface Worker extends Runnable { 74 public boolean waitForExit(long i) throws InterruptedException ; 75 } 76 77 private final class ProducerWorker implements Worker { 78 79 Session session; 80 private MessageProducer producer; 81 private BytesMessage message; 82 CountDownLatch doneLatch = new CountDownLatch (1); 83 private final String workerId; 84 85 ProducerWorker(Session session, String workerId) throws JMSException { 86 this.session = session; 87 this.workerId = workerId; 88 producer = session.createProducer(target); 89 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 90 message = session.createBytesMessage(); 91 message.setStringProperty("workerId", workerId); 92 message.writeBytes(data); 93 } 94 95 public void run() { 96 try { 97 for( int batchId=0; true; batchId++ ) { 98 for( int msgId=0; msgId < batchSize; msgId++ ) { 100 try { 102 Thread.sleep(random.nextInt(workerThinkTime)); 103 } catch (InterruptedException e1) { 104 return; 105 } 106 107 message.setIntProperty("batch-id",batchId); 108 message.setIntProperty("msg-id",msgId); 109 110 111 producer.send(message); 112 } 113 session.commit(); 114 publishedBatches.incrementAndGet(); 115 } 117 } catch (JMSException e) { 118 if( !ignoreJMSErrors.get() ) { 119 e.printStackTrace(); 120 errors.add(e); 121 } 122 return; 123 } catch (Throwable e) { 124 e.printStackTrace(); 125 errors.add(e); 126 return; 127 } finally { 128 System.out.println("Producer exiting."); 129 doneLatch.countDown(); 130 } 131 } 132 133 public boolean waitForExit(long i) throws InterruptedException { 134 return doneLatch.await(i, TimeUnit.MILLISECONDS); 135 } 136 } 137 138 private final class ConsumerWorker implements Worker { 139 140 Session session; 141 private MessageConsumer consumer; 142 private final long timeout; 143 CountDownLatch doneLatch = new CountDownLatch (1); 144 private final String workerId; 145 146 ConsumerWorker(Session session, String workerId, long timeout) throws JMSException { 147 this.session = session; 148 this.workerId = workerId; 149 this.timeout = timeout; 150 consumer = session.createConsumer(target,"workerId='"+workerId+"'"); 151 } 152 153 public void run() { 154 155 try { 156 int batchId=0; 157 while( true ) { 158 for( int msgId=0; msgId < batchSize; msgId++ ) { 159 160 try { 162 Thread.sleep(random.nextInt(workerThinkTime)); 163 } catch (InterruptedException e1) { 164 return; 165 } 166 167 Message message = consumer.receive(timeout); 168 if( msgId > 0 ) { 169 assertNotNull(message); 170 assertEquals(message.getIntProperty("batch-id"), batchId); 171 assertEquals(message.getIntProperty("msg-id"), msgId); 172 } else { 173 if( message==null ) { 174 System.out.println("At end of batch an don't have a next batch to process. done."); 175 return; 176 } 177 assertEquals(msgId, message.getIntProperty("msg-id") ); 178 batchId = message.getIntProperty("batch-id"); 179 } 181 182 } 183 session.commit(); 184 consumedBatches.incrementAndGet(); 185 } 187 } catch (JMSException e) { 188 if( !ignoreJMSErrors.get() ) { 189 e.printStackTrace(); 190 errors.add(e); 191 } 192 return; 193 } catch (Throwable e) { 194 e.printStackTrace(); 195 errors.add(e); 196 return; 197 } finally { 198 System.out.println("Consumer exiting."); 199 doneLatch.countDown(); 200 } 201 } 202 203 public boolean waitForExit(long i) throws InterruptedException { 204 return doneLatch.await(i, TimeUnit.MILLISECONDS); 205 } 206 } 207 208 211 protected void setUp() throws Exception { 212 factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 213 this.target = new ActiveMQQueue(getClass().getName()); 214 } 215 216 protected void tearDown() throws Exception { 217 if( connection!=null ) { 218 try { connection.close(); } catch (Throwable ignore) {} 219 connection = null; 220 } 221 } 222 223 229 private void reconnect() throws InterruptedException , JMSException { 230 if( connection!=null ) { 231 try { connection.close(); } catch (Throwable ignore) {} 232 connection = null; 233 } 234 235 long reconnectDelay=1000; 236 JMSException lastError=null; 237 238 while( connection == null) { 239 if( reconnectDelay > 1000*10 ) { 240 reconnectDelay = 1000*10; 241 } 242 try { 243 connection = factory.createConnection(); 244 connection.start(); 245 } catch (JMSException e) { 246 lastError = e; 247 Thread.sleep(reconnectDelay); 248 reconnectDelay*=2; 249 } 250 } 251 } 252 253 258 public void testAcidTransactions() throws Throwable { 259 260 System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: " 261 + batchSize + ", Worker Think Time: " + workerThinkTime); 262 263 data = new byte[recordSize]; 265 for (int i = 0; i < data.length; i++) { 266 data[i] = (byte) i; 267 } 268 269 System.out.println("=============================================="); 270 System.out.println("===> Start the server now."); 271 System.out.println("=============================================="); 272 reconnect(); 273 274 System.out.println("Starting " + workerCount + " Workers..."); 275 ArrayList workers = new ArrayList (); 276 for( int i=0; i< workerCount; i++ ){ 277 String workerId = "worker-"+i; 278 279 Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 1000*5); 280 workers.add(w); 281 new Thread (w,"Consumer:"+workerId).start(); 282 283 w = new ProducerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId); 284 workers.add(w); 285 new Thread (w,"Producer:"+workerId).start(); 286 } 287 288 System.out.println("Waiting for "+(workerCount*10)+" batches to be delivered."); 289 290 while( publishedBatches.get() < workerCount*5) { 294 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 295 Thread.sleep(1000); 296 } 297 298 System.out.println("=============================================="); 299 System.out.println("===> Server is under load now. Kill it!"); 300 System.out.println("=============================================="); 301 ignoreJMSErrors.set(true); 302 303 System.out.println("Waiting for all workers to exit due to server shutdown."); 305 for (Iterator iter = workers.iterator(); iter.hasNext();) { 306 Worker worker = (Worker) iter.next(); 307 while( !worker.waitForExit(1000) ) { 308 System.out.println("=============================================="); 309 System.out.println("===> Server is under load now. Kill it!"); 310 System.out.println("=============================================="); 311 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 312 } 313 } 314 workers.clear(); 315 316 if( errors.size()>0 ) 318 throw (Throwable ) errors.get(0); 319 320 System.out.println("=============================================="); 321 System.out.println("===> Start the server now."); 322 System.out.println("=============================================="); 323 reconnect(); 324 325 System.out.println("Restarted."); 326 327 for( int i=0; i< workerCount; i++ ){ 329 String workerId = "worker-"+i; 330 Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 5*1000); 331 workers.add(w); 332 new Thread (w, "Consumer:"+workerId).start(); 333 } 334 335 System.out.println("Waiting for restarted consumers to finish consuming all messages.."); 336 for (Iterator iter = workers.iterator(); iter.hasNext();) { 337 Worker worker = (Worker) iter.next(); 338 while( !worker.waitForExit(1000*5) ) { 339 System.out.println("Waiting for restarted consumers to finish consuming all messages.."); 340 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 341 } 342 } 343 workers.clear(); 344 345 System.out.println("Workers finished.."); 346 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get()); 347 348 if( errors.size()>0 ) 349 throw (Throwable ) errors.get(0); 350 351 } 352 353 public static void main(String [] args) { 354 try { 355 AcidTestTool tool = new AcidTestTool(); 356 tool.setUp(); 357 tool.testAcidTransactions(); 358 tool.tearDown(); 359 } catch (Throwable e) { 360 System.out.println("Test Failed: "+e.getMessage()); 361 e.printStackTrace(); 362 } 363 } 364 } 365 | Popular Tags |