KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > transport > stomp > StompTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.transport.stomp;
19
20 import org.apache.activemq.ActiveMQConnectionFactory;
21 import org.apache.activemq.CombinationTestSupport;
22 import org.apache.activemq.broker.*;
23 import org.apache.activemq.command.ActiveMQQueue;
24 import org.apache.activemq.command.ActiveMQTextMessage;
25 import org.apache.activemq.transport.stomp.Stomp;
26
27 import javax.jms.*;
28 import javax.jms.Connection JavaDoc;
29 import java.io.ByteArrayOutputStream JavaDoc;
30 import java.io.IOException JavaDoc;
31 import java.io.InputStream JavaDoc;
32 import java.io.OutputStream JavaDoc;
33 import java.net.Socket JavaDoc;
34 import java.net.SocketTimeoutException JavaDoc;
35 import java.net.URI JavaDoc;
36 import java.util.regex.Pattern JavaDoc;
37 import java.util.regex.Matcher JavaDoc;
38
39 public class StompTest extends CombinationTestSupport {
40
41     private BrokerService broker;
42     private TransportConnector connector;
43     private Socket JavaDoc stompSocket;
44     private ByteArrayOutputStream JavaDoc inputBuffer;
45     private Connection JavaDoc connection;
46     private Session session;
47     private ActiveMQQueue queue;
48     protected String JavaDoc bindAddress = "stomp://localhost:0";
49
50     protected void setUp() throws Exception JavaDoc {
51         broker = new BrokerService();
52         broker.setPersistent(false);
53
54         connector = broker.addConnector(bindAddress);
55         broker.start();
56
57         URI JavaDoc connectUri = connector.getConnectUri();
58         stompSocket = createSocket(connectUri);
59         inputBuffer = new ByteArrayOutputStream JavaDoc();
60
61         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
62         connection = cf.createConnection();
63         session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
64         queue = new ActiveMQQueue(getQueueName());
65         connection.start();
66     }
67
68     protected Socket JavaDoc createSocket(URI JavaDoc connectUri) throws IOException JavaDoc {
69         return new Socket JavaDoc("127.0.0.1", connectUri.getPort());
70     }
71
72     protected String JavaDoc getQueueName() {
73         return getClass().getName() + "." + getName();
74     }
75
76     protected void tearDown() throws Exception JavaDoc {
77         connection.close();
78         if (stompSocket != null) {
79             stompSocket.close();
80         }
81         broker.stop();
82     }
83
84     public void sendFrame(String JavaDoc data) throws Exception JavaDoc {
85         byte[] bytes = data.getBytes("UTF-8");
86         OutputStream JavaDoc outputStream = stompSocket.getOutputStream();
87         for (int i = 0; i < bytes.length; i++) {
88             outputStream.write(bytes[i]);
89         }
90         outputStream.flush();
91     }
92
93     public String JavaDoc receiveFrame(long timeOut) throws Exception JavaDoc {
94         stompSocket.setSoTimeout((int) timeOut);
95         InputStream JavaDoc is = stompSocket.getInputStream();
96         int c=0;
97         for(;;) {
98             c = is.read();
99             if( c < 0 ) {
100                 throw new IOException JavaDoc("socket closed.");
101             } else if( c == 0 ) {
102                 c = is.read();
103                 assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
104                 byte[] ba = inputBuffer.toByteArray();
105                 inputBuffer.reset();
106                 return new String JavaDoc(ba, "UTF-8");
107             } else {
108                 inputBuffer.write(c);
109             }
110         }
111     }
112
113
114
115     public void sendMessage(String JavaDoc msg) throws Exception JavaDoc {
116         sendMessage(msg, "foo", "xyz");
117     }
118
119     public void sendMessage(String JavaDoc msg, String JavaDoc propertyName, String JavaDoc propertyValue) throws JMSException {
120         MessageProducer producer = session.createProducer(queue);
121         TextMessage message = session.createTextMessage(msg);
122         message.setStringProperty(propertyName, propertyValue);
123         producer.send(message);
124     }
125
126     public void sendBytesMessage(byte[] msg) throws Exception JavaDoc{
127         MessageProducer producer = session.createProducer(queue);
128         BytesMessage message = session.createBytesMessage();
129         message.writeBytes(msg);
130         producer.send(message);
131
132     }
133
134     public void testConnect() throws Exception JavaDoc {
135
136         String JavaDoc connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
137         sendFrame(connect_frame);
138
139         String JavaDoc f = receiveFrame(10000);
140         assertTrue(f.startsWith("CONNECTED"));
141         assertTrue(f.indexOf("response-id:1") >= 0);
142
143     }
144
145     public void testSendMessage() throws Exception JavaDoc {
146
147         MessageConsumer consumer = session.createConsumer(queue);
148
149         String JavaDoc frame =
150             "CONNECT\n" +
151             "login: brianm\n" +
152             "passcode: wombats\n\n"+
153             Stomp.NULL;
154         sendFrame(frame);
155
156         frame = receiveFrame(10000);
157         assertTrue(frame.startsWith("CONNECTED"));
158
159         frame =
160             "SEND\n" +
161             "destination:/queue/" + getQueueName() + "\n\n" +
162             "Hello World" +
163             Stomp.NULL;
164
165         sendFrame(frame);
166
167         TextMessage message = (TextMessage) consumer.receive(1000);
168         assertNotNull(message);
169         assertEquals("Hello World", message.getText());
170
171         // Make sure that the timestamp is valid - should
172
// be very close to the current time.
173
long tnow = System.currentTimeMillis();
174         long tmsg = message.getJMSTimestamp();
175         assertTrue( Math.abs(tnow - tmsg) < 1000 );
176     }
177
178     public void testJMSXGroupIdCanBeSet() throws Exception JavaDoc {
179
180         MessageConsumer consumer = session.createConsumer(queue);
181
182         String JavaDoc frame =
183             "CONNECT\n" +
184             "login: brianm\n" +
185             "passcode: wombats\n\n"+
186             Stomp.NULL;
187         sendFrame(frame);
188
189         frame = receiveFrame(10000);
190         assertTrue(frame.startsWith("CONNECTED"));
191
192         frame =
193             "SEND\n" +
194             "destination:/queue/" + getQueueName() + "\n" +
195             "JMSXGroupID: TEST\n\n" +
196             "Hello World" +
197             Stomp.NULL;
198
199         sendFrame(frame);
200
201         TextMessage message = (TextMessage) consumer.receive(1000);
202         assertNotNull(message);
203         assertEquals("TEST", ((ActiveMQTextMessage)message).getGroupID());
204     }
205
206
207     public void testSendMessageWithCustomHeadersAndSelector() throws Exception JavaDoc {
208
209         MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
210
211         String JavaDoc frame =
212             "CONNECT\n" +
213             "login: brianm\n" +
214             "passcode: wombats\n\n"+
215             Stomp.NULL;
216         sendFrame(frame);
217
218         frame = receiveFrame(10000);
219         assertTrue(frame.startsWith("CONNECTED"));
220
221         frame =
222             "SEND\n" +
223             "foo:abc\n" +
224             "bar:123\n" +
225             "destination:/queue/" + getQueueName() + "\n\n" +
226             "Hello World" +
227             Stomp.NULL;
228
229         sendFrame(frame);
230
231         TextMessage message = (TextMessage) consumer.receive(1000);
232         assertNotNull(message);
233         assertEquals("Hello World", message.getText());
234         assertEquals("foo", "abc", message.getStringProperty("foo"));
235         assertEquals("bar", "123", message.getStringProperty("bar"));
236     }
237
238     public void testSendMessageWithStandardHeaders() throws Exception JavaDoc {
239
240         MessageConsumer consumer = session.createConsumer(queue);
241
242         String JavaDoc frame =
243             "CONNECT\n" +
244             "login: brianm\n" +
245             "passcode: wombats\n\n"+
246             Stomp.NULL;
247         sendFrame(frame);
248
249         frame = receiveFrame(10000);
250         assertTrue(frame.startsWith("CONNECTED"));
251
252         frame =
253             "SEND\n" +
254             "correlation-id:c123\n" +
255             "priority:3\n" +
256             "type:t345\n" +
257             "JMSXGroupID:abc\n" +
258             "foo:abc\n" +
259             "bar:123\n" +
260             "destination:/queue/" + getQueueName() + "\n\n" +
261             "Hello World" +
262             Stomp.NULL;
263
264         sendFrame(frame);
265
266         TextMessage message = (TextMessage) consumer.receive(1000);
267         assertNotNull(message);
268         assertEquals("Hello World", message.getText());
269         assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
270         assertEquals("getJMSType", "t345", message.getJMSType());
271         assertEquals("getJMSPriority", 3, message.getJMSPriority());
272         assertEquals("foo", "abc", message.getStringProperty("foo"));
273         assertEquals("bar", "123", message.getStringProperty("bar"));
274
275         assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
276         ActiveMQTextMessage amqMessage = (ActiveMQTextMessage) message;
277         assertEquals("GroupID", "abc", amqMessage.getGroupID());
278     }
279
280     public void testSubscribeWithAutoAck() throws Exception JavaDoc {
281
282         String JavaDoc frame =
283             "CONNECT\n" +
284             "login: brianm\n" +
285             "passcode: wombats\n\n"+
286             Stomp.NULL;
287         sendFrame(frame);
288
289         frame = receiveFrame(100000);
290         assertTrue(frame.startsWith("CONNECTED"));
291
292         frame =
293             "SUBSCRIBE\n" +
294             "destination:/queue/" + getQueueName() + "\n" +
295             "ack:auto\n\n" +
296             Stomp.NULL;
297         sendFrame(frame);
298
299         sendMessage(getName());
300
301         frame = receiveFrame(10000);
302         assertTrue(frame.startsWith("MESSAGE"));
303
304         frame =
305             "DISCONNECT\n" +
306             "\n\n"+
307             Stomp.NULL;
308         sendFrame(frame);
309     }
310
311         public void testSubscribeWithAutoAckAndBytesMessage() throws Exception JavaDoc {
312
313         String JavaDoc frame =
314             "CONNECT\n" +
315             "login: brianm\n" +
316             "passcode: wombats\n\n"+
317             Stomp.NULL;
318         sendFrame(frame);
319
320         frame = receiveFrame(100000);
321         assertTrue(frame.startsWith("CONNECTED"));
322
323         frame =
324             "SUBSCRIBE\n" +
325             "destination:/queue/" + getQueueName() + "\n" +
326             "ack:auto\n\n" +
327             Stomp.NULL;
328         sendFrame(frame);
329
330         sendBytesMessage(new byte[] {1,2,3,4,5});
331
332         frame = receiveFrame(10000);
333         assertTrue(frame.startsWith("MESSAGE"));
334
335         Pattern JavaDoc cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
336         Matcher JavaDoc cl_matcher = cl.matcher(frame);
337         assertTrue(cl_matcher.find());
338         assertEquals("5", cl_matcher.group(1));
339
340         assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
341
342         frame =
343             "DISCONNECT\n" +
344             "\n\n"+
345             Stomp.NULL;
346         sendFrame(frame);
347     }
348
349     public void testSubscribeWithMessageSentWithProperties() throws Exception JavaDoc {
350
351         String JavaDoc frame =
352             "CONNECT\n" +
353             "login: brianm\n" +
354             "passcode: wombats\n\n"+
355             Stomp.NULL;
356         sendFrame(frame);
357
358         frame = receiveFrame(100000);
359         assertTrue(frame.startsWith("CONNECTED"));
360
361         frame =
362             "SUBSCRIBE\n" +
363             "destination:/queue/" + getQueueName() + "\n" +
364             "ack:auto\n\n" +
365             Stomp.NULL;
366         sendFrame(frame);
367
368
369         MessageProducer producer = session.createProducer(queue);
370         TextMessage message = session.createTextMessage("Hello World");
371         message.setStringProperty("s", "value");
372         message.setBooleanProperty("n", false);
373         message.setByteProperty("byte", (byte) 9);
374         message.setDoubleProperty("d", 2.0);
375         message.setFloatProperty("f", (float) 6.0);
376         message.setIntProperty("i", 10);
377         message.setLongProperty("l", 121);
378         message.setShortProperty("s", (short) 12);
379         producer.send(message);
380
381         frame = receiveFrame(10000);
382         assertTrue(frame.startsWith("MESSAGE"));
383
384 // System.out.println("out: "+frame);
385

386         frame =
387             "DISCONNECT\n" +
388             "\n\n"+
389             Stomp.NULL;
390         sendFrame(frame);
391     }
392
393     public void testMessagesAreInOrder() throws Exception JavaDoc {
394         int ctr = 10;
395         String JavaDoc[] data = new String JavaDoc[ctr];
396
397         String JavaDoc frame =
398                 "CONNECT\n" +
399                 "login: brianm\n" +
400                 "passcode: wombats\n\n" +
401                 Stomp.NULL;
402         sendFrame(frame);
403
404         frame = receiveFrame(100000);
405         assertTrue(frame.startsWith("CONNECTED"));
406
407         frame =
408                 "SUBSCRIBE\n" +
409                 "destination:/queue/" + getQueueName() + "\n" +
410                 "ack:auto\n\n" +
411                 Stomp.NULL;
412         sendFrame(frame);
413
414         for (int i = 0; i < ctr; ++i) {
415             data[i] = getName() + i;
416             sendMessage(data[i]);
417         }
418
419         for (int i = 0; i < ctr; ++i) {
420             frame = receiveFrame(1000);
421             assertTrue("Message not in order", frame.indexOf(data[i]) >=0 );
422         }
423
424         // sleep a while before publishing another set of messages
425
waitForFrameToTakeEffect();
426
427         for (int i = 0; i < ctr; ++i) {
428             data[i] = getName() + ":second:" + i;
429             sendMessage(data[i]);
430         }
431
432         for (int i = 0; i < ctr; ++i) {
433             frame = receiveFrame(1000);
434             assertTrue("Message not in order", frame.indexOf(data[i]) >=0 );
435         }
436
437         frame =
438                 "DISCONNECT\n" +
439                 "\n\n" +
440                 Stomp.NULL;
441         sendFrame(frame);
442     }
443
444
445     public void testSubscribeWithAutoAckAndSelector() throws Exception JavaDoc {
446
447         String JavaDoc frame =
448             "CONNECT\n" +
449             "login: brianm\n" +
450             "passcode: wombats\n\n"+
451             Stomp.NULL;
452         sendFrame(frame);
453
454         frame = receiveFrame(100000);
455         assertTrue(frame.startsWith("CONNECTED"));
456
457         frame =
458             "SUBSCRIBE\n" +
459             "destination:/queue/" + getQueueName() + "\n" +
460             "selector: foo = 'zzz'\n" +
461             "ack:auto\n\n" +
462             Stomp.NULL;
463         sendFrame(frame);
464
465         sendMessage("Ignored message", "foo", "1234");
466         sendMessage("Real message", "foo", "zzz");
467
468         frame = receiveFrame(10000);
469         assertTrue(frame.startsWith("MESSAGE"));
470         assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
471
472        frame =
473             "DISCONNECT\n" +
474             "\n\n"+
475             Stomp.NULL;
476        sendFrame(frame);
477     }
478
479
480     public void testSubscribeWithClientAck() throws Exception JavaDoc {
481
482        String JavaDoc frame =
483             "CONNECT\n" +
484             "login: brianm\n" +
485             "passcode: wombats\n\n"+
486             Stomp.NULL;
487        sendFrame(frame);
488
489        frame = receiveFrame(10000);
490        assertTrue(frame.startsWith("CONNECTED"));
491
492
493        frame =
494             "SUBSCRIBE\n" +
495             "destination:/queue/" + getQueueName() + "\n" +
496             "ack:client\n\n"+
497             Stomp.NULL;
498
499
500        sendFrame(frame);
501        sendMessage(getName());
502        frame = receiveFrame(10000);
503        assertTrue(frame.startsWith("MESSAGE"));
504
505        frame =
506             "DISCONNECT\n" +
507             "\n\n"+
508             Stomp.NULL;
509        sendFrame(frame);
510
511        // message should be received since message was not acknowledged
512
MessageConsumer consumer = session.createConsumer(queue);
513        TextMessage message = (TextMessage) consumer.receive(1000);
514        assertNotNull(message);
515        assertTrue(message.getJMSRedelivered());
516
517
518
519     }
520
521     public void testUnsubscribe() throws Exception JavaDoc {
522
523         String JavaDoc frame =
524             "CONNECT\n" +
525             "login: brianm\n" +
526             "passcode: wombats\n\n"+
527             Stomp.NULL;
528         sendFrame(frame);
529         frame = receiveFrame(100000);
530         assertTrue(frame.startsWith("CONNECTED"));
531
532         frame =
533             "SUBSCRIBE\n" +
534             "destination:/queue/" + getQueueName() + "\n" +
535             "ack:auto\n\n" +
536             Stomp.NULL;
537         sendFrame(frame);
538
539         //send a message to our queue
540
sendMessage("first message");
541
542
543         //receive message from socket
544
frame = receiveFrame(1000);
545         assertTrue(frame.startsWith("MESSAGE"));
546
547         //remove suscription
548
frame =
549             "UNSUBSCRIBE\n" +
550             "destination:/queue/" + getQueueName() + "\n" +
551             "\n\n" +
552             Stomp.NULL;
553         sendFrame(frame);
554
555         waitForFrameToTakeEffect();
556
557         //send a message to our queue
558
sendMessage("second message");
559
560
561         try {
562             frame = receiveFrame(1000);
563             log.info("Received frame: " + frame);
564             fail("No message should have been received since subscription was removed");
565         }catch (SocketTimeoutException JavaDoc e){
566
567         }
568
569     }
570
571
572     public void testTransactionCommit() throws Exception JavaDoc {
573         MessageConsumer consumer = session.createConsumer(queue);
574
575         String JavaDoc frame =
576             "CONNECT\n" +
577             "login: brianm\n" +
578             "passcode: wombats\n\n"+
579             Stomp.NULL;
580         sendFrame(frame);
581
582         String JavaDoc f = receiveFrame(1000);
583         assertTrue(f.startsWith("CONNECTED"));
584
585         frame =
586             "BEGIN\n" +
587             "transaction: tx1\n" +
588             "\n\n" +
589             Stomp.NULL;
590         sendFrame(frame);
591
592         frame =
593             "SEND\n" +
594             "destination:/queue/" + getQueueName() + "\n" +
595             "transaction: tx1\n" +
596             "\n\n" +
597             "Hello World" +
598             Stomp.NULL;
599         sendFrame(frame);
600
601         frame =
602             "COMMIT\n" +
603             "transaction: tx1\n" +
604             "\n\n" +
605             Stomp.NULL;
606         sendFrame(frame);
607
608         waitForFrameToTakeEffect();
609
610         TextMessage message = (TextMessage) consumer.receive(1000);
611         assertNotNull("Should have received a message", message);
612     }
613
614     public void testTransactionRollback() throws Exception JavaDoc {
615         MessageConsumer consumer = session.createConsumer(queue);
616
617         String JavaDoc frame =
618             "CONNECT\n" +
619             "login: brianm\n" +
620             "passcode: wombats\n\n"+
621             Stomp.NULL;
622         sendFrame(frame);
623
624         String JavaDoc f = receiveFrame(1000);
625         assertTrue(f.startsWith("CONNECTED"));
626
627         frame =
628             "BEGIN\n" +
629             "transaction: tx1\n" +
630             "\n\n" +
631             Stomp.NULL;
632         sendFrame(frame);
633
634         frame =
635             "SEND\n" +
636             "destination:/queue/" + getQueueName() + "\n" +
637             "transaction: tx1\n" +
638             "\n" +
639             "first message" +
640             Stomp.NULL;
641         sendFrame(frame);
642
643         //rollback first message
644
frame =
645             "ABORT\n" +
646             "transaction: tx1\n" +
647             "\n\n" +
648             Stomp.NULL;
649         sendFrame(frame);
650
651         frame =
652             "BEGIN\n" +
653             "transaction: tx1\n" +
654             "\n\n" +
655             Stomp.NULL;
656         sendFrame(frame);
657
658         frame =
659             "SEND\n" +
660             "destination:/queue/" + getQueueName() + "\n" +
661             "transaction: tx1\n" +
662             "\n" +
663             "second message" +
664             Stomp.NULL;
665         sendFrame(frame);
666
667         frame =
668             "COMMIT\n" +
669             "transaction: tx1\n" +
670             "\n\n" +
671             Stomp.NULL;
672         sendFrame(frame);
673
674         // This test case is currently failing
675
waitForFrameToTakeEffect();
676
677         //only second msg should be received since first msg was rolled back
678
TextMessage message = (TextMessage) consumer.receive(1000);
679         assertNotNull(message);
680         assertEquals("second message", message.getText().trim());
681     }
682
683     public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception JavaDoc {
684         assertClients(1);
685         String JavaDoc frame =
686             "CONNECT\n" +
687             "login: brianm\n" +
688             "passcode: wombats\n\n"+
689             Stomp.NULL;
690
691         sendFrame(frame);
692
693         // This test case is currently failing
694
waitForFrameToTakeEffect();
695
696         assertClients(2);
697
698         // now lets kill the socket
699
stompSocket.close();
700         stompSocket = null;
701
702         Thread.sleep(2000);
703
704         assertClients(1);
705     }
706
707     protected void assertClients(int expected) throws Exception JavaDoc {
708         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
709         int actual = clients.length;
710
711         assertEquals("Number of clients", expected, actual);
712     }
713
714     protected void waitForFrameToTakeEffect() throws InterruptedException JavaDoc {
715         // bit of a dirty hack :)
716
// another option would be to force some kind of receipt to be returned
717
// from the frame
718
Thread.sleep(2000);
719     }
720 }
721
Popular Tags