KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > ws > rm > DestinationSequence


1 package org.objectweb.celtix.bus.ws.rm;
2
3 import java.io.IOException JavaDoc;
4 import java.io.InputStream JavaDoc;
5 import java.math.BigInteger JavaDoc;
6 import java.util.ArrayList JavaDoc;
7 import java.util.List JavaDoc;
8 import java.util.TimerTask JavaDoc;
9 import java.util.logging.Level JavaDoc;
10 import java.util.logging.Logger JavaDoc;
11
12 import org.objectweb.celtix.bus.configuration.wsrm.AcksPolicyType;
13 import org.objectweb.celtix.common.i18n.Message;
14 import org.objectweb.celtix.common.logging.LogUtils;
15 import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType;
16 import org.objectweb.celtix.ws.rm.Identifier;
17 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
18 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
19 import org.objectweb.celtix.ws.rm.SequenceFaultType;
20 import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence;
21 import org.objectweb.celtix.ws.rm.policy.RMAssertionType;
22 import org.objectweb.celtix.ws.rm.wsdl.SequenceFault;
23
24 public class DestinationSequence extends AbstractSequenceImpl implements RMDestinationSequence {
25
26     private static final Logger JavaDoc LOG = LogUtils.getL7dLogger(DestinationSequence.class);
27
28     private SequenceAcknowledgement acked;
29
30     private RMDestination destination;
31     private EndpointReferenceType acksTo;
32     private BigInteger JavaDoc lastMessageNumber;
33     private SequenceMonitor monitor;
34     private boolean acknowledgeOnNextOccasion;
35     private List JavaDoc<DeferredAcknowledgment> deferredAcknowledgments;
36     private String JavaDoc correlationID;
37     
38     public DestinationSequence(Identifier i, EndpointReferenceType a, RMDestination d) {
39         this(i, a, null, null);
40         setDestination(d);
41     }
42     
43     public DestinationSequence(Identifier i, EndpointReferenceType a,
44                               BigInteger JavaDoc lmn, SequenceAcknowledgement ac) {
45         super(i);
46         acksTo = a;
47         lastMessageNumber = lmn;
48         acked = ac;
49         if (null == acked) {
50             acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
51             acked.setIdentifier(id);
52         }
53         monitor = new SequenceMonitor();
54     }
55
56     
57     // RMDestinationSequence interface
58

59     
60     /**
61      * @return the acksTo address for the sequence
62      */

63     public EndpointReferenceType getAcksTo() {
64         return acksTo;
65     }
66     
67     /**
68      * @return the message number of the last message or null if the last message had not been received.
69      */

70     public BigInteger JavaDoc getLastMessageNr() {
71         return lastMessageNumber;
72     }
73     
74     /**
75      * @return the sequence acknowledgement presenting the sequences thus far received by a destination
76      */

77     public SequenceAcknowledgement getAcknowledgment() {
78         return acked;
79     }
80     
81     /**
82      * @return the sequence acknowledgement presenting the sequences thus far received by a destination
83      * as an input stream
84      */

85     public InputStream JavaDoc getAcknowledgmentAsStream() {
86         return RMUtils.getPersistenceUtils().getAcknowledgementAsInputStream(acked);
87     }
88     
89     /**
90      * @return the identifier of the rm destination
91      */

92     public String JavaDoc getEndpointIdentifier() {
93         if (null != destination) {
94             return destination.getEndpointId();
95         }
96         return null;
97     }
98     
99     // end RMDestinationSequence interface
100

101     final void setDestination(RMDestination d) {
102         destination = d;
103     }
104     
105     RMDestination getDestination() {
106         return destination;
107     }
108     
109     void setLastMessageNumber(BigInteger JavaDoc lmn) {
110         lastMessageNumber = lmn;
111     }
112    
113     
114     
115     /**
116      * Returns the monitor for this sequence.
117      *
118      * @return the sequence monitor.
119      */

120     SequenceMonitor getMonitor() {
121         return monitor;
122     }
123     
124
125     /**
126      * Called by the RM destination upon receipt of a message with the given
127      * message number for this sequence.
128      *
129      * @param messageNumber the number of the received message
130      * @param lastMessage true if this is to be the last message in the sequence
131      */

132     void acknowledge(BigInteger JavaDoc messageNumber) throws SequenceFault {
133         
134         if (null != lastMessageNumber && messageNumber.compareTo(lastMessageNumber) > 0) {
135             SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
136             sf.setFaultCode(RMUtils.getRMConstants().getLastMessageNumberExceededFaultCode());
137             Message msg = new Message("LAST_MESSAGE_NUMBER_EXCEEDED_EXC", LOG, this);
138             throw new SequenceFault(msg.toString(), sf);
139         }
140         
141         monitor.acknowledgeMessage();
142         
143         boolean done = false;
144         int i = 0;
145         for (; i < acked.getAcknowledgementRange().size(); i++) {
146             AcknowledgementRange r = acked.getAcknowledgementRange().get(i);
147             if (r.getLower().compareTo(messageNumber) <= 0
148                 && r.getUpper().compareTo(messageNumber) >= 0) {
149                 done = true;
150                 break;
151             } else {
152                 BigInteger JavaDoc diff = r.getLower().subtract(messageNumber);
153                 if (diff.signum() == 1) {
154                     if (diff.equals(BigInteger.ONE)) {
155                         r.setLower(messageNumber);
156                         done = true;
157                     }
158                     break;
159                 } else if (messageNumber.subtract(r.getUpper()).equals(BigInteger.ONE)) {
160                     r.setUpper(messageNumber);
161                     done = true;
162                     break;
163                 }
164             }
165         }
166
167         if (!done) {
168             AcknowledgementRange range = RMUtils.getWSRMFactory()
169                 .createSequenceAcknowledgementAcknowledgementRange();
170             range.setLower(messageNumber);
171             range.setUpper(messageNumber);
172             acked.getAcknowledgementRange().add(i, range);
173         }
174                
175         scheduleAcknowledgement();
176     }
177
178
179     /**
180      * Called after an acknowledgement header for this sequence has been added to an outgoing message.
181      */

182     void acknowledgmentSent() {
183         acknowledgeOnNextOccasion = false;
184     }
185
186     boolean sendAcknowledgement() {
187         return acknowledgeOnNextOccasion;
188     }
189     
190     /**
191      * The correlation of the incoming CreateSequence call used to create this
192      * sequence is recorded so that in the absence of an offer, the corresponding
193      * outgoing CreateSeqeunce can be correlated.
194      */

195     void setCorrelationID(String JavaDoc cid) {
196         correlationID = cid;
197     }
198    
199     String JavaDoc getCorrelationID() {
200         return correlationID;
201     }
202
203     boolean canPiggybackAckOnPartialResponse() {
204         // TODO: should also check if we allow breaking the WI Profile rule by which no headers
205
// can be included in a HTTP response
206
return getAcksTo().getAddress().getValue().equals(Names.WSA_ANONYMOUS_ADDRESS);
207     }
208     
209     static SequenceFault createUnknownSequenceFault(Identifier sid) {
210         SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
211         sf.setFaultCode(RMUtils.getRMConstants().getUnknownSequenceFaultCode());
212         Message msg = new Message("UNKNOWN_SEQUENCE_EXC", LOG, sid.getValue());
213         return new SequenceFault(msg.toString(), sf);
214     }
215    
216     private void scheduleAcknowledgement() {
217         RMAssertionType rma = destination.getRMAssertion();
218         int delay = 0;
219         if (null != rma.getAcknowledgementInterval()) {
220             delay = rma.getAcknowledgementInterval().getMilliseconds().intValue();
221         }
222         AcksPolicyType ap = destination.getAcksPolicy();
223         if (delay > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) {
224             scheduleDeferredAcknowledgement(delay);
225         } else {
226             scheduleImmediateAcknowledgement();
227         }
228     }
229
230
231     void scheduleImmediateAcknowledgement() {
232         acknowledgeOnNextOccasion = true;
233     }
234
235     private void scheduleDeferredAcknowledgement(int delay) {
236         if (null == deferredAcknowledgments) {
237             deferredAcknowledgments = new ArrayList JavaDoc<DeferredAcknowledgment>();
238         }
239         long now = System.currentTimeMillis();
240         long expectedExecutionTime = now + delay;
241         for (DeferredAcknowledgment da : deferredAcknowledgments) {
242             if (da.scheduledExecutionTime() <= expectedExecutionTime) {
243                 return;
244             }
245         }
246         DeferredAcknowledgment da = new DeferredAcknowledgment();
247         deferredAcknowledgments.add(da);
248         destination.getHandler().getTimer().schedule(da, delay);
249     }
250
251     final class DeferredAcknowledgment extends TimerTask JavaDoc {
252
253         public void run() {
254             DestinationSequence.this.scheduleImmediateAcknowledgement();
255             try {
256                 destination.getHandler().getProxy().acknowledge(DestinationSequence.this);
257             } catch (IOException JavaDoc ex) {
258                 Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequence.this);
259                 LOG.log(Level.SEVERE, msg.toString(), ex);
260             }
261         }
262     }
263 }
264
Popular Tags