KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > MessageExpirationTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import javax.jms.DeliveryMode JavaDoc;
21
22 import junit.framework.Test;
23
24 import org.apache.activemq.command.ActiveMQDestination;
25 import org.apache.activemq.command.ConnectionInfo;
26 import org.apache.activemq.command.ConsumerInfo;
27 import org.apache.activemq.command.LocalTransactionId;
28 import org.apache.activemq.command.Message;
29 import org.apache.activemq.command.MessageAck;
30 import org.apache.activemq.command.ProducerInfo;
31 import org.apache.activemq.command.SessionInfo;
32 import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
33 import org.apache.activemq.broker.region.policy.PolicyMap;
34 import org.apache.activemq.broker.region.policy.PolicyEntry;
35 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
36 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
37
38 public class MessageExpirationTest extends BrokerTestSupport {
39     
40     public ActiveMQDestination destination;
41     public int deliveryMode;
42     public int prefetch;
43     public byte destinationType;
44     public boolean durableConsumer;
45     
46     protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode, int timeToLive) {
47         Message message = createMessage(producerInfo, destination, deliveryMode);
48         long now = System.currentTimeMillis();
49         message.setTimestamp(now);
50         message.setExpiration(now+timeToLive);
51         return message;
52     }
53     
54     
55     public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() {
56         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
57                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
58                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
59         addCombinationValues( "destinationType", new Object JavaDoc[]{
60                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
61                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE),
62                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
63                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
64                 } );
65     }
66
67     @Override JavaDoc
68     protected BrokerService createBroker() throws Exception JavaDoc {
69         BrokerService broker = new BrokerService();
70         broker.setPersistent(false);
71         return broker;
72     }
73     
74     protected PolicyEntry getDefaultPolicy() {
75         PolicyEntry policy = super.getDefaultPolicy();
76         //disable spooling
77
policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
78         return policy;
79     }
80
81     public void testMessagesWaitingForUssageDecreaseExpire() throws Exception JavaDoc {
82         
83         // Start a producer
84
final StubConnection connection = createConnection();
85         ConnectionInfo connectionInfo = createConnectionInfo();
86         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
87         final ProducerInfo producerInfo = createProducerInfo(sessionInfo);
88         connection.send(connectionInfo);
89         connection.send(sessionInfo);
90         connection.send(producerInfo);
91
92         
93         // Start a consumer..
94
final StubConnection connection2 = createConnection();
95         ConnectionInfo connectionInfo2 = createConnectionInfo();
96         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
97         connection2.send(connectionInfo2);
98         connection2.send(sessionInfo2);
99         
100         destination = createDestinationInfo(connection2, connectionInfo2, destinationType);
101         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
102         consumerInfo2.setPrefetchSize(1);
103         connection2.request(consumerInfo2);
104         
105         // Reduce the limit so that only 1 message can flow through the broker at a time.
106
broker.getMemoryManager().setLimit(1);
107         
108         final Message m1 = createMessage(producerInfo, destination, deliveryMode);
109         final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000);
110         final Message m3 = createMessage(producerInfo, destination, deliveryMode);
111         final Message m4 = createMessage(producerInfo, destination, deliveryMode, 1000);
112         
113         // Produce in an async thread since the producer will be getting blocked by the usage manager..
114
new Thread JavaDoc() {
115             public void run() {
116                 // m1 and m3 should not expire.. but the others should.
117
try {
118                     connection.send(m1);
119                     connection.send(m2);
120                     connection.send(m3);
121                     connection.send(m4);
122                 } catch (Exception JavaDoc e) {
123                     e.printStackTrace();
124                 }
125             }
126         }.start();
127         
128                 
129         // Make sure only 1 message was delivered due to prefetch == 1
130
Message m = receiveMessage(connection2);
131         assertNotNull(m);
132         assertEquals(m1.getMessageId(), m.getMessageId());
133         assertNoMessagesLeft(connection);
134         
135         // Sleep before we ack so that the messages expire on the usage manager
136
Thread.sleep(1500);
137         connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
138         
139         // 2nd message received should be m3.. it should have expired 2nd message sent.
140
m = receiveMessage(connection2);
141         assertNotNull(m);
142         assertEquals(m3.getMessageId(), m.getMessageId());
143         
144         // Sleep before we ack so that the messages expire on the usage manager
145
Thread.sleep(1500);
146         connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE));
147         
148         // And there should be no messages left now..
149
assertNoMessagesLeft(connection2);
150         
151         connection.send(closeConnectionInfo(connectionInfo));
152         connection.send(closeConnectionInfo(connectionInfo2));
153     }
154     
155     
156     public void initCombosForTestMessagesInLongTransactionExpire() {
157         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
158                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
159                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
160         addCombinationValues( "destinationType", new Object JavaDoc[]{
161                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
162                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
163                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
164                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)
165                 } );
166     }
167     
168     public void testMessagesInLongTransactionExpire() throws Exception JavaDoc {
169         
170         // Start a producer and consumer
171
StubConnection connection = createConnection();
172         ConnectionInfo connectionInfo = createConnectionInfo();
173         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
174         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
175         connection.send(connectionInfo);
176         connection.send(sessionInfo);
177         connection.send(producerInfo);
178
179         destination = createDestinationInfo(connection, connectionInfo, destinationType);
180         
181         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
182         consumerInfo.setPrefetchSize(1000);
183         connection.send(consumerInfo);
184
185         // Start the tx..
186
LocalTransactionId txid = createLocalTransaction(sessionInfo);
187         connection.send(createBeginTransaction(connectionInfo, txid));
188         
189         // m1 and m3 should not expire.. but the others should.
190
Message m1 = createMessage(producerInfo, destination, deliveryMode);
191         m1.setTransactionId(txid);
192         connection.send(m1);
193         Message m = createMessage(producerInfo, destination, deliveryMode, 1000);
194         m.setTransactionId(txid);
195         connection.send(m);
196         Message m3 = createMessage(producerInfo, destination, deliveryMode);
197         m3.setTransactionId(txid);
198         connection.send(m3);
199         m = createMessage(producerInfo, destination, deliveryMode, 1000);
200         m.setTransactionId(txid);
201         connection.send(m);
202         
203         // Sleep before we commit so that the messages expire on the commit list..
204
Thread.sleep(1500);
205         connection.send(createCommitTransaction1Phase(connectionInfo, txid));
206                 
207         m = receiveMessage(connection);
208         assertNotNull(m);
209         assertEquals(m1.getMessageId(), m.getMessageId());
210         connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
211         
212         // 2nd message received should be m3.. it should have expired 2nd message sent.
213
m = receiveMessage(connection);
214         assertNotNull(m);
215         assertEquals(m3.getMessageId(), m.getMessageId());
216         connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
217         
218         // And there should be no messages left now..
219
assertNoMessagesLeft(connection);
220         
221         connection.send(closeConnectionInfo(connectionInfo));
222     }
223
224
225     public void TestMessagesInSubscriptionPendingListExpire() {
226         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
227                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
228                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
229         addCombinationValues( "destinationType", new Object JavaDoc[]{
230                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
231                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
232                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
233                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)
234                 } );
235     }
236     
237     public void initCombosForTestMessagesInSubscriptionPendingListExpire() {
238         addCombinationValues( "deliveryMode", new Object JavaDoc[]{
239                 new Integer JavaDoc(DeliveryMode.NON_PERSISTENT),
240                 new Integer JavaDoc(DeliveryMode.PERSISTENT)} );
241         addCombinationValues( "destinationType", new Object JavaDoc[]{
242                 new Byte JavaDoc(ActiveMQDestination.QUEUE_TYPE),
243                 new Byte JavaDoc(ActiveMQDestination.TOPIC_TYPE),
244                 new Byte JavaDoc(ActiveMQDestination.TEMP_QUEUE_TYPE),
245                 new Byte JavaDoc(ActiveMQDestination.TEMP_TOPIC_TYPE)
246                 } );
247     }
248
249     public void testMessagesInSubscriptionPendingListExpire() throws Exception JavaDoc {
250         
251         // Start a producer and consumer
252
StubConnection connection = createConnection();
253         ConnectionInfo connectionInfo = createConnectionInfo();
254         SessionInfo sessionInfo = createSessionInfo(connectionInfo);
255         ProducerInfo producerInfo = createProducerInfo(sessionInfo);
256         connection.send(connectionInfo);
257         connection.send(sessionInfo);
258         connection.send(producerInfo);
259
260         destination = createDestinationInfo(connection, connectionInfo, destinationType);
261         
262         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
263         consumerInfo.setPrefetchSize(1);
264         connection.send(consumerInfo);
265         
266         // m1 and m3 should not expire.. but the others should.
267
Message m1 = createMessage(producerInfo, destination, deliveryMode);
268         connection.send(m1);
269         connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
270         Message m3 = createMessage(producerInfo, destination, deliveryMode);
271         connection.send(m3);
272         connection.send(createMessage(producerInfo, destination, deliveryMode, 1000));
273                 
274         // Make sure only 1 message was delivered due to prefetch == 1
275
Message m = receiveMessage(connection);
276         assertNotNull(m);
277         assertEquals(m1.getMessageId(), m.getMessageId());
278         assertNoMessagesLeft(connection);
279         
280         // Sleep before we ack so that the messages expire on the pending list..
281
Thread.sleep(1500);
282         connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
283         
284         // 2nd message received should be m3.. it should have expired 2nd message sent.
285
m = receiveMessage(connection);
286         assertNotNull(m);
287         assertEquals(m3.getMessageId(), m.getMessageId());
288         connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
289         
290         // And there should be no messages left now..
291
assertNoMessagesLeft(connection);
292         
293         connection.send(closeConnectionInfo(connectionInfo));
294     }
295     
296     public static Test suite() {
297         return suite(MessageExpirationTest.class);
298     }
299     
300     public static void main(String JavaDoc[] args) {
301         junit.textui.TestRunner.run(suite());
302     }
303
304 }
305
Popular Tags