KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > advisory > ConsumerListenerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.advisory;
19
20 import java.util.concurrent.ArrayBlockingQueue JavaDoc;
21 import java.util.concurrent.BlockingQueue JavaDoc;
22 import java.util.concurrent.TimeUnit JavaDoc;
23
24 import org.apache.activemq.EmbeddedBrokerTestSupport;
25 import org.apache.activemq.advisory.ConsumerEvent;
26 import org.apache.activemq.advisory.ConsumerEventSource;
27 import org.apache.activemq.advisory.ConsumerListener;
28
29 import javax.jms.Connection JavaDoc;
30 import javax.jms.JMSException JavaDoc;
31 import javax.jms.Message JavaDoc;
32 import javax.jms.MessageConsumer JavaDoc;
33 import javax.jms.MessageListener JavaDoc;
34 import javax.jms.Session JavaDoc;
35
36 /**
37  *
38  * @version $Revision: 475999 $
39  */

40 public class ConsumerListenerTest extends EmbeddedBrokerTestSupport implements ConsumerListener {
41
42     protected Session JavaDoc consumerSession1;
43     protected Session JavaDoc consumerSession2;
44     protected int consumerCounter;
45     protected ConsumerEventSource consumerEventSource;
46     protected BlockingQueue JavaDoc eventQueue = new ArrayBlockingQueue JavaDoc(1000);
47     private Connection JavaDoc connection;
48
49     public void testConsumerEvents() throws Exception JavaDoc {
50         consumerEventSource.start();
51
52         consumerSession1 = createConsumer();
53         assertConsumerEvent(1, true);
54
55         consumerSession2 = createConsumer();
56         assertConsumerEvent(2, true);
57
58         consumerSession1.close();
59         consumerSession1 = null;
60         assertConsumerEvent(1, false);
61
62         consumerSession2.close();
63         consumerSession2 = null;
64         assertConsumerEvent(0, false);
65     }
66
67     public void testListenWhileAlreadyConsumersActive() throws Exception JavaDoc {
68         consumerSession1 = createConsumer();
69         consumerSession2 = createConsumer();
70
71         consumerEventSource.start();
72         assertConsumerEvent(2, true);
73         assertConsumerEvent(2, true);
74
75         consumerSession1.close();
76         consumerSession1 = null;
77         assertConsumerEvent(1, false);
78
79         consumerSession2.close();
80         consumerSession2 = null;
81         assertConsumerEvent(0, false);
82     }
83
84     public void onConsumerEvent(ConsumerEvent event) {
85         eventQueue.add(event);
86     }
87
88     protected void setUp() throws Exception JavaDoc {
89         super.setUp();
90
91         connection = createConnection();
92         connection.start();
93         consumerEventSource = new ConsumerEventSource(connection, destination);
94         consumerEventSource.setConsumerListener(this);
95     }
96
97     protected void tearDown() throws Exception JavaDoc {
98         if (consumerEventSource != null) {
99             consumerEventSource.stop();
100         }
101         if (consumerSession2 != null) {
102             consumerSession2.close();
103         }
104         if (consumerSession1 != null) {
105             consumerSession1.close();
106         }
107         if (connection != null) {
108             connection.close();
109         }
110         super.tearDown();
111     }
112
113     protected void assertConsumerEvent(int count, boolean started) throws InterruptedException JavaDoc {
114         ConsumerEvent event = waitForConsumerEvent();
115         assertEquals("Consumer count", count, event.getConsumerCount());
116         assertEquals("started", started, event.isStarted());
117     }
118
119     protected Session JavaDoc createConsumer() throws JMSException JavaDoc {
120         final String JavaDoc consumerText = "Consumer: " + (++consumerCounter);
121         log.info("Creating consumer: " + consumerText + " on destination: " + destination);
122         
123         Session JavaDoc answer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
124         MessageConsumer JavaDoc consumer = answer.createConsumer(destination);
125         consumer.setMessageListener(new MessageListener JavaDoc() {
126             public void onMessage(Message JavaDoc message) {
127                 log.info("Received message by: " + consumerText + " message: " + message);
128             }
129         });
130         return answer;
131     }
132
133     protected ConsumerEvent waitForConsumerEvent() throws InterruptedException JavaDoc {
134         ConsumerEvent answer = (ConsumerEvent) eventQueue.poll(100000, TimeUnit.MILLISECONDS);
135         assertTrue("Should have received a consumer event!", answer != null);
136         return answer;
137     }
138
139 }
140
Popular Tags