1 18 package org.apache.activemq.broker.policy; 19 20 import org.apache.activemq.broker.QueueSubscriptionTest; 21 import org.apache.activemq.broker.BrokerService; 22 import org.apache.activemq.broker.region.policy.PolicyEntry; 23 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; 24 import org.apache.activemq.broker.region.policy.PolicyMap; 25 26 import javax.jms.Connection ; 27 import javax.jms.Destination ; 28 import javax.jms.MessageConsumer ; 29 import javax.jms.Session ; 30 31 public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest { 32 33 protected BrokerService createBroker() throws Exception { 34 BrokerService broker = super.createBroker(); 35 36 PolicyEntry policy = new PolicyEntry(); 37 policy.setDispatchPolicy(new RoundRobinDispatchPolicy()); 38 39 PolicyMap pMap = new PolicyMap(); 40 pMap.setDefaultEntry(policy); 41 42 broker.setDestinationPolicy(pMap); 43 44 return broker; 45 } 46 47 public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { 48 super.testOneProducerTwoConsumersSmallMessagesOnePrefetch(); 49 50 assertEachConsumerReceivedAtLeastXMessages(1); 53 } 54 55 public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { 56 super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); 57 assertMessagesDividedAmongConsumers(); 58 } 59 60 public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { 61 super.testOneProducerTwoConsumersLargeMessagesOnePrefetch(); 62 63 assertEachConsumerReceivedAtLeastXMessages(1); 66 } 67 68 public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { 69 super.testOneProducerTwoConsumersLargeMessagesLargePrefetch(); 70 assertMessagesDividedAmongConsumers(); 71 } 72 73 public void testOneProducerManyConsumersFewMessages() throws Exception { 74 super.testOneProducerManyConsumersFewMessages(); 75 76 assertMessagesDividedAmongConsumers(); 78 } 79 80 public void testOneProducerManyConsumersManyMessages() throws Exception { 81 super.testOneProducerManyConsumersManyMessages(); 82 assertMessagesDividedAmongConsumers(); 83 } 84 85 public void testManyProducersManyConsumers() throws Exception { 86 super.testManyProducersManyConsumers(); 87 assertMessagesDividedAmongConsumers(); 88 } 89 90 public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() throws Exception { 91 createMessageConsumer(createConnectionFactory().createConnection(), createDestination(), "JMSPriority<1"); 93 super.testOneProducerTwoConsumersSmallMessagesLargePrefetch(); 94 assertMessagesDividedAmongConsumers(); 95 } 96 97 protected MessageConsumer createMessageConsumer(Connection conn, Destination dest, String selector) throws Exception { 98 connections.add(conn); 99 100 Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 101 final MessageConsumer consumer = sess.createConsumer(dest, selector); 102 conn.start(); 103 104 return consumer; 105 } 106 107 public void assertMessagesDividedAmongConsumers() { 108 assertEachConsumerReceivedAtLeastXMessages((messageCount * producerCount) / consumerCount); 109 assertEachConsumerReceivedAtMostXMessages(((messageCount * producerCount) / consumerCount) + 1); 110 } 111 } 112 | Popular Tags |