KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > ws > rm > RetransmissionQueue


1 package org.objectweb.celtix.bus.ws.rm;
2
3 import java.io.IOException JavaDoc;
4 import java.math.BigInteger JavaDoc;
5 import java.util.ArrayList JavaDoc;
6 import java.util.Collection JavaDoc;
7 import java.util.HashMap JavaDoc;
8 import java.util.Iterator JavaDoc;
9 import java.util.List JavaDoc;
10 import java.util.Map JavaDoc;
11 import java.util.Timer JavaDoc;
12 import java.util.TimerTask JavaDoc;
13 import java.util.logging.Level JavaDoc;
14 import java.util.logging.Logger JavaDoc;
15
16 import javax.xml.namespace.QName JavaDoc;
17 import javax.xml.soap.SOAPMessage JavaDoc;
18 import javax.xml.ws.handler.Handler;
19 import javax.xml.ws.handler.MessageContext;
20
21 import org.objectweb.celtix.bindings.AbstractBindingBase;
22 import org.objectweb.celtix.bindings.AbstractBindingImpl;
23 import org.objectweb.celtix.bindings.Request;
24 import org.objectweb.celtix.bindings.Response;
25 import org.objectweb.celtix.bindings.ServerRequest;
26 import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
27 import org.objectweb.celtix.bus.ws.addressing.soap.MAPCodec;
28 import org.objectweb.celtix.bus.ws.rm.soap.RMSoapHandler;
29 import org.objectweb.celtix.common.logging.LogUtils;
30 import org.objectweb.celtix.context.InputStreamMessageContext;
31 import org.objectweb.celtix.context.ObjectMessageContext;
32 import org.objectweb.celtix.context.ObjectMessageContextImpl;
33 import org.objectweb.celtix.context.OutputStreamMessageContext;
34 import org.objectweb.celtix.transports.ClientTransport;
35 import org.objectweb.celtix.transports.ServerTransport;
36 import org.objectweb.celtix.transports.Transport;
37 import org.objectweb.celtix.workqueue.WorkQueue;
38 import org.objectweb.celtix.ws.addressing.AddressingProperties;
39 import org.objectweb.celtix.ws.rm.AckRequestedType;
40 import org.objectweb.celtix.ws.rm.Identifier;
41 import org.objectweb.celtix.ws.rm.RMProperties;
42 import org.objectweb.celtix.ws.rm.SequenceType;
43 import org.objectweb.celtix.ws.rm.persistence.RMMessage;
44 import org.objectweb.celtix.ws.rm.persistence.RMStore;
45 import org.objectweb.celtix.ws.rm.policy.RMAssertionType;
46
47 public class RetransmissionQueue {
48     public static final QName JavaDoc EXPONENTIAL_BACKOFF_BASE_ATTR =
49         new QName JavaDoc(RMHandler.RM_CONFIGURATION_URI, "exponentialBackoffBase");
50     public static final String JavaDoc DEFAULT_BASE_RETRANSMISSION_INTERVAL = "3000";
51     public static final String JavaDoc DEFAULT_EXPONENTIAL_BACKOFF = "2";
52     private static final String JavaDoc SOAP_MSG_KEY = "org.objectweb.celtix.bindings.soap.message";
53     private static final Logger JavaDoc LOG =
54         LogUtils.getL7dLogger(RetransmissionQueue.class);
55     
56
57     private RMHandler handler;
58     private RMSoapHandler rmSOAPHandler;
59     private MAPCodec wsaSOAPHandler;
60     private WorkQueue workQueue;
61     private long baseRetransmissionInterval;
62     private int exponentialBackoff;
63     private Map JavaDoc<String JavaDoc, List JavaDoc<ResendCandidate>> candidates;
64     private Runnable JavaDoc resendInitiator;
65     private boolean shutdown;
66     private Resender resender;
67     private Timer JavaDoc timer;
68     
69     /**
70      * Constructor.
71      */

72     public RetransmissionQueue(RMHandler h) {
73         this(h, Long.parseLong(DEFAULT_BASE_RETRANSMISSION_INTERVAL),
74              Integer.parseInt(DEFAULT_EXPONENTIAL_BACKOFF));
75     }
76     
77     /**
78      * Constructor.
79      */

80     public RetransmissionQueue(RMHandler h, RMAssertionType rma) {
81         this(h, rma.getBaseRetransmissionInterval().getMilliseconds().longValue(),
82              Integer.parseInt(rma.getExponentialBackoff().getOtherAttributes()
83                               .get(EXPONENTIAL_BACKOFF_BASE_ATTR)));
84     }
85     
86
87     /**
88      * Constructor.
89      *
90      * @param base the base retransmission interval
91      * @param backoff the exponential backoff
92      */

93     public RetransmissionQueue(RMHandler h, long base,
94                                int backoff) {
95         handler = h;
96         baseRetransmissionInterval = base;
97         exponentialBackoff = backoff;
98         candidates = new HashMap JavaDoc<String JavaDoc, List JavaDoc<ResendCandidate>>();
99         resender = getDefaultResender();
100     }
101     
102     /**
103      * Create default Resender logic.
104      *
105      * @return default Resender
106      */

107     protected final Resender getDefaultResender() {
108         return new Resender() {
109             public void resend(ObjectMessageContext context,
110                                boolean requestAcknowledge) {
111                 RMProperties properties =
112                     RMContextUtils.retrieveRMProperties(context, true);
113                 SequenceType st = properties.getSequence();
114                 if (st != null) {
115                     LOG.log(Level.INFO, "RESEND_MSG", st.getMessageNumber());
116                 }
117                 try {
118                     refreshMAPs(context);
119                     refreshRMProperties(context, requestAcknowledge);
120                     if (ContextUtils.isRequestor(context)) {
121                         clientResend(context);
122                     } else {
123                         serverResend(context);
124                     }
125                 } catch (Exception JavaDoc e) {
126                     LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
127                 }
128             }
129         };
130     };
131     
132     /**
133      * Refresh the MAPs with a new message ID (to avoid the resend being
134      * rejected by the receiver-side WS-Addressing layer as a duplicate).
135      *
136      * @param context the message context
137      */

138     private void refreshMAPs(MessageContext context) {
139         AddressingProperties maps =
140             ContextUtils.retrieveMAPs(context, false, true);
141         String JavaDoc uuid = ContextUtils.generateUUID();
142         maps.setMessageID(ContextUtils.getAttributedURI(uuid));
143     }
144     
145     /**
146      * Refresh the RM Properties with an AckRequested if necessary.
147      * Currently the first resend for each sequence on each initiator iteration
148      * includes an AckRequested. The idea is that a timely ACK may cause some of
149      * of the resend to be avoided.
150      *
151      * @param context the message context
152      * @param requestAcknowledge true if an AckRequested header should be included
153      */

154     private void refreshRMProperties(MessageContext context, boolean requestAcknowledge) {
155         RMProperties properties =
156             RMContextUtils.retrieveRMProperties(context, true);
157         List JavaDoc<AckRequestedType> requests = null;
158         if (requestAcknowledge) {
159             requests = new ArrayList JavaDoc<AckRequestedType>();
160             requests.add(RMUtils.getWSRMFactory().createAckRequestedType());
161             Identifier id = properties.getSequence().getIdentifier();
162             requests.get(0).setIdentifier(id);
163         }
164         properties.setAcksRequested(requests);
165     }
166     
167     /**
168      * Create a client request for retransmission.
169      *
170      * @param context the message context
171      * @return an appropriate Request for the context
172      */

173     private Request createClientRequest(ObjectMessageContext context) {
174         AbstractBindingBase binding = handler.getBinding();
175         Transport transport = handler.getClientTransport();
176         Request request = new Request(binding, transport, context);
177         request.setOneway(ContextUtils.isOneway(context));
178         return request;
179     }
180
181     /**
182      * Client-side resend.
183      *
184      * @param context the message context
185      */

186     private void clientResend(ObjectMessageContext context) throws IOException JavaDoc {
187         Request request = createClientRequest(context);
188         OutputStreamMessageContext outputStreamContext =
189             request.process(null, true, true);
190         ClientTransport transport = handler.getClientTransport();
191         if (transport != null) {
192             // decoupled response channel always being used with RM,
193
// hence a partial response must be processed
194
invokePartial(request, transport, outputStreamContext);
195         } else {
196             LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG");
197         }
198     }
199     
200     /**
201      * Create a server request for retransmission.
202      *
203      * @param context the message context
204      * @return an appropriate ServerRequest for the context
205      */

206     private ServerRequest createServerRequest(ObjectMessageContext context) {
207         AbstractBindingBase binding = handler.getBinding();
208         ServerRequest request = new ServerRequest(binding, context);
209         // a server-originated resend implies a response, hence non-oneway
210
request.setOneway(false);
211         return request;
212     }
213
214
215     /**
216      * Server-side resend.
217      *
218      * @param context the message context
219      */

220     private void serverResend(ObjectMessageContext context) throws IOException JavaDoc {
221         ServerTransport transport = handler.getServerTransport();
222         if (transport != null) {
223             ServerRequest serverRequest = createServerRequest(context);
224             serverRequest.processOutbound(transport, null, true);
225         } else {
226             LOG.log(Level.WARNING, "NO_TRANSPORT_FOR_RESEND_MSG");
227         }
228     }
229     
230     /**
231      * Invoke a oneway operation, allowing for a partial response.
232      *
233      * @param request the request
234      * @param transport the client transport
235      * @param outputStreamContext the output stream message context
236      */

237     private void invokePartial(Request request,
238                                ClientTransport transport,
239                                OutputStreamMessageContext outputStreamContext)
240         throws IOException JavaDoc {
241         InputStreamMessageContext inputStreamContext =
242             transport.invoke(outputStreamContext);
243         Response response = new Response(request);
244         response.processProtocol(inputStreamContext);
245         response.processLogical(null);
246     }
247
248     /**
249      * Populates the retransmission queue with messages recovered from persistent
250      * store.
251      *
252      */

253     protected void populate(Collection JavaDoc<SourceSequence> seqs) {
254         LOG.fine(seqs.size() + " active sequences");
255         RMStore store = handler.getStore();
256         for (SourceSequence seq : seqs) {
257             Collection JavaDoc<RMMessage> msgs = store.getMessages(seq.getIdentifier(), true);
258             LOG.fine("Recovered " + msgs.size() + " messages for this sequence");
259             for (RMMessage msg : msgs) {
260                 ObjectMessageContext objCtx = new ObjectMessageContextImpl();
261                 objCtx.putAll(msg.getContext());
262                 cacheUnacknowledged(objCtx);
263                 LOG.fine("cached unacknowledged message nr: " + msg.getMessageNr());
264             }
265         }
266     }
267     
268     protected RMSoapHandler getRMSoapHandler() {
269         if (null == rmSOAPHandler) {
270             AbstractBindingImpl abi = handler.getBinding().getBindingImpl();
271             List JavaDoc<Handler> handlerChain = abi.getPostProtocolSystemHandlers();
272             for (Handler h : handlerChain) {
273                 if (h instanceof RMSoapHandler) {
274                     rmSOAPHandler = (RMSoapHandler)h;
275                 }
276             }
277         }
278         return rmSOAPHandler;
279     }
280     
281     protected MAPCodec getWsaSOAPHandler() {
282         if (null == wsaSOAPHandler) {
283             AbstractBindingImpl abi = handler.getBinding().getBindingImpl();
284             List JavaDoc<Handler> handlerChain = abi.getPostProtocolSystemHandlers();
285             for (Handler h : handlerChain) {
286                 if (h instanceof MAPCodec) {
287                     wsaSOAPHandler = (MAPCodec)h;
288                 }
289             }
290         }
291         return wsaSOAPHandler;
292     }
293
294     /**
295      * Plug in replacement resend logic (facilitates unit testing).
296      *
297      * @param replacement resend logic
298      */

299     protected void replaceResender(Resender replacement) {
300         resender = replacement;
301     }
302     
303     /**
304      * Initiate resends.
305      *
306      * @param queue the work queue providing async execution
307      */

308     protected void start(WorkQueue queue) {
309         if (null == workQueue) {
310             LOG.fine("Starting retransmission queue");
311             workQueue = queue;
312             // workQueue.schedule(getResendInitiator(), baseRetransmissionInterval);
313

314             TimerTask JavaDoc task = new TimerTask JavaDoc() {
315                 public void run() {
316                     getResendInitiator().run();
317                 }
318             };
319             timer = new Timer JavaDoc();
320             timer.schedule(task, getBaseRetransmissionInterval(), getBaseRetransmissionInterval());
321         }
322     }
323     
324     protected void stop() {
325         if (null != timer) {
326             LOG.fine("Stopping retransmission queue");
327             timer.cancel();
328         }
329     }
330     
331     /**
332      * Accepts a new resend candidate.
333      *
334      * @param ctx the message context.
335      * @return ResendCandidate
336      */

337     protected ResendCandidate cacheUnacknowledged(ObjectMessageContext ctx) {
338         ResendCandidate candidate = null;
339         RMProperties rmps = RMContextUtils.retrieveRMProperties(ctx, true);
340         if (null == rmps) {
341             SOAPMessage JavaDoc message = (SOAPMessage JavaDoc)ctx.get(SOAP_MSG_KEY);
342             rmps = getRMSoapHandler().unmarshalRMProperties(message);
343             RMContextUtils.storeRMProperties(ctx, rmps, true);
344         }
345         AddressingProperties maps = ContextUtils.retrieveMAPs(ctx, false, true);
346         if (null == maps) {
347             SOAPMessage JavaDoc message = (SOAPMessage JavaDoc)ctx.get(SOAP_MSG_KEY);
348             try {
349                 maps = getWsaSOAPHandler().unmarshalMAPs(message);
350                 ContextUtils.storeMAPs(maps, ctx, true);
351             } catch (Exception JavaDoc ex) {
352                 ex.printStackTrace();
353             }
354         }
355         
356         SequenceType st = rmps.getSequence();
357         Identifier sid = st.getIdentifier();
358         synchronized (this) {
359             String JavaDoc key = sid.getValue();
360             List JavaDoc<ResendCandidate> sequenceCandidates =
361                 getSequenceCandidates(key);
362             if (null == sequenceCandidates) {
363                 sequenceCandidates = new ArrayList JavaDoc<ResendCandidate>();
364                 candidates.put(key, sequenceCandidates);
365             }
366             candidate = new ResendCandidate(ctx);
367             sequenceCandidates.add(candidate);
368         }
369         return candidate;
370     }
371
372     /**
373      * Purge all candidates for the given sequence that
374      * have been acknowledged.
375      *
376      * @param seq the sequence object.
377      */

378     protected void purgeAcknowledged(SourceSequence seq) {
379         Collection JavaDoc<BigInteger JavaDoc> purged = new ArrayList JavaDoc<BigInteger JavaDoc>();
380         synchronized (this) {
381             List JavaDoc<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
382             if (null != sequenceCandidates) {
383                 for (int i = sequenceCandidates.size() - 1; i >= 0; i--) {
384                     ResendCandidate candidate = sequenceCandidates.get(i);
385                     RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getContext(),
386                                                                                   true);
387                     SequenceType st = properties.getSequence();
388                     BigInteger JavaDoc m = st.getMessageNumber();
389                     if (seq.isAcknowledged(m)) {
390                         sequenceCandidates.remove(i);
391                         candidate.resolved();
392                         purged.add(m);
393                     }
394                 }
395             }
396         }
397         if (purged.size() > 0) {
398             handler.getStore().removeMessages(seq.getIdentifier(), purged, true);
399         }
400     }
401
402     /**
403      * @param seq the sequence under consideration
404      * @return the number of unacknowledged messages for that sequence
405      */

