1 package org.objectweb.celtix.bus.ws.rm; 2 3 import java.io.IOException ; 4 import java.io.InputStream ; 5 import java.math.BigInteger ; 6 import java.util.ArrayList ; 7 import java.util.List ; 8 import java.util.TimerTask ; 9 import java.util.logging.Level ; 10 import java.util.logging.Logger ; 11 12 import org.objectweb.celtix.bus.configuration.wsrm.AcksPolicyType; 13 import org.objectweb.celtix.common.i18n.Message; 14 import org.objectweb.celtix.common.logging.LogUtils; 15 import org.objectweb.celtix.ws.addressing.v200408.EndpointReferenceType; 16 import org.objectweb.celtix.ws.rm.Identifier; 17 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement; 18 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement.AcknowledgementRange; 19 import org.objectweb.celtix.ws.rm.SequenceFaultType; 20 import org.objectweb.celtix.ws.rm.persistence.RMDestinationSequence; 21 import org.objectweb.celtix.ws.rm.policy.RMAssertionType; 22 import org.objectweb.celtix.ws.rm.wsdl.SequenceFault; 23 24 public class DestinationSequence extends AbstractSequenceImpl implements RMDestinationSequence { 25 26 private static final Logger LOG = LogUtils.getL7dLogger(DestinationSequence.class); 27 28 private SequenceAcknowledgement acked; 29 30 private RMDestination destination; 31 private EndpointReferenceType acksTo; 32 private BigInteger lastMessageNumber; 33 private SequenceMonitor monitor; 34 private boolean acknowledgeOnNextOccasion; 35 private List <DeferredAcknowledgment> deferredAcknowledgments; 36 private String correlationID; 37 38 public DestinationSequence(Identifier i, EndpointReferenceType a, RMDestination d) { 39 this(i, a, null, null); 40 setDestination(d); 41 } 42 43 public DestinationSequence(Identifier i, EndpointReferenceType a, 44 BigInteger lmn, SequenceAcknowledgement ac) { 45 super(i); 46 acksTo = a; 47 lastMessageNumber = lmn; 48 acked = ac; 49 if (null == acked) { 50 acked = RMUtils.getWSRMFactory().createSequenceAcknowledgement(); 51 acked.setIdentifier(id); 52 } 53 monitor = new SequenceMonitor(); 54 } 55 56 57 59 60 63 public EndpointReferenceType getAcksTo() { 64 return acksTo; 65 } 66 67 70 public BigInteger getLastMessageNr() { 71 return lastMessageNumber; 72 } 73 74 77 public SequenceAcknowledgement getAcknowledgment() { 78 return acked; 79 } 80 81 85 public InputStream getAcknowledgmentAsStream() { 86 return RMUtils.getPersistenceUtils().getAcknowledgementAsInputStream(acked); 87 } 88 89 92 public String getEndpointIdentifier() { 93 if (null != destination) { 94 return destination.getEndpointId(); 95 } 96 return null; 97 } 98 99 101 final void setDestination(RMDestination d) { 102 destination = d; 103 } 104 105 RMDestination getDestination() { 106 return destination; 107 } 108 109 void setLastMessageNumber(BigInteger lmn) { 110 lastMessageNumber = lmn; 111 } 112 113 114 115 120 SequenceMonitor getMonitor() { 121 return monitor; 122 } 123 124 125 132 void acknowledge(BigInteger messageNumber) throws SequenceFault { 133 134 if (null != lastMessageNumber && messageNumber.compareTo(lastMessageNumber) > 0) { 135 SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType(); 136 sf.setFaultCode(RMUtils.getRMConstants().getLastMessageNumberExceededFaultCode()); 137 Message msg = new Message("LAST_MESSAGE_NUMBER_EXCEEDED_EXC", LOG, this); 138 throw new SequenceFault(msg.toString(), sf); 139 } 140 141 monitor.acknowledgeMessage(); 142 143 boolean done = false; 144 int i = 0; 145 for (; i < acked.getAcknowledgementRange().size(); i++) { 146 AcknowledgementRange r = acked.getAcknowledgementRange().get(i); 147 if (r.getLower().compareTo(messageNumber) <= 0 148 && r.getUpper().compareTo(messageNumber) >= 0) { 149 done = true; 150 break; 151 } else { 152 BigInteger diff = r.getLower().subtract(messageNumber); 153 if (diff.signum() == 1) { 154 if (diff.equals(BigInteger.ONE)) { 155 r.setLower(messageNumber); 156 done = true; 157 } 158 break; 159 } else if (messageNumber.subtract(r.getUpper()).equals(BigInteger.ONE)) { 160 r.setUpper(messageNumber); 161 done = true; 162 break; 163 } 164 } 165 } 166 167 if (!done) { 168 AcknowledgementRange range = RMUtils.getWSRMFactory() 169 .createSequenceAcknowledgementAcknowledgementRange(); 170 range.setLower(messageNumber); 171 range.setUpper(messageNumber); 172 acked.getAcknowledgementRange().add(i, range); 173 } 174 175 scheduleAcknowledgement(); 176 } 177 178 179 182 void acknowledgmentSent() { 183 acknowledgeOnNextOccasion = false; 184 } 185 186 boolean sendAcknowledgement() { 187 return acknowledgeOnNextOccasion; 188 } 189 190 195 void setCorrelationID(String cid) { 196 correlationID = cid; 197 } 198 199 String getCorrelationID() { 200 return correlationID; 201 } 202 203 boolean canPiggybackAckOnPartialResponse() { 204 return getAcksTo().getAddress().getValue().equals(Names.WSA_ANONYMOUS_ADDRESS); 207 } 208 209 static SequenceFault createUnknownSequenceFault(Identifier sid) { 210 SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType(); 211 sf.setFaultCode(RMUtils.getRMConstants().getUnknownSequenceFaultCode()); 212 Message msg = new Message("UNKNOWN_SEQUENCE_EXC", LOG, sid.getValue()); 213 return new SequenceFault(msg.toString(), sf); 214 } 215 216 private void scheduleAcknowledgement() { 217 RMAssertionType rma = destination.getRMAssertion(); 218 int delay = 0; 219 if (null != rma.getAcknowledgementInterval()) { 220 delay = rma.getAcknowledgementInterval().getMilliseconds().intValue(); 221 } 222 AcksPolicyType ap = destination.getAcksPolicy(); 223 if (delay > 0 && getMonitor().getMPM() >= ap.getIntraMessageThreshold()) { 224 scheduleDeferredAcknowledgement(delay); 225 } else { 226 scheduleImmediateAcknowledgement(); 227 } 228 } 229 230 231 void scheduleImmediateAcknowledgement() { 232 acknowledgeOnNextOccasion = true; 233 } 234 235 private void scheduleDeferredAcknowledgement(int delay) { 236 if (null == deferredAcknowledgments) { 237 deferredAcknowledgments = new ArrayList <DeferredAcknowledgment>(); 238 } 239 long now = System.currentTimeMillis(); 240 long expectedExecutionTime = now + delay; 241 for (DeferredAcknowledgment da : deferredAcknowledgments) { 242 if (da.scheduledExecutionTime() <= expectedExecutionTime) { 243 return; 244 } 245 } 246 DeferredAcknowledgment da = new DeferredAcknowledgment(); 247 deferredAcknowledgments.add(da); 248 destination.getHandler().getTimer().schedule(da, delay); 249 } 250 251 final class DeferredAcknowledgment extends TimerTask { 252 253 public void run() { 254 DestinationSequence.this.scheduleImmediateAcknowledgement(); 255 try { 256 destination.getHandler().getProxy().acknowledge(DestinationSequence.this); 257 } catch (IOException ex) { 258 Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequence.this); 259 LOG.log(Level.SEVERE, msg.toString(), ex); 260 } 261 } 262 } 263 } 264 | Popular Tags |