1 package org.apache.activemq; 2 3 import java.io.IOException ; 4 import java.util.concurrent.CountDownLatch ; 5 import java.util.concurrent.TimeUnit ; 6 import java.util.concurrent.atomic.AtomicBoolean ; 7 8 import javax.jms.ConnectionFactory ; 9 import javax.jms.DeliveryMode ; 10 import javax.jms.JMSException ; 11 import javax.jms.MessageConsumer ; 12 import javax.jms.MessageProducer ; 13 import javax.jms.Session ; 14 import javax.jms.TextMessage ; 15 16 import org.apache.activemq.broker.BrokerService; 17 import org.apache.activemq.broker.TransportConnector; 18 import org.apache.activemq.broker.region.policy.PolicyEntry; 19 import org.apache.activemq.broker.region.policy.PolicyMap; 20 import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; 21 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; 22 import org.apache.activemq.command.ActiveMQQueue; 23 import org.apache.activemq.transport.tcp.TcpTransport; 24 25 26 public class ProducerFlowControlTest extends JmsTestSupport { 27 28 ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A"); 29 ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B"); 30 private TransportConnector connector; 31 private ActiveMQConnection connection; 32 33 public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception { 34 ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory(); 35 factory.setProducerWindowSize(1024*64); 36 connection = (ActiveMQConnection) factory.createConnection(); 37 connections.add(connection); 38 connection.start(); 39 40 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 41 MessageConsumer consumer = session.createConsumer(queueB); 42 43 fillQueue(queueA); 46 47 CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); 49 assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) ); 50 51 TextMessage msg = (TextMessage ) consumer.receive(); 52 assertEquals("Message 1", msg.getText()); 53 msg.acknowledge(); 54 55 pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2"); 56 assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) ); 57 58 msg = (TextMessage ) consumer.receive(); 59 assertEquals("Message 2", msg.getText()); 60 msg.acknowledge(); 61 } 62 63 public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception { 64 ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory(); 65 factory.setAlwaysSyncSend(true); 66 connection = (ActiveMQConnection) factory.createConnection(); 67 connections.add(connection); 68 connection.start(); 69 70 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 71 MessageConsumer consumer = session.createConsumer(queueB); 72 73 fillQueue(queueA); 76 77 CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); 79 assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) ); 80 81 TextMessage msg = (TextMessage ) consumer.receive(); 82 assertEquals("Message 1", msg.getText()); 83 msg.acknowledge(); 84 85 pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2"); 86 assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) ); 87 88 msg = (TextMessage ) consumer.receive(); 89 assertEquals("Message 2", msg.getText()); 90 msg.acknowledge(); 91 } 92 93 public void testSimpleSendReceive() throws Exception { 94 ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory(); 95 factory.setAlwaysSyncSend(true); 96 connection = (ActiveMQConnection) factory.createConnection(); 97 connections.add(connection); 98 connection.start(); 99 100 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 101 MessageConsumer consumer = session.createConsumer(queueA); 102 103 CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1"); 105 assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) ); 106 107 TextMessage msg = (TextMessage ) consumer.receive(); 108 assertEquals("Message 1", msg.getText()); 109 msg.acknowledge(); 110 111 pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2"); 112 assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) ); 113 114 msg = (TextMessage ) consumer.receive(); 115 assertEquals("Message 2", msg.getText()); 116 msg.acknowledge(); 117 } 118 119 public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception { 120 ConnectionFactory factory = createConnectionFactory(); 121 connection = (ActiveMQConnection) factory.createConnection(); 122 connections.add(connection); 123 connection.start(); 124 125 fillQueue(queueA); 128 129 CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); 133 assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) ); 134 } 135 136 137 private void fillQueue(final ActiveMQQueue queue) throws JMSException , InterruptedException { 138 final AtomicBoolean done = new AtomicBoolean (true); 139 final AtomicBoolean keepGoing = new AtomicBoolean (true); 140 141 new Thread ("Fill thread.") { 144 public void run() { 145 Session session=null; 146 try { 147 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 148 MessageProducer producer = session.createProducer(queue); 149 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 150 while( keepGoing.get() ) { 151 done.set(false); 152 producer.send(session.createTextMessage("Hello World")); 153 } 154 } catch (JMSException e) { 155 } finally { 156 safeClose(session); 157 } 158 } 159 }.start(); 160 161 while( true ) { 162 Thread.sleep(1000); 163 if( done.get() ) 165 break; 166 done.set(true); 167 } 168 keepGoing.set(false); 169 } 170 171 private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException { 172 final CountDownLatch done = new CountDownLatch (1); 173 new Thread ("Send thread.") { 174 public void run() { 175 Session session=null; 176 try { 177 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 178 MessageProducer producer = session.createProducer(queue); 179 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 180 producer.send(session.createTextMessage(message)); 181 done.countDown(); 182 } catch (JMSException e) { 183 } finally { 184 safeClose(session); 185 } 186 } 187 }.start(); 188 return done; 189 } 190 191 protected BrokerService createBroker() throws Exception { 192 BrokerService service = new BrokerService(); 193 service.setPersistent(false); 194 service.setUseJmx(false); 195 196 PolicyMap policyMap = new PolicyMap(); 198 PolicyEntry policy = new PolicyEntry(); 199 policy.setMemoryLimit(1); 200 policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); 201 policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); 202 policyMap.setDefaultEntry(policy); 203 service.setDestinationPolicy(policyMap); 204 205 connector = service.addConnector("tcp://localhost:0"); 206 return service; 207 } 208 209 protected void tearDown() throws Exception { 210 TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class); 211 t.getTransportListener().onException(new IOException ("Disposed.")); 212 connection.getTransport().stop(); 213 super.tearDown(); 214 } 215 216 protected ConnectionFactory createConnectionFactory() throws Exception { 217 return new ActiveMQConnectionFactory(connector.getConnectUri()); 218 } 219 } 220 | Popular Tags |