1 18 package org.apache.axis2.clientapi; 19 20 import org.apache.axis2.Constants; 21 import org.apache.axis2.addressing.EndpointReference; 22 import org.apache.axis2.context.ConfigurationContext; 23 import org.apache.axis2.context.MessageContext; 24 import org.apache.axis2.context.OperationContextFactory; 25 import org.apache.axis2.context.ServiceContext; 26 import org.apache.axis2.description.OperationDescription; 27 import org.apache.axis2.description.TransportInDescription; 28 import org.apache.axis2.description.TransportOutDescription; 29 import org.apache.axis2.engine.AxisConfiguration; 30 import org.apache.axis2.engine.AxisEngine; 31 import org.apache.axis2.engine.AxisFault; 32 import org.apache.axis2.om.OMException; 33 import org.apache.axis2.soap.SOAPEnvelope; 34 import org.apache.axis2.transport.TransportListener; 35 import org.apache.axis2.util.threadpool.AxisWorker; 36 import org.apache.wsdl.WSDLConstants; 37 38 import javax.xml.namespace.QName ; 39 import java.io.IOException ; 40 41 46 public class InOutMEPClient extends MEPClient { 47 protected TransportListener listener; 48 51 protected TransportOutDescription senderTransport; 52 protected TransportInDescription listenerTransport; 53 54 61 protected boolean useSeparateListener = false; 62 63 64 67 protected EndpointReference to; 68 69 71 74 protected CallbackReceiver callbackReceiver; 75 96 97 public InOutMEPClient(ServiceContext serviceContext) { 98 super(serviceContext, WSDLConstants.MEP_URI_OUT_IN); 99 callbackReceiver = new CallbackReceiver(); 101 } 102 103 109 public MessageContext invokeBlocking(OperationDescription axisop, final MessageContext msgctx) 110 throws AxisFault { 111 verifyInvocation(axisop,msgctx); 112 if (useSeparateListener) { 113 SyncCallBack callback = new SyncCallBack(); 114 invokeNonBlocking(axisop, msgctx, callback); 115 int index = 0; 116 while (!callback.isComplete()) { 117 if (index < 20) { 118 try { 119 Thread.sleep(100); 120 } catch (InterruptedException e) { 121 throw new AxisFault(e); 122 } 123 } else { 124 throw new AxisFault("Time out waiting for the response"); 125 } 126 } 127 if (callback.envelope != null) { 128 MessageContext resMsgctx = new MessageContext(serviceContext.getEngineContext()); 129 resMsgctx.setEnvelope(callback.envelope); 130 return resMsgctx; 131 } else { 132 if (callback.error instanceof AxisFault) { 133 throw (AxisFault) callback.error; 134 } else { 135 throw new AxisFault(callback.error); 136 } 137 } 138 } else { 139 msgctx.setTo(to); 140 msgctx.setSoapAction(soapAction); 141 msgctx.setServiceContext(serviceContext); 142 ConfigurationContext syscontext = serviceContext.getEngineContext(); 143 144 checkTransport(msgctx); 145 146 ConfigurationContext sysContext = serviceContext.getEngineContext(); 147 AxisConfiguration registry = sysContext.getAxisConfiguration(); 148 149 msgctx.setOperationContext( 150 OperationContextFactory.createMEPContext( 151 WSDLConstants.MEP_CONSTANT_IN_OUT, 152 axisop, 153 serviceContext)); 154 MessageContext response = TwoChannelBasedSender.send(msgctx, listenerTransport); 155 156 SOAPEnvelope resenvelope = response.getEnvelope(); 157 158 if (resenvelope.getBody().hasFault()) { 159 throw new AxisFault(resenvelope.getBody().getFault().getException()); 160 } 161 return response; 162 } 163 } 164 165 public void invokeNonBlocking( 166 final OperationDescription axisop, 167 final MessageContext msgctx, 168 final Callback callback) 169 throws AxisFault { 170 verifyInvocation(axisop,msgctx); 171 msgctx.setTo(to); 172 try { 173 final ConfigurationContext syscontext = serviceContext.getEngineContext(); 174 175 AxisEngine engine = new AxisEngine(syscontext); 176 checkTransport(msgctx); 178 msgctx.setSoapAction(soapAction); 179 180 if (useSeparateListener) { 181 String messageID = String.valueOf(System.currentTimeMillis()); 182 msgctx.setMessageID(messageID); 183 axisop.setMessageReciever(callbackReceiver); 184 callbackReceiver.addCallback(messageID, callback); 185 msgctx.setReplyTo( 186 ListenerManager.replyToEPR( 187 serviceContext.getServiceConfig().getName().getLocalPart() 188 + "/" 189 + axisop.getName().getLocalPart(), 190 listenerTransport.getName().getLocalPart())); 191 msgctx.setOperationContext(axisop.findOperationContext(msgctx, serviceContext)); 192 msgctx.setServiceContext(serviceContext); 193 engine.send(msgctx); 194 } else { 195 serviceContext.getEngineContext().getThreadPool().addWorker( 196 new NonBlockingInvocationWorker(callback, axisop, msgctx)); 197 } 198 199 } catch (OMException e) { 200 throw AxisFault.makeFault(e); 201 } catch (IOException e) { 202 throw AxisFault.makeFault(e); 203 } 204 205 } 206 207 210 public void setTo(EndpointReference to) { 211 this.to = to; 212 } 213 214 229 230 public void setTransportInfo( 231 String senderTransport, 232 String listenerTransport, 233 boolean useSeparateListener) 234 throws AxisFault { 235 236 if (!useSeparateListener) { 237 boolean isTransportsEqual = senderTransport.equals(listenerTransport); 238 boolean isATwoWaytransport = Constants.TRANSPORT_HTTP.equals(senderTransport) 239 || Constants.TRANSPORT_TCP.equals(senderTransport) 240 || Constants.TRANSPORT_COMMONS_HTTP.equals(senderTransport); 241 boolean isCommonsAndHTTP = Constants.TRANSPORT_COMMONS_HTTP.equals(senderTransport) 242 && Constants.TRANSPORT_HTTP.equals(listenerTransport); 243 if(!isCommonsAndHTTP && (!isTransportsEqual || !isATwoWaytransport)){ 244 throw new AxisFault("useSeparateListener = false is only supports by the htpp/tcp and tcp commons transport set as the sender and receiver"); 245 } 246 }else{ 247 this.useSeparateListener = useSeparateListener; 248 249 } 250 251 AxisConfiguration axisConfig = serviceContext.getEngineContext().getAxisConfiguration(); 252 this.listenerTransport = axisConfig.getTransportIn(new QName (listenerTransport)); 253 this.senderTransport = axisConfig.getTransportOut(new QName (senderTransport)); 254 if (this.senderTransport == null) { 255 throw new AxisFault("Unknown transport " + senderTransport); 256 } 257 258 if (this.listenerTransport == null) { 259 throw new AxisFault("Unknown transport " + listenerTransport); 260 } 261 262 if (useSeparateListener == true) { 263 if (!serviceContext 264 .getEngineContext() 265 .getAxisConfiguration() 266 .isEngaged(new QName (Constants.MODULE_ADDRESSING))) { 267 throw new AxisFault("to do two Transport Channels the Addressing Modules must be engeged"); 268 } 269 ListenerManager.makeSureStarted(listenerTransport, serviceContext.getEngineContext()); 270 } 271 } 272 273 private void checkTransport(MessageContext msgctx) throws AxisFault { 274 if (senderTransport == null) { 275 senderTransport = inferTransport(to); 276 } 277 if (listenerTransport == null) { 278 listenerTransport = 279 serviceContext.getEngineContext().getAxisConfiguration().getTransportIn( 280 senderTransport.getName()); 281 } 282 283 if (msgctx.getTransportIn() == null) { 284 msgctx.setTransportIn(listenerTransport); 285 } 286 if (msgctx.getTransportOut() == null) { 287 msgctx.setTransportOut(senderTransport); 288 } 289 290 } 291 292 public class SyncCallBack extends Callback { 293 private SOAPEnvelope envelope; 294 private Exception error; 295 public void onComplete(AsyncResult result) { 296 this.envelope = result.getResponseEnvelope(); 297 } 298 public void reportError(Exception e) { 299 error = e; 300 } 301 } 305 306 public void engageModule(QName moduleName) throws AxisFault { 307 serviceContext.getEngineContext().getAxisConfiguration().engageModule(moduleName); 308 } 309 310 public void close() throws AxisFault { 311 ListenerManager.stop(listenerTransport.getName().getLocalPart()); 313 } 314 315 private class NonBlockingInvocationWorker implements AxisWorker { 316 317 private Callback callback; 318 private OperationDescription axisop; 319 private MessageContext msgctx; 320 321 public NonBlockingInvocationWorker( 322 Callback callback, 323 OperationDescription axisop, 324 MessageContext msgctx) { 325 this.callback = callback; 326 this.axisop = axisop; 327 this.msgctx = msgctx; 328 } 329 330 public void doWork() { 331 try { 332 msgctx.setOperationContext( 333 OperationContextFactory.createMEPContext( 334 WSDLConstants.MEP_CONSTANT_IN_OUT, 335 axisop, 336 serviceContext)); 337 msgctx.setServiceContext(serviceContext); 338 MessageContext response = TwoChannelBasedSender.send(msgctx, listenerTransport); 339 SOAPEnvelope resenvelope = response.getEnvelope(); 340 AsyncResult asyncResult = new AsyncResult(response); 341 callback.onComplete(asyncResult); 342 callback.setComplete(true); 343 } catch (Exception e) { 344 callback.reportError(e); 345 } 346 } 347 } 348 349 } 350 | Popular Tags |