406     protected synchronized int countUnacknowledged(SourceSequence seq) {
407         List JavaDoc<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
408         return sequenceCandidates == null ? 0 : sequenceCandidates.size();
409     }
410     
411     /**
412      * @return a map relating sequence ID to a lists of un-acknowledged
413      * messages for that sequence
414      */

415     protected Map JavaDoc<String JavaDoc, List JavaDoc<ResendCandidate>> getUnacknowledged() {
416         return candidates;
417     }
418     
419     /**
420      * @param seq the sequence under consideration
421      * @return the list of resend candidates for that sequence
422      * @pre called with mutex held
423      */

424     protected List JavaDoc<ResendCandidate> getSequenceCandidates(SourceSequence seq) {
425         return getSequenceCandidates(seq.getIdentifier().getValue());
426     }
427     
428     /**
429      * @param key the sequence identifier under consideration
430      * @return the list of resend candidates for that sequence
431      * @pre called with mutex held
432      */

433     protected List JavaDoc<ResendCandidate> getSequenceCandidates(String JavaDoc key) {
434         return candidates.get(key);
435     }
436    
437     /**
438      * @return the base retransmission interval
439      */

440     protected long getBaseRetransmissionInterval() {
441         return baseRetransmissionInterval;
442     }
443     
444     /**
445      * @return the exponential backoff
446      */

