KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > advisory > AdvisoryBrokerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker.advisory;
19
20 import junit.framework.Test;
21
22 import org.apache.activemq.advisory.AdvisorySupport;
23 import org.apache.activemq.broker.BrokerTestSupport;
24 import org.apache.activemq.broker.StubConnection;
25 import org.apache.activemq.command.ActiveMQDestination;
26 import org.apache.activemq.command.ActiveMQQueue;
27 import org.apache.activemq.command.ConnectionInfo;
28 import org.apache.activemq.command.ConsumerInfo;
29 import org.apache.activemq.command.Message;
30 import org.apache.activemq.command.ProducerInfo;
31 import org.apache.activemq.command.RemoveInfo;
32 import org.apache.activemq.command.SessionInfo;
33
34 public class AdvisoryBrokerTest extends BrokerTestSupport {
35      
36     public void testConnectionAdvisories() throws Exception JavaDoc {
37         
38         ActiveMQDestination destination = AdvisorySupport.getConnectionAdvisoryTopic();
39         
40         // Setup a first connection
41
StubConnection connection1 = createConnection();
42         ConnectionInfo connectionInfo1 = createConnectionInfo();
43         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
44         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
45         consumerInfo1.setPrefetchSize(100);
46         
47         connection1.send(connectionInfo1);
48         connection1.send(sessionInfo1);
49         connection1.send(consumerInfo1);
50
51         // We should get an advisory of our own connection.
52
Message m1 = receiveMessage(connection1);
53         assertNotNull(m1);
54         assertNotNull(m1.getDataStructure());
55         assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), connectionInfo1.getConnectionId());
56
57         // Setup a second connection
58
StubConnection connection2 = createConnection();
59         ConnectionInfo connectionInfo2 = createConnectionInfo();
60         connection2.send(connectionInfo2);
61         
62         // We should get an advisory of the second connection.
63
m1 = receiveMessage(connection1);
64         assertNotNull(m1);
65         assertNotNull(m1.getDataStructure());
66         assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), connectionInfo2.getConnectionId());
67
68         // Close the second connection.
69
connection2.send(closeConnectionInfo(connectionInfo2));
70         connection2.stop();
71
72         // We should get an advisory of the second connection closing
73
m1 = receiveMessage(connection1);
74         assertNotNull(m1);
75         assertNotNull(m1.getDataStructure());
76         RemoveInfo r = (RemoveInfo) m1.getDataStructure();
77         assertEquals(r.getObjectId(), connectionInfo2.getConnectionId());
78         
79         assertNoMessagesLeft(connection1);
80     }
81
82     public void testConsumerAdvisories() throws Exception JavaDoc {
83
84         ActiveMQDestination queue = new ActiveMQQueue("test");
85         ActiveMQDestination destination = AdvisorySupport.getConsumerAdvisoryTopic(queue);
86         
87         // Setup a first connection
88
StubConnection connection1 = createConnection();
89         ConnectionInfo connectionInfo1 = createConnectionInfo();
90         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
91         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
92         consumerInfo1.setPrefetchSize(100);
93         
94         connection1.send(connectionInfo1);
95         connection1.send(sessionInfo1);
96         connection1.send(consumerInfo1);
97
98         // We should not see and advisory for the advisory consumer.
99
assertNoMessagesLeft(connection1);
100
101         // Setup a second consumer.
102
StubConnection connection2 = createConnection();
103         ConnectionInfo connectionInfo2 = createConnectionInfo();
104         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
105         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue);
106         consumerInfo1.setPrefetchSize(100);
107         
108         connection2.send(connectionInfo2);
109         connection2.send(sessionInfo2);
110         connection2.send(consumerInfo2);
111         
112         // We should get an advisory of the new consumer.
113
Message m1 = receiveMessage(connection1);
114         assertNotNull(m1);
115         assertNotNull(m1.getDataStructure());
116         assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), consumerInfo2.getConsumerId());
117
118         // Close the second connection.
119
connection2.request(closeConnectionInfo(connectionInfo2));
120         connection2.stop();
121
122         // We should get an advisory of the consumer closing
123
m1 = receiveMessage(connection1);
124         assertNotNull(m1);
125         assertNotNull(m1.getDataStructure());
126         RemoveInfo r = (RemoveInfo) m1.getDataStructure();
127         assertEquals(r.getObjectId(), consumerInfo2.getConsumerId());
128         
129         assertNoMessagesLeft(connection2);
130     }
131
132     public void testConsumerAdvisoriesReplayed() throws Exception JavaDoc {
133
134         ActiveMQDestination queue = new ActiveMQQueue("test");
135         ActiveMQDestination destination = AdvisorySupport.getConsumerAdvisoryTopic(queue);
136         
137         // Setup a first connection
138
StubConnection connection1 = createConnection();
139         ConnectionInfo connectionInfo1 = createConnectionInfo();
140         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
141         
142         connection1.send(connectionInfo1);
143         connection1.send(sessionInfo1);
144
145         // Setup a second consumer.
146
StubConnection connection2 = createConnection();
147         ConnectionInfo connectionInfo2 = createConnectionInfo();
148         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
149         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue);
150         consumerInfo2.setPrefetchSize(100);
151         connection2.send(connectionInfo2);
152         connection2.send(sessionInfo2);
153         connection2.send(consumerInfo2);
154         
155         // We should get an advisory of the previous consumer.
156
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
157         consumerInfo1.setPrefetchSize(100);
158         connection1.send(consumerInfo1);
159
160         Message m1 = receiveMessage(connection1);
161         assertNotNull(m1);
162         assertNotNull(m1.getDataStructure());
163         assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), consumerInfo2.getConsumerId());
164
165         // Close the second connection.
166
connection2.request(closeConnectionInfo(connectionInfo2));
167         connection2.stop();
168
169         // We should get an advisory of the consumer closing
170
m1 = receiveMessage(connection1);
171         assertNotNull(m1);
172         assertNotNull(m1.getDataStructure());
173         RemoveInfo r = (RemoveInfo) m1.getDataStructure();
174         assertEquals(r.getObjectId(), consumerInfo2.getConsumerId());
175         
176         assertNoMessagesLeft(connection2);
177     }
178
179
180     public void testProducerAdvisories() throws Exception JavaDoc {
181
182         ActiveMQDestination queue = new ActiveMQQueue("test");
183         ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);
184         
185         // Setup a first connection
186
StubConnection connection1 = createConnection();
187         ConnectionInfo connectionInfo1 = createConnectionInfo();
188         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
189         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
190         consumerInfo1.setPrefetchSize(100);
191         
192         connection1.send(connectionInfo1);
193         connection1.send(sessionInfo1);
194         connection1.send(consumerInfo1);
195
196         assertNoMessagesLeft(connection1);
197
198         // Setup a producer.
199
StubConnection connection2 = createConnection();
200         ConnectionInfo connectionInfo2 = createConnectionInfo();
201         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
202         ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
203         producerInfo2.setDestination(queue);
204         
205         connection2.send(connectionInfo2);
206         connection2.send(sessionInfo2);
207         connection2.send(producerInfo2);
208         
209         // We should get an advisory of the new produver.
210
Message m1 = receiveMessage(connection1);
211         assertNotNull(m1);
212         assertNotNull(m1.getDataStructure());
213         assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
214
215         // Close the second connection.
216
connection2.request(closeConnectionInfo(connectionInfo2));
217         connection2.stop();
218
219         // We should get an advisory of the producer closing
220
m1 = receiveMessage(connection1);
221         assertNotNull(m1);
222         assertNotNull(m1.getDataStructure());
223         RemoveInfo r = (RemoveInfo) m1.getDataStructure();
224         assertEquals(r.getObjectId(), producerInfo2.getProducerId());
225         
226         assertNoMessagesLeft(connection2);
227     }
228     
229     public void testProducerAdvisoriesReplayed() throws Exception JavaDoc {
230
231         ActiveMQDestination queue = new ActiveMQQueue("test");
232         ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);
233         
234         // Setup a first connection
235
StubConnection connection1 = createConnection();
236         ConnectionInfo connectionInfo1 = createConnectionInfo();
237         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
238         
239         connection1.send(connectionInfo1);
240         connection1.send(sessionInfo1);
241
242         // Setup a producer.
243
StubConnection connection2 = createConnection();
244         ConnectionInfo connectionInfo2 = createConnectionInfo();
245         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
246         ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
247         producerInfo2.setDestination(queue);
248         
249         connection2.send(connectionInfo2);
250         connection2.send(sessionInfo2);
251         connection2.send(producerInfo2);
252         
253         // Create the advisory consumer.. it should see the previous producer
254
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
255         consumerInfo1.setPrefetchSize(100);
256         connection1.send(consumerInfo1);
257
258         Message m1 = receiveMessage(connection1);
259         assertNotNull(m1);
260         assertNotNull(m1.getDataStructure());
261         assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
262
263         // Close the second connection.
264
connection2.request(closeConnectionInfo(connectionInfo2));
265         connection2.stop();
266
267         // We should get an advisory of the producer closing
268
m1 = receiveMessage(connection1);
269         assertNotNull(m1);
270         assertNotNull(m1.getDataStructure());
271         RemoveInfo r = (RemoveInfo) m1.getDataStructure();
272         assertEquals(r.getObjectId(), producerInfo2.getProducerId());
273         
274         assertNoMessagesLeft(connection2);
275     }
276
277     public void testProducerAdvisoriesReplayedOnlyTargetNewConsumer() throws Exception JavaDoc {
278
279         ActiveMQDestination queue = new ActiveMQQueue("test");
280         ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);
281         
282         // Setup a first connection
283
StubConnection connection1 = createConnection();
284         ConnectionInfo connectionInfo1 = createConnectionInfo();
285         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
286         connection1.send(connectionInfo1);
287         connection1.send(sessionInfo1);
288         // Create the first consumer..
289
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
290         consumerInfo1.setPrefetchSize(100);
291         connection1.send(consumerInfo1);
292
293         // Setup a producer.
294
StubConnection connection2 = createConnection();
295         ConnectionInfo connectionInfo2 = createConnectionInfo();
296         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
297         ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
298         producerInfo2.setDestination(queue);
299         connection2.send(connectionInfo2);
300         connection2.send(sessionInfo2);
301         connection2.send(producerInfo2);
302         
303         Message m1 = receiveMessage(connection1);
304         assertNotNull(m1);
305         assertNotNull(m1.getDataStructure());
306         assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
307         
308         // Create the 2nd consumer..
309
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
310         consumerInfo2.setPrefetchSize(100);
311         connection2.send(consumerInfo2);
312
313         // The second consumer should se a replay
314
m1 = receiveMessage(connection2);
315         assertNotNull(m1);
316         assertNotNull(m1.getDataStructure());
317         assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId());
318
319         // But the first consumer should not see the replay.
320
assertNoMessagesLeft(connection1);
321     }
322
323     public static Test suite() {
324         return suite(AdvisoryBrokerTest.class);
325     }
326     
327     public static void main(String JavaDoc[] args) {
328         junit.textui.TestRunner.run(suite());
329     }
330
331 }
332
Popular Tags