1 package org.objectweb.celtix.bus.ws.rm; 2 3 import java.io.IOException ; 4 import java.util.ArrayList ; 5 import java.util.Collection ; 6 import java.util.HashMap ; 7 import java.util.Map ; 8 import java.util.concurrent.locks.Condition ; 9 import java.util.concurrent.locks.Lock ; 10 import java.util.concurrent.locks.ReentrantLock ; 11 import java.util.logging.Level ; 12 import java.util.logging.Logger ; 13 14 15 import org.objectweb.celtix.Bus; 16 import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType; 17 import org.objectweb.celtix.bus.configuration.wsrm.SourcePolicyType; 18 import org.objectweb.celtix.buslifecycle.BusLifeCycleListener; 19 import org.objectweb.celtix.common.i18n.Message; 20 import org.objectweb.celtix.common.logging.LogUtils; 21 import org.objectweb.celtix.context.ObjectMessageContext; 22 import org.objectweb.celtix.ws.rm.Identifier; 23 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement; 24 import org.objectweb.celtix.ws.rm.persistence.RMMessage; 25 import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence; 26 import org.objectweb.celtix.ws.rm.persistence.RMStore; 27 28 public class RMSource extends RMEndpoint { 29 30 private static final Logger LOG = LogUtils.getL7dLogger(RMSource.class); 31 private static final String SOURCE_POLICIES_PROPERTY_NAME = "sourcePolicies"; 32 private static final String REQUESTOR_SEQUENCE_ID = ""; 33 34 private Map <String , SourceSequence> map; 35 private Map <String , SourceSequence> current; 36 private final RetransmissionQueue retransmissionQueue; 37 private Lock sequenceCreationLock; 38 private Condition sequenceCreationCondition; 39 private boolean sequenceCreationNotified; 40 41 42 RMSource(RMHandler h) { 43 super(h); 44 map = new HashMap <String , SourceSequence>(); 45 Bus bus = h.getBus(); 46 bus.getLifeCycleManager().registerLifeCycleListener(new BusLifeCycleListener() { 47 public void initComplete() { 48 } 49 public void postShutdown() { 50 } 51 public void preShutdown() { 52 shutdown(); 53 } 54 }); 55 current = new HashMap <String , SourceSequence>(); 56 57 retransmissionQueue = new RetransmissionQueue(h, getRMAssertion()); 58 sequenceCreationLock = new ReentrantLock (); 59 sequenceCreationCondition = sequenceCreationLock.newCondition(); 60 } 61 62 public SourceSequence getSequence(Identifier id) { 63 return map.get(id.getValue()); 64 } 65 66 public void addSequence(SourceSequence seq) { 67 addSequence(seq, true); 68 } 69 70 public void addSequence(SourceSequence seq, boolean persist) { 71 LOG.fine("Adding source sequence: " + seq); 72 seq.setSource(this); 73 map.put(seq.getIdentifier().getValue(), seq); 74 if (persist) { 75 getHandler().getStore().createSourceSequence(seq); 76 } 77 } 78 79 public void removeSequence(SourceSequence seq) { 80 map.remove(seq.getIdentifier().getValue()); 81 getHandler().getStore().removeSourceSequence(seq.getIdentifier()); 82 } 83 84 public final Collection <SourceSequence> getAllSequences() { 85 return map.values(); 86 } 87 88 89 public SourcePolicyType getSourcePolicies() { 90 SourcePolicyType sp = (SourcePolicyType)getHandler().getConfiguration() 91 .getObject(SourcePolicyType.class, SOURCE_POLICIES_PROPERTY_NAME); 92 if (null == sp) { 93 sp = RMUtils.getWSRMConfFactory().createSourcePolicyType(); 94 } 95 return sp; 96 } 97 98 public SequenceTerminationPolicyType getSequenceTerminationPolicy() { 99 SourcePolicyType sp = getSourcePolicies(); 100 assert null != sp; 101 SequenceTerminationPolicyType stp = sp.getSequenceTerminationPolicy(); 102 if (null == stp) { 103 stp = RMUtils.getWSRMConfFactory().createSequenceTerminationPolicyType(); 104 } 105 return stp; 106 } 107 108 public RetransmissionQueue getRetransmissionQueue() { 109 return retransmissionQueue; 110 } 111 112 117 SourceSequence getCurrent() { 118 return getCurrent(null); 119 } 120 121 125 void setCurrent(SourceSequence s) { 126 setCurrent(null, s); 127 } 128 129 135 SourceSequence getCurrent(Identifier i) { 136 sequenceCreationLock.lock(); 137 try { 138 return getAssociatedSequence(i); 139 } finally { 140 sequenceCreationLock.unlock(); 141 } 142 } 143 144 151 SourceSequence getAssociatedSequence(Identifier i) { 152 return current.get(i == null ? REQUESTOR_SEQUENCE_ID : i.getValue()); 153 } 154 155 161 SourceSequence awaitCurrent(Identifier i) { 162 sequenceCreationLock.lock(); 163 try { 164 SourceSequence seq = getAssociatedSequence(i); 165 while (seq == null) { 166 while (!sequenceCreationNotified) { 167 try { 168 sequenceCreationCondition.await(); 169 } catch (InterruptedException ie) { 170 } 172 } 173 seq = getAssociatedSequence(i); 174 } 175 return seq; 176 } finally { 177 sequenceCreationLock.unlock(); 178 } 179 } 180 181 186 void setCurrent(Identifier i, SourceSequence s) { 187 sequenceCreationLock.lock(); 188 try { 189 current.put(i == null ? REQUESTOR_SEQUENCE_ID : i.getValue(), s); 190 sequenceCreationNotified = true; 191 sequenceCreationCondition.signal(); 192 } finally { 193 sequenceCreationLock.unlock(); 194 } 195 } 196 197 203 public void addUnacknowledged(SourceSequence seq, RMMessage msg) { 204 ObjectMessageContext clone = getHandler().getBinding().createObjectContext(); 205 clone.putAll(msg.getContext()); 206 getRetransmissionQueue().cacheUnacknowledged(clone); 207 getHandler().getStore().persistOutgoing(seq, msg); 208 } 209 210 218 public void setAcknowledged(SequenceAcknowledgement acknowledgment) { 219 Identifier sid = acknowledgment.getIdentifier(); 220 SourceSequence seq = getSequence(sid); 221 if (null != seq) { 222 seq.setAcknowledged(acknowledgment); 223 retransmissionQueue.purgeAcknowledged(seq); 224 if (seq.allAcknowledged()) { 225 try { 226 getHandler().getProxy().terminateSequence(seq); 227 } catch (IOException ex) { 228 Message msg = new Message("SEQ_TERMINATION_FAILURE", LOG, seq.getIdentifier()); 229 LOG.log(Level.SEVERE, msg.toString(), ex); 230 } 231 } 232 } 233 } 234 235 public void shutdown() { 236 retransmissionQueue.shutdown(); 237 } 238 239 245 public Collection <SourceSequence> getAllUnacknowledgedSequences() { 246 Collection <SourceSequence> seqs = new ArrayList <SourceSequence>(); 247 for (SourceSequence seq : map.values()) { 248 if (!seq.allAcknowledged()) { 249 seqs.add(seq); 250 } 251 } 252 return seqs; 253 } 254 255 void restore() { 256 RMStore store = getHandler().getStore(); 257 258 Collection <RMSourceSequence> dss = store.getSourceSequences(getEndpointId()); 259 for (RMSourceSequence ds : dss) { 262 addSequence((SourceSequence)ds, false); 263 } 264 265 retransmissionQueue.populate(getAllSequences()); 266 int n = retransmissionQueue.getUnacknowledged().size(); 267 if (n > 0) { 268 LOG.fine("Recovered " + n + " messages, start retransmission queue now"); 269 retransmissionQueue.start(getHandler().getBus().getWorkQueueManager().getAutomaticWorkQueue()); 270 } else { 271 LOG.fine("No outgoing messages recovered"); 272 } 273 274 275 } 276 } 277 | Popular Tags |