1 18 package org.apache.activemq.broker; 19 20 import org.apache.activemq.JmsMultipleClientsTestSupport; 21 import org.apache.activemq.ActiveMQConnectionFactory; 22 import org.apache.activemq.command.ActiveMQDestination; 23 24 public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport { 25 protected int messageCount = 1000; protected int prefetchCount = 10; 27 28 protected void setUp() throws Exception { 29 super.setUp(); 30 durable = false; 31 topic = false; 32 } 33 34 public void testManyProducersOneConsumer() throws Exception { 35 consumerCount = 1; 36 producerCount = 10; 37 messageCount = 100; 38 messageSize = 1; prefetchCount = 10; 40 41 doMultipleClientsTest(); 42 43 assertTotalMessagesReceived(messageCount * producerCount); 44 } 45 46 public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { 47 consumerCount = 2; 48 producerCount = 1; 49 messageCount = 1000; 50 messageSize = 1024; configurePrefetchOfOne(); 52 53 doMultipleClientsTest(); 54 55 assertTotalMessagesReceived(messageCount * producerCount); 56 } 57 58 public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { 59 consumerCount = 2; 60 producerCount = 1; 61 messageCount = 1000; 62 prefetchCount = messageCount * 2; 63 messageSize = 1024; 65 doMultipleClientsTest(); 66 67 assertTotalMessagesReceived(messageCount * producerCount); 68 } 69 70 public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { 71 consumerCount = 2; 72 producerCount = 1; 73 messageCount = 10; 74 messageSize = 1024 * 1024 * 1; configurePrefetchOfOne(); 76 77 doMultipleClientsTest(); 78 79 assertTotalMessagesReceived(messageCount * producerCount); 80 } 81 82 public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { 83 consumerCount = 2; 84 producerCount = 1; 85 messageCount = 10; 86 prefetchCount = messageCount * 2; 87 messageSize = 1024 * 1024 * 1; 89 doMultipleClientsTest(); 90 91 assertTotalMessagesReceived(messageCount * producerCount); 92 } 93 94 public void testOneProducerManyConsumersFewMessages() throws Exception { 95 consumerCount = 50; 96 producerCount = 1; 97 messageCount = 10; 98 messageSize = 1; prefetchCount = 10; 100 101 doMultipleClientsTest(); 102 103 assertTotalMessagesReceived(messageCount * producerCount); 104 } 105 106 public void testOneProducerManyConsumersManyMessages() throws Exception { 107 consumerCount = 50; 108 producerCount = 1; 109 messageCount = 1000; 110 messageSize = 1; prefetchCount = 10; 112 113 doMultipleClientsTest(); 114 115 assertTotalMessagesReceived(messageCount * producerCount); 116 } 117 118 public void testManyProducersManyConsumers() throws Exception { 119 consumerCount = 50; 120 producerCount = 50; 121 messageCount = 100; 122 messageSize = 1; prefetchCount = 100; 124 125 doMultipleClientsTest(); 126 127 assertTotalMessagesReceived(messageCount * producerCount); 128 } 129 130 protected void configurePrefetchOfOne() { 131 prefetchCount = 1; 132 133 allMessagesList.setMaximumDuration(allMessagesList.getMaximumDuration() * 20); 135 } 136 137 public void doMultipleClientsTest() throws Exception { 138 final ActiveMQDestination dest = createDestination(); 140 141 ActiveMQConnectionFactory consumerFactory = (ActiveMQConnectionFactory)createConnectionFactory(); 143 consumerFactory.getPrefetchPolicy().setAll(prefetchCount); 144 145 startConsumers(consumerFactory, dest); 146 147 startProducers(dest, messageCount); 148 149 int totalMessageCount = messageCount * producerCount; 151 if( dest.isTopic() ) 152 totalMessageCount *= consumerCount; 153 waitForAllMessagesToBeReceived(totalMessageCount); 154 } 155 } 156 | Popular Tags |