447     protected int getExponentialBackoff() {
448         return exponentialBackoff;
449     }
450     
451     /**
452      * Shutdown.
453      */

454     protected synchronized void shutdown() {
455         shutdown = true;
456     }
457
458     /**
459      * @return true if shutdown
460      */

461     protected synchronized boolean isShutdown() {
462         return shutdown;
463     }
464     
465     /**
466      * @return the ResendInitiator
467      */

468     protected Runnable JavaDoc getResendInitiator() {
469         if (resendInitiator == null) {
470             resendInitiator = new ResendInitiator();
471         }
472         return resendInitiator;
473     }
474
475     /**
476      * @param context the message context
477      * @return a ResendCandidate
478      */

479     protected ResendCandidate createResendCandidate(ObjectMessageContext context) {
480         return new ResendCandidate(context);
481     }
482     
483     /**
484      * Manages scheduling of resend attempts.
485      * A single task runs every base transmission interval,
486      * determining which resend candidates are due a resend attempt.
487      */

488     protected class ResendInitiator implements Runnable JavaDoc {
489         public void run() {
490             // iterate over resend candidates, resending any that are due
491
synchronized (RetransmissionQueue.this) {
492                 Iterator JavaDoc<Map.Entry JavaDoc<String JavaDoc, List JavaDoc<ResendCandidate>>> sequences =
493                     candidates.entrySet().iterator();
494                 while (sequences.hasNext()) {
495                     Iterator JavaDoc<ResendCandidate> sequenceCandidates =
496                         sequences.next().getValue().iterator();
497                     boolean requestAck = true;
498                     while (sequenceCandidates.hasNext()) {
499                         ResendCandidate candidate = sequenceCandidates.next();
500                         if (candidate.isDue()) {
501                             candidate.initiate(requestAck);
502                             requestAck = false;
503                         }
504                     }
505                 }
506             }
507             /*
508             if (!isShutdown()) {
509                 // schedule next resend initiation task (rescheduling each time,
510                 // as opposed to scheduling a periodic task, eliminates the
511                 // potential for simultaneous execution)
512                 workQueue.schedule(this, getBaseRetransmissionInterval());
513             }
514             */

515         }
516     }
517     
518     /**
519      * Represents a candidate for resend, i.e. an unacked outgoing message.
520      * When this is determined as due another resend attempt, an asynchronous
521      * task is scheduled for this purpose.
522      */

523     protected class ResendCandidate implements Runnable JavaDoc {
524         private ObjectMessageContext context;
525         private int skips;
526         private int skipped;
527         private boolean pending;
528         private boolean includeAckRequested;
529
530         /**
531          * @param ctx message context for the unacked message
532          */

533         protected ResendCandidate(ObjectMessageContext ctx) {
534             context = ctx;
535             skipped = -1;
536             skips = 1;
537         }
538         
539         /**
540          * Async resend logic.
541          */

542         public void run() {
543             try {
544                 // ensure ACK wasn't received while this task was enqueued
545
// on executor
546
if (isPending()) {
547                     resender.resend(context, includeAckRequested);
548                     includeAckRequested = false;
549                 }
550             } finally {
551                 attempted();
552             }
553         }
554         
555         /**
556          * @return true if candidate is due a resend
557          * REVISIT should bound the max number of resend attampts
558          */

559         protected synchronized boolean isDue() {
560             boolean due = false;
561             // skip count is used to model exponential backoff
562
// to avoid gratuitous time evaluation
563
if (!pending && ++skipped == skips) {
564                 skips *= getExponentialBackoff();
565                 skipped = 0;
566                 due = true;
567             }
568             return due;
569         }
570         
571         /**
572          * @return if resend attempt is pending
573          */

574         protected synchronized boolean isPending() {
575             return pending;
576         }
577        
578         /**
579          * Initiate resend asynchronsly.
580          *
581          * @param requestAcknowledge true if a AckRequest header is to be sent with
582          * resend
583          */

584         protected synchronized void initiate(boolean requestAcknowledge) {
585             includeAckRequested = requestAcknowledge;
586             pending = true;
587             workQueue.execute(this);
588         }
589         
590         /**
591          * ACK has been received for this candidate.
592          */

593         protected synchronized void resolved() {
594             pending = false;
595             skips = Integer.MAX_VALUE;
596         }
597         
598         /**
599          * @return associated message context
600          */

601         protected MessageContext getContext() {
602             return context;
603         }
604                         
605         /**
606          * A resend has been attempted.
607          */

608         private synchronized void attempted() {
609             pending = false;
610         }
611     }
612     
613     /**
614      * Encapsulates actual resend logic (pluggable to facilitate unit testing)
615      */

616     public interface Resender {
617         /**
618          * Resend mechanics.
619          *
620          * @param context the cloned message context.
621          * @param if a AckRequest should be included
622          */

623         void resend(ObjectMessageContext context, boolean requestAcknowledge);
624     }
625 }
626
Popular Tags