1 18 package org.apache.activemq.transport.stomp; 19 20 import java.io.ByteArrayInputStream ; 21 import java.io.ByteArrayOutputStream ; 22 import java.io.DataInput ; 23 import java.io.DataInputStream ; 24 import java.io.IOException ; 25 import java.io.InputStream ; 26 import java.io.OutputStream ; 27 import java.net.Socket ; 28 29 import javax.jms.Connection ; 30 import javax.jms.Message ; 31 import javax.jms.MessageProducer ; 32 import javax.jms.Session ; 33 34 import junit.framework.TestCase; 35 36 import org.apache.activemq.ActiveMQConnectionFactory; 37 import org.apache.activemq.broker.BrokerService; 38 import org.apache.activemq.command.ActiveMQQueue; 39 import org.apache.commons.logging.Log; 40 import org.apache.commons.logging.LogFactory; 41 42 46 public class StompSubscriptionRemoveTest extends TestCase { 47 private static final Log log = LogFactory.getLog(StompSubscriptionRemoveTest.class); 48 49 private Socket stompSocket; 50 private ByteArrayOutputStream inputBuffer; 51 52 53 public void testRemoveSubscriber() throws Exception { 54 BrokerService broker = new BrokerService(); 55 broker.setPersistent(false); 56 57 broker.addConnector("stomp://localhost:61613").setName("Stomp"); 58 broker.addConnector("tcp://localhost:61616").setName("Default"); 59 broker.start(); 60 61 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 62 Connection connection = factory.createConnection(); 63 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 64 MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName())); 65 Message message = session.createTextMessage("Testas"); 66 for (int idx = 0; idx < 2000; ++idx) { 67 producer.send(message); 68 log.debug("Sending: " + idx); 69 } 70 producer.close(); 71 session.close(); 73 connection.close(); 74 75 stompSocket = new Socket ("localhost", 61613); 76 inputBuffer = new ByteArrayOutputStream (); 77 78 String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n"; 79 sendFrame(connect_frame); 80 81 String f = receiveFrame(100000); 82 String frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n"; 83 sendFrame(frame); 84 int messagesCount = 0; 85 int count = 0; 86 while (count < 2) { 87 String receiveFrame = receiveFrame(10000); 88 DataInput input = new DataInputStream (new ByteArrayInputStream (receiveFrame.getBytes())); 89 String line; 90 while (true) { 91 line = input.readLine(); 92 if (line == null) { 93 throw new IOException ("connection was closed"); 94 } 95 else { 96 line = line.trim(); 97 if (line.length() > 0) { 98 break; 99 } 100 } 101 } 102 line = input.readLine(); 103 if (line == null) { 104 throw new IOException ("connection was closed"); 105 } 106 String messageId = line.substring(line.indexOf(':') + 1); 107 messageId = messageId.trim(); 108 String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n"; 109 sendFrame(ackmessage); 110 log.debug(receiveFrame); 111 ++messagesCount; 113 ++count; 114 } 115 116 sendFrame("DISCONNECT\n\n"); 117 Thread.sleep(1000); 118 stompSocket.close(); 119 120 stompSocket = new Socket ("localhost", 61613); 121 inputBuffer = new ByteArrayOutputStream (); 122 123 connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n"; 124 sendFrame(connect_frame); 125 126 f = receiveFrame(5000); 127 128 frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n"; 129 sendFrame(frame); 130 try { 131 while (count != 2000) { 132 String receiveFrame = receiveFrame(5000); 133 DataInput input = new DataInputStream (new ByteArrayInputStream (receiveFrame.getBytes())); 134 String line; 135 while (true) { 136 line = input.readLine(); 137 if (line == null) { 138 throw new IOException ("connection was closed"); 139 } 140 else { 141 line = line.trim(); 142 if (line.length() > 0) { 143 break; 144 } 145 } 146 } 147 148 line = input.readLine(); 149 if (line == null) { 150 throw new IOException ("connection was closed"); 151 } 152 String messageId = line.substring(line.indexOf(':') + 1); 153 messageId = messageId.trim(); 154 String ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n"; 155 sendFrame(ackmessage); 156 log.debug("Received: " + receiveFrame); 157 ++messagesCount; 159 ++count; 160 } 161 162 } 163 catch (IOException ex) { 164 ex.printStackTrace(); 165 } 166 167 sendFrame("DISCONNECT\n\n"); 168 stompSocket.close(); 169 broker.stop(); 170 171 log.info("Total messages received: " + messagesCount); 172 assertTrue("Messages received after connection loss: " + messagesCount, messagesCount >= 2000); 173 174 176 } 180 181 public void sendFrame(String data) throws Exception { 182 byte[] bytes = data.getBytes("UTF-8"); 183 OutputStream outputStream = stompSocket.getOutputStream(); 184 outputStream.write(bytes); 185 outputStream.write(0); 186 outputStream.flush(); 187 } 188 189 public String receiveFrame(long timeOut) throws Exception { 190 stompSocket.setSoTimeout((int) timeOut); 191 InputStream is = stompSocket.getInputStream(); 192 int c = 0; 193 for (;;) { 194 c = is.read(); 195 if (c < 0) { 196 throw new IOException ("socket closed."); 197 } 198 else if (c == 0) { 199 c = is.read(); 200 byte[] ba = inputBuffer.toByteArray(); 201 inputBuffer.reset(); 202 return new String (ba, "UTF-8"); 203 } 204 else { 205 inputBuffer.write(c); 206 } 207 } 208 } 209 210 protected String getDestinationName() { 211 return getClass().getName() + "." + getName(); 212 } 213 } 214 | Popular Tags |