KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > usecases > TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.usecases;
19
20 import org.apache.activemq.broker.BrokerService;
21 import org.apache.activemq.broker.TransportConnector;
22 import org.apache.activemq.network.DemandForwardingBridge;
23 import org.apache.activemq.network.NetworkBridgeConfiguration;
24 import org.apache.activemq.transport.TransportFactory;
25 import org.apache.activemq.JmsMultipleBrokersTestSupport;
26 import org.apache.activemq.command.Command;
27 import org.apache.activemq.util.MessageIdList;
28
29 import javax.jms.Destination JavaDoc;
30 import javax.jms.MessageConsumer JavaDoc;
31 import java.util.List JavaDoc;
32 import java.util.ArrayList JavaDoc;
33 import java.net.URI JavaDoc;
34
35 import java.util.concurrent.atomic.AtomicInteger JavaDoc;
36
37 /**
38  * @version $Revision: 1.1.1.1 $
39  */

40 public class TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest extends JmsMultipleBrokersTestSupport {
41     protected static final int MESSAGE_COUNT = 10;
42
43     protected List bridges;
44     protected AtomicInteger JavaDoc msgDispatchCount;
45
46     /**
47      * BrokerA -> BrokerB
48      */

49     public void testRemoteBrokerHasConsumer() throws Exception JavaDoc {
50         // Setup broker networks
51
bridgeBrokers("BrokerA", "BrokerB");
52
53         startAllBrokers();
54
55         // Setup destination
56
Destination JavaDoc dest = createDestination("TEST.FOO", true);
57
58         // Setup consumers
59
MessageConsumer JavaDoc clientA = createConsumer("BrokerA", dest);
60         MessageConsumer JavaDoc clientB = createConsumer("BrokerB", dest);
61
62         // Send messages
63
sendMessages("BrokerA", dest, MESSAGE_COUNT);
64
65         // Get message count
66
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
67         MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
68
69         msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
70         msgsB.waitForMessagesToArrive(MESSAGE_COUNT);
71
72         assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
73         assertEquals(MESSAGE_COUNT, msgsB.getMessageCount());
74
75         // Check that 10 message dispatch commands are send over the network
76
assertEquals(MESSAGE_COUNT, msgDispatchCount.get());
77     }
78
79     /**
80      * BrokerA -> BrokerB
81      */

82     public void testRemoteBrokerHasNoConsumer() throws Exception JavaDoc {
83         // Setup broker networks
84
bridgeBrokers("BrokerA", "BrokerB");
85
86         startAllBrokers();
87
88         // Setup destination
89
Destination JavaDoc dest = createDestination("TEST.FOO", true);
90
91         // Setup consumers
92
MessageConsumer JavaDoc clientA = createConsumer("BrokerA", dest);
93
94         // Send messages
95
sendMessages("BrokerA", dest, MESSAGE_COUNT);
96
97         // Get message count
98
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
99
100         msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
101
102         assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
103
104         // Check that no message dispatch commands are send over the network
105
assertEquals(0, msgDispatchCount.get());
106     }
107
108     protected void bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception JavaDoc {
109         List remoteTransports = remoteBroker.getTransportConnectors();
110         List localTransports = localBroker.getTransportConnectors();
111
112         URI JavaDoc remoteURI, localURI;
113         if (!remoteTransports.isEmpty() && !localTransports.isEmpty()) {
114             remoteURI = ((TransportConnector)remoteTransports.get(0)).getConnectUri();
115             localURI = ((TransportConnector)localTransports.get(0)).getConnectUri();
116
117             // Ensure that we are connecting using tcp
118
if (remoteURI.toString().startsWith("tcp:") && localURI.toString().startsWith("tcp:")) {
119                 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
120                 config.setBrokerName(localBroker.getBrokerName());
121                 DemandForwardingBridge bridge = new DemandForwardingBridge(config,TransportFactory.connect(localURI),
122                                                                            TransportFactory.connect(remoteURI)) {
123                     protected void serviceLocalCommand(Command command) {
124                         if (command.isMessageDispatch()) {
125                             // Keep track of the number of message dispatches through the bridge
126
msgDispatchCount.incrementAndGet();
127                         }
128
129                         super.serviceLocalCommand(command);
130                     }
131                 };
132                 bridges.add(bridge);
133
134                 bridge.start();
135             } else {
136                 throw new Exception JavaDoc("Remote broker or local broker is not using tcp connectors");
137             }
138         } else {
139             throw new Exception JavaDoc("Remote broker or local broker has no registered connectors.");
140         }
141
142         MAX_SETUP_TIME = 2000;
143     }
144
145     public void setUp() throws Exception JavaDoc {
146         super.setAutoFail(true);
147         super.setUp();
148         createBroker(new URI JavaDoc("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
149         createBroker(new URI JavaDoc("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
150
151         bridges = new ArrayList JavaDoc();
152         msgDispatchCount = new AtomicInteger JavaDoc(0);
153     }
154 }
155
Popular Tags