KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > LargeMessageTestSupport


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18
19 package org.apache.activemq;
20
21 import javax.jms.BytesMessage JavaDoc;
22 import javax.jms.Connection JavaDoc;
23 import javax.jms.DeliveryMode JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.jms.Topic JavaDoc;
32
33 import org.apache.activemq.ActiveMQConnection;
34 import org.apache.activemq.ActiveMQConnectionFactory;
35 import org.apache.activemq.command.ActiveMQMessage;
36 import org.apache.activemq.command.ActiveMQQueue;
37 import org.apache.activemq.command.ActiveMQTopic;
38 import org.apache.activemq.util.IdGenerator;
39
40 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
41
42 /**
43  * @version $Revision: 1.4 $
44  */

45 public class LargeMessageTestSupport extends ClientTestSupport implements MessageListener JavaDoc {
46
47     private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LargeMessageTestSupport.class);
48
49     protected static final int LARGE_MESSAGE_SIZE = 128 * 1024;
50     protected static final int MESSAGE_COUNT = 100;
51     protected Connection JavaDoc producerConnection;
52     protected Connection JavaDoc consumerConnection;
53     protected MessageConsumer JavaDoc consumer;
54     protected MessageProducer JavaDoc producer;
55     protected Session JavaDoc producerSession;
56     protected Session JavaDoc consumerSession;
57     protected byte[] largeMessageData;
58     protected Destination JavaDoc destination;
59     protected boolean isTopic = true;
60     protected boolean isDurable = true;
61     protected int deliveryMode = DeliveryMode.PERSISTENT;
62     protected IdGenerator idGen = new IdGenerator();
63     protected boolean validMessageConsumption = true;
64     protected AtomicInteger JavaDoc messageCount = new AtomicInteger JavaDoc(0);
65
66     protected int prefetchValue = 10000000;
67
68     protected Destination JavaDoc createDestination() {
69         String JavaDoc subject = getClass().getName();
70         if (isTopic) {
71             return new ActiveMQTopic(subject);
72         }
73         else {
74             return new ActiveMQQueue(subject);
75         }
76     }
77
78     protected MessageConsumer JavaDoc createConsumer() throws JMSException JavaDoc {
79         if (isTopic && isDurable) {
80             return consumerSession.createDurableSubscriber((Topic JavaDoc) destination, idGen.generateId());
81         }
82         else {
83             return consumerSession.createConsumer(destination);
84         }
85     }
86
87     public void setUp() throws Exception JavaDoc {
88         super.setUp();
89         ClientTestSupport.removeMessageStore();
90         log.info("Setting up . . . . . ");
91         messageCount.set(0);
92
93         destination = createDestination();
94         largeMessageData = new byte[LARGE_MESSAGE_SIZE];
95         for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
96             if (i % 2 == 0) {
97                 largeMessageData[i] = 'a';
98             }
99             else {
100                 largeMessageData[i] = 'z';
101             }
102         }
103
104         try {
105             Thread.sleep(1000);// allow the broker to start
106
}
107         catch (InterruptedException JavaDoc e) {
108             throw new JMSException JavaDoc(e.getMessage());
109         }
110
111         ActiveMQConnectionFactory fac = getConnectionFactory();
112         producerConnection = fac.createConnection();
113         setPrefetchPolicy((ActiveMQConnection) producerConnection);
114         producerConnection.start();
115
116         consumerConnection = fac.createConnection();
117         setPrefetchPolicy((ActiveMQConnection) consumerConnection);
118         consumerConnection.setClientID(idGen.generateId());
119         consumerConnection.start();
120         producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
121         producer = producerSession.createProducer(createDestination());
122         producer.setDeliveryMode(deliveryMode);
123         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
124         consumer = createConsumer();
125         consumer.setMessageListener(this);
126         log.info("Setup complete");
127     }
128
129     protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) {
130         activeMQConnection.getPrefetchPolicy().setTopicPrefetch(prefetchValue);
131         activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue);
132         activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue);
133         activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue);
134         activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue);
135     }
136
137     public void tearDown() throws Exception JavaDoc {
138         Thread.sleep(1000);
139         producerConnection.close();
140         consumerConnection.close();
141
142         super.tearDown();
143
144         largeMessageData = null;
145     }
146
147     protected boolean isSame(BytesMessage JavaDoc msg1) throws Exception JavaDoc {
148         boolean result = false;
149         ((ActiveMQMessage) msg1).setReadOnlyBody(true);
150
151         for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
152             result = msg1.readByte() == largeMessageData[i];
153             if (!result)
154                 break;
155         }
156
157         return result;
158     }
159
160     public void onMessage(Message msg) {
161         try {
162             BytesMessage JavaDoc ba = (BytesMessage JavaDoc) msg;
163             validMessageConsumption &= isSame(ba);
164             assertTrue(ba.getBodyLength() == LARGE_MESSAGE_SIZE);
165             if (messageCount.incrementAndGet() >= MESSAGE_COUNT) {
166                 synchronized (messageCount) {
167                     messageCount.notify();
168                 }
169             }
170             log.info("got message = " + messageCount);
171             if (messageCount.get() % 50 == 0) {
172                 log.info("count = " + messageCount);
173             }
174         }
175         catch (Exception JavaDoc e) {
176             e.printStackTrace();
177         }
178     }
179
180     public void testLargeMessages() throws Exception JavaDoc {
181         for (int i = 0; i < MESSAGE_COUNT; i++) {
182             log.info("Sending message: " + i);
183             BytesMessage JavaDoc msg = producerSession.createBytesMessage();
184             msg.writeBytes(largeMessageData);
185             producer.send(msg);
186         }
187         long now = System.currentTimeMillis();
188         while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
189             log.info("message count = " + messageCount);
190             synchronized (messageCount) {
191                 messageCount.wait(1000);
192             }
193         }
194         log.info("Finished count = " + messageCount);
195         assertTrue("Not enough messages - expected " + MESSAGE_COUNT + " but got " + messageCount, messageCount.get() == MESSAGE_COUNT);
196         assertTrue("received messages are not valid", validMessageConsumption);
197         Thread.sleep(1000);
198         log.info("FINAL count = " + messageCount);
199     }
200 }
201
Popular Tags