1 package org.objectweb.celtix.bindings; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.util.List ; 6 import java.util.concurrent.Executor ; 7 import java.util.concurrent.RejectedExecutionException ; 8 import java.util.logging.Level ; 9 import java.util.logging.Logger ; 10 11 import javax.wsdl.Port; 12 import javax.wsdl.WSDLException; 13 import javax.wsdl.extensions.ExtensibilityElement; 14 import javax.xml.namespace.QName ; 15 import javax.xml.ws.handler.MessageContext; 16 17 import org.objectweb.celtix.Bus; 18 import org.objectweb.celtix.BusException; 19 import org.objectweb.celtix.common.injection.ResourceInjector; 20 import org.objectweb.celtix.common.logging.LogUtils; 21 import org.objectweb.celtix.context.InputStreamMessageContext; 22 import org.objectweb.celtix.context.ObjectMessageContext; 23 import org.objectweb.celtix.context.OutputStreamMessageContext; 24 import org.objectweb.celtix.resource.DefaultResourceManager; 25 import org.objectweb.celtix.resource.ResourceManager; 26 import org.objectweb.celtix.resource.ResourceResolver; 27 import org.objectweb.celtix.transports.ServerTransport; 28 import org.objectweb.celtix.transports.ServerTransportCallback; 29 import org.objectweb.celtix.transports.TransportFactory; 30 import org.objectweb.celtix.ws.addressing.EndpointReferenceType; 31 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 32 33 import static org.objectweb.celtix.bindings.JAXWSConstants.BUS_PROPERTY; 34 import static org.objectweb.celtix.bindings.JAXWSConstants.SERVER_BINDING_PROPERTY; 35 import static org.objectweb.celtix.bindings.JAXWSConstants.SERVER_TRANSPORT_PROPERTY; 36 37 public abstract class AbstractServerBinding extends AbstractBindingBase implements ServerBinding { 38 39 private static final Logger LOG = LogUtils.getL7dLogger(AbstractServerBinding.class); 40 41 protected ServerBindingEndpointCallback sbeCallback; 42 43 public AbstractServerBinding(Bus b, EndpointReferenceType ref, 44 ServerBindingEndpointCallback sbcb) { 45 super(b, ref); 46 sbeCallback = sbcb; 47 } 48 49 51 public void activate() throws WSDLException, IOException { 52 transport = createTransport(reference); 53 54 ServerTransportCallback tc = new ServerTransportCallback() { 55 56 public void dispatch(InputStreamMessageContext ctx, ServerTransport t) { 57 AbstractServerBinding.this.dispatch(ctx, t); 58 } 59 60 public Executor getExecutor() { 61 return sbeCallback.getExecutor(); 62 } 63 }; 64 serverTransport().activate(tc); 65 66 injectSystemHandlers(); 67 } 68 69 public void deactivate() throws IOException { 70 serverTransport().deactivate(); 71 } 72 73 80 public void partialResponse(OutputStreamMessageContext outputContext, 81 DataBindingCallback callback) throws IOException { 82 ObjectMessageContext objectMessageContext = createObjectContext(); 83 objectMessageContext.putAll(outputContext); 84 BindingContextUtils.storeDataBindingCallback(objectMessageContext, callback); 85 86 if (callback != null) { 87 Request request = new Request(this, transport, objectMessageContext); 88 request.setOneway(true); 89 90 try { 91 request.process(outputContext); 92 terminateOutputContext(outputContext); 93 } finally { 94 request.complete(); 95 } 96 } else { 97 transport.finalPrepareOutputStreamContext(outputContext); 98 terminateOutputContext(outputContext); 99 } 100 } 101 102 104 106 public abstract AbstractBindingImpl getBindingImpl(); 107 108 public abstract QName getOperationName(MessageContext ctx); 109 110 111 113 protected void finalPrepareOutputStreamContext(ServerTransport t, MessageContext bindingContext, 114 OutputStreamMessageContext ostreamContext) 115 throws IOException { 116 t.finalPrepareOutputStreamContext(ostreamContext); 117 } 118 119 protected boolean isFault(ObjectMessageContext objCtx, MessageContext bindingCtx) { 120 if (getBindingImpl().hasFault(bindingCtx)) { 121 return true; 122 } 123 return objCtx.getException() != null; 124 } 125 126 protected void dispatch(InputStreamMessageContext istreamCtx, final ServerTransport t) { 127 LOG.info("Dispatched to binding on thread : " + Thread.currentThread()); 128 BindingContextUtils.storeServerBindingEndpointCallback(istreamCtx, sbeCallback); 130 131 final ServerRequest inMsg = new ServerRequest(this, istreamCtx); 132 133 Exception inboundException = null; 134 135 try { 136 inMsg.processInbound(); 137 if (!inMsg.doDispatch()) { 138 LOG.log(Level.INFO, 139 "handlers have halted inbound message processing or specifically prevent dispatch"); 140 } 141 } catch (Exception ex) { 142 inboundException = ex; 143 LOG.log(Level.INFO, "inbound message processing resulted in exception: ", ex); 144 } 145 146 151 boolean doDispatch = null == inboundException && inMsg.doDispatch(); 152 153 if (!doDispatch || inMsg.isOneway()) { 154 155 inMsg.processOutbound(t, inboundException); 156 157 if (!doDispatch) { 158 return; 159 } 160 } 161 162 164 Runnable invoker = new Runnable () { 165 public void run() { 166 LOG.log(Level.INFO, "Before invoking on implementor"); 167 assert null != inMsg.getObjectCtx(); 168 inMsg.doInvocation(); 169 LOG.log(Level.INFO, "After invoking on implementor"); 170 if (!inMsg.isOneway()) { 171 inMsg.processOutbound(t, null); 173 } 174 } 175 }; 176 177 if ((BindingContextUtils.retrieveDecoupledResponse(inMsg.getObjectCtx()) 181 || inMsg.isOneway()) 182 && BindingContextUtils.retrieveAsyncOnewayDispatch(istreamCtx)) { 183 executeAsync(invoker); 185 } else { 186 invoker.run(); 188 } 189 } 190 191 protected ServerTransport createTransport(EndpointReferenceType ref) throws WSDLException, IOException { 192 193 try { 194 Port port = EndpointReferenceUtils.getPort(bus.getWSDLManager(), ref); 195 List <?> exts = port.getExtensibilityElements(); 196 if (exts.size() > 0) { 197 ExtensibilityElement el = (ExtensibilityElement)exts.get(0); 198 TransportFactory tf = 199 bus.getTransportFactoryManager(). 200 getTransportFactory(el.getElementType().getNamespaceURI()); 201 return tf.createServerTransport(ref); 202 } 203 } catch (BusException ex) { 204 LOG.severe("TRANSPORT_FACTORY_RETRIEVAL_FAILURE_MSG"); 205 } 206 return null; 207 } 208 209 protected ServerTransport serverTransport() { 210 return (ServerTransport)transport; 211 } 212 213 220 221 private void injectSystemHandlers() { 222 ResourceManager rm = new DefaultResourceManager(); 223 rm.addResourceResolver(new ResourceResolver() { 224 @SuppressWarnings ("unchecked") 225 public <T> T resolve(String resourceName, Class <T> resourceType) { 226 if (BUS_PROPERTY.equals(resourceName)) { 227 return (T)AbstractServerBinding.this.getBus(); 228 } else if (SERVER_BINDING_PROPERTY.equals(resourceName)) { 229 return (T)AbstractServerBinding.this; 230 } else if (SERVER_TRANSPORT_PROPERTY.equals(resourceName)) { 231 return (T)transport; 232 } 233 return null; 234 } 235 236 public InputStream getAsStream(String name) { 237 return null; 238 } 239 }); 240 ResourceInjector injector = new ResourceInjector(rm); 241 242 getBindingImpl().injectSystemHandlers(injector); 243 } 244 245 private void terminateOutputContext(OutputStreamMessageContext outputContext) 246 throws IOException { 247 outputContext.getOutputStream().flush(); 248 outputContext.getOutputStream().close(); 249 } 250 251 private void executeAsync(Runnable command) { 252 Executor executor = 253 sbeCallback.getExecutor() != null 254 ? sbeCallback.getExecutor() 255 : getBus().getWorkQueueManager().getAutomaticWorkQueue(); 256 try { 257 executor.execute(command); 258 } catch (RejectedExecutionException ree) { 259 LOG.log(Level.WARNING, "ONEWAY_FALLBACK_TO_DIRECT_MSG", ree); 260 command.run(); 261 } 262 } 263 } 264 | Popular Tags |