KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > MessageListenerRedeliveryTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import javax.jms.Connection JavaDoc;
21 import javax.jms.DeliveryMode JavaDoc;
22 import javax.jms.Destination JavaDoc;
23 import javax.jms.JMSException JavaDoc;
24 import javax.jms.Message JavaDoc;
25 import javax.jms.MessageConsumer JavaDoc;
26 import javax.jms.MessageListener JavaDoc;
27 import javax.jms.MessageProducer JavaDoc;
28 import javax.jms.Queue JavaDoc;
29 import javax.jms.Session JavaDoc;
30 import javax.jms.TextMessage JavaDoc;
31
32 import junit.framework.TestCase;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36
37 public class MessageListenerRedeliveryTest extends TestCase {
38
39     private Log log = LogFactory.getLog(getClass());
40
41     private Connection JavaDoc connection;
42
43     protected void setUp() throws Exception JavaDoc {
44         connection = createConnection();
45     }
46
47     /**
48      * @see junit.framework.TestCase#tearDown()
49      */

50     protected void tearDown() throws Exception JavaDoc {
51         if (connection != null) {
52             connection.close();
53             connection = null;
54         }
55     }
56
57     protected RedeliveryPolicy getRedeliveryPolicy() {
58         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
59         redeliveryPolicy.setInitialRedeliveryDelay(1000);
60         redeliveryPolicy.setMaximumRedeliveries(3);
61         redeliveryPolicy.setBackOffMultiplier((short) 2);
62         redeliveryPolicy.setUseExponentialBackOff(true);
63         return redeliveryPolicy;
64     }
65
66     protected Connection JavaDoc createConnection() throws Exception JavaDoc {
67         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
68         factory.setRedeliveryPolicy(getRedeliveryPolicy());
69         return factory.createConnection();
70     }
71
72     private class TestMessageListener implements MessageListener JavaDoc {
73         private Session JavaDoc session;
74
75         public int counter = 0;
76
77         public TestMessageListener(Session JavaDoc session) {
78             this.session = session;
79         }
80
81         public void onMessage(Message message) {
82             try {
83                 log.info("Message Received: " + message);
84                 counter++;
85                 if (counter <= 4) {
86                     log.info("Message Rollback.");
87                     session.rollback();
88                 } else {
89                     log.info("Message Commit.");
90                     message.acknowledge();
91                     session.commit();
92                 }
93             } catch (JMSException JavaDoc e) {
94                 log.error("Error when rolling back transaction");
95             }
96         }
97     }
98
99     public void testQueueRollbackConsumerListener() throws JMSException JavaDoc {
100         connection.start();
101
102         Session JavaDoc session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
103         Queue JavaDoc queue = session.createQueue("queue-" + getName());
104         MessageProducer JavaDoc producer = createProducer(session, queue);
105         Message message = createTextMessage(session);
106         producer.send(message);
107         session.commit();
108
109         MessageConsumer JavaDoc consumer = session.createConsumer(queue);
110
111         ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
112         mc.setRedeliveryPolicy(getRedeliveryPolicy());
113
114         TestMessageListener listener = new TestMessageListener(session);
115         consumer.setMessageListener(listener);
116
117         try {
118             Thread.sleep(500);
119         } catch (InterruptedException JavaDoc e) {
120
121         }
122         
123         // first try.. should get 2 since there is no delay on the
124
// first redeliver..
125
assertEquals(2, listener.counter);
126
127         try {
128             Thread.sleep(1000);
129         } catch (InterruptedException JavaDoc e) {
130
131         }
132         // 2nd redeliver (redelivery after 1 sec)
133
assertEquals(3, listener.counter);
134
135         try {
136             Thread.sleep(2000);
137         } catch (InterruptedException JavaDoc e) {
138
139         }
140         // 3rd redeliver (redelivery after 2 seconds) - it should give up after that
141
assertEquals(4, listener.counter);
142
143         // create new message
144
producer.send(createTextMessage(session));
145         session.commit();
146
147         try {
148             Thread.sleep(500);
149         } catch (InterruptedException JavaDoc e) {
150             // ignore
151
}
152         // it should be committed, so no redelivery
153
assertEquals(5, listener.counter);
154
155         try {
156             Thread.sleep(1500);
157         } catch (InterruptedException JavaDoc e) {
158             // ignore
159
}
160         // no redelivery, counter should still be 4
161
assertEquals(5, listener.counter);
162
163         session.close();
164     }
165
166     public void testQueueRollbackSessionListener() throws JMSException JavaDoc {
167         connection.start();
168
169         Session JavaDoc session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
170         Queue JavaDoc queue = session.createQueue("queue-" + getName());
171         MessageProducer JavaDoc producer = createProducer(session, queue);
172         Message message = createTextMessage(session);
173         producer.send(message);
174         session.commit();
175
176         MessageConsumer JavaDoc consumer = session.createConsumer(queue);
177
178         ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
179         mc.setRedeliveryPolicy(getRedeliveryPolicy());
180
181         TestMessageListener listener = new TestMessageListener(session);
182         consumer.setMessageListener(listener);
183
184         try {
185             Thread.sleep(500);
186         } catch (InterruptedException JavaDoc e) {
187
188         }
189         // first try
190
assertEquals(2, listener.counter);
191
192         try {
193             Thread.sleep(1000);
194         } catch (InterruptedException JavaDoc e) {
195
196         }
197         // second try (redelivery after 1 sec)
198
assertEquals(3, listener.counter);
199
200         try {
201             Thread.sleep(2000);
202         } catch (InterruptedException JavaDoc e) {
203
204         }
205         // third try (redelivery after 2 seconds) - it should give up after that
206
assertEquals(4, listener.counter);
207
208         // create new message
209
producer.send(createTextMessage(session));
210         session.commit();
211
212         try {
213             Thread.sleep(500);
214         } catch (InterruptedException JavaDoc e) {
215             // ignore
216
}
217         // it should be committed, so no redelivery
218
assertEquals(5, listener.counter);
219
220         try {
221             Thread.sleep(1500);
222         } catch (InterruptedException JavaDoc e) {
223             // ignore
224
}
225         // no redelivery, counter should still be 4
226
assertEquals(5, listener.counter);
227
228         session.close();
229     }
230
231     private TextMessage JavaDoc createTextMessage(Session JavaDoc session) throws JMSException JavaDoc {
232         return session.createTextMessage("Hello");
233     }
234
235     private MessageProducer JavaDoc createProducer(Session JavaDoc session, Destination JavaDoc queue) throws JMSException JavaDoc {
236         MessageProducer JavaDoc producer = session.createProducer(queue);
237         producer.setDeliveryMode(getDeliveryMode());
238         return producer;
239     }
240
241     protected int getDeliveryMode() {
242         return DeliveryMode.PERSISTENT;
243     }
244 }
245
Popular Tags