KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > ws > rm > SourceSequence


1 package org.objectweb.celtix.bus.ws.rm;
2
3 import java.math.BigInteger JavaDoc;
4 import java.util.Date JavaDoc;
5 import java.util.logging.Level JavaDoc;
6 import java.util.logging.Logger JavaDoc;
7
8 import javax.xml.datatype.DatatypeConfigurationException JavaDoc;
9 import javax.xml.datatype.DatatypeFactory JavaDoc;
10 import javax.xml.datatype.Duration JavaDoc;
11
12 import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType;
13 import org.objectweb.celtix.bus.ws.addressing.ContextUtils;
14 import org.objectweb.celtix.common.logging.LogUtils;
15 import org.objectweb.celtix.ws.rm.Expires;
16 import org.objectweb.celtix.ws.rm.Identifier;
17 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
18 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
19 import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence;
20
21 public class SourceSequence extends AbstractSequenceImpl implements RMSourceSequence {
22
23     public static final Duration JavaDoc PT0S;
24     private static final Logger JavaDoc LOG = LogUtils.getL7dLogger(SourceSequence.class);
25     
26     private SequenceAcknowledgement acked;
27     
28     private Date JavaDoc expires;
29     private RMSource source;
30     private BigInteger JavaDoc currentMessageNumber;
31     private boolean lastMessage;
32     private Identifier offeringId;
33     private org.objectweb.celtix.ws.addressing.EndpointReferenceType target;
34     
35     static {
36         Duration JavaDoc pt0s = null;
37         try {
38             DatatypeFactory JavaDoc df = DatatypeFactory.newInstance();
39             pt0s = df.newDuration("PT0S");
40         } catch (DatatypeConfigurationException JavaDoc ex) {
41             LOG.log(Level.INFO, "Could not create Duration object.", ex);
42         }
43         PT0S = pt0s;
44     }
45     
46     public SourceSequence(Identifier i) {
47         this(i, null, null);
48     }
49     
50     public SourceSequence(Identifier i, Date JavaDoc e, Identifier oi) {
51         this(i, e, oi, BigInteger.ZERO, false);
52     }
53    
54     
55     public SourceSequence(Identifier i, Date JavaDoc e, Identifier oi, BigInteger JavaDoc cmn, boolean lm) {
56         super(i);
57         expires = e;
58
59         offeringId = oi;
60
61         currentMessageNumber = cmn;
62         lastMessage = lm;
63         acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement();
64         acked.setIdentifier(id);
65     }
66     
67     // begin RMSourceSequence interface
68

69     public BigInteger JavaDoc getCurrentMessageNr() {
70         return currentMessageNumber;
71     }
72
73     /**
74      * @return the identifier of the rm source
75      */

76     public String JavaDoc getEndpointIdentifier() {
77         if (null != source) {
78             return source.getEndpointId();
79         }
80         return null;
81     }
82     
83     public Date JavaDoc getExpiry() {
84         return expires;
85     }
86
87     public boolean isLastMessage() {
88         return lastMessage;
89     }
90
91     public Identifier getOfferingSequenceIdentifier() {
92         return offeringId;
93     }
94     
95     // end RMSourceSequence interface
96

97     void setSource(RMSource s) {
98         source = s;
99     }
100     
101     void setLastMessage(boolean lm) {
102         lastMessage = lm;
103     }
104     
105     /**
106      * Returns true if this sequence was constructed from an offer for an inbound sequence
107      * includes in the CreateSequenceRequest in response to which the sequence with
108      * the specified identifier was created.
109      *
110      * @param id the sequence identifier
111      * @return true if the sequence was constructed from an offer.
112      */

113     boolean offeredBy(Identifier sid) {
114         return null != offeringId && offeringId.getValue().equals(sid.getValue());
115     }
116     
117     /**
118      * Returns true if the sequence is expired.
119      *
120      * @return true if the sequence is expired.
121      */

122
123     boolean isExpired() {
124         return expires == null ? false : new Date JavaDoc().after(expires);
125     }
126     
127     void setExpires(Expires ex) {
128         Duration JavaDoc d = null;
129         if (null != ex) {
130             d = ex.getValue();
131         }
132
133         if (null != d && (null == PT0S || !PT0S.equals(d))) {
134             Date JavaDoc now = new Date JavaDoc();
135             expires = new Date JavaDoc(now.getTime() + ex.getValue().getTimeInMillis(now));
136         }
137     }
138     
139     /**
140      * Returns the next message number and increases the message number.
141      *
142      * @return the next message number.
143      */

144     BigInteger JavaDoc nextMessageNumber() {
145         return nextMessageNumber(null, null);
146     }
147
148     /**
149      * Returns the next message number and increases the message number.
150      * The parameters, if not null, indicate that this message is being sent as a response
151      * to the message with the specified message number in the sequence specified by the
152      * by the identifier, and are used to decide if this message should be the last in
153      * this sequence.
154      *
155      * @return the next message number.
156      */

157     BigInteger JavaDoc nextMessageNumber(Identifier inSeqId, BigInteger JavaDoc inMsgNumber) {
158
159         assert !lastMessage;
160         
161         BigInteger JavaDoc result = null;
162         synchronized (this) {
163             currentMessageNumber = currentMessageNumber.add(BigInteger.ONE);
164             checkLastMessage(inSeqId, inMsgNumber);
165             result = currentMessageNumber;
166         }
167         return result;
168     }
169     
170     void nextAndLastMessageNumber() {
171         assert !lastMessage;
172         
173         synchronized (this) {
174             currentMessageNumber = currentMessageNumber.add(BigInteger.ONE);
175             lastMessage = true;
176         }
177     }
178     
179     /**
180      * Used by the RM source to cache received acknowledgements for this
181      * sequence.
182      *
183      * @param acknowledgement an acknowledgement for this sequence
184      */

185     void setAcknowledged(SequenceAcknowledgement acknowledgement) {
186         acked = acknowledgement;
187     }
188     
189     
190     SequenceAcknowledgement getAcknowledgement() {
191         return acked;
192     }
193     
194     /**
195      * Checks if the message with the given number has been acknowledged.
196      *
197      * @param m the message number
198      * @return true of the message with the given number has been acknowledged.
199      */

200     boolean isAcknowledged(BigInteger JavaDoc m) {
201         for (AcknowledgementRange r : acked.getAcknowledgementRange()) {
202             if (m.subtract(r.getLower()).signum() >= 0 && r.getUpper().subtract(m).signum() >= 0) {
203                 return true;
204             }
205         }
206         return false;
207     }
208     
209     /**
210      * Returns true if a last message had been sent for this sequence and if all
211      * messages for this sequence have been acknowledged.
212      *
213      * @return true if all messages have been acknowledged.
214      */

215     boolean allAcknowledged() {
216         if (!lastMessage) {
217             return false;
218         }
219
220         if (acked.getAcknowledgementRange().size() == 1) {
221             AcknowledgementRange r = acked.getAcknowledgementRange().get(0);
222             return r.getLower().equals(BigInteger.ONE) && r.getUpper().equals(currentMessageNumber);
223         }
224         return false;
225     }
226     
227     /**
228      * The target for the sequence is the first non-anonymous address that
229      * a message is sent to as part of this sequence. It is subsequently used
230      * for as the target of out-of-band protocol messages related to that
231      * sequence that originate from the sequnce source (i.e. TerminateSequence
232      * and LastMessage, but not AckRequested or SequenceAcknowledgement as these
233      * are orignate from the sequence destination).
234      *
235      * @param to
236      */

237     synchronized void setTarget(org.objectweb.celtix.ws.addressing.EndpointReferenceType to) {
238         if (target == null && !ContextUtils.isGenericAddress(to)) {
239             target = to;
240         }
241     }
242     
243     synchronized org.objectweb.celtix.ws.addressing.EndpointReferenceType getTarget() {
244         return target;
245     }
246    
247     /**
248      * Checks if the current message should be the last message in this sequence
249      * and if so sets the lastMessageNumber property.
250      */

251     private void checkLastMessage(Identifier inSeqId, BigInteger JavaDoc inMsgNumber) {
252
253         assert null != source;
254         
255         // check if this is a response to a message that was is the last message in the sequence
256
// that included this sequence as an offer
257

258         if (null != inSeqId && null != inMsgNumber) {
259             DestinationSequence inSeq = source.getHandler().getDestination().getSequence(inSeqId);
260             if (null != inSeq && offeredBy(inSeqId) && inMsgNumber.equals(inSeq.getLastMessageNr())) {
261                 lastMessage = true;
262             }
263         }
264         
265         if (!lastMessage) {
266             SequenceTerminationPolicyType stp = source.getSequenceTerminationPolicy();
267             assert null != stp;
268
269             if ((!stp.getMaxLength().equals(BigInteger.ZERO) && stp.getMaxLength()
270                 .compareTo(currentMessageNumber) <= 0)
271                 || (stp.getMaxRanges() > 0 && acked.getAcknowledgementRange().size() >= stp.getMaxRanges())
272                 || (stp.getMaxUnacknowledged() > 0 && source.getRetransmissionQueue()
273                     .countUnacknowledged(this) >= stp.getMaxUnacknowledged())) {
274                 lastMessage = true;
275             }
276         }
277         
278         if (LOG.isLoggable(Level.FINE) && lastMessage) {
279             LOG.fine(currentMessageNumber + " should be the last message in this sequence.");
280         }
281     }
282
283 }
284
Popular Tags