KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > advisory > ProducerListenerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.advisory;
19
20 import java.util.concurrent.ArrayBlockingQueue JavaDoc;
21 import java.util.concurrent.BlockingQueue JavaDoc;
22 import java.util.concurrent.TimeUnit JavaDoc;
23
24 import org.apache.activemq.EmbeddedBrokerTestSupport;
25 import org.apache.activemq.advisory.ConsumerEvent;
26 import org.apache.activemq.advisory.ConsumerEventSource;
27 import org.apache.activemq.advisory.ConsumerListener;
28
29 import javax.jms.Connection JavaDoc;
30 import javax.jms.JMSException JavaDoc;
31 import javax.jms.Message JavaDoc;
32 import javax.jms.MessageConsumer JavaDoc;
33 import javax.jms.MessageListener JavaDoc;
34 import javax.jms.MessageProducer JavaDoc;
35 import javax.jms.Session JavaDoc;
36
37 /**
38  *
39  * @version $Revision: 359679 $
40  */

41 public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements ProducerListener {
42
43     protected Session JavaDoc consumerSession1;
44     protected Session JavaDoc consumerSession2;
45     protected int consumerCounter;
46     protected ProducerEventSource producerEventSource;
47     protected BlockingQueue JavaDoc eventQueue = new ArrayBlockingQueue JavaDoc(1000);
48     private Connection JavaDoc connection;
49
50     public void testProducerEvents() throws Exception JavaDoc {
51         producerEventSource.start();
52
53         consumerSession1 = createProducer();
54         assertConsumerEvent(1, true);
55
56         consumerSession2 = createProducer();
57         assertConsumerEvent(2, true);
58
59         consumerSession1.close();
60         consumerSession1 = null;
61         assertConsumerEvent(1, false);
62
63         consumerSession2.close();
64         consumerSession2 = null;
65         assertConsumerEvent(0, false);
66     }
67
68     public void testListenWhileAlreadyConsumersActive() throws Exception JavaDoc {
69         consumerSession1 = createProducer();
70         consumerSession2 = createProducer();
71
72         producerEventSource.start();
73         assertConsumerEvent(2, true);
74         assertConsumerEvent(2, true);
75
76         consumerSession1.close();
77         consumerSession1 = null;
78         assertConsumerEvent(1, false);
79
80         consumerSession2.close();
81         consumerSession2 = null;
82         assertConsumerEvent(0, false);
83     }
84
85     public void onProducerEvent(ProducerEvent event) {
86         eventQueue.add(event);
87     }
88
89     protected void setUp() throws Exception JavaDoc {
90         super.setUp();
91
92         connection = createConnection();
93         connection.start();
94         producerEventSource = new ProducerEventSource(connection, destination);
95         producerEventSource.setProducerListener(this);
96     }
97
98     protected void tearDown() throws Exception JavaDoc {
99         if (producerEventSource != null) {
100             producerEventSource.stop();
101         }
102         if (consumerSession2 != null) {
103             consumerSession2.close();
104         }
105         if (consumerSession1 != null) {
106             consumerSession1.close();
107         }
108         if (connection != null) {
109             connection.close();
110         }
111         super.tearDown();
112     }
113
114     protected void assertConsumerEvent(int count, boolean started) throws InterruptedException JavaDoc {
115         ProducerEvent event = waitForProducerEvent();
116         assertEquals("Producer count", count, event.getProducerCount());
117         assertEquals("started", started, event.isStarted());
118     }
119
120     protected Session JavaDoc createProducer() throws JMSException JavaDoc {
121         final String JavaDoc consumerText = "Consumer: " + (++consumerCounter);
122         log.info("Creating consumer: " + consumerText + " on destination: " + destination);
123         
124         Session JavaDoc answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
125         MessageProducer JavaDoc producer = answer.createProducer(destination);
126         return answer;
127     }
128
129     protected ProducerEvent waitForProducerEvent() throws InterruptedException JavaDoc {
130         ProducerEvent answer = (ProducerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS);
131         assertTrue("Should have received a consumer event!", answer != null);
132         return answer;
133     }
134
135 }
136
Popular Tags