1 package org.objectweb.celtix.bus.ws.rm; 2 3 import java.io.IOException ; 4 import java.math.BigInteger ; 5 import java.util.ArrayList ; 6 import java.util.Collection ; 7 import java.util.HashMap ; 8 import java.util.Iterator ; 9 import java.util.List ; 10 import java.util.Map ; 11 import java.util.Timer ; 12 import java.util.TimerTask ; 13 import java.util.logging.Level ; 14 import java.util.logging.Logger ; 15 16 import javax.xml.namespace.QName ; 17 import javax.xml.soap.SOAPMessage ; 18 import javax.xml.ws.handler.Handler; 19 import javax.xml.ws.handler.MessageContext; 20 21 import org.objectweb.celtix.bindings.AbstractBindingBase; 22 import org.objectweb.celtix.bindings.AbstractBindingImpl; 23 import org.objectweb.celtix.bindings.Request; 24 import org.objectweb.celtix.bindings.Response; 25 import org.objectweb.celtix.bindings.ServerRequest; 26 import org.objectweb.celtix.bus.ws.addressing.ContextUtils; 27 import org.objectweb.celtix.bus.ws.addressing.soap.MAPCodec; 28 import org.objectweb.celtix.bus.ws.rm.soap.RMSoapHandler; 29 import org.objectweb.celtix.common.logging.LogUtils; 30 import org.objectweb.celtix.context.InputStreamMessageContext; 31 import org.objectweb.celtix.context.ObjectMessageContext; 32 import org.objectweb.celtix.context.ObjectMessageContextImpl; 33 import org.objectweb.celtix.context.OutputStreamMessageContext; 34 import org.objectweb.celtix.transports.ClientTransport; 35 import org.objectweb.celtix.transports.ServerTransport; 36 import org.objectweb.celtix.transports.Transport; 37 import org.objectweb.celtix.workqueue.WorkQueue; 38 import org.objectweb.celtix.ws.addressing.AddressingProperties; 39 import org.objectweb.celtix.ws.rm.AckRequestedType; 40 import org.objectweb.celtix.ws.rm.Identifier; 41 import org.objectweb.celtix.ws.rm.RMProperties; 42 import org.objectweb.celtix.ws.rm.SequenceType; 43 import org.objectweb.celtix.ws.rm.persistence.RMMessage; 44 import org.objectweb.celtix.ws.rm.persistence.RMStore; 45 import org.objectweb.celtix.ws.rm.policy.RMAssertionType; 46 47 public class RetransmissionQueue { 48 public static final QName EXPONENTIAL_BACKOFF_BASE_ATTR = 49 new QName (RMHandler.RM_CONFIGURATION_URI, "exponentialBackoffBase"); 50 public static final String DEFAULT_BASE_RETRANSMISSION_INTERVAL = "3000"; 51 public static final String DEFAULT_EXPONENTIAL_BACKOFF = "2"; 52 private static final String SOAP_MSG_KEY = "org.objectweb.celtix.bindings.soap.message"; 53 private static final Logger LOG = 54 LogUtils.getL7dLogger(RetransmissionQueue.class); 55 56 57 private RMHandler handler; 58 private RMSoapHandler rmSOAPHandler; 59 private MAPCodec wsaSOAPHandler; 60 private WorkQueue workQueue; 61 private long baseRetransmissionInterval; 62 private int exponentialBackoff; 63 private Map <String , List <ResendCandidate>> candidates; 64 private Runnable resendInitiator; 65 private boolean shutdown; 66 private Resender resender; 67 private Timer timer; 68 69 72 public RetransmissionQueue(RMHandler h) { 73 this(h, Long.parseLong(DEFAULT_BASE_RETRANSMISSION_INTERVAL), 74 Integer.parseInt(DEFAULT_EXPONENTIAL_BACKOFF)); 75 } 76 77 80 public RetransmissionQueue(RMHandler h, RMAssertionType rma) { 81 this(h, rma.getBaseRetransmissionInterval().getMilliseconds().longValue(), 82 Integer.parseInt(rma.getExponentialBackoff().getOtherAttributes() 83 .get(EXPONENTIAL_BACKOFF_BASE_ATTR))); 84 } 85 86 87 93 public RetransmissionQueue(RMHandler h, long base, 94 int backoff) { 95 handler = h; 96 baseRetransmissionInterval = base; 97 exponentialBackoff = backoff; 98 candidates = new HashMap <String , List <ResendCandidate>>(); 99 resender = getDefaultResender(); 100 } 101 102 107 protected final Resender getDefaultResender() { 108 return new Resender() { 109 public void resend(ObjectMessageContext context, 110 boolean requestAcknowledge) { 111 RMProperties properties = 112 RMContextUtils.retrieveRMProperties(context, true); 113 SequenceType st = properties.getSequence(); 114 if (st != null) { 115 LOG.log(Level.INFO, "RESEND_MSG", st.getMessageNumber()); 116 } 117 try { 118 refreshMAPs(context); 119 refreshRMProperties(context, requestAcknowledge); 120 if (ContextUtils.isRequestor(context)) { 121 clientResend(context); 122 } else { 123 serverResend(context); 124 } 125 } catch (Exception e) { 126 LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e); 127 } 128 } 129 }; 130 }; 131 132 138 private void refreshMAPs(MessageContext context) { 139 AddressingProperties maps = 140 ContextUtils.retrieveMAPs(context, false, true); 141 String uuid = ContextUtils.generateUUID(); 142 maps.setMessageID(ContextUtils.getAttributedURI(uuid)); 143 } 144 145 154 private void refreshRMProperties(MessageContext context, boolean requestAcknowledge) { 155 RMProperties properties = 156 RMContextUtils.retrieveRMProperties(context, true); 157 List <AckRequestedType> requests = null; 158 if (requestAcknowledge) { 159 requests = new ArrayList <AckRequestedType>(); 160 requests.add(RMUtils.getWSRMFactory().createAckRequestedType()); 161 Identifier id = properties.getSequence().getIdentifier(); 162 requests.get(0).setIdentifier(id); 163 } 164 properties.setAcksRequested(requests); 165 } 166 167 173 private Request createClientRequest(ObjectMessageContext context) { 174 AbstractBindingBase binding = handler.getBinding(); 175 Transport transport = handler.getClientTransport(); 176 Request request = new Request(binding, transport, context); 177 request.setOneway(ContextUtils.isOneway(context)); 178 return request; 179 } 180 181 186 private void clientResend(ObjectMessageContext context) throws IOException { 187 Request request = createClientRequest(context); 188 OutputStreamMessageContext outputStreamContext = 189 request.process(null, true, true); 190 ClientTransport transport = handler.getClientTransport(); 191 if (transport != null) { 192 invokePartial(request, transport, outputStreamContext); 195 } else { 196 LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG"); 197 } 198 } 199 200 206 private ServerRequest createServerRequest(ObjectMessageContext context) { 207 AbstractBindingBase binding = handler.getBinding(); 208 ServerRequest request = new ServerRequest(binding, context); 209 request.setOneway(false); 211 return request; 212 } 213 214 215 220 private void serverResend(ObjectMessageContext context) throws IOException { 221 ServerTransport transport = handler.getServerTransport(); 222 if (transport != null) { 223 ServerRequest serverRequest = createServerRequest(context); 224 serverRequest.processOutbound(transport, null, true); 225 } else { 226 LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG"); 227 } 228 } 229 230 237 private void invokePartial(Request request, 238 ClientTransport transport, 239 OutputStreamMessageContext outputStreamContext) 240 throws IOException { 241 InputStreamMessageContext inputStreamContext = 242 transport.invoke(outputStreamContext); 243 Response response = new Response(request); 244 response.processProtocol(inputStreamContext); 245 response.processLogical(null); 246 } 247 248 253 protected void populate(Collection <SourceSequence> seqs) { 254 LOG.fine(seqs.size() + " active sequences"); 255 RMStore store = handler.getStore(); 256 for (SourceSequence seq : seqs) { 257 Collection <RMMessage> msgs = store.getMessages(seq.getIdentifier(), true); 258 LOG.fine("Recovered " + msgs.size() + " messages for this sequence"); 259 for (RMMessage msg : msgs) { 260 ObjectMessageContext objCtx = new ObjectMessageContextImpl(); 261 objCtx.putAll(msg.getContext()); 262 cacheUnacknowledged(objCtx); 263 LOG.fine("cached unacknowledged message nr: " + msg.getMessageNr()); 264 } 265 } 266 } 267 268 protected RMSoapHandler getRMSoapHandler() { 269 if (null == rmSOAPHandler) { 270 AbstractBindingImpl abi = handler.getBinding().getBindingImpl(); 271 List <Handler> handlerChain = abi.getPostProtocolSystemHandlers(); 272 for (Handler h : handlerChain) { 273 if (h instanceof RMSoapHandler) { 274 rmSOAPHandler = (RMSoapHandler)h; 275 } 276 } 277 } 278 return rmSOAPHandler; 279 } 280 281 protected MAPCodec getWsaSOAPHandler() { 282 if (null == wsaSOAPHandler) { 283 AbstractBindingImpl abi = handler.getBinding().getBindingImpl(); 284 List <Handler> handlerChain = abi.getPostProtocolSystemHandlers(); 285 for (Handler h : handlerChain) { 286 if (h instanceof MAPCodec) { 287 wsaSOAPHandler = (MAPCodec)h; 288 } 289 } 290 } 291 return wsaSOAPHandler; 292 } 293 294 299 protected void replaceResender(Resender replacement) { 300 resender = replacement; 301 } 302 303 308 protected void start(WorkQueue queue) { 309 if (null == workQueue) { 310 LOG.fine("Starting retransmission queue"); 311 workQueue = queue; 312 314 TimerTask task = new TimerTask () { 315 public void run() { 316 getResendInitiator().run(); 317 } 318 }; 319 timer = new Timer (); 320 timer.schedule(task, getBaseRetransmissionInterval(), getBaseRetransmissionInterval()); 321 } 322 } 323 324 protected void stop() { 325 if (null != timer) { 326 LOG.fine("Stopping retransmission queue"); 327 timer.cancel(); 328 } 329 } 330 331 337 protected ResendCandidate cacheUnacknowledged(ObjectMessageContext ctx) { 338 ResendCandidate candidate = null; 339 RMProperties rmps = RMContextUtils.retrieveRMProperties(ctx, true); 340 if (null == rmps) { 341 SOAPMessage message = (SOAPMessage )ctx.get(SOAP_MSG_KEY); 342 rmps = getRMSoapHandler().unmarshalRMProperties(message); 343 RMContextUtils.storeRMProperties(ctx, rmps, true); 344 } 345 AddressingProperties maps = ContextUtils.retrieveMAPs(ctx, false, true); 346 if (null == maps) { 347 SOAPMessage message = (SOAPMessage )ctx.get(SOAP_MSG_KEY); 348 try { 349 maps = getWsaSOAPHandler().unmarshalMAPs(message); 350 ContextUtils.storeMAPs(maps, ctx, true); 351 } catch (Exception ex) { 352 ex.printStackTrace(); 353 } 354 } 355 356 SequenceType st = rmps.getSequence(); 357 Identifier sid = st.getIdentifier(); 358 synchronized (this) { 359 String key = sid.getValue(); 360 List <ResendCandidate> sequenceCandidates = 361 getSequenceCandidates(key); 362 if (null == sequenceCandidates) { 363 sequenceCandidates = new ArrayList <ResendCandidate>(); 364 candidates.put(key, sequenceCandidates); 365 } 366 candidate = new ResendCandidate(ctx); 367 sequenceCandidates.add(candidate); 368 } 369 return candidate; 370 } 371 372 378 protected void purgeAcknowledged(SourceSequence seq) { 379 Collection <BigInteger > purged = new ArrayList <BigInteger >(); 380 synchronized (this) { 381 List <ResendCandidate> sequenceCandidates = getSequenceCandidates(seq); 382 if (null != sequenceCandidates) { 383 for (int i = sequenceCandidates.size() - 1; i >= 0; i--) { 384 ResendCandidate candidate = sequenceCandidates.get(i); 385 RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getContext(), 386 true); 387 SequenceType st = properties.getSequence(); 388 BigInteger m = st.getMessageNumber(); 389 if (seq.isAcknowledged(m)) { 390 sequenceCandidates.remove(i); 391 candidate.resolved(); 392 purged.add(m); 393 } 394 } 395 } 396 } 397 if (purged.size() > 0) { 398 handler.getStore().removeMessages(seq.getIdentifier(), purged, true); 399 } 400 } 401 402 406 protected synchronized int countUnacknowledged(SourceSequence seq) { 407 List <ResendCandidate> sequenceCandidates = getSequenceCandidates(seq); 408 return sequenceCandidates == null ? 0 : sequenceCandidates.size(); 409 } 410 411 415 protected Map <String , List <ResendCandidate>> getUnacknowledged() { 416 return candidates; 417 } 418 419 424 protected List <ResendCandidate> getSequenceCandidates(SourceSequence seq) { 425 return getSequenceCandidates(seq.getIdentifier().getValue()); 426 } 427 428 433 protected List <ResendCandidate> getSequenceCandidates(String key) { 434 return candidates.get(key); 435 } 436 437 440 protected long getBaseRetransmissionInterval() { 441 return baseRetransmissionInterval; 442 } 443 444 447 protected int getExponentialBackoff() { 448 return exponentialBackoff; 449 } 450 451 454 protected synchronized void shutdown() { 455 shutdown = true; 456 } 457 458 461 protected synchronized boolean isShutdown() { 462 return shutdown; 463 } 464 465 468 protected Runnable getResendInitiator() { 469 if (resendInitiator == null) { 470 resendInitiator = new ResendInitiator(); 471 } 472 return resendInitiator; 473 } 474 475 479 protected ResendCandidate createResendCandidate(ObjectMessageContext context) { 480 return new ResendCandidate(context); 481 } 482 483 488 protected class ResendInitiator implements Runnable { 489 public void run() { 490 synchronized (RetransmissionQueue.this) { 492 Iterator <Map.Entry <String , List <ResendCandidate>>> sequences = 493 candidates.entrySet().iterator(); 494 while (sequences.hasNext()) { 495 Iterator <ResendCandidate> sequenceCandidates = 496 sequences.next().getValue().iterator(); 497 boolean requestAck = true; 498 while (sequenceCandidates.hasNext()) { 499 ResendCandidate candidate = sequenceCandidates.next(); 500 if (candidate.isDue()) { 501 candidate.initiate(requestAck); 502 requestAck = false; 503 } 504 } 505 } 506 } 507 515 } 516 } 517 518 523 protected class ResendCandidate implements Runnable { 524 private ObjectMessageContext context; 525 private int skips; 526 private int skipped; 527 private boolean pending; 528 private boolean includeAckRequested; 529 530 533 protected ResendCandidate(ObjectMessageContext ctx) { 534 context = ctx; 535 skipped = -1; 536 skips = 1; 537 } 538 539 542 public void run() { 543 try { 544 if (isPending()) { 547 resender.resend(context, includeAckRequested); 548 includeAckRequested = false; 549 } 550 } finally { 551 attempted(); 552 } 553 } 554 555 559 protected synchronized boolean isDue() { 560 boolean due = false; 561 if (!pending && ++skipped == skips) { 564 skips *= getExponentialBackoff(); 565 skipped = 0; 566 due = true; 567 } 568 return due; 569 } 570 571 574 protected synchronized boolean isPending() { 575 return pending; 576 } 577 578 584 protected synchronized void initiate(boolean requestAcknowledge) { 585 includeAckRequested = requestAcknowledge; 586 pending = true; 587 workQueue.execute(this); 588 } 589 590 593 protected synchronized void resolved() { 594 pending = false; 595 skips = Integer.MAX_VALUE; 596 } 597 598 601 protected MessageContext getContext() { 602 return context; 603 } 604 605 608 private synchronized void attempted() { 609 pending = false; 610 } 611 } 612 613 616 public interface Resender { 617 623 void resend(ObjectMessageContext context, boolean requestAcknowledge); 624 } 625 } 626 | Popular Tags |