KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > ProducerFlowControlTest


1 package org.apache.activemq;
2
3 import java.io.IOException JavaDoc;
4 import java.util.concurrent.CountDownLatch JavaDoc;
5 import java.util.concurrent.TimeUnit JavaDoc;
6 import java.util.concurrent.atomic.AtomicBoolean JavaDoc;
7
8 import javax.jms.ConnectionFactory JavaDoc;
9 import javax.jms.DeliveryMode JavaDoc;
10 import javax.jms.JMSException JavaDoc;
11 import javax.jms.MessageConsumer JavaDoc;
12 import javax.jms.MessageProducer JavaDoc;
13 import javax.jms.Session JavaDoc;
14 import javax.jms.TextMessage JavaDoc;
15
16 import org.apache.activemq.broker.BrokerService;
17 import org.apache.activemq.broker.TransportConnector;
18 import org.apache.activemq.broker.region.policy.PolicyEntry;
19 import org.apache.activemq.broker.region.policy.PolicyMap;
20 import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
21 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
22 import org.apache.activemq.command.ActiveMQQueue;
23 import org.apache.activemq.transport.tcp.TcpTransport;
24
25
26 public class ProducerFlowControlTest extends JmsTestSupport {
27     
28     ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
29     ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
30     private TransportConnector connector;
31     private ActiveMQConnection connection;
32
33     public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception JavaDoc {
34         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
35         factory.setProducerWindowSize(1024*64);
36         connection = (ActiveMQConnection) factory.createConnection();
37         connections.add(connection);
38         connection.start();
39
40         Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
41         MessageConsumer JavaDoc consumer = session.createConsumer(queueB);
42
43         // Test sending to Queue A
44
// 1 few sends should not block until the producer window is used up.
45
fillQueue(queueA);
46
47         // Test sending to Queue B it should not block since the connection should not be blocked.
48
CountDownLatch JavaDoc pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
49         assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
50         
51         TextMessage JavaDoc msg = (TextMessage JavaDoc) consumer.receive();
52         assertEquals("Message 1", msg.getText());
53         msg.acknowledge();
54         
55         pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
56         assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
57         
58         msg = (TextMessage JavaDoc) consumer.receive();
59         assertEquals("Message 2", msg.getText());
60         msg.acknowledge();
61     }
62
63     public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception JavaDoc {
64         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
65         factory.setAlwaysSyncSend(true);
66         connection = (ActiveMQConnection) factory.createConnection();
67         connections.add(connection);
68         connection.start();
69
70         Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
71         MessageConsumer JavaDoc consumer = session.createConsumer(queueB);
72
73         // Test sending to Queue A
74
// 1st send should not block. But the rest will.
75
fillQueue(queueA);
76
77         // Test sending to Queue B it should not block.
78
CountDownLatch JavaDoc pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
79         assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
80         
81         TextMessage JavaDoc msg = (TextMessage JavaDoc) consumer.receive();
82         assertEquals("Message 1", msg.getText());
83         msg.acknowledge();
84         
85         pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2");
86         assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
87         
88         msg = (TextMessage JavaDoc) consumer.receive();
89         assertEquals("Message 2", msg.getText());
90         msg.acknowledge();
91     }
92
93     public void testSimpleSendReceive() throws Exception JavaDoc {
94         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
95         factory.setAlwaysSyncSend(true);
96         connection = (ActiveMQConnection) factory.createConnection();
97         connections.add(connection);
98         connection.start();
99
100         Session JavaDoc session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
101         MessageConsumer JavaDoc consumer = session.createConsumer(queueA);
102
103         // Test sending to Queue B it should not block.
104
CountDownLatch JavaDoc pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
105         assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
106         
107         TextMessage JavaDoc msg = (TextMessage JavaDoc) consumer.receive();
108         assertEquals("Message 1", msg.getText());
109         msg.acknowledge();
110         
111         pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
112         assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
113         
114         msg = (TextMessage JavaDoc) consumer.receive();
115         assertEquals("Message 2", msg.getText());
116         msg.acknowledge();
117     }
118
119     public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception JavaDoc {
120         ConnectionFactory JavaDoc factory = createConnectionFactory();
121         connection = (ActiveMQConnection) factory.createConnection();
122         connections.add(connection);
123         connection.start();
124
125         // Test sending to Queue A
126
// 1st send should not block.
127
fillQueue(queueA);
128
129         // Test sending to Queue B it should block.
130
// Since even though the it's queue limits have not been reached, the connection
131
// is blocked.
132
CountDownLatch JavaDoc pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
133         assertFalse( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
134     }
135
136
137     private void fillQueue(final ActiveMQQueue queue) throws JMSException JavaDoc, InterruptedException JavaDoc {
138         final AtomicBoolean JavaDoc done = new AtomicBoolean JavaDoc(true);
139         final AtomicBoolean JavaDoc keepGoing = new AtomicBoolean JavaDoc(true);
140         
141         // Starts an async thread that every time it publishes it sets the done flag to false.
142
// Once the send starts to block it will not reset the done flag anymore.
143
new Thread JavaDoc("Fill thread.") {
144             public void run() {
145                 Session JavaDoc session=null;
146                 try {
147                     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
148                     MessageProducer JavaDoc producer = session.createProducer(queue);
149                     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
150                     while( keepGoing.get() ) {
151                         done.set(false);
152                         producer.send(session.createTextMessage("Hello World"));
153                     }
154                 } catch (JMSException JavaDoc e) {
155                 } finally {
156                     safeClose(session);
157                 }
158             }
159         }.start();
160         
161         while( true ) {
162             Thread.sleep(1000);
163             // the producer is blocked once the done flag stays true.
164
if( done.get() )
165                 break;
166             done.set(true);
167         }
168         keepGoing.set(false);
169     }
170
171     private CountDownLatch JavaDoc asyncSendTo(final ActiveMQQueue queue, final String JavaDoc message) throws JMSException JavaDoc {
172         final CountDownLatch JavaDoc done = new CountDownLatch JavaDoc(1);
173         new Thread JavaDoc("Send thread.") {
174             public void run() {
175                 Session JavaDoc session=null;
176                 try {
177                     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
178                     MessageProducer JavaDoc producer = session.createProducer(queue);
179                     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
180                     producer.send(session.createTextMessage(message));
181                     done.countDown();
182                 } catch (JMSException JavaDoc e) {
183                 } finally {
184                     safeClose(session);
185                 }
186             }
187         }.start();
188         return done;
189     }
190
191     protected BrokerService createBroker() throws Exception JavaDoc {
192         BrokerService service = new BrokerService();
193         service.setPersistent(false);
194         service.setUseJmx(false);
195         
196         // Setup a destination policy where it takes only 1 message at a time.
197
PolicyMap policyMap = new PolicyMap();
198         PolicyEntry policy = new PolicyEntry();
199         policy.setMemoryLimit(1);
200         policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
201         policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
202         policyMap.setDefaultEntry(policy);
203         service.setDestinationPolicy(policyMap);
204         
205         connector = service.addConnector("tcp://localhost:0");
206         return service;
207     }
208     
209     protected void tearDown() throws Exception JavaDoc {
210         TcpTransport t = (TcpTransport) connection.getTransport().narrow(TcpTransport.class);
211         t.getTransportListener().onException(new IOException JavaDoc("Disposed."));
212         connection.getTransport().stop();
213         super.tearDown();
214     }
215     
216     protected ConnectionFactory JavaDoc createConnectionFactory() throws Exception JavaDoc {
217         return new ActiveMQConnectionFactory(connector.getConnectUri());
218     }
219 }
220
Popular Tags