KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > network > SimpleNetworkTest


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.network;
19
20 import java.net.URI JavaDoc;
21
22 import javax.jms.Connection JavaDoc;
23 import javax.jms.DeliveryMode JavaDoc;
24 import javax.jms.Destination JavaDoc;
25 import javax.jms.JMSException JavaDoc;
26 import javax.jms.Message JavaDoc;
27 import javax.jms.MessageConsumer JavaDoc;
28 import javax.jms.MessageListener JavaDoc;
29 import javax.jms.MessageProducer JavaDoc;
30 import javax.jms.Session JavaDoc;
31 import javax.jms.TextMessage JavaDoc;
32 import javax.jms.TopicRequestor JavaDoc;
33 import javax.jms.TopicSession JavaDoc;
34
35 import junit.framework.TestCase;
36
37 import org.apache.activemq.ActiveMQConnectionFactory;
38 import org.apache.activemq.broker.BrokerService;
39 import org.apache.activemq.command.ActiveMQTopic;
40 import org.apache.activemq.xbean.BrokerFactoryBean;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.springframework.context.support.AbstractApplicationContext;
44 import org.springframework.core.io.ClassPathResource;
45 import org.springframework.core.io.Resource;
46 public class SimpleNetworkTest extends TestCase{
47     
48     protected static final Log log = LogFactory.getLog(SimpleNetworkTest.class);
49
50     protected static final int MESSAGE_COUNT=10;
51     protected AbstractApplicationContext context;
52     protected Connection JavaDoc localConnection;
53     protected Connection JavaDoc remoteConnection;
54     protected BrokerService localBroker;
55     protected BrokerService remoteBroker;
56     protected Session JavaDoc localSession;
57     protected Session JavaDoc remoteSession;
58     protected ActiveMQTopic included;
59     protected ActiveMQTopic excluded;
60     protected String JavaDoc consumerName="durableSubs";
61     
62     
63     public void testRequestReply() throws Exception JavaDoc{
64         final MessageProducer JavaDoc remoteProducer=remoteSession.createProducer(null);
65         MessageConsumer JavaDoc remoteConsumer=remoteSession.createConsumer(included);
66         remoteConsumer.setMessageListener(new MessageListener JavaDoc(){
67             public void onMessage(Message JavaDoc msg){
68                 try{
69                     TextMessage JavaDoc textMsg=(TextMessage JavaDoc) msg;
70                     String JavaDoc payload="REPLY: "+textMsg.getText();
71                     Destination JavaDoc replyTo;
72                     replyTo=msg.getJMSReplyTo();
73                     textMsg.clearBody();
74                     textMsg.setText(payload);
75                     remoteProducer.send(replyTo,textMsg);
76                 }catch(JMSException JavaDoc e){
77                     // TODO Auto-generated catch block
78
e.printStackTrace();
79                 }
80             }
81         });
82         
83         TopicRequestor JavaDoc requestor=new TopicRequestor JavaDoc((TopicSession JavaDoc) localSession,included);
84         Thread.sleep(2000);//alow for consumer infos to perculate arround
85
for (int i =0;i < MESSAGE_COUNT; i++){
86             TextMessage JavaDoc msg = localSession.createTextMessage("test msg: " +i);
87             TextMessage JavaDoc result = (TextMessage JavaDoc) requestor.request(msg);
88             assertNotNull(result);
89             log.info(result.getText());
90         }
91     }
92
93     public void testFiltering() throws Exception JavaDoc{
94         MessageConsumer JavaDoc includedConsumer=remoteSession.createConsumer(included);
95         MessageConsumer JavaDoc excludedConsumer=remoteSession.createConsumer(excluded);
96         MessageProducer JavaDoc includedProducer=localSession.createProducer(included);
97         MessageProducer JavaDoc excludedProducer=localSession.createProducer(excluded);
98         Thread.sleep(1000);
99         Message JavaDoc test=localSession.createTextMessage("test");
100         includedProducer.send(test);
101         excludedProducer.send(test);
102         assertNull(excludedConsumer.receive(500));
103         assertNotNull(includedConsumer.receive(500));
104     }
105
106     public void testConduitBridge() throws Exception JavaDoc{
107         MessageConsumer JavaDoc consumer1=remoteSession.createConsumer(included);
108         MessageConsumer JavaDoc consumer2=remoteSession.createConsumer(included);
109         MessageProducer JavaDoc producer=localSession.createProducer(included);
110         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
111         Thread.sleep(1000);
112         for(int i=0;i<MESSAGE_COUNT;i++){
113             Message JavaDoc test=localSession.createTextMessage("test-"+i);
114             producer.send(test);
115             assertNotNull(consumer1.receive(500));
116             assertNotNull(consumer2.receive(500));
117         }
118         // ensure no more messages received
119
assertNull(consumer1.receive(500));
120         assertNull(consumer2.receive(500));
121     }
122
123     public void testDurableStoreAndForward() throws Exception JavaDoc{
124         // create a remote durable consumer
125
MessageConsumer JavaDoc remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName);
126         Thread.sleep(1000);
127         // now close everything down and restart
128
doTearDown();
129         doSetUp();
130         MessageProducer JavaDoc producer=localSession.createProducer(included);
131         for(int i=0;i<MESSAGE_COUNT;i++){
132             Message JavaDoc test=localSession.createTextMessage("test-"+i);
133             producer.send(test);
134         }
135         Thread.sleep(1000);
136         // close everything down and restart
137
doTearDown();
138         doSetUp();
139         remoteConsumer=remoteSession.createDurableSubscriber(included,consumerName);
140         for(int i=0;i<MESSAGE_COUNT;i++){
141             Message JavaDoc test=localSession.createTextMessage("test-"+i);
142             assertNotNull(remoteConsumer.receive(500));
143         }
144     }
145     
146     
147
148     protected void setUp() throws Exception JavaDoc{
149         super.setUp();
150         doSetUp();
151     }
152
153     protected void tearDown() throws Exception JavaDoc{
154         localBroker.deleteAllMessages();
155         remoteBroker.deleteAllMessages();
156         doTearDown();
157         super.tearDown();
158     }
159
160     protected void doTearDown() throws Exception JavaDoc{
161         localConnection.close();
162         remoteConnection.close();
163         localBroker.stop();
164         remoteBroker.stop();
165     }
166
167     protected void doSetUp() throws Exception JavaDoc{
168         Resource resource=new ClassPathResource(getRemoteBrokerURI());
169         BrokerFactoryBean factory=new BrokerFactoryBean(resource);
170         factory.afterPropertiesSet();
171         remoteBroker=factory.getBroker();
172         remoteBroker.start();
173         
174         resource=new ClassPathResource(getLocalBrokerURI());
175         factory=new BrokerFactoryBean(resource);
176         factory.afterPropertiesSet();
177         localBroker=factory.getBroker();
178         
179         localBroker.start();
180         
181         URI JavaDoc localURI=localBroker.getVmConnectorURI();
182         ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI);
183         localConnection=fac.createConnection();
184         localConnection.setClientID("local");
185         localConnection.start();
186         URI JavaDoc remoteURI=remoteBroker.getVmConnectorURI();
187         fac=new ActiveMQConnectionFactory(remoteURI);
188         remoteConnection=fac.createConnection();
189         remoteConnection.setClientID("remote");
190         remoteConnection.start();
191         included=new ActiveMQTopic("include.test.bar");
192         excluded=new ActiveMQTopic("exclude.test.bar");
193         localSession=localConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
194         remoteSession=remoteConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
195     }
196
197     protected String JavaDoc getRemoteBrokerURI() {
198         return "org/apache/activemq/network/remoteBroker.xml";
199     }
200
201     protected String JavaDoc getLocalBrokerURI() {
202         return "org/apache/activemq/network/localBroker.xml";
203     }
204 }
205
Popular Tags