KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > tool > AcidTestTool


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.tool;
19
20 import java.io.IOException JavaDoc;
21 import java.io.PrintWriter JavaDoc;
22 import java.util.ArrayList JavaDoc;
23 import java.util.Collections JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.List JavaDoc;
26 import java.util.Random JavaDoc;
27
28 import javax.jms.BytesMessage JavaDoc;
29 import javax.jms.Connection JavaDoc;
30 import javax.jms.DeliveryMode JavaDoc;
31 import javax.jms.Destination JavaDoc;
32 import javax.jms.JMSException JavaDoc;
33 import javax.jms.Message JavaDoc;
34 import javax.jms.MessageConsumer JavaDoc;
35 import javax.jms.MessageProducer JavaDoc;
36 import javax.jms.Session JavaDoc;
37
38 import junit.framework.TestCase;
39
40 import org.apache.activemq.ActiveMQConnectionFactory;
41 import org.apache.activemq.command.ActiveMQQueue;
42
43 import java.util.concurrent.CountDownLatch JavaDoc;
44 import java.util.concurrent.TimeUnit JavaDoc;
45 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
46 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
47
48 /**
49  * @version $Revision$
50  */

51 public class AcidTestTool extends TestCase {
52
53     private Random JavaDoc random = new Random JavaDoc();
54     private byte data[];
55     private int workerCount = 10;
56     private PrintWriter JavaDoc statWriter;
57
58     // Worker configuration.
59
protected int recordSize = 1024;
60     protected int batchSize = 5;
61     protected int workerThinkTime = 500;
62     AtomicBoolean JavaDoc ignoreJMSErrors = new AtomicBoolean JavaDoc(false);
63
64     protected Destination JavaDoc target;
65     private ActiveMQConnectionFactory factory;
66     private Connection JavaDoc connection;
67     
68     AtomicInteger JavaDoc publishedBatches = new AtomicInteger JavaDoc(0);
69     AtomicInteger JavaDoc consumedBatches = new AtomicInteger JavaDoc(0);
70     
71     List JavaDoc errors = Collections.synchronizedList(new ArrayList JavaDoc());
72
73     private interface Worker extends Runnable JavaDoc {
74         public boolean waitForExit(long i) throws InterruptedException JavaDoc;
75     }
76     
77     private final class ProducerWorker implements Worker {
78
79         Session JavaDoc session;
80         private MessageProducer JavaDoc producer;
81         private BytesMessage JavaDoc message;
82         CountDownLatch JavaDoc doneLatch = new CountDownLatch JavaDoc(1);
83         private final String JavaDoc workerId;
84
85         ProducerWorker(Session JavaDoc session, String JavaDoc workerId) throws JMSException JavaDoc {
86             this.session = session;
87             this.workerId = workerId;
88             producer = session.createProducer(target);
89             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
90             message = session.createBytesMessage();
91             message.setStringProperty("workerId", workerId);
92             message.writeBytes(data);
93         }
94
95         public void run() {
96             try {
97                 for( int batchId=0; true; batchId++ ) {
98 // System.out.println("Sending batch: "+workerId+" "+batchId);
99
for( int msgId=0; msgId < batchSize; msgId++ ) {
100                         // Sleep some random amount of time less than workerThinkTime
101
try {
102                             Thread.sleep(random.nextInt(workerThinkTime));
103                         } catch (InterruptedException JavaDoc e1) {
104                             return;
105                         }
106                         
107                         message.setIntProperty("batch-id",batchId);
108                         message.setIntProperty("msg-id",msgId);
109     
110                         
111                         producer.send(message);
112                     }
113                     session.commit();
114                     publishedBatches.incrementAndGet();
115 // System.out.println("Commited send batch: "+workerId+" "+batchId);
116
}
117             } catch (JMSException JavaDoc e) {
118                 if( !ignoreJMSErrors.get() ) {
119                     e.printStackTrace();
120                     errors.add(e);
121                 }
122                 return;
123             } catch (Throwable JavaDoc e) {
124                 e.printStackTrace();
125                 errors.add(e);
126                 return;
127             } finally {
128                 System.out.println("Producer exiting.");
129                 doneLatch.countDown();
130             }
131         }
132
133         public boolean waitForExit(long i) throws InterruptedException JavaDoc {
134             return doneLatch.await(i, TimeUnit.MILLISECONDS);
135         }
136     }
137     
138     private final class ConsumerWorker implements Worker {
139
140         Session JavaDoc session;
141         private MessageConsumer JavaDoc consumer;
142         private final long timeout;
143         CountDownLatch JavaDoc doneLatch = new CountDownLatch JavaDoc(1);
144         private final String JavaDoc workerId;
145         
146         ConsumerWorker(Session JavaDoc session, String JavaDoc workerId, long timeout) throws JMSException JavaDoc {
147             this.session = session;
148             this.workerId = workerId;
149             this.timeout = timeout;
150             consumer = session.createConsumer(target,"workerId='"+workerId+"'");
151         }
152
153         public void run() {
154             
155             try {
156                 int batchId=0;
157                 while( true ) {
158                     for( int msgId=0; msgId < batchSize; msgId++ ) {
159
160                         // Sleep some random amount of time less than workerThinkTime
161
try {
162                             Thread.sleep(random.nextInt(workerThinkTime));
163                         } catch (InterruptedException JavaDoc e1) {
164                             return;
165                         }
166                         
167                         Message JavaDoc message = consumer.receive(timeout);
168                         if( msgId > 0 ) {
169                             assertNotNull(message);
170                             assertEquals(message.getIntProperty("batch-id"), batchId);
171                             assertEquals(message.getIntProperty("msg-id"), msgId);
172                         } else {
173                             if( message==null ) {
174                                 System.out.println("At end of batch an don't have a next batch to process. done.");
175                                 return;
176                             }
177                             assertEquals(msgId, message.getIntProperty("msg-id") );
178                             batchId = message.getIntProperty("batch-id");
179 // System.out.println("Receiving batch: "+workerId+" "+batchId);
180
}
181                         
182                     }
183                     session.commit();
184                     consumedBatches.incrementAndGet();
185 // System.out.println("Commited receive batch: "+workerId+" "+batchId);
186
}
187             } catch (JMSException JavaDoc e) {
188                 if( !ignoreJMSErrors.get() ) {
189                     e.printStackTrace();
190                     errors.add(e);
191                 }
192                 return;
193             } catch (Throwable JavaDoc e) {
194                 e.printStackTrace();
195                 errors.add(e);
196                 return;
197             } finally {
198                 System.out.println("Consumer exiting.");
199                 doneLatch.countDown();
200             }
201         }
202
203         public boolean waitForExit(long i) throws InterruptedException JavaDoc {
204             return doneLatch.await(i, TimeUnit.MILLISECONDS);
205         }
206     }
207     
208     /**
209      * @see junit.framework.TestCase#setUp()
210      */

211     protected void setUp() throws Exception JavaDoc {
212         factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
213         this.target = new ActiveMQQueue(getClass().getName());
214     }
215
216     protected void tearDown() throws Exception JavaDoc {
217         if( connection!=null ) {
218             try { connection.close(); } catch (Throwable JavaDoc ignore) {}
219             connection = null;
220         }
221     }
222     
223     /**
224      * @throws InterruptedException
225      * @throws JMSException
226      * @throws JMSException
227      *
228      */

229     private void reconnect() throws InterruptedException JavaDoc, JMSException JavaDoc {
230         if( connection!=null ) {
231             try { connection.close(); } catch (Throwable JavaDoc ignore) {}
232             connection = null;
233         }
234         
235         long reconnectDelay=1000;
236         JMSException JavaDoc lastError=null;
237         
238         while( connection == null) {
239             if( reconnectDelay > 1000*10 ) {
240                 reconnectDelay = 1000*10;
241             }
242             try {
243                 connection = factory.createConnection();
244                 connection.start();
245             } catch (JMSException JavaDoc e) {
246                 lastError = e;
247                 Thread.sleep(reconnectDelay);
248                 reconnectDelay*=2;
249             }
250         }
251     }
252
253     /**
254      * @throws Throwable
255      * @throws IOException
256      *
257      */

258     public void testAcidTransactions() throws Throwable JavaDoc {
259
260         System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: "
261                 + batchSize + ", Worker Think Time: " + workerThinkTime);
262
263         // Create the record and fill it with some values.
264
data = new byte[recordSize];
265         for (int i = 0; i < data.length; i++) {
266             data[i] = (byte) i;
267         }
268
269         System.out.println("==============================================");
270         System.out.println("===> Start the server now.");
271         System.out.println("==============================================");
272         reconnect();
273         
274         System.out.println("Starting " + workerCount + " Workers...");
275         ArrayList JavaDoc workers = new ArrayList JavaDoc();
276         for( int i=0; i< workerCount; i++ ){
277             String JavaDoc workerId = "worker-"+i;
278             
279             Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 1000*5);
280             workers.add(w);
281             new Thread JavaDoc(w,"Consumer:"+workerId).start();
282
283             w = new ProducerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId);
284             workers.add(w);
285             new Thread JavaDoc(w,"Producer:"+workerId).start();
286         }
287
288         System.out.println("Waiting for "+(workerCount*10)+" batches to be delivered.");
289
290         //
291
// Wait for about 5 batches of messages per worker to be consumed before restart.
292
//
293
while( publishedBatches.get() < workerCount*5) {
294             System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
295             Thread.sleep(1000);
296         }
297         
298         System.out.println("==============================================");
299         System.out.println("===> Server is under load now. Kill it!");
300         System.out.println("==============================================");
301         ignoreJMSErrors.set(true);
302
303         // Wait for all the workers to finish.
304
System.out.println("Waiting for all workers to exit due to server shutdown.");
305         for (Iterator JavaDoc iter = workers.iterator(); iter.hasNext();) {
306             Worker worker = (Worker) iter.next();
307             while( !worker.waitForExit(1000) ) {
308                 System.out.println("==============================================");
309                 System.out.println("===> Server is under load now. Kill it!");
310                 System.out.println("==============================================");
311                 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
312             }
313         }
314         workers.clear();
315         
316         // No errors should have occured so far.
317
if( errors.size()>0 )
318             throw (Throwable JavaDoc) errors.get(0);
319         
320         System.out.println("==============================================");
321         System.out.println("===> Start the server now.");
322         System.out.println("==============================================");
323         reconnect();
324
325         System.out.println("Restarted.");
326         
327         // Validate the all transactions were commited as a uow. Looking for partial commits.
328
for( int i=0; i< workerCount; i++ ){
329             String JavaDoc workerId = "worker-"+i;
330             Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 5*1000);
331             workers.add(w);
332             new Thread JavaDoc(w, "Consumer:"+workerId).start();
333         }
334
335         System.out.println("Waiting for restarted consumers to finish consuming all messages..");
336         for (Iterator JavaDoc iter = workers.iterator(); iter.hasNext();) {
337             Worker worker = (Worker) iter.next();
338             while( !worker.waitForExit(1000*5) ) {
339                 System.out.println("Waiting for restarted consumers to finish consuming all messages..");
340                 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
341             }
342         }
343         workers.clear();
344
345         System.out.println("Workers finished..");
346         System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
347         
348         if( errors.size()>0 )
349             throw (Throwable JavaDoc) errors.get(0);
350         
351     }
352     
353     public static void main(String JavaDoc[] args) {
354         try {
355             AcidTestTool tool = new AcidTestTool();
356             tool.setUp();
357             tool.testAcidTransactions();
358             tool.tearDown();
359         } catch (Throwable JavaDoc e) {
360             System.out.println("Test Failed: "+e.getMessage());
361             e.printStackTrace();
362         }
363     }
364 }
365
Popular Tags