KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > stomp > StompSubscriptionRemoveTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.stomp;
19
20 import java.io.ByteArrayInputStream JavaDoc;
21 import java.io.ByteArrayOutputStream JavaDoc;
22 import java.io.DataInput JavaDoc;
23 import java.io.DataInputStream JavaDoc;
24 import java.io.IOException JavaDoc;
25 import java.io.InputStream JavaDoc;
26 import java.io.OutputStream JavaDoc;
27 import java.net.Socket JavaDoc;
28
29 import javax.jms.Connection JavaDoc;
30 import javax.jms.Message JavaDoc;
31 import javax.jms.MessageProducer JavaDoc;
32 import javax.jms.Session JavaDoc;
33
34 import junit.framework.TestCase;
35
36 import org.apache.activemq.ActiveMQConnectionFactory;
37 import org.apache.activemq.broker.BrokerService;
38 import org.apache.activemq.command.ActiveMQQueue;
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41
42 /**
43  *
44  * @version $Revision: 453497 $
45  */

46 public class StompSubscriptionRemoveTest extends TestCase {
47     private static final Log log = LogFactory.getLog(StompSubscriptionRemoveTest.class);
48
49     private Socket JavaDoc stompSocket;
50     private ByteArrayOutputStream JavaDoc inputBuffer;
51
52     
53     public void testRemoveSubscriber() throws Exception JavaDoc {
54         BrokerService broker = new BrokerService();
55         broker.setPersistent(false);
56
57         broker.addConnector("stomp://localhost:61613").setName("Stomp");
58         broker.addConnector("tcp://localhost:61616").setName("Default");
59         broker.start();
60
61         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
62         Connection JavaDoc connection = factory.createConnection();
63         Session JavaDoc session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
64         MessageProducer JavaDoc producer = session.createProducer(new ActiveMQQueue(getDestinationName()));
65         Message JavaDoc message = session.createTextMessage("Testas");
66         for (int idx = 0; idx < 2000; ++idx) {
67             producer.send(message);
68             log.debug("Sending: " + idx);
69         }
70         producer.close();
71         // consumer.close();
72
session.close();
73         connection.close();
74
75         stompSocket = new Socket JavaDoc("localhost", 61613);
76         inputBuffer = new ByteArrayOutputStream JavaDoc();
77
78         String JavaDoc connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n";
79         sendFrame(connect_frame);
80
81         String JavaDoc f = receiveFrame(100000);
82         String JavaDoc frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n";
83         sendFrame(frame);
84         int messagesCount = 0;
85         int count = 0;
86         while (count < 2) {
87             String JavaDoc receiveFrame = receiveFrame(10000);
88             DataInput JavaDoc input = new DataInputStream JavaDoc(new ByteArrayInputStream JavaDoc(receiveFrame.getBytes()));
89             String JavaDoc line;
90             while (true) {
91                 line = input.readLine();
92                 if (line == null) {
93                     throw new IOException JavaDoc("connection was closed");
94                 }
95                 else {
96                     line = line.trim();
97                     if (line.length() > 0) {
98                         break;
99                     }
100                 }
101             }
102             line = input.readLine();
103             if (line == null) {
104                 throw new IOException JavaDoc("connection was closed");
105             }
106             String JavaDoc messageId = line.substring(line.indexOf(':') + 1);
107             messageId = messageId.trim();
108             String JavaDoc ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n";
109             sendFrame(ackmessage);
110             log.debug(receiveFrame);
111             //Thread.sleep(1000);
112
++messagesCount;
113             ++count;
114         }
115
116         sendFrame("DISCONNECT\n\n");
117         Thread.sleep(1000);
118         stompSocket.close();
119
120         stompSocket = new Socket JavaDoc("localhost", 61613);
121         inputBuffer = new ByteArrayOutputStream JavaDoc();
122
123         connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n";
124         sendFrame(connect_frame);
125
126         f = receiveFrame(5000);
127
128         frame = "SUBSCRIBE\n" + "destination:/queue/" + getDestinationName() + "\n" + "ack:client\n\n";
129         sendFrame(frame);
130         try {
131             while (count != 2000) {
132                 String JavaDoc receiveFrame = receiveFrame(5000);
133                 DataInput JavaDoc input = new DataInputStream JavaDoc(new ByteArrayInputStream JavaDoc(receiveFrame.getBytes()));
134                 String JavaDoc line;
135                 while (true) {
136                     line = input.readLine();
137                     if (line == null) {
138                         throw new IOException JavaDoc("connection was closed");
139                     }
140                     else {
141                         line = line.trim();
142                         if (line.length() > 0) {
143                             break;
144                         }
145                     }
146                 }
147
148                 line = input.readLine();
149                 if (line == null) {
150                     throw new IOException JavaDoc("connection was closed");
151                 }
152                 String JavaDoc messageId = line.substring(line.indexOf(':') + 1);
153                 messageId = messageId.trim();
154                 String JavaDoc ackmessage = "ACK\n" + "message-id:" + messageId + "\n\n";
155                 sendFrame(ackmessage);
156                 log.debug("Received: " + receiveFrame);
157                 //Thread.sleep(1000);
158
++messagesCount;
159                 ++count;
160             }
161
162         }
163         catch (IOException JavaDoc ex) {
164             ex.printStackTrace();
165         }
166
167         sendFrame("DISCONNECT\n\n");
168         stompSocket.close();
169         broker.stop();
170
171         log.info("Total messages received: " + messagesCount);
172         assertTrue("Messages received after connection loss: " + messagesCount, messagesCount >= 2000);
173
174         // The first ack messages has no chance complete, so we receiving more messages
175

176         // Don't know how to list subscriptions for the broker. Currently you
177
// can check using JMX console. You'll see
178
// Subscription without any connections
179
}
180
181     public void sendFrame(String JavaDoc data) throws Exception JavaDoc {
182         byte[] bytes = data.getBytes("UTF-8");
183         OutputStream JavaDoc outputStream = stompSocket.getOutputStream();
184         outputStream.write(bytes);
185         outputStream.write(0);
186         outputStream.flush();
187     }
188
189     public String JavaDoc receiveFrame(long timeOut) throws Exception JavaDoc {
190         stompSocket.setSoTimeout((int) timeOut);
191         InputStream JavaDoc is = stompSocket.getInputStream();
192         int c = 0;
193         for (;;) {
194             c = is.read();
195             if (c < 0) {
196                 throw new IOException JavaDoc("socket closed.");
197             }
198             else if (c == 0) {
199                 c = is.read();
200                 byte[] ba = inputBuffer.toByteArray();
201                 inputBuffer.reset();
202                 return new String JavaDoc(ba, "UTF-8");
203             }
204             else {
205                 inputBuffer.write(c);
206             }
207         }
208     }
209
210     protected String JavaDoc getDestinationName() {
211         return getClass().getName() + "." + getName();
212     }
213 }
214
Popular Tags