1 17 package org.apache.servicemix.jbi.nmr.flow.jms; 18 19 import java.util.HashSet ; 20 import java.util.List ; 21 import java.util.Set ; 22 23 import javax.jbi.messaging.ExchangeStatus; 24 import javax.jbi.messaging.InOut; 25 import javax.jbi.messaging.MessageExchange; 26 import javax.jbi.messaging.MessageExchangeFactory; 27 import javax.jbi.messaging.MessagingException; 28 import javax.jbi.messaging.NormalizedMessage; 29 import javax.xml.namespace.QName ; 30 31 import junit.framework.TestCase; 32 33 import org.apache.activemq.broker.BrokerService; 34 import org.apache.servicemix.JbiConstants; 35 import org.apache.servicemix.MessageExchangeListener; 36 import org.apache.servicemix.components.util.ComponentSupport; 37 import org.apache.servicemix.jbi.container.JBIContainer; 38 import org.apache.servicemix.jbi.jaxp.StringSource; 39 40 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; 41 42 public class StatelessJmsFlowTest extends TestCase { 43 44 protected JBIContainer jbi1; 45 protected JBIContainer jbi2; 46 protected BrokerService broker; 47 48 protected void setUp() throws Exception { 49 broker = new BrokerService(); 50 broker.setPersistent(false); 51 broker.setUseJmx(false); 52 broker.addConnector("tcp://localhost:61616"); 53 broker.start(); 54 55 jbi1 = createContainer("jbi1"); 56 jbi2 = createContainer("jbi2"); 57 } 58 59 protected void tearDown() throws Exception { 60 jbi1.shutDown(); 61 jbi2.shutDown(); 62 broker.stop(); 63 } 64 65 protected JBIContainer createContainer(String name) throws Exception { 66 JBIContainer container = new JBIContainer(); 67 container.setName(name); 68 container.setFlowName("jms?jmsURL=tcp://localhost:61616"); 69 container.setUseMBeanServer(false); 70 container.setEmbedded(true); 71 container.init(); 72 container.start(); 73 return container; 74 } 75 76 protected StatelessEcho activateProvider(JBIContainer container, boolean stateless) throws Exception { 77 StatelessEcho echo = new StatelessEcho(stateless); 78 container.activateComponent(echo, "echo"); 79 return echo; 80 } 81 82 protected StatelessSender activateConsumer(JBIContainer container) throws Exception { 83 StatelessSender sender = new StatelessSender(); 84 container.activateComponent(sender, "sender"); 85 return sender; 86 } 87 88 public void testStatelessConsumer() throws Exception { 89 StatelessEcho echo1 = activateProvider(jbi1, false); 90 StatelessEcho echo2 = activateProvider(jbi2, false); 91 StatelessSender sender1 = activateConsumer(jbi1); 92 StatelessSender sender2 = activateConsumer(jbi2); 93 94 sender1.sendMessages(100, true); 95 96 int n1 = 0; 97 int n2 = 0; 98 for (int i = 0; i < 10; i++) { 99 Thread.sleep(1000); 100 n1 = sender1.outIds.size(); 101 n2 = sender2.outIds.size(); 102 if (n1 + n2 == 100) { 103 break; 104 } 105 } 106 assertTrue(n1 != 0); 107 assertTrue(n2 != 0); 108 assertTrue(n1 + n2 == 100); 109 } 110 111 public void testStatefullConsumer() throws Exception { 112 StatelessEcho echo1 = activateProvider(jbi1, false); 113 StatelessEcho echo2 = activateProvider(jbi2, false); 114 StatelessSender sender1 = activateConsumer(jbi1); 115 StatelessSender sender2 = activateConsumer(jbi2); 116 117 sender1.sendMessages(100, false); 118 119 int n1 = 0; 120 int n2 = 0; 121 for (int i = 0; i < 10; i++) { 122 Thread.sleep(1000); 123 n1 = sender1.outIds.size(); 124 n2 = sender2.outIds.size(); 125 if (n1 + n2 == 100) { 126 break; 127 } 128 } 129 assertTrue(n1 != 0); 130 assertTrue(n2 == 0); 131 assertTrue(n1 + n2 == 100); 132 } 133 134 public void testStatelessProvider() throws Exception { 135 StatelessEcho echo1 = activateProvider(jbi1, true); 136 StatelessEcho echo2 = activateProvider(jbi2, true); 137 StatelessSender sender1 = activateConsumer(jbi1); 138 StatelessSender sender2 = activateConsumer(jbi2); 139 140 sender1.sendMessages(100, false); 141 142 for (int i = 0; i < 10; i++) { 143 Thread.sleep(1000); 144 if (echo1.doneIds.size() + echo2.doneIds.size() == 100) { 145 break; 146 } 147 } 148 assertTrue(echo1.doneIds.size() + echo2.doneIds.size() == 100); 149 150 Set doneIds1 = new HashSet (); 156 doneIds1.addAll(echo1.doneIds); 157 doneIds1.removeAll(echo1.inIds); 158 assertTrue(doneIds1.size() > 0); 159 } 160 161 public void testStatefullProvider() throws Exception { 162 StatelessEcho echo1 = activateProvider(jbi1, false); 163 StatelessEcho echo2 = activateProvider(jbi2, false); 164 StatelessSender sender1 = activateConsumer(jbi1); 165 StatelessSender sender2 = activateConsumer(jbi2); 166 167 sender1.sendMessages(100, false); 168 169 for (int i = 0; i < 10; i++) { 170 Thread.sleep(1000); 171 if (echo1.doneIds.size() + echo2.doneIds.size() == 100) { 172 break; 173 } 174 } 175 assertTrue(echo1.doneIds.size() + echo2.doneIds.size() == 100); 176 177 Set doneIds1 = new HashSet (); 183 doneIds1.addAll(echo1.doneIds); 184 doneIds1.removeAll(echo1.inIds); 185 assertTrue(doneIds1.size() == 0); 186 } 187 188 public static class StatelessSender extends ComponentSupport implements MessageExchangeListener { 189 public static final QName SERVICE = new QName ("sender"); 190 public static final String ENDPOINT = "ep"; 191 public List outIds = new CopyOnWriteArrayList(); 192 public StatelessSender() { 193 super(SERVICE, ENDPOINT); 194 } 195 public void sendMessages(int nb, boolean stateless) throws Exception { 196 for (int i = 0; i < nb; i++) { 197 MessageExchangeFactory mef = getDeliveryChannel().createExchangeFactory(); 198 InOut me = mef.createInOutExchange(); 199 me.setService(new QName ("echo")); 200 if (stateless) { 201 me.setProperty(JbiConstants.STATELESS_CONSUMER, Boolean.TRUE); 202 } 203 me.setInMessage(me.createMessage()); 204 me.getInMessage().setContent(new StringSource("<hello/>")); 205 getDeliveryChannel().send(me); 206 207 } 208 } 209 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 210 outIds.add(exchange.getExchangeId()); 211 done(exchange); 212 } 213 } 214 215 public static class StatelessEcho extends ComponentSupport implements MessageExchangeListener { 216 private boolean stateless; 217 public List inIds = new CopyOnWriteArrayList(); 218 public List doneIds = new CopyOnWriteArrayList(); 219 public StatelessEcho(boolean stateless) { 220 setService(new QName ("echo")); 221 setEndpoint("ep"); 222 this.stateless = stateless; 223 } 224 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 225 if (exchange.getStatus() == ExchangeStatus.DONE) { 226 doneIds.add(exchange.getExchangeId()); 227 } else { 228 inIds.add(exchange.getExchangeId()); 229 if (stateless) { 230 exchange.setProperty(JbiConstants.STATELESS_PROVIDER, Boolean.TRUE); 231 } 232 NormalizedMessage out = exchange.createMessage(); 233 out.setContent(new StringSource("<world/>")); 234 answer(exchange, out); 235 } 236 } 237 } 238 239 } 240 | Popular Tags |