1 package org.objectweb.celtix.bus.ws.rm; 2 3 import java.io.IOException ; 4 import java.math.BigInteger ; 5 import java.util.Collection ; 6 import java.util.HashMap ; 7 import java.util.Map ; 8 import java.util.Timer ; 9 import java.util.logging.Level ; 10 import java.util.logging.Logger ; 11 12 import javax.annotation.PostConstruct; 13 import javax.annotation.PreDestroy; 14 import javax.annotation.Resource; 15 import javax.xml.ws.handler.LogicalHandler; 16 import javax.xml.ws.handler.LogicalMessageContext; 17 import javax.xml.ws.handler.MessageContext; 18 19 import org.objectweb.celtix.Bus; 20 import org.objectweb.celtix.bindings.AbstractBindingBase; 21 import org.objectweb.celtix.bindings.BindingBase; 22 import org.objectweb.celtix.bindings.BindingContextUtils; 23 import org.objectweb.celtix.bindings.ClientBinding; 24 import org.objectweb.celtix.bindings.JAXWSConstants; 25 import org.objectweb.celtix.bindings.ServerBinding; 26 27 import org.objectweb.celtix.bus.jaxws.EndpointImpl; 28 import org.objectweb.celtix.bus.jaxws.ServiceImpl; 29 import org.objectweb.celtix.bus.ws.addressing.AddressingPropertiesImpl; 30 import org.objectweb.celtix.bus.ws.addressing.ContextUtils; 31 import org.objectweb.celtix.bus.ws.addressing.VersionTransformer; 32 import org.objectweb.celtix.bus.ws.rm.persistence.RMStoreFactory; 33 import org.objectweb.celtix.common.logging.LogUtils; 34 import org.objectweb.celtix.configuration.Configuration; 35 import org.objectweb.celtix.configuration.ConfigurationBuilder; 36 import org.objectweb.celtix.configuration.ConfigurationBuilderFactory; 37 import org.objectweb.celtix.configuration.ConfigurationProvider; 38 import org.objectweb.celtix.context.ObjectMessageContext; 39 import org.objectweb.celtix.context.OutputStreamMessageContext; 40 import org.objectweb.celtix.handlers.SystemHandler; 41 import org.objectweb.celtix.transports.ClientTransport; 42 import org.objectweb.celtix.transports.ServerTransport; 43 import org.objectweb.celtix.transports.Transport; 44 import org.objectweb.celtix.ws.addressing.AddressingProperties; 45 import org.objectweb.celtix.ws.addressing.RelatesToType; 46 import org.objectweb.celtix.ws.addressing.v200408.AttributedURI; 47 import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType; 48 import org.objectweb.celtix.ws.rm.AckRequestedType; 49 import org.objectweb.celtix.ws.rm.CreateSequenceResponseType; 50 import org.objectweb.celtix.ws.rm.CreateSequenceType; 51 import org.objectweb.celtix.ws.rm.Identifier; 52 import org.objectweb.celtix.ws.rm.RMProperties; 53 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement; 54 import org.objectweb.celtix.ws.rm.SequenceType; 55 import org.objectweb.celtix.ws.rm.TerminateSequenceType; 56 import org.objectweb.celtix.ws.rm.persistence.RMStore; 57 import org.objectweb.celtix.ws.rm.wsdl.SequenceFault; 58 import org.objectweb.celtix.wsdl.EndpointReferenceUtils; 59 60 public class RMHandler implements LogicalHandler<LogicalMessageContext>, SystemHandler { 61 62 public static final String RM_CONFIGURATION_URI = "http://celtix.objectweb.org/bus/ws/rm/rm-config"; 63 public static final String RM_CONFIGURATION_ID = "rm-handler"; 64 65 private static final Logger LOG = LogUtils.getL7dLogger(RMHandler.class); 66 private static Map <BindingBase, RMHandler> handlers; 67 68 private RMSource source; 69 private RMDestination destination; 70 private RMProxy proxy; 71 private RMServant servant; 72 private Configuration configuration; 73 private RMStore store; 74 private Timer timer; 75 private boolean busLifeCycleListenerRegistered; 76 77 @Resource(name = JAXWSConstants.BUS_PROPERTY) private Bus bus; 78 @Resource(name = JAXWSConstants.CLIENT_BINDING_PROPERTY) private ClientBinding clientBinding; 79 @Resource(name = JAXWSConstants.SERVER_BINDING_PROPERTY) private ServerBinding serverBinding; 80 @Resource(name = JAXWSConstants.CLIENT_TRANSPORT_PROPERTY) private ClientTransport clientTransport; 81 @Resource(name = JAXWSConstants.SERVER_TRANSPORT_PROPERTY) private ServerTransport serverTransport; 82 83 public RMHandler() { 84 proxy = new RMProxy(this); 85 servant = new RMServant(); 86 } 87 88 @PostConstruct 89 protected synchronized void initialise() { 90 if (null == handlers) { 91 handlers = new HashMap <BindingBase, RMHandler>(); 92 } 93 handlers.put(getBinding(), this); 94 95 if (null == configuration) { 96 configuration = createConfiguration(); 97 } 98 99 if (null == store) { 100 store = new RMStoreFactory().getStore(configuration); 101 } 102 103 if (null == getSource()) { 104 source = new RMSource(this); 105 source.restore(); 106 } 107 if (null == destination) { 108 destination = new RMDestination(this); 109 destination.restore(); 110 } 111 112 if (null == timer) { 113 timer = new Timer (); 114 } 115 116 if (!busLifeCycleListenerRegistered) { 117 getBinding().getBus().getLifeCycleManager() 118 .registerLifeCycleListener(new RMBusLifeCycleListener(getSource())); 119 busLifeCycleListenerRegistered = true; 120 } 121 } 122 123 public static RMHandler getHandler(BindingBase binding) { 124 return handlers.get(binding); 125 } 126 127 public void close(MessageContext context) { 128 } 130 131 public boolean handleFault(LogicalMessageContext context) { 132 133 open(context); 134 return false; 135 } 136 137 public boolean handleMessage(LogicalMessageContext context) { 138 139 open(context); 140 141 try { 142 if (ContextUtils.isOutbound(context)) { 143 handleOutbound(context); 144 } else { 145 handleInbound(context); 146 } 147 } catch (SequenceFault sf) { 148 sf.printStackTrace(); 149 LOG.log(Level.SEVERE, "SequenceFault", sf); 150 } 151 return true; 152 } 153 154 @PreDestroy 155 public void shutdown() { 156 if (null != getSource()) { 157 getSource().shutdown(); 158 } 159 } 160 161 public Configuration getConfiguration() { 162 return configuration; 163 } 164 165 public RMStore getStore() { 166 return store; 167 } 168 169 public Timer getTimer() { 170 return timer; 171 } 172 173 public Bus getBus() { 174 return bus; 175 } 176 177 public Transport getTransport() { 178 return null == clientTransport ? serverTransport : clientTransport; 179 } 180 181 public ClientTransport getClientTransport() { 182 return clientTransport; 183 } 184 185 public ServerTransport getServerTransport() { 186 return serverTransport; 187 } 188 189 public ClientBinding getClientBinding() { 190 return clientBinding; 191 } 192 193 public ServerBinding getServerBinding() { 194 return serverBinding; 195 } 196 197 public boolean isServerSide() { 198 return null != serverBinding; 199 } 200 201 public AbstractBindingBase getBinding() { 202 if (null != clientBinding) { 203 return (AbstractBindingBase)clientBinding; 204 } 205 return (AbstractBindingBase)serverBinding; 206 } 207 208 public RMProxy getProxy() { 209 return proxy; 210 } 211 212 public RMServant getServant() { 213 return servant; 214 } 215 216 protected RMSource getSource() { 217 return source; 218 } 219 220 protected RMDestination getDestination() { 221 return destination; 222 } 223 224 protected void open(LogicalMessageContext context) { 225 getSource().getRetransmissionQueue().start(getBus().getWorkQueueManager() 227 .getAutomaticWorkQueue()); 228 } 229 230 protected Configuration createConfiguration() { 231 232 Configuration busCfg = getBinding().getBus().getConfiguration(); 233 ConfigurationBuilder builder = ConfigurationBuilderFactory.getBuilder(); 234 Configuration parent; 235 org.objectweb.celtix.ws.addressing.EndpointReferenceType ref = getBinding().getEndpointReference(); 236 237 if (null != clientBinding) { 238 String id = EndpointReferenceUtils.getServiceName(ref).toString() 239 + "/" + EndpointReferenceUtils.getPortName(ref); 240 parent = builder.getConfiguration(ServiceImpl.PORT_CONFIGURATION_URI, 241 id, busCfg); 242 } else { 243 parent = builder.getConfiguration(EndpointImpl.ENDPOINT_CONFIGURATION_URI, EndpointReferenceUtils 244 .getServiceName(ref).toString(), busCfg); 245 } 246 247 Configuration cfg = builder.getConfiguration(RM_CONFIGURATION_URI, RM_CONFIGURATION_ID, parent); 248 if (null == cfg) { 249 cfg = builder.buildConfiguration(RM_CONFIGURATION_URI, RM_CONFIGURATION_ID, parent); 250 251 } 252 boolean policyProviderRegistered = false; 253 for (ConfigurationProvider p : cfg.getProviders()) { 254 if (p instanceof RMPolicyProvider) { 255 policyProviderRegistered = true; 256 break; 257 } 258 } 259 if (!policyProviderRegistered) { 260 cfg.getProviders().add(new RMPolicyProvider(getBinding().getBus(), 261 getBinding().getEndpointReference())); 262 } 263 264 265 return cfg; 266 267 } 268 269 protected void handleOutbound(LogicalMessageContext context) throws SequenceFault { 270 LOG.entering(getClass().getName(), "handleOutbound"); 271 AddressingPropertiesImpl maps = 272 ContextUtils.retrieveMAPs(context, false, true); 273 274 maps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME); 276 277 String action = null; 278 if (maps != null && null != maps.getAction()) { 279 action = maps.getAction().getValue(); 280 } 281 282 285 if (LOG.isLoggable(Level.FINE)) { 286 LOG.fine("Action: " + action); 287 } 288 289 boolean isApplicationMessage = true; 290 291 if (RMUtils.getRMConstants().getCreateSequenceAction().equals(action) 292 || RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action) 293 || RMUtils.getRMConstants().getTerminateSequenceAction().equals(action) 294 || RMUtils.getRMConstants().getLastMessageAction().equals(action) 295 || RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action) 296 || RMUtils.getRMConstants().getSequenceInfoAction().equals(action)) { 297 isApplicationMessage = false; 298 } 299 300 RMPropertiesImpl rmpsOut = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, true); 301 if (null == rmpsOut) { 302 rmpsOut = new RMPropertiesImpl(); 303 RMContextUtils.storeRMProperties(context, rmpsOut, true); 304 } 305 306 RMPropertiesImpl rmpsIn = null; 307 Identifier inSeqId = null; 308 BigInteger inMessageNumber = null; 309 310 if (isApplicationMessage) { 311 312 rmpsIn = (RMPropertiesImpl)RMContextUtils.retrieveRMProperties(context, false); 313 314 if (null != rmpsIn && null != rmpsIn.getSequence()) { 315 inSeqId = rmpsIn.getSequence().getIdentifier(); 316 inMessageNumber = rmpsIn.getSequence().getMessageNumber(); 317 } 318 LOG.fine("inbound sequence: " + (null == inSeqId ? "null" : inSeqId.getValue())); 319 320 322 if (!(isServerSide() && BindingContextUtils.isOnewayTransport(context))) { 323 324 if (!ContextUtils.isRequestor(context)) { 325 assert null != inSeqId; 326 } 327 328 330 SourceSequence seq = getSequence(inSeqId, context, maps); 331 assert null != seq; 332 333 336 seq.nextMessageNumber(inSeqId, inMessageNumber); 337 rmpsOut.setSequence(seq); 338 339 343 if (seq.isLastMessage()) { 344 source.setCurrent(null); 345 } 346 } 347 } 348 349 352 if (isApplicationMessage 353 || RMUtils.getRMConstants().getSequenceAcknowledgmentAction().equals(action)) { 354 AttributedURI to = VersionTransformer.convert(maps.getTo()); 355 assert null != to; 356 addAcknowledgements(rmpsOut, inSeqId, to); 357 } 358 359 362 if (BindingContextUtils.isOnewayMethod(context) 363 || RMUtils.getRMConstants().getLastMessageAction().equals(action)) { 364 context.put(OutputStreamMessageContext.ONEWAY_MESSAGE_TF, Boolean.FALSE); 365 } 366 } 367 368 protected void handleInbound(LogicalMessageContext context) throws SequenceFault { 369 370 LOG.entering(getClass().getName(), "handleInbound"); 371 RMProperties rmps = RMContextUtils.retrieveRMProperties(context, false); 372 373 final AddressingPropertiesImpl maps = ContextUtils.retrieveMAPs(context, false, false); 374 assert null != maps; 375 376 String action = null; 377 if (null != maps.getAction()) { 378 action = maps.getAction().getValue(); 379 } 380 381 if (LOG.isLoggable(Level.FINE)) { 382 LOG.fine("Action: " + action); 383 } 384 385 if (RMUtils.getRMConstants().getCreateSequenceResponseAction().equals(action)) { 386 Object [] parameters = (Object [])context.get(ObjectMessageContext.METHOD_PARAMETERS); 387 CreateSequenceResponseType csr = (CreateSequenceResponseType)parameters[0]; 388 getServant().createSequenceResponse(getSource(), 389 csr, 390 getProxy().getOfferedIdentifier()); 391 392 return; 393 } else if (RMUtils.getRMConstants().getCreateSequenceAction().equals(action)) { 394 Object [] parameters = (Object [])context.get(ObjectMessageContext.METHOD_PARAMETERS); 395 CreateSequenceType cs = (CreateSequenceType)parameters[0]; 396 397 final CreateSequenceResponseType csr = 398 getServant().createSequence(getDestination(), cs, maps); 399 400 Runnable response = new Runnable () { 401 public void run() { 402 try { 403 getProxy().createSequenceResponse(maps, csr); 404 } catch (IOException ex) { 405 ex.printStackTrace(); 406 } catch (SequenceFault sf) { 407 sf.printStackTrace(); 408 } 409 } 410 }; 411 getBinding().getBus().getWorkQueueManager().getAutomaticWorkQueue().execute(response); 412 413 return; 414 } else if (RMUtils.getRMConstants().getTerminateSequenceAction().equals(action)) { 415 Object [] parameters = (Object [])context.get(ObjectMessageContext.METHOD_PARAMETERS); 416 TerminateSequenceType cs = (TerminateSequenceType)parameters[0]; 417 418 getServant().terminateSequence(getDestination(), cs.getIdentifier()); 419 } 420 421 423 if (null != rmps) { 424 425 processAcknowledgments(rmps); 426 427 processAcknowledgmentRequests(rmps); 428 429 processSequence(rmps, maps); 430 } 431 } 432 433 private void processAcknowledgments(RMProperties rmps) { 434 Collection <SequenceAcknowledgement> acks = rmps.getAcks(); 435 if (null != acks) { 436 for (SequenceAcknowledgement ack : acks) { 437 getSource().setAcknowledged(ack); 438 } 439 } 440 } 441 442 private void processSequence(RMProperties rmps, AddressingProperties maps) throws SequenceFault { 443 SequenceType s = rmps.getSequence(); 444 if (null == s) { 445 return; 446 } 447 getDestination().acknowledge(s, 448 null == maps.getReplyTo() ? null : maps.getReplyTo().getAddress().getValue()); 449 } 450 451 private void processAcknowledgmentRequests(RMProperties rmps) { 452 Collection <AckRequestedType> requested = rmps.getAcksRequested(); 453 if (null != requested) { 454 for (AckRequestedType ar : requested) { 455 DestinationSequence seq = getDestination().getSequence(ar.getIdentifier()); 456 if (null != seq) { 457 seq.scheduleImmediateAcknowledgement(); 458 } else { 459 LOG.severe("No such sequence."); 460 } 461 } 462 } 463 } 464 465 private void addAcknowledgements(RMPropertiesImpl rmpsOut, Identifier inSeqId, AttributedURI to) { 466 467 for (DestinationSequence seq : getDestination().getAllSequences()) { 468 if (seq.sendAcknowledgement() 469 && ((seq.getAcksTo().getAddress().getValue().equals(RMUtils.getAddressingConstants() 470 .getAnonymousURI()) && AbstractSequenceImpl.identifierEquals(seq.getIdentifier(), 471 inSeqId)) 472 || to.getValue().equals(seq.getAcksTo().getAddress().getValue()))) { 473 rmpsOut.addAck(seq); 474 } else if (LOG.isLoggable(Level.FINE)) { 475 if (!seq.sendAcknowledgement()) { 476 LOG.fine("no need to add an acknowledgements for sequence " 477 + seq.getIdentifier().getValue()); 478 } else { 479 LOG.fine("sequences acksTo (" + seq.getAcksTo().getAddress().getValue() 480 + ") does not match to (" + to.getValue() + ")"); 481 } 482 } 483 } 484 485 if (LOG.isLoggable(Level.FINE)) { 486 Collection <SequenceAcknowledgement> acks = rmpsOut.getAcks(); 487 if (null == acks) { 488 LOG.fine("No acknowledgements added"); 489 } else { 490 LOG.fine("Added " + acks.size() + " acknowledgements."); 491 } 492 } 493 } 494 495 private SourceSequence getSequence(Identifier inSeqId, 496 LogicalMessageContext context, 497 AddressingPropertiesImpl maps) throws SequenceFault { 498 SourceSequence seq = getSource().getCurrent(inSeqId); 499 500 if (null == seq) { 501 org.objectweb.celtix.ws.addressing.EndpointReferenceType to = null; 503 try { 504 EndpointReferenceType acksTo = null; 505 RelatesToType relatesTo = null; 506 if (isServerSide()) { 507 AddressingPropertiesImpl inMaps = ContextUtils 508 .retrieveMAPs(context, false, false); 509 inMaps.exposeAs(VersionTransformer.Names200408.WSA_NAMESPACE_NAME); 510 acksTo = RMUtils.createReference(inMaps.getTo().getValue()); 511 to = inMaps.getReplyTo(); 512 getServant().setUnattachedIdentifier(inSeqId); 513 relatesTo = ContextUtils.WSA_OBJECT_FACTORY.createRelatesToType(); 514 DestinationSequence inSeq = getDestination().getSequence(inSeqId); 515 relatesTo.setValue(inSeq != null ? inSeq.getCorrelationID() : null); 516 } else { 517 acksTo = VersionTransformer.convert(maps.getReplyTo()); 518 if (Names.WSA_NONE_ADDRESS.equals(acksTo.getAddress().getValue())) { 520 acksTo = RMUtils.createReference(Names.WSA_ANONYMOUS_ADDRESS); 521 } 522 } 523 524 getProxy().createSequence(getSource(), to, acksTo, relatesTo); 525 } catch (IOException ex) { 526 ex.printStackTrace(); 527 } 528 529 seq = getSource().awaitCurrent(inSeqId); 530 seq.setTarget(to); 531 } 532 533 return seq; 534 } 535 536 public void destroy() { 537 getSource().getRetransmissionQueue().stop(); 538 } 539 540 541 542 } 543 | Popular Tags |