| 1 package org.objectweb.celtix.bus.ws.rm; 2 3 import java.math.BigInteger ; 4 import java.util.Date ; 5 import java.util.logging.Level ; 6 import java.util.logging.Logger ; 7 8 import javax.xml.datatype.DatatypeConfigurationException ; 9 import javax.xml.datatype.DatatypeFactory ; 10 import javax.xml.datatype.Duration ; 11 12 import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType; 13 import org.objectweb.celtix.bus.ws.addressing.ContextUtils; 14 import org.objectweb.celtix.common.logging.LogUtils; 15 import org.objectweb.celtix.ws.rm.Expires; 16 import org.objectweb.celtix.ws.rm.Identifier; 17 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement; 18 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange; 19 import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence; 20 21 public class SourceSequence extends AbstractSequenceImpl implements RMSourceSequence { 22 23 public static final Duration PT0S; 24 private static final Logger LOG = LogUtils.getL7dLogger(SourceSequence.class); 25 26 private SequenceAcknowledgement acked; 27 28 private Date expires; 29 private RMSource source; 30 private BigInteger currentMessageNumber; 31 private boolean lastMessage; 32 private Identifier offeringId; 33 private org.objectweb.celtix.ws.addressing.EndpointReferenceType target; 34 35 static { 36 Duration pt0s = null; 37 try { 38 DatatypeFactory df = DatatypeFactory.newInstance(); 39 pt0s = df.newDuration("PT0S"); 40 } catch (DatatypeConfigurationException ex) { 41 LOG.log(Level.INFO, "Could not create Duration object.", ex); 42 } 43 PT0S = pt0s; 44 } 45 46 public SourceSequence(Identifier i) { 47 this(i, null, null); 48 } 49 50 public SourceSequence(Identifier i, Date e, Identifier oi) { 51 this(i, e, oi, BigInteger.ZERO, false); 52 } 53 54 55 public SourceSequence(Identifier i, Date e, Identifier oi, BigInteger cmn, boolean lm) { 56 super(i); 57 expires = e; 58 59 offeringId = oi; 60 61 currentMessageNumber = cmn; 62 lastMessage = lm; 63 acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement(); 64 acked.setIdentifier(id); 65 } 66 67 69 public BigInteger getCurrentMessageNr() { 70 return currentMessageNumber; 71 } 72 73 76 public String getEndpointIdentifier() { 77 if (null != source) { 78 return source.getEndpointId(); 79 } 80 return null; 81 } 82 83 public Date getExpiry() { 84 return expires; 85 } 86 87 public boolean isLastMessage() { 88 return lastMessage; 89 } 90 91 public Identifier getOfferingSequenceIdentifier() { 92 return offeringId; 93 } 94 95 97 void setSource(RMSource s) { 98 source = s; 99 } 100 101 void setLastMessage(boolean lm) { 102 lastMessage = lm; 103 } 104 105 113 boolean offeredBy(Identifier sid) { 114 return null != offeringId && offeringId.getValue().equals(sid.getValue()); 115 } 116 117 122 123 boolean isExpired() { 124 return expires == null ? false : new Date ().after(expires); 125 } 126 127 void setExpires(Expires ex) { 128 Duration d = null; 129 if (null != ex) { 130 d = ex.getValue(); 131 } 132 133 if (null != d && (null == PT0S || !PT0S.equals(d))) { 134 Date now = new Date (); 135 expires = new Date (now.getTime() + ex.getValue().getTimeInMillis(now)); 136 } 137 } 138 139 144 BigInteger nextMessageNumber() { 145 return nextMessageNumber(null, null); 146 } 147 148 157 BigInteger nextMessageNumber(Identifier inSeqId, BigInteger inMsgNumber) { 158 159 assert !lastMessage; 160 161 BigInteger result = null; 162 synchronized (this) { 163 currentMessageNumber = currentMessageNumber.add(BigInteger.ONE); 164 checkLastMessage(inSeqId, inMsgNumber); 165 result = currentMessageNumber; 166 } 167 return result; 168 } 169 170 void nextAndLastMessageNumber() { 171 assert !lastMessage; 172 173 synchronized (this) { 174 currentMessageNumber = currentMessageNumber.add(BigInteger.ONE); 175 lastMessage = true; 176 } 177 } 178 179 185 void setAcknowledged(SequenceAcknowledgement acknowledgement) { 186 acked = acknowledgement; 187 } 188 189 190 SequenceAcknowledgement getAcknowledgement() { 191 return acked; 192 } 193 194 200 boolean isAcknowledged(BigInteger m) { 201 for (AcknowledgementRange r : acked.getAcknowledgementRange()) { 202 if (m.subtract(r.getLower()).signum() >= 0 && r.getUpper().subtract(m).signum() >= 0) { 203 return true; 204 } 205 } 206 return false; 207 } 208 209 215 boolean allAcknowledged() { 216 if (!lastMessage) { 217 return false; 218 } 219 220 if (acked.getAcknowledgementRange().size() == 1) { 221 AcknowledgementRange r = acked.getAcknowledgementRange().get(0); 222 return r.getLower().equals(BigInteger.ONE) && r.getUpper().equals(currentMessageNumber); 223 } 224 return false; 225 } 226 227 237 synchronized void setTarget(org.objectweb.celtix.ws.addressing.EndpointReferenceType to) { 238 if (target == null && !ContextUtils.isGenericAddress(to)) { 239 target = to; 240 } 241 } 242 243 synchronized org.objectweb.celtix.ws.addressing.EndpointReferenceType getTarget() { 244 return target; 245 } 246 247 251 private void checkLastMessage(Identifier inSeqId, BigInteger inMsgNumber) { 252 253 assert null != source; 254 255 258 if (null != inSeqId && null != inMsgNumber) { 259 DestinationSequence inSeq = source.getHandler().getDestination().getSequence(inSeqId); 260 if (null != inSeq && offeredBy(inSeqId) && inMsgNumber.equals(inSeq.getLastMessageNr())) { 261 lastMessage = true; 262 } 263 } 264 265 if (!lastMessage) { 266 SequenceTerminationPolicyType stp = source.getSequenceTerminationPolicy(); 267 assert null != stp; 268 269 if ((!stp.getMaxLength().equals(BigInteger.ZERO) && stp.getMaxLength() 270 .compareTo(currentMessageNumber) <= 0) 271 || (stp.getMaxRanges() > 0 && acked.getAcknowledgementRange().size() >= stp.getMaxRanges()) 272 || (stp.getMaxUnacknowledged() > 0 && source.getRetransmissionQueue() 273 .countUnacknowledged(this) >= stp.getMaxUnacknowledged())) { 274 lastMessage = true; 275 } 276 } 277 278 if (LOG.isLoggable(Level.FINE) && lastMessage) { 279 LOG.fine(currentMessageNumber + " should be the last message in this sequence."); 280 } 281 } 282 283 } 284 | Popular Tags |