1 18 package org.apache.activemq.bugs; 19 20 import java.io.ByteArrayOutputStream ; 21 import java.io.IOException ; 22 import java.io.InputStream ; 23 import java.io.OutputStream ; 24 import java.net.Socket ; 25 26 import javax.jms.Connection ; 27 import javax.jms.Message ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.MessageProducer ; 30 import javax.jms.Session ; 31 import javax.jms.TextMessage ; 32 33 import junit.framework.TestCase; 34 35 import org.apache.activemq.ActiveMQConnectionFactory; 36 import org.apache.activemq.broker.BrokerService; 37 import org.apache.activemq.command.ActiveMQQueue; 38 import org.apache.commons.logging.Log; 39 import org.apache.commons.logging.LogFactory; 40 41 public class SlowConsumerTest extends TestCase { 42 private static final Log log = LogFactory.getLog(SlowConsumerTest.class); 43 private Socket stompSocket; 44 private ByteArrayOutputStream inputBuffer; 45 46 private static final int MESSAGES_COUNT = 10000; 47 private int messagesCount; 48 protected int messageLogFrequency = 2500; 49 protected long messageReceiveTimeout = 10000L; 50 51 55 public void testRemoveSubscriber() throws Exception { 56 final BrokerService broker = new BrokerService(); 57 broker.setPersistent(true); 58 broker.setUseJmx(true); 59 broker.setDeleteAllMessagesOnStartup(true); 60 61 broker.addConnector("tcp://localhost:61616").setName("Default"); 62 broker.start(); 63 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 64 final Connection connection = factory.createConnection(); 65 connection.start(); 66 67 Thread producingThread = new Thread ("Producing thread") { 68 public void run() { 69 try { 70 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 71 MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName())); 72 for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { 73 Message message = session.createTextMessage("" + idx); 74 producer.send(message); 75 log.debug("Sending: " + idx); 76 } 77 producer.close(); 78 session.close(); 79 } 80 catch (Throwable ex) { 81 ex.printStackTrace(); 82 } 83 } 84 }; 85 producingThread.setPriority(Thread.MAX_PRIORITY); 86 producingThread.start(); 87 Thread.sleep(1000); 88 89 Thread consumingThread = new Thread ("Consuming thread") { 90 91 public void run() { 92 try { 93 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 94 MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationName())); 95 int diff = 0; 96 while (messagesCount != MESSAGES_COUNT) { 97 Message msg = consumer.receive(messageReceiveTimeout ); 98 if (msg == null) { 99 log.warn("Got null message at count: " + messagesCount + ". Continuing..."); 100 break; 101 } 102 String text = ((TextMessage ) msg).getText(); 103 int currentMsgIdx = Integer.parseInt(text); 104 log.debug("Received: " + text + " messageCount: " + messagesCount); 105 msg.acknowledge(); 106 if ((messagesCount + diff) != currentMsgIdx) { 107 log.debug("Message(s) skipped!! Should be message no.: " + messagesCount + " but got: " + currentMsgIdx); 108 diff = currentMsgIdx - messagesCount; 109 } 110 ++messagesCount; 111 if (messagesCount % messageLogFrequency == 0) { 112 log.info("Received: " + messagesCount + " messages so far"); 113 } 114 } 116 } 117 catch (Throwable ex) { 118 ex.printStackTrace(); 119 } 120 } 121 }; 122 consumingThread.start(); 123 consumingThread.join(); 124 125 assertEquals(MESSAGES_COUNT, messagesCount); 126 127 } 128 129 public void sendFrame(String data) throws Exception { 130 byte[] bytes = data.getBytes("UTF-8"); 131 OutputStream outputStream = stompSocket.getOutputStream(); 132 for (int i = 0; i < bytes.length; i++) { 133 outputStream.write(bytes[i]); 134 } 135 outputStream.flush(); 136 } 137 138 public String receiveFrame(long timeOut) throws Exception { 139 stompSocket.setSoTimeout((int) timeOut); 140 InputStream is = stompSocket.getInputStream(); 141 int c = 0; 142 for (;;) { 143 c = is.read(); 144 if (c < 0) { 145 throw new IOException ("socket closed."); 146 } 147 else if (c == 0) { 148 c = is.read(); 149 byte[] ba = inputBuffer.toByteArray(); 150 inputBuffer.reset(); 151 return new String (ba, "UTF-8"); 152 } 153 else { 154 inputBuffer.write(c); 155 } 156 } 157 } 158 159 protected String getDestinationName() { 160 return getClass().getName() + "." + getName(); 161 } 162 } 163 | Popular Tags |