KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > celtix > bus > ws > rm > RMSource


1 package org.objectweb.celtix.bus.ws.rm;
2
3 import java.io.IOException JavaDoc;
4 import java.util.ArrayList JavaDoc;
5 import java.util.Collection JavaDoc;
6 import java.util.HashMap JavaDoc;
7 import java.util.Map JavaDoc;
8 import java.util.concurrent.locks.Condition JavaDoc;
9 import java.util.concurrent.locks.Lock JavaDoc;
10 import java.util.concurrent.locks.ReentrantLock JavaDoc;
11 import java.util.logging.Level JavaDoc;
12 import java.util.logging.Logger JavaDoc;
13
14
15 import org.objectweb.celtix.Bus;
16 import org.objectweb.celtix.bus.configuration.wsrm.SequenceTerminationPolicyType;
17 import org.objectweb.celtix.bus.configuration.wsrm.SourcePolicyType;
18 import org.objectweb.celtix.buslifecycle.BusLifeCycleListener;
19 import org.objectweb.celtix.common.i18n.Message;
20 import org.objectweb.celtix.common.logging.LogUtils;
21 import org.objectweb.celtix.context.ObjectMessageContext;
22 import org.objectweb.celtix.ws.rm.Identifier;
23 import org.objectweb.celtix.ws.rm.SequenceAcknowledgement;
24 import org.objectweb.celtix.ws.rm.persistence.RMMessage;
25 import org.objectweb.celtix.ws.rm.persistence.RMSourceSequence;
26 import org.objectweb.celtix.ws.rm.persistence.RMStore;
27
28 public class RMSource extends RMEndpoint {
29
30     private static final Logger JavaDoc LOG = LogUtils.getL7dLogger(RMSource.class);
31     private static final String JavaDoc SOURCE_POLICIES_PROPERTY_NAME = "sourcePolicies";
32     private static final String JavaDoc REQUESTOR_SEQUENCE_ID = "";
33     
34     private Map JavaDoc<String JavaDoc, SourceSequence> map;
35     private Map JavaDoc<String JavaDoc, SourceSequence> current;
36     private final RetransmissionQueue retransmissionQueue;
37     private Lock JavaDoc sequenceCreationLock;
38     private Condition JavaDoc sequenceCreationCondition;
39     private boolean sequenceCreationNotified;
40
41
42     RMSource(RMHandler h) {
43         super(h);
44         map = new HashMap JavaDoc<String JavaDoc, SourceSequence>();
45         Bus bus = h.getBus();
46         bus.getLifeCycleManager().registerLifeCycleListener(new BusLifeCycleListener() {
47             public void initComplete() {
48             }
49             public void postShutdown() {
50             }
51             public void preShutdown() {
52                 shutdown();
53             }
54         });
55         current = new HashMap JavaDoc<String JavaDoc, SourceSequence>();
56         
57         retransmissionQueue = new RetransmissionQueue(h, getRMAssertion());
58         sequenceCreationLock = new ReentrantLock JavaDoc();
59         sequenceCreationCondition = sequenceCreationLock.newCondition();
60     }
61     
62     public SourceSequence getSequence(Identifier id) {
63         return map.get(id.getValue());
64     }
65     
66     public void addSequence(SourceSequence seq) {
67         addSequence(seq, true);
68     }
69     
70     public void addSequence(SourceSequence seq, boolean persist) {
71         LOG.fine("Adding source sequence: " + seq);
72         seq.setSource(this);
73         map.put(seq.getIdentifier().getValue(), seq);
74         if (persist) {
75             getHandler().getStore().createSourceSequence(seq);
76         }
77     }
78     
79     public void removeSequence(SourceSequence seq) {
80         map.remove(seq.getIdentifier().getValue());
81         getHandler().getStore().removeSourceSequence(seq.getIdentifier());
82     }
83     
84     public final Collection JavaDoc<SourceSequence> getAllSequences() {
85         return map.values();
86     }
87     
88
89     public SourcePolicyType getSourcePolicies() {
90         SourcePolicyType sp = (SourcePolicyType)getHandler().getConfiguration()
91             .getObject(SourcePolicyType.class, SOURCE_POLICIES_PROPERTY_NAME);
92         if (null == sp) {
93             sp = RMUtils.getWSRMConfFactory().createSourcePolicyType();
94         }
95         return sp;
96     }
97
98     public SequenceTerminationPolicyType getSequenceTerminationPolicy() {
99         SourcePolicyType sp = getSourcePolicies();
100         assert null != sp;
101         SequenceTerminationPolicyType stp = sp.getSequenceTerminationPolicy();
102         if (null == stp) {
103             stp = RMUtils.getWSRMConfFactory().createSequenceTerminationPolicyType();
104         }
105         return stp;
106     }
107
108     public RetransmissionQueue getRetransmissionQueue() {
109         return retransmissionQueue;
110     }
111
112     /**
113      * Returns the current sequence used by a client side source.
114      *
115      * @return the current sequence.
116      */

117     SourceSequence getCurrent() {
118         return getCurrent(null);
119     }
120     
121     /**
122      * Sets the current sequence used by a client side source.
123      * @param s the current sequence.
124      */

125     void setCurrent(SourceSequence s) {
126         setCurrent(null, s);
127     }
128     
129     /**
130      * Returns the current sequence used by a server side source for responses to a message
131      * sent as part of the inbound sequence with the specified identifier.
132      *
133      * @return the current sequence.
134      */

135     SourceSequence getCurrent(Identifier i) {
136         sequenceCreationLock.lock();
137         try {
138             return getAssociatedSequence(i);
139         } finally {
140             sequenceCreationLock.unlock();
141         }
142     }
143
144     /**
145      * Returns the sequence associated with the given identifier.
146      *
147      * @param i the corresponding sequence identifier
148      * @return the associated sequence
149      * @pre the sequenceCreationLock is already held
150      */

151     SourceSequence getAssociatedSequence(Identifier i) {
152         return current.get(i == null ? REQUESTOR_SEQUENCE_ID : i.getValue());
153     }
154     
155     /**
156      * Await the avilability of a sequence corresponding to the given identifier.
157      *
158      * @param i the sequnce identifier
159      * @return
160      */

161     SourceSequence awaitCurrent(Identifier i) {
162         sequenceCreationLock.lock();
163         try {
164             SourceSequence seq = getAssociatedSequence(i);
165             while (seq == null) {
166                 while (!sequenceCreationNotified) {
167                     try {
168                         sequenceCreationCondition.await();
169                     } catch (InterruptedException JavaDoc ie) {
170                         // ignore
171
}
172                 }
173                 seq = getAssociatedSequence(i);
174             }
175             return seq;
176         } finally {
177             sequenceCreationLock.unlock();
178         }
179     }
180     
181     /**
182      * Sets the current sequence used by a server side source for responses to a message
183      * sent as part of the inbound sequence with the specified identifier.
184      * @param s the current sequence.
185      */

186     void setCurrent(Identifier i, SourceSequence s) {
187         sequenceCreationLock.lock();
188         try {
189             current.put(i == null ? REQUESTOR_SEQUENCE_ID : i.getValue(), s);
190             sequenceCreationNotified = true;
191             sequenceCreationCondition.signal();
192         } finally {
193             sequenceCreationLock.unlock();
194         }
195     }
196
197     /**
198      * Create a copy of the message, store it in the retransmission queue and
199      * schedule the next transmission
200      *
201      * @param context
202      */

203     public void addUnacknowledged(SourceSequence seq, RMMessage msg) {
204         ObjectMessageContext clone = getHandler().getBinding().createObjectContext();
205         clone.putAll(msg.getContext());
206         getRetransmissionQueue().cacheUnacknowledged(clone);
207         getHandler().getStore().persistOutgoing(seq, msg);
208     }
209
210     /**
211      * Stores the received acknowledgment in the Sequence object identified in
212      * the <code>SequenceAcknowldgement</code> parameter. Then purges any
213      * acknowledged messages from the retransmission queue and requests sequence
214      * termination if necessary.
215      *
216      * @param acknowledgment
217      */

218     public void setAcknowledged(SequenceAcknowledgement acknowledgment) {
219         Identifier sid = acknowledgment.getIdentifier();
220         SourceSequence seq = getSequence(sid);
221         if (null != seq) {
222             seq.setAcknowledged(acknowledgment);
223             retransmissionQueue.purgeAcknowledged(seq);
224             if (seq.allAcknowledged()) {
225                 try {
226                     getHandler().getProxy().terminateSequence(seq);
227                 } catch (IOException JavaDoc ex) {
228                     Message msg = new Message("SEQ_TERMINATION_FAILURE", LOG, seq.getIdentifier());
229                     LOG.log(Level.SEVERE, msg.toString(), ex);
230                 }
231             }
232         }
233     }
234     
235     public void shutdown() {
236         retransmissionQueue.shutdown();
237     }
238     
239     /**
240      * Returns a collection of all sequences for which have not yet been
241      * completely acknowledged.
242      *
243      * @return the collection of unacknowledged sequences.
244      */

245     public Collection JavaDoc<SourceSequence> getAllUnacknowledgedSequences() {
246         Collection JavaDoc<SourceSequence> seqs = new ArrayList JavaDoc<SourceSequence>();
247         for (SourceSequence seq : map.values()) {
248             if (!seq.allAcknowledged()) {
249                 seqs.add(seq);
250             }
251         }
252         return seqs;
253     }
254     
255     void restore() {
256         RMStore store = getHandler().getStore();
257         
258         Collection JavaDoc<RMSourceSequence> dss = store.getSourceSequences(getEndpointId());
259         // Don't make any of these sequences the current sequence, thus forcing
260
// termination of the recovered sequences as soon as possible
261
for (RMSourceSequence ds : dss) {
262             addSequence((SourceSequence)ds, false);
263         }
264         
265         retransmissionQueue.populate(getAllSequences());
266         int n = retransmissionQueue.getUnacknowledged().size();
267         if (n > 0) {
268             LOG.fine("Recovered " + n + " messages, start retransmission queue now");
269             retransmissionQueue.start(getHandler().getBus().getWorkQueueManager().getAutomaticWorkQueue());
270         } else {
271             LOG.fine("No outgoing messages recovered");
272         }
273         
274         
275     }
276 }
277
Popular Tags