KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > flow > jms > StatelessJmsFlowTest


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr.flow.jms;
18
19 import java.util.HashSet JavaDoc;
20 import java.util.List JavaDoc;
21 import java.util.Set JavaDoc;
22
23 import javax.jbi.messaging.ExchangeStatus;
24 import javax.jbi.messaging.InOut;
25 import javax.jbi.messaging.MessageExchange;
26 import javax.jbi.messaging.MessageExchangeFactory;
27 import javax.jbi.messaging.MessagingException;
28 import javax.jbi.messaging.NormalizedMessage;
29 import javax.xml.namespace.QName JavaDoc;
30
31 import junit.framework.TestCase;
32
33 import org.apache.activemq.broker.BrokerService;
34 import org.apache.servicemix.JbiConstants;
35 import org.apache.servicemix.MessageExchangeListener;
36 import org.apache.servicemix.components.util.ComponentSupport;
37 import org.apache.servicemix.jbi.container.JBIContainer;
38 import org.apache.servicemix.jbi.jaxp.StringSource;
39
40 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
41
42 public class StatelessJmsFlowTest extends TestCase {
43
44     protected JBIContainer jbi1;
45     protected JBIContainer jbi2;
46     protected BrokerService broker;
47     
48     protected void setUp() throws Exception JavaDoc {
49         broker = new BrokerService();
50         broker.setPersistent(false);
51         broker.setUseJmx(false);
52         broker.addConnector("tcp://localhost:61616");
53         broker.start();
54         
55         jbi1 = createContainer("jbi1");
56         jbi2 = createContainer("jbi2");
57     }
58     
59     protected void tearDown() throws Exception JavaDoc {
60         jbi1.shutDown();
61         jbi2.shutDown();
62         broker.stop();
63     }
64     
65     protected JBIContainer createContainer(String JavaDoc name) throws Exception JavaDoc {
66         JBIContainer container = new JBIContainer();
67         container.setName(name);
68         container.setFlowName("jms?jmsURL=tcp://localhost:61616");
69         container.setUseMBeanServer(false);
70         container.setEmbedded(true);
71         container.init();
72         container.start();
73         return container;
74     }
75     
76     protected StatelessEcho activateProvider(JBIContainer container, boolean stateless) throws Exception JavaDoc {
77         StatelessEcho echo = new StatelessEcho(stateless);
78         container.activateComponent(echo, "echo");
79         return echo;
80     }
81     
82     protected StatelessSender activateConsumer(JBIContainer container) throws Exception JavaDoc {
83         StatelessSender sender = new StatelessSender();
84         container.activateComponent(sender, "sender");
85         return sender;
86     }
87     
88     public void testStatelessConsumer() throws Exception JavaDoc {
89         StatelessEcho echo1 = activateProvider(jbi1, false);
90         StatelessEcho echo2 = activateProvider(jbi2, false);
91         StatelessSender sender1 = activateConsumer(jbi1);
92         StatelessSender sender2 = activateConsumer(jbi2);
93         
94         sender1.sendMessages(100, true);
95         
96         int n1 = 0;
97         int n2 = 0;
98         for (int i = 0; i < 10; i++) {
99             Thread.sleep(1000);
100             n1 = sender1.outIds.size();
101             n2 = sender2.outIds.size();
102             if (n1 + n2 == 100) {
103                 break;
104             }
105         }
106         assertTrue(n1 != 0);
107         assertTrue(n2 != 0);
108         assertTrue(n1 + n2 == 100);
109     }
110     
111     public void testStatefullConsumer() throws Exception JavaDoc {
112         StatelessEcho echo1 = activateProvider(jbi1, false);
113         StatelessEcho echo2 = activateProvider(jbi2, false);
114         StatelessSender sender1 = activateConsumer(jbi1);
115         StatelessSender sender2 = activateConsumer(jbi2);
116         
117         sender1.sendMessages(100, false);
118         
119         int n1 = 0;
120         int n2 = 0;
121         for (int i = 0; i < 10; i++) {
122             Thread.sleep(1000);
123             n1 = sender1.outIds.size();
124             n2 = sender2.outIds.size();
125             if (n1 + n2 == 100) {
126                 break;
127             }
128         }
129         assertTrue(n1 != 0);
130         assertTrue(n2 == 0);
131         assertTrue(n1 + n2 == 100);
132     }
133     
134     public void testStatelessProvider() throws Exception JavaDoc {
135         StatelessEcho echo1 = activateProvider(jbi1, true);
136         StatelessEcho echo2 = activateProvider(jbi2, true);
137         StatelessSender sender1 = activateConsumer(jbi1);
138         StatelessSender sender2 = activateConsumer(jbi2);
139         
140         sender1.sendMessages(100, false);
141
142         for (int i = 0; i < 10; i++) {
143             Thread.sleep(1000);
144             if (echo1.doneIds.size() + echo2.doneIds.size() == 100) {
145                 break;
146             }
147         }
148         assertTrue(echo1.doneIds.size() + echo2.doneIds.size() == 100);
149         
150         // Check that the echo1 component received
151
// DONE status for exchanges it did not handle
152
// the first time.
153
// Do not bother testing for echo2, as it will
154
// be automatically true.
155
Set JavaDoc doneIds1 = new HashSet JavaDoc();
156         doneIds1.addAll(echo1.doneIds);
157         doneIds1.removeAll(echo1.inIds);
158         assertTrue(doneIds1.size() > 0);
159     }
160     
161     public void testStatefullProvider() throws Exception JavaDoc {
162         StatelessEcho echo1 = activateProvider(jbi1, false);
163         StatelessEcho echo2 = activateProvider(jbi2, false);
164         StatelessSender sender1 = activateConsumer(jbi1);
165         StatelessSender sender2 = activateConsumer(jbi2);
166         
167         sender1.sendMessages(100, false);
168
169         for (int i = 0; i < 10; i++) {
170             Thread.sleep(1000);
171             if (echo1.doneIds.size() + echo2.doneIds.size() == 100) {
172                 break;
173             }
174         }
175         assertTrue(echo1.doneIds.size() + echo2.doneIds.size() == 100);
176         
177         // Check that the echo1 component received
178
// DONE status for exchanges it did not handle
179
// the first time.
180
// Do not bother testing for echo2, as it will
181
// be automatically true.
182
Set JavaDoc doneIds1 = new HashSet JavaDoc();
183         doneIds1.addAll(echo1.doneIds);
184         doneIds1.removeAll(echo1.inIds);
185         assertTrue(doneIds1.size() == 0);
186     }
187     
188     public static class StatelessSender extends ComponentSupport implements MessageExchangeListener {
189         public static final QName JavaDoc SERVICE = new QName JavaDoc("sender");
190         public static final String JavaDoc ENDPOINT = "ep";
191         public List JavaDoc outIds = new CopyOnWriteArrayList();
192         public StatelessSender() {
193             super(SERVICE, ENDPOINT);
194         }
195         public void sendMessages(int nb, boolean stateless) throws Exception JavaDoc {
196             for (int i = 0; i < nb; i++) {
197                 MessageExchangeFactory mef = getDeliveryChannel().createExchangeFactory();
198                 InOut me = mef.createInOutExchange();
199                 me.setService(new QName JavaDoc("echo"));
200                 if (stateless) {
201                     me.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE);
202                 }
203                 me.setInMessage(me.createMessage());
204                 me.getInMessage().setContent(new StringSource("<hello/>"));
205                 getDeliveryChannel().send(me);
206                 
207             }
208         }
209         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
210             outIds.add(exchange.getExchangeId());
211             done(exchange);
212         }
213     }
214     
215     public static class StatelessEcho extends ComponentSupport implements MessageExchangeListener {
216         private boolean stateless;
217         public List JavaDoc inIds = new CopyOnWriteArrayList();
218         public List JavaDoc doneIds = new CopyOnWriteArrayList();
219         public StatelessEcho(boolean stateless) {
220             setService(new QName JavaDoc("echo"));
221             setEndpoint("ep");
222             this.stateless = stateless;
223         }
224         public void onMessageExchange(MessageExchange exchange) throws MessagingException {
225             if (exchange.getStatus() == ExchangeStatus.DONE) {
226                 doneIds.add(exchange.getExchangeId());
227             } else {
228                 inIds.add(exchange.getExchangeId());
229                 if (stateless) {
230                     exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE);
231                 }
232                 NormalizedMessage out = exchange.createMessage();
233                 out.setContent(new StringSource("<world/>"));
234                 answer(exchange, out);
235             }
236         }
237     }
238
239 }
240
Popular Tags