KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > bugs > SlowConsumerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.bugs;
19
20 import java.io.ByteArrayOutputStream JavaDoc;
21 import java.io.IOException JavaDoc;
22 import java.io.InputStream JavaDoc;
23 import java.io.OutputStream JavaDoc;
24 import java.net.Socket JavaDoc;
25
26 import javax.jms.Connection JavaDoc;
27 import javax.jms.Message JavaDoc;
28 import javax.jms.MessageConsumer JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.jms.TextMessage JavaDoc;
32
33 import junit.framework.TestCase;
34
35 import org.apache.activemq.ActiveMQConnectionFactory;
36 import org.apache.activemq.broker.BrokerService;
37 import org.apache.activemq.command.ActiveMQQueue;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40
41 public class SlowConsumerTest extends TestCase {
42     private static final Log log = LogFactory.getLog(SlowConsumerTest.class);
43     private Socket JavaDoc stompSocket;
44     private ByteArrayOutputStream JavaDoc inputBuffer;
45
46     private static final int MESSAGES_COUNT = 10000;
47     private int messagesCount;
48     protected int messageLogFrequency = 2500;
49     protected long messageReceiveTimeout = 10000L;
50
51     /**
52      * @param args
53      * @throws Exception
54      */

55     public void testRemoveSubscriber() throws Exception JavaDoc {
56         final BrokerService broker = new BrokerService();
57         broker.setPersistent(true);
58         broker.setUseJmx(true);
59         broker.setDeleteAllMessagesOnStartup(true);
60
61         broker.addConnector("tcp://localhost:61616").setName("Default");
62         broker.start();
63         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
64         final Connection JavaDoc connection = factory.createConnection();
65         connection.start();
66
67         Thread JavaDoc producingThread = new Thread JavaDoc("Producing thread") {
68             public void run() {
69                 try {
70                     Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
71                     MessageProducer JavaDoc producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
72                     for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
73                         Message JavaDoc message = session.createTextMessage("" + idx);
74                         producer.send(message);
75                         log.debug("Sending: " + idx);
76                     }
77                     producer.close();
78                     session.close();
79                 }
80                 catch (Throwable JavaDoc ex) {
81                     ex.printStackTrace();
82                 }
83             }
84         };
85         producingThread.setPriority(Thread.MAX_PRIORITY);
86         producingThread.start();
87         Thread.sleep(1000);
88
89         Thread JavaDoc consumingThread = new Thread JavaDoc("Consuming thread") {
90
91             public void run() {
92                 try {
93                     Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
94                     MessageConsumer JavaDoc consumer = session.createConsumer(new ActiveMQQueue(getDestinationName()));
95                     int diff = 0;
96                     while (messagesCount != MESSAGES_COUNT) {
97                         Message JavaDoc msg = consumer.receive(messageReceiveTimeout );
98                         if (msg == null) {
99                             log.warn("Got null message at count: " + messagesCount + ". Continuing...");
100                             break;
101                         }
102                         String JavaDoc text = ((TextMessage JavaDoc) msg).getText();
103                         int currentMsgIdx = Integer.parseInt(text);
104                         log.debug("Received: " + text + " messageCount: " + messagesCount);
105                         msg.acknowledge();
106                         if ((messagesCount + diff) != currentMsgIdx) {
107                             log.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx);
108                             diff = currentMsgIdx - messagesCount;
109                         }
110                         ++messagesCount;
111                         if (messagesCount % messageLogFrequency == 0) {
112                             log.info("Received: " + messagesCount + " messages so far");
113                         }
114                         //Thread.sleep(70);
115
}
116                 }
117                 catch (Throwable JavaDoc ex) {
118                     ex.printStackTrace();
119                 }
120             }
121         };
122         consumingThread.start();
123         consumingThread.join();
124
125         assertEquals(MESSAGES_COUNT, messagesCount);
126
127     }
128
129     public void sendFrame(String JavaDoc data) throws Exception JavaDoc {
130         byte[] bytes = data.getBytes("UTF-8");
131         OutputStream JavaDoc outputStream = stompSocket.getOutputStream();
132         for (int i = 0; i < bytes.length; i++) {
133             outputStream.write(bytes[i]);
134         }
135         outputStream.flush();
136     }
137
138     public String JavaDoc receiveFrame(long timeOut) throws Exception JavaDoc {
139         stompSocket.setSoTimeout((int) timeOut);
140         InputStream JavaDoc is = stompSocket.getInputStream();
141         int c = 0;
142         for (;;) {
143             c = is.read();
144             if (c < 0) {
145                 throw new IOException JavaDoc("socket closed.");
146             }
147             else if (c == 0) {
148                 c = is.read();
149                 byte[] ba = inputBuffer.toByteArray();
150                 inputBuffer.reset();
151                 return new String JavaDoc(ba, "UTF-8");
152             }
153             else {
154                 inputBuffer.write(c);
155             }
156         }
157     }
158
159     protected String JavaDoc getDestinationName() {
160         return getClass().getName() + "." + getName();
161     }
162 }
163
Popular Tags