1 17 package org.apache.servicemix.components.util; 18 19 import javax.jbi.JBIException; 20 import javax.jbi.messaging.DeliveryChannel; 21 import javax.jbi.messaging.ExchangeStatus; 22 import javax.jbi.messaging.Fault; 23 import javax.jbi.messaging.InOnly; 24 import javax.jbi.messaging.InOut; 25 import javax.jbi.messaging.MessageExchange; 26 import javax.jbi.messaging.MessageExchangeFactory; 27 import javax.jbi.messaging.MessagingException; 28 import javax.jbi.servicedesc.ServiceEndpoint; 29 import javax.xml.namespace.QName ; 30 31 import org.apache.servicemix.MessageExchangeListener; 32 import org.apache.servicemix.jbi.MissingPropertyException; 33 import org.apache.servicemix.jbi.NoServiceAvailableException; 34 35 41 public class PipelineComponent extends ComponentSupport implements MessageExchangeListener { 42 private ServiceEndpoint requestResponseEndpoint; 43 private ServiceEndpoint outputEndpoint; 44 private QName requestResponseServiceName; 45 private QName outputEndpointServiceName; 46 47 public PipelineComponent() { 48 } 49 50 public PipelineComponent(QName service, String endpoint) { 51 super(service, endpoint); 52 } 53 54 public void start() throws JBIException { 55 super.start(); 56 57 if (requestResponseEndpoint == null) { 58 if (requestResponseServiceName == null) { 59 throw new MissingPropertyException("requestResponseServiceName"); 60 } 61 requestResponseEndpoint = chooseEndpoint(requestResponseServiceName); 62 63 } 64 if (outputEndpoint == null) { 65 if (outputEndpointServiceName == null) { 66 throw new MissingPropertyException("outputEndpointServiceName"); 67 } 68 outputEndpoint = chooseEndpoint(outputEndpointServiceName); 69 } 70 } 71 72 public void onMessageExchange(MessageExchange exchange) throws MessagingException { 73 if (exchange.getStatus() == ExchangeStatus.DONE) { 75 return; 76 } else if (exchange.getStatus() == ExchangeStatus.ERROR) { 78 return; 79 } 80 81 DeliveryChannel deliveryChannel = getDeliveryChannel(); 83 MessageExchangeFactory rpcFactory = deliveryChannel.createExchangeFactory(requestResponseEndpoint); 84 InOut rpc = rpcFactory.createInOutExchange(); 85 rpc.setInMessage(exchange.getMessage("in")); 86 boolean answer = deliveryChannel.sendSync(rpc); 87 88 MessageExchangeFactory outputFactory = deliveryChannel.createExchangeFactory(outputEndpoint); 89 InOnly inOnly = outputFactory.createInOnlyExchange(); 90 91 if (answer) { 92 inOnly.setInMessage(rpc.getOutMessage()); 93 deliveryChannel.send(inOnly); 94 done(exchange); 95 } 96 else if (exchange instanceof InOnly == false) { 97 inOnly.setError(rpc.getError()); 98 Fault fault = rpc.getFault(); 99 fail(exchange, fault); 100 } 101 else { 102 done(exchange); 104 } 105 done(rpc); 106 } 107 108 public ServiceEndpoint getRequestResponseEndpoint() { 111 return requestResponseEndpoint; 112 } 113 114 public void setRequestResponseEndpoint(ServiceEndpoint requestResponseEndpoint) { 115 this.requestResponseEndpoint = requestResponseEndpoint; 116 } 117 118 public ServiceEndpoint getOutputEndpoint() { 119 return outputEndpoint; 120 } 121 122 public void setOutputEndpoint(ServiceEndpoint outputEndpoint) { 123 this.outputEndpoint = outputEndpoint; 124 } 125 126 public QName getRequestResponseServiceName() { 127 return requestResponseServiceName; 128 } 129 130 public void setRequestResponseServiceName(QName requestResponseServiceName) { 131 this.requestResponseServiceName = requestResponseServiceName; 132 } 133 134 public QName getOutputEndpointServiceName() { 135 return outputEndpointServiceName; 136 } 137 138 public void setOutputEndpointServiceName(QName outputEndpointServiceName) { 139 this.outputEndpointServiceName = outputEndpointServiceName; 140 } 141 142 145 152 protected ServiceEndpoint chooseEndpoint(QName serviceName) throws JBIException { 153 ServiceEndpoint[] endpoints = getContext().getEndpointsForService(serviceName); 154 if (endpoints == null || endpoints.length == 0) { 155 throw new NoServiceAvailableException(serviceName); 156 } 157 158 return endpoints[0]; 160 } 161 162 } 163 | Popular Tags |