1 18 package org.apache.activemq.transport.vm; 19 20 import java.net.URI ; 21 import java.net.URISyntaxException ; 22 23 import javax.jms.DeliveryMode ; 24 25 import org.apache.activemq.broker.BrokerRegistry; 26 import org.apache.activemq.broker.BrokerTestSupport; 27 import org.apache.activemq.broker.StubConnection; 28 import org.apache.activemq.command.ActiveMQQueue; 29 import org.apache.activemq.command.ConnectionInfo; 30 import org.apache.activemq.command.ConsumerInfo; 31 import org.apache.activemq.command.Message; 32 import org.apache.activemq.command.ProducerInfo; 33 import org.apache.activemq.command.SessionInfo; 34 import org.apache.activemq.transport.Transport; 35 import org.apache.activemq.transport.TransportFactory; 36 import org.apache.activemq.util.IOExceptionSupport; 37 38 43 public class VMTransportEmbeddedBrokerTest extends BrokerTestSupport { 44 45 public static void main(String [] args) { 46 junit.textui.TestRunner.run(VMTransportEmbeddedBrokerTest.class); 47 } 48 49 public void testConsumerPrefetchAtOne() throws Exception { 50 51 assertNull(BrokerRegistry.getInstance().lookup("localhost")); 53 StubConnection connection = createConnection(); 54 assertNotNull(BrokerRegistry.getInstance().lookup("localhost")); 55 56 ConnectionInfo connectionInfo = createConnectionInfo(); 58 SessionInfo sessionInfo = createSessionInfo(connectionInfo); 59 ProducerInfo producerInfo = createProducerInfo(sessionInfo); 60 connection.send(connectionInfo); 61 connection.send(sessionInfo); 62 connection.send(producerInfo); 63 64 ActiveMQQueue destination = new ActiveMQQueue("TEST"); 65 66 ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); 67 consumerInfo.setPrefetchSize(1); 68 connection.send(consumerInfo); 69 70 connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT)); 72 connection.send(createMessage(producerInfo, destination, DeliveryMode.NON_PERSISTENT)); 73 74 Message m = receiveMessage(connection); 76 assertNotNull(m); 77 assertNoMessagesLeft(connection); 78 79 assertNotNull(BrokerRegistry.getInstance().lookup("localhost")); 81 connection.stop(); 82 assertNull(BrokerRegistry.getInstance().lookup("localhost")); 83 } 84 85 protected void setUp() throws Exception { 86 } 88 protected void tearDown() throws Exception { 89 } 91 protected StubConnection createConnection() throws Exception { 92 try { 93 Transport transport = TransportFactory.connect(new URI ("vm://localhost?broker.persistent=false")); 94 StubConnection connection = new StubConnection(transport); 95 return connection; 96 } catch (URISyntaxException e) { 97 throw IOExceptionSupport.create(e); 98 } 99 } 100 101 } 102 | Popular Tags |