1 17 package org.apache.servicemix.eip; 18 19 import java.io.ByteArrayInputStream ; 20 import java.io.ByteArrayOutputStream ; 21 import java.io.IOException ; 22 import java.io.ObjectOutputStream ; 23 24 import javax.jbi.messaging.ExchangeStatus; 25 import javax.jbi.messaging.Fault; 26 import javax.jbi.messaging.MessageExchange; 27 import javax.jbi.messaging.MessagingException; 28 import javax.jbi.messaging.NormalizedMessage; 29 import javax.xml.namespace.QName ; 30 import javax.xml.transform.Source ; 31 import javax.xml.transform.stream.StreamSource ; 32 33 import junit.framework.TestCase; 34 35 import org.apache.servicemix.JbiConstants; 36 import org.apache.servicemix.MessageExchangeListener; 37 import org.apache.servicemix.client.DefaultServiceMixClient; 38 import org.apache.servicemix.components.util.ComponentSupport; 39 import org.apache.servicemix.eip.support.ExchangeTarget; 40 import org.apache.servicemix.id.IdGenerator; 41 import org.apache.servicemix.jbi.container.JBIContainer; 42 import org.apache.servicemix.store.memory.MemoryStore; 43 import org.apache.servicemix.tck.ExchangeCompletedListener; 44 import org.apache.servicemix.tck.ReceiverComponent; 45 46 public abstract class AbstractEIPTest extends TestCase { 47 48 protected JBIContainer jbi; 49 protected DefaultServiceMixClient client; 50 protected ExchangeCompletedListener listener; 51 52 protected void setUp() throws Exception { 53 jbi = new JBIContainer(); 54 jbi.setEmbedded(true); 55 jbi.setUseMBeanServer(false); 56 jbi.setCreateMBeanServer(false); 57 configureContainer(); 58 listener = new ExchangeCompletedListener(); 59 jbi.addListener(listener); 60 61 jbi.init(); 62 jbi.start(); 63 64 client = new DefaultServiceMixClient(jbi); 65 66 } 68 69 protected void tearDown() throws Exception { 70 listener.assertExchangeCompleted(); 71 jbi.shutDown(); 72 } 73 74 protected void configureContainer() throws Exception { 75 jbi.setFlowName("st"); 76 } 77 78 protected void configurePattern(EIPEndpoint endpoint) { 79 endpoint.setStore(new MemoryStore(new IdGenerator()) { 80 public void store(String id, Object exchange) throws IOException { 81 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 82 new ObjectOutputStream (baos).writeObject(exchange); 83 super.store(id, exchange); 84 } 85 }); 86 } 87 88 protected ExchangeTarget createServiceExchangeTarget(QName name) { 89 ExchangeTarget target = new ExchangeTarget(); 90 target.setService(name); 91 return target; 92 } 93 94 protected ReceiverComponent activateReceiver(String name) throws Exception { 95 ReceiverComponent receiver = new ReceiverComponent(); 96 activateComponent(receiver, name); 97 return receiver; 98 } 99 100 protected void activateComponent(EIPEndpoint endpoint, String name) throws Exception { 101 EIPSpringComponent eip = new EIPSpringComponent(); 102 endpoint.setService(new QName (name)); 103 endpoint.setEndpoint("ep"); 104 eip.setEndpoints(new EIPEndpoint[] { endpoint }); 105 jbi.activateComponent(eip, name); 106 } 107 108 protected void activateComponent(ComponentSupport component, String name) throws Exception { 109 component.setService(new QName (name)); 110 component.setEndpoint("ep"); 111 jbi.activateComponent(component, name); 112 } 113 114 protected static Source createSource(String msg) { 115 return new StreamSource (new ByteArrayInputStream (msg.getBytes())); 116 } 117 118 protected static class ReturnOutComponent extends ComponentSupport implements MessageExchangeListener { 119 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 120 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 121 boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)); 122 if (exchange.getMessage("out") == null) { 123 NormalizedMessage out = exchange.createMessage(); 124 out.setContent(createSource("<outMsg/>")); 125 exchange.setMessage(out, "out"); 126 if (txSync) { 127 sendSync(exchange); 128 } else { 129 send(exchange); 130 } 131 } else if (exchange.getFault() == null) { 132 Fault fault = exchange.createFault(); 133 fault.setContent(createSource("<fault/>")); 134 exchange.setMessage(fault, "fault"); 135 if (txSync) { 136 sendSync(exchange); 137 } else { 138 send(exchange); 139 } 140 } else { 141 done(exchange); 142 } 143 } 144 } 145 } 146 147 protected static class ReturnMockComponent extends ComponentSupport implements MessageExchangeListener { 148 private String response; 149 public ReturnMockComponent(String response) { 150 this.response = response; 151 } 152 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 153 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 154 boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)); 155 NormalizedMessage out = exchange.createMessage(); 156 out.setContent(createSource(response)); 157 exchange.setMessage(out, "out"); 158 if (txSync) { 159 sendSync(exchange); 160 } else { 161 send(exchange); 162 } 163 } 164 } 165 } 166 167 protected static class ReturnOutAndErrorComponent extends ComponentSupport implements MessageExchangeListener { 168 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 169 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 170 if (exchange.getMessage("out") == null) { 171 boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC)); 172 NormalizedMessage out = exchange.createMessage(); 173 out.setContent(createSource("<outMsg/>")); 174 exchange.setMessage(out, "out"); 175 if (txSync) { 176 sendSync(exchange); 177 } else { 178 send(exchange); 179 } 180 } else { 181 fail(exchange, new Exception ("Dummy error")); 182 } 183 } 184 } 185 } 186 187 protected static class ReturnErrorComponent extends ComponentSupport implements MessageExchangeListener { 188 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 189 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 190 fail(exchange, new Exception ("Dummy error")); 191 } 192 } 193 } 194 195 protected static class ReturnFaultComponent extends ComponentSupport implements MessageExchangeListener { 196 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 197 if (exchange.getStatus() == ExchangeStatus.ACTIVE) { 198 Fault fault = exchange.createFault(); 199 fault.setContent(createSource("<fault/>")); 200 fail(exchange, fault); 201 } 202 } 203 } 204 205 } 206 | Popular Tags |