1 18 package org.apache.activemq.transport.stomp; 19 20 import org.apache.activemq.ActiveMQConnectionFactory; 21 import org.apache.activemq.CombinationTestSupport; 22 import org.apache.activemq.broker.*; 23 import org.apache.activemq.command.ActiveMQQueue; 24 import org.apache.activemq.command.ActiveMQTextMessage; 25 import org.apache.activemq.transport.stomp.Stomp; 26 27 import javax.jms.*; 28 import javax.jms.Connection ; 29 import java.io.ByteArrayOutputStream ; 30 import java.io.IOException ; 31 import java.io.InputStream ; 32 import java.io.OutputStream ; 33 import java.net.Socket ; 34 import java.net.SocketTimeoutException ; 35 import java.net.URI ; 36 import java.util.regex.Pattern ; 37 import java.util.regex.Matcher ; 38 39 public class StompTest extends CombinationTestSupport { 40 41 private BrokerService broker; 42 private TransportConnector connector; 43 private Socket stompSocket; 44 private ByteArrayOutputStream inputBuffer; 45 private Connection connection; 46 private Session session; 47 private ActiveMQQueue queue; 48 protected String bindAddress = "stomp://localhost:0"; 49 50 protected void setUp() throws Exception { 51 broker = new BrokerService(); 52 broker.setPersistent(false); 53 54 connector = broker.addConnector(bindAddress); 55 broker.start(); 56 57 URI connectUri = connector.getConnectUri(); 58 stompSocket = createSocket(connectUri); 59 inputBuffer = new ByteArrayOutputStream (); 60 61 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); 62 connection = cf.createConnection(); 63 session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 64 queue = new ActiveMQQueue(getQueueName()); 65 connection.start(); 66 } 67 68 protected Socket createSocket(URI connectUri) throws IOException { 69 return new Socket ("127.0.0.1", connectUri.getPort()); 70 } 71 72 protected String getQueueName() { 73 return getClass().getName() + "." + getName(); 74 } 75 76 protected void tearDown() throws Exception { 77 connection.close(); 78 if (stompSocket != null) { 79 stompSocket.close(); 80 } 81 broker.stop(); 82 } 83 84 public void sendFrame(String data) throws Exception { 85 byte[] bytes = data.getBytes("UTF-8"); 86 OutputStream outputStream = stompSocket.getOutputStream(); 87 for (int i = 0; i < bytes.length; i++) { 88 outputStream.write(bytes[i]); 89 } 90 outputStream.flush(); 91 } 92 93 public String receiveFrame(long timeOut) throws Exception { 94 stompSocket.setSoTimeout((int) timeOut); 95 InputStream is = stompSocket.getInputStream(); 96 int c=0; 97 for(;;) { 98 c = is.read(); 99 if( c < 0 ) { 100 throw new IOException ("socket closed."); 101 } else if( c == 0 ) { 102 c = is.read(); 103 assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n'); 104 byte[] ba = inputBuffer.toByteArray(); 105 inputBuffer.reset(); 106 return new String (ba, "UTF-8"); 107 } else { 108 inputBuffer.write(c); 109 } 110 } 111 } 112 113 114 115 public void sendMessage(String msg) throws Exception { 116 sendMessage(msg, "foo", "xyz"); 117 } 118 119 public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException { 120 MessageProducer producer = session.createProducer(queue); 121 TextMessage message = session.createTextMessage(msg); 122 message.setStringProperty(propertyName, propertyValue); 123 producer.send(message); 124 } 125 126 public void sendBytesMessage(byte[] msg) throws Exception { 127 MessageProducer producer = session.createProducer(queue); 128 BytesMessage message = session.createBytesMessage(); 129 message.writeBytes(msg); 130 producer.send(message); 131 132 } 133 134 public void testConnect() throws Exception { 135 136 String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL; 137 sendFrame(connect_frame); 138 139 String f = receiveFrame(10000); 140 assertTrue(f.startsWith("CONNECTED")); 141 assertTrue(f.indexOf("response-id:1") >= 0); 142 143 } 144 145 public void testSendMessage() throws Exception { 146 147 MessageConsumer consumer = session.createConsumer(queue); 148 149 String frame = 150 "CONNECT\n" + 151 "login: brianm\n" + 152 "passcode: wombats\n\n"+ 153 Stomp.NULL; 154 sendFrame(frame); 155 156 frame = receiveFrame(10000); 157 assertTrue(frame.startsWith("CONNECTED")); 158 159 frame = 160 "SEND\n" + 161 "destination:/queue/" + getQueueName() + "\n\n" + 162 "Hello World" + 163 Stomp.NULL; 164 165 sendFrame(frame); 166 167 TextMessage message = (TextMessage) consumer.receive(1000); 168 assertNotNull(message); 169 assertEquals("Hello World", message.getText()); 170 171 long tnow = System.currentTimeMillis(); 174 long tmsg = message.getJMSTimestamp(); 175 assertTrue( Math.abs(tnow - tmsg) < 1000 ); 176 } 177 178 public void testJMSXGroupIdCanBeSet() throws Exception { 179 180 MessageConsumer consumer = session.createConsumer(queue); 181 182 String frame = 183 "CONNECT\n" + 184 "login: brianm\n" + 185 "passcode: wombats\n\n"+ 186 Stomp.NULL; 187 sendFrame(frame); 188 189 frame = receiveFrame(10000); 190 assertTrue(frame.startsWith("CONNECTED")); 191 192 frame = 193 "SEND\n" + 194 "destination:/queue/" + getQueueName() + "\n" + 195 "JMSXGroupID: TEST\n\n" + 196 "Hello World" + 197 Stomp.NULL; 198 199 sendFrame(frame); 200 201 TextMessage message = (TextMessage) consumer.receive(1000); 202 assertNotNull(message); 203 assertEquals("TEST", ((ActiveMQTextMessage)message).getGroupID()); 204 } 205 206 207 public void testSendMessageWithCustomHeadersAndSelector() throws Exception { 208 209 MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'"); 210 211 String frame = 212 "CONNECT\n" + 213 "login: brianm\n" + 214 "passcode: wombats\n\n"+ 215 Stomp.NULL; 216 sendFrame(frame); 217 218 frame = receiveFrame(10000); 219 assertTrue(frame.startsWith("CONNECTED")); 220 221 frame = 222 "SEND\n" + 223 "foo:abc\n" + 224 "bar:123\n" + 225 "destination:/queue/" + getQueueName() + "\n\n" + 226 "Hello World" + 227 Stomp.NULL; 228 229 sendFrame(frame); 230 231 TextMessage message = (TextMessage) consumer.receive(1000); 232 assertNotNull(message); 233 assertEquals("Hello World", message.getText()); 234 assertEquals("foo", "abc", message.getStringProperty("foo")); 235 assertEquals("bar", "123", message.getStringProperty("bar")); 236 } 237 238 public void testSendMessageWithStandardHeaders() throws Exception { 239 240 MessageConsumer consumer = session.createConsumer(queue); 241 242 String frame = 243 "CONNECT\n" + 244 "login: brianm\n" + 245 "passcode: wombats\n\n"+ 246 Stomp.NULL; 247 sendFrame(frame); 248 249 frame = receiveFrame(10000); 250 assertTrue(frame.startsWith("CONNECTED")); 251 252 frame = 253 "SEND\n" + 254 "correlation-id:c123\n" + 255 "priority:3\n" + 256 "type:t345\n" + 257 "JMSXGroupID:abc\n" + 258 "foo:abc\n" + 259 "bar:123\n" + 260 "destination:/queue/" + getQueueName() + "\n\n" + 261 "Hello World" + 262 Stomp.NULL; 263 264 sendFrame(frame); 265 266 TextMessage message = (TextMessage) consumer.receive(1000); 267 assertNotNull(message); 268 assertEquals("Hello World", message.getText()); 269 assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID()); 270 assertEquals("getJMSType", "t345", message.getJMSType()); 271 assertEquals("getJMSPriority", 3, message.getJMSPriority()); 272 assertEquals("foo", "abc", message.getStringProperty("foo")); 273 assertEquals("bar", "123", message.getStringProperty("bar")); 274 275 assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID")); 276 ActiveMQTextMessage amqMessage = (ActiveMQTextMessage) message; 277 assertEquals("GroupID", "abc", amqMessage.getGroupID()); 278 } 279 280 public void testSubscribeWithAutoAck() throws Exception { 281 282 String frame = 283 "CONNECT\n" + 284 "login: brianm\n" + 285 "passcode: wombats\n\n"+ 286 Stomp.NULL; 287 sendFrame(frame); 288 289 frame = receiveFrame(100000); 290 assertTrue(frame.startsWith("CONNECTED")); 291 292 frame = 293 "SUBSCRIBE\n" + 294 "destination:/queue/" + getQueueName() + "\n" + 295 "ack:auto\n\n" + 296 Stomp.NULL; 297 sendFrame(frame); 298 299 sendMessage(getName()); 300 301 frame = receiveFrame(10000); 302 assertTrue(frame.startsWith("MESSAGE")); 303 304 frame = 305 "DISCONNECT\n" + 306 "\n\n"+ 307 Stomp.NULL; 308 sendFrame(frame); 309 } 310 311 public void testSubscribeWithAutoAckAndBytesMessage() throws Exception { 312 313 String frame = 314 "CONNECT\n" + 315 "login: brianm\n" + 316 "passcode: wombats\n\n"+ 317 Stomp.NULL; 318 sendFrame(frame); 319 320 frame = receiveFrame(100000); 321 assertTrue(frame.startsWith("CONNECTED")); 322 323 frame = 324 "SUBSCRIBE\n" + 325 "destination:/queue/" + getQueueName() + "\n" + 326 "ack:auto\n\n" + 327 Stomp.NULL; 328 sendFrame(frame); 329 330 sendBytesMessage(new byte[] {1,2,3,4,5}); 331 332 frame = receiveFrame(10000); 333 assertTrue(frame.startsWith("MESSAGE")); 334 335 Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE); 336 Matcher cl_matcher = cl.matcher(frame); 337 assertTrue(cl_matcher.find()); 338 assertEquals("5", cl_matcher.group(1)); 339 340 assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find()); 341 342 frame = 343 "DISCONNECT\n" + 344 "\n\n"+ 345 Stomp.NULL; 346 sendFrame(frame); 347 } 348 349 public void testSubscribeWithMessageSentWithProperties() throws Exception { 350 351 String frame = 352 "CONNECT\n" + 353 "login: brianm\n" + 354 "passcode: wombats\n\n"+ 355 Stomp.NULL; 356 sendFrame(frame); 357 358 frame = receiveFrame(100000); 359 assertTrue(frame.startsWith("CONNECTED")); 360 361 frame = 362 "SUBSCRIBE\n" + 363 "destination:/queue/" + getQueueName() + "\n" + 364 "ack:auto\n\n" + 365 Stomp.NULL; 366 sendFrame(frame); 367 368 369 MessageProducer producer = session.createProducer(queue); 370 TextMessage message = session.createTextMessage("Hello World"); 371 message.setStringProperty("s", "value"); 372 message.setBooleanProperty("n", false); 373 message.setByteProperty("byte", (byte) 9); 374 message.setDoubleProperty("d", 2.0); 375 message.setFloatProperty("f", (float) 6.0); 376 message.setIntProperty("i", 10); 377 message.setLongProperty("l", 121); 378 message.setShortProperty("s", (short) 12); 379 producer.send(message); 380 381 frame = receiveFrame(10000); 382 assertTrue(frame.startsWith("MESSAGE")); 383 384 386 frame = 387 "DISCONNECT\n" + 388 "\n\n"+ 389 Stomp.NULL; 390 sendFrame(frame); 391 } 392 393 public void testMessagesAreInOrder() throws Exception { 394 int ctr = 10; 395 String [] data = new String [ctr]; 396 397 String frame = 398 "CONNECT\n" + 399 "login: brianm\n" + 400 "passcode: wombats\n\n" + 401 Stomp.NULL; 402 sendFrame(frame); 403 404 frame = receiveFrame(100000); 405 assertTrue(frame.startsWith("CONNECTED")); 406 407 frame = 408 "SUBSCRIBE\n" + 409 "destination:/queue/" + getQueueName() + "\n" + 410 "ack:auto\n\n" + 411 Stomp.NULL; 412 sendFrame(frame); 413 414 for (int i = 0; i < ctr; ++i) { 415 data[i] = getName() + i; 416 sendMessage(data[i]); 417 } 418 419 for (int i = 0; i < ctr; ++i) { 420 frame = receiveFrame(1000); 421 assertTrue("Message not in order", frame.indexOf(data[i]) >=0 ); 422 } 423 424 waitForFrameToTakeEffect(); 426 427 for (int i = 0; i < ctr; ++i) { 428 data[i] = getName() + ":second:" + i; 429 sendMessage(data[i]); 430 } 431 432 for (int i = 0; i < ctr; ++i) { 433 frame = receiveFrame(1000); 434 assertTrue("Message not in order", frame.indexOf(data[i]) >=0 ); 435 } 436 437 frame = 438 "DISCONNECT\n" + 439 "\n\n" + 440 Stomp.NULL; 441 sendFrame(frame); 442 } 443 444 445 public void testSubscribeWithAutoAckAndSelector() throws Exception { 446 447 String frame = 448 "CONNECT\n" + 449 "login: brianm\n" + 450 "passcode: wombats\n\n"+ 451 Stomp.NULL; 452 sendFrame(frame); 453 454 frame = receiveFrame(100000); 455 assertTrue(frame.startsWith("CONNECTED")); 456 457 frame = 458 "SUBSCRIBE\n" + 459 "destination:/queue/" + getQueueName() + "\n" + 460 "selector: foo = 'zzz'\n" + 461 "ack:auto\n\n" + 462 Stomp.NULL; 463 sendFrame(frame); 464 465 sendMessage("Ignored message", "foo", "1234"); 466 sendMessage("Real message", "foo", "zzz"); 467 468 frame = receiveFrame(10000); 469 assertTrue(frame.startsWith("MESSAGE")); 470 assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0); 471 472 frame = 473 "DISCONNECT\n" + 474 "\n\n"+ 475 Stomp.NULL; 476 sendFrame(frame); 477 } 478 479 480 public void testSubscribeWithClientAck() throws Exception { 481 482 String frame = 483 "CONNECT\n" + 484 "login: brianm\n" + 485 "passcode: wombats\n\n"+ 486 Stomp.NULL; 487 sendFrame(frame); 488 489 frame = receiveFrame(10000); 490 assertTrue(frame.startsWith("CONNECTED")); 491 492 493 frame = 494 "SUBSCRIBE\n" + 495 "destination:/queue/" + getQueueName() + "\n" + 496 "ack:client\n\n"+ 497 Stomp.NULL; 498 499 500 sendFrame(frame); 501 sendMessage(getName()); 502 frame = receiveFrame(10000); 503 assertTrue(frame.startsWith("MESSAGE")); 504 505 frame = 506 "DISCONNECT\n" + 507 "\n\n"+ 508 Stomp.NULL; 509 sendFrame(frame); 510 511 MessageConsumer consumer = session.createConsumer(queue); 513 TextMessage message = (TextMessage) consumer.receive(1000); 514 assertNotNull(message); 515 assertTrue(message.getJMSRedelivered()); 516 517 518 519 } 520 521 public void testUnsubscribe() throws Exception { 522 523 String frame = 524 "CONNECT\n" + 525 "login: brianm\n" + 526 "passcode: wombats\n\n"+ 527 Stomp.NULL; 528 sendFrame(frame); 529 frame = receiveFrame(100000); 530 assertTrue(frame.startsWith("CONNECTED")); 531 532 frame = 533 "SUBSCRIBE\n" + 534 "destination:/queue/" + getQueueName() + "\n" + 535 "ack:auto\n\n" + 536 Stomp.NULL; 537 sendFrame(frame); 538 539 sendMessage("first message"); 541 542 543 frame = receiveFrame(1000); 545 assertTrue(frame.startsWith("MESSAGE")); 546 547 frame = 549 "UNSUBSCRIBE\n" + 550 "destination:/queue/" + getQueueName() + "\n" + 551 "\n\n" + 552 Stomp.NULL; 553 sendFrame(frame); 554 555 waitForFrameToTakeEffect(); 556 557 sendMessage("second message"); 559 560 561 try { 562 frame = receiveFrame(1000); 563 log.info("Received frame: " + frame); 564 fail("No message should have been received since subscription was removed"); 565 }catch (SocketTimeoutException e){ 566 567 } 568 569 } 570 571 572 public void testTransactionCommit() throws Exception { 573 MessageConsumer consumer = session.createConsumer(queue); 574 575 String frame = 576 "CONNECT\n" + 577 "login: brianm\n" + 578 "passcode: wombats\n\n"+ 579 Stomp.NULL; 580 sendFrame(frame); 581 582 String f = receiveFrame(1000); 583 assertTrue(f.startsWith("CONNECTED")); 584 585 frame = 586 "BEGIN\n" + 587 "transaction: tx1\n" + 588 "\n\n" + 589 Stomp.NULL; 590 sendFrame(frame); 591 592 frame = 593 "SEND\n" + 594 "destination:/queue/" + getQueueName() + "\n" + 595 "transaction: tx1\n" + 596 "\n\n" + 597 "Hello World" + 598 Stomp.NULL; 599 sendFrame(frame); 600 601 frame = 602 "COMMIT\n" + 603 "transaction: tx1\n" + 604 "\n\n" + 605 Stomp.NULL; 606 sendFrame(frame); 607 608 waitForFrameToTakeEffect(); 609 610 TextMessage message = (TextMessage) consumer.receive(1000); 611 assertNotNull("Should have received a message", message); 612 } 613 614 public void testTransactionRollback() throws Exception { 615 MessageConsumer consumer = session.createConsumer(queue); 616 617 String frame = 618 "CONNECT\n" + 619 "login: brianm\n" + 620 "passcode: wombats\n\n"+ 621 Stomp.NULL; 622 sendFrame(frame); 623 624 String f = receiveFrame(1000); 625 assertTrue(f.startsWith("CONNECTED")); 626 627 frame = 628 "BEGIN\n" + 629 "transaction: tx1\n" + 630 "\n\n" + 631 Stomp.NULL; 632 sendFrame(frame); 633 634 frame = 635 "SEND\n" + 636 "destination:/queue/" + getQueueName() + "\n" + 637 "transaction: tx1\n" + 638 "\n" + 639 "first message" + 640 Stomp.NULL; 641 sendFrame(frame); 642 643 frame = 645 "ABORT\n" + 646 "transaction: tx1\n" + 647 "\n\n" + 648 Stomp.NULL; 649 sendFrame(frame); 650 651 frame = 652 "BEGIN\n" + 653 "transaction: tx1\n" + 654 "\n\n" + 655 Stomp.NULL; 656 sendFrame(frame); 657 658 frame = 659 "SEND\n" + 660 "destination:/queue/" + getQueueName() + "\n" + 661 "transaction: tx1\n" + 662 "\n" + 663 "second message" + 664 Stomp.NULL; 665 sendFrame(frame); 666 667 frame = 668 "COMMIT\n" + 669 "transaction: tx1\n" + 670 "\n\n" + 671 Stomp.NULL; 672 sendFrame(frame); 673 674 waitForFrameToTakeEffect(); 676 677 TextMessage message = (TextMessage) consumer.receive(1000); 679 assertNotNull(message); 680 assertEquals("second message", message.getText().trim()); 681 } 682 683 public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception { 684 assertClients(1); 685 String frame = 686 "CONNECT\n" + 687 "login: brianm\n" + 688 "passcode: wombats\n\n"+ 689 Stomp.NULL; 690 691 sendFrame(frame); 692 693 waitForFrameToTakeEffect(); 695 696 assertClients(2); 697 698 stompSocket.close(); 700 stompSocket = null; 701 702 Thread.sleep(2000); 703 704 assertClients(1); 705 } 706 707 protected void assertClients(int expected) throws Exception { 708 org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); 709 int actual = clients.length; 710 711 assertEquals("Number of clients", expected, actual); 712 } 713 714 protected void waitForFrameToTakeEffect() throws InterruptedException { 715 Thread.sleep(2000); 719 } 720 } 721 | Popular Tags |