KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > test > rollback > RollbacksWhileConsumingLargeQueueTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.test.rollback;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.JMSException JavaDoc;
22 import javax.jms.Message JavaDoc;
23 import javax.jms.MessageConsumer JavaDoc;
24 import javax.jms.MessageListener JavaDoc;
25 import javax.jms.Session JavaDoc;
26 import javax.jms.TextMessage JavaDoc;
27
28 import org.apache.activemq.EmbeddedBrokerTestSupport;
29 import org.springframework.jms.core.MessageCreator;
30
31 import java.util.concurrent.CountDownLatch JavaDoc;
32 import java.util.concurrent.TimeUnit JavaDoc;
33 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
34
35 /**
36  * @version $Revision: 475999 $
37  */

38 public class RollbacksWhileConsumingLargeQueueTest extends
39         EmbeddedBrokerTestSupport implements MessageListener JavaDoc {
40
41     protected int numberOfMessagesOnQueue = 6500;
42     private Connection JavaDoc connection;
43     private AtomicInteger JavaDoc deliveryCounter = new AtomicInteger JavaDoc(0);
44     private AtomicInteger JavaDoc ackCounter = new AtomicInteger JavaDoc(0);
45     private CountDownLatch JavaDoc latch;
46     private Throwable JavaDoc failure;
47
48     public void testWithReciever() throws Throwable JavaDoc {
49         latch = new CountDownLatch JavaDoc(numberOfMessagesOnQueue);
50         Session JavaDoc session = connection.createSession(true, 0);
51         MessageConsumer JavaDoc consumer = session.createConsumer(destination);
52
53         long start = System.currentTimeMillis();
54         while ((System.currentTimeMillis() - start) < 1000*1000) {
55             if (getFailure() != null) {
56                 throw getFailure();
57             }
58             
59             // Are we done receiving all the messages.
60
if( ackCounter.get() == numberOfMessagesOnQueue )
61                 return;
62
63             Message JavaDoc message = consumer.receive(1000);
64             if (message == null)
65                 continue;
66
67             try {
68                 onMessage(message);
69                 session.commit();
70             } catch (Throwable JavaDoc e) {
71                 session.rollback();
72             }
73         }
74
75         fail("Did not receive all the messages.");
76     }
77
78     public void testWithMessageListener() throws Throwable JavaDoc {
79         latch = new CountDownLatch JavaDoc(numberOfMessagesOnQueue);
80         new DelegatingTransactionalMessageListener(this, connection,
81                 destination);
82
83         long start = System.currentTimeMillis();
84         while ((System.currentTimeMillis() - start) < 1000*1000) {
85
86             if (getFailure() != null) {
87                 throw getFailure();
88             }
89
90             if (latch.await(1, TimeUnit.SECONDS)) {
91                 System.out.println("Received: " + deliveryCounter.get()
92                         + " message(s)");
93                 return;
94             }
95
96         }
97
98         fail("Did not receive all the messages.");
99     }
100
101
102     protected void setUp() throws Exception JavaDoc {
103         super.setUp();
104
105         connection = createConnection();
106         connection.start();
107
108         // lets fill the queue up
109
for (int i = 0; i < numberOfMessagesOnQueue; i++) {
110             template.send(createMessageCreator(i));
111         }
112
113     }
114
115     protected void tearDown() throws Exception JavaDoc {
116         if (connection != null) {
117             connection.close();
118         }
119         super.tearDown();
120     }
121
122     protected MessageCreator createMessageCreator(final int i) {
123         return new MessageCreator() {
124             public Message JavaDoc createMessage(Session JavaDoc session) throws JMSException JavaDoc {
125                 TextMessage JavaDoc answer = session.createTextMessage("Message: " + i);
126                 answer.setIntProperty("Counter", i);
127                 return answer;
128             }
129         };
130     }
131
132     public void onMessage(Message JavaDoc message) {
133         String JavaDoc msgId = null;
134         String JavaDoc msgText = null;
135
136         try {
137             msgId = message.getJMSMessageID();
138             msgText = ((TextMessage JavaDoc) message).getText();
139         } catch (JMSException JavaDoc e) {
140             setFailure(e);
141         }
142
143         try {
144             assertEquals("Message: " + ackCounter.get(), msgText);
145         } catch (Throwable JavaDoc e) {
146             setFailure(e);
147         }
148
149         int value = deliveryCounter.incrementAndGet();
150         if (value % 2 == 0) {
151             log.info("Rolling Back message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
152             throw new RuntimeException JavaDoc("Dummy exception on message: " + value);
153         }
154
155         log.info("Received message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
156         ackCounter.incrementAndGet();
157         latch.countDown();
158     }
159
160     public synchronized Throwable JavaDoc getFailure() {
161         return failure;
162     }
163
164     public synchronized void setFailure(Throwable JavaDoc failure) {
165         this.failure = failure;
166     }
167 }
168
Popular Tags