KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > extensions > wsrm > AckService


1 /*
2  * The Apache Software License, Version 1.1
3  *
4  * Copyright (c) 2002 The Apache Software Foundation. All rights
5  * reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  *
14  * 2. Redistributions in binary form must reproduce the above copyright
15  * notice, this list of conditions and the following disclaimer in
16  * the documentation and/or other materials provided with the
17  * distribution.
18  *
19  * 3. The end-user documentation included with the redistribution,
20  * if any, must include the following acknowledgment:
21  * "This product includes software developed by the
22  * Apache Software Foundation (http://www.apache.org/)."
23  * Alternately, this acknowledgment may appear in the software itself,
24  * if and wherever such third-party acknowledgments normally appear.
25  *
26  * 4. The names "Axis" and "Apache Software Foundation" must
27  * not be used to endorse or promote products derived from this
28  * software without prior written permission. For written
29  * permission, please contact apache@apache.org.
30  *
31  * 5. Products derived from this software may not be called "Apache",
32  * nor may "Apache" appear in their name, without prior written
33  * permission of the Apache Software Foundation.
34  *
35  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
36  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
37  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
38  * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
39  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
40  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
41  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
42  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
43  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
44  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
45  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
46  * SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Apache Software Foundation. For more
51  * information on the Apache Software Foundation, please see
52  * <http://www.apache.org/>.
53  */

54 package extensions.wsrm;
55
56 import org.apache.axis.MessageContext;
57 import org.apache.axis.AxisFault;
58 import org.apache.axis.client.Call;
59 import org.apache.axis.message.SOAPEnvelope;
60 import org.apache.axis.message.SOAPHeaderElement;
61 import org.apache.axis.message.PrefixedQName;
62 import org.apache.axis.message.MessageElement;
63 import org.apache.axis.handlers.BasicHandler;
64
65 import javax.xml.namespace.QName JavaDoc;
66 import java.util.*;
67
68 public class AckService extends BasicHandler implements RMConstants {
69     static RetryGuy myRetryGuy = null;
70
71     /**
72      * Wake up every three seconds, and scan all active sequences to see
73      * if any messages need to be resent (no ack in 20 seconds).
74      */

75     class RetryGuy implements Runnable JavaDoc {
76         public void run() {
77             while (true) {
78                 try {
79                     Thread.sleep(3000);
80                 } catch (InterruptedException JavaDoc e) {
81                     e.printStackTrace();
82                 }
83
84                 Iterator i = sequences.values().iterator();
85                 while (i.hasNext()) {
86                     MySequence seq = (MySequence)i.next();
87                     synchronized (seq.activeMessages) {
88                         Iterator msgs = seq.activeMessages.iterator();
89                         while (msgs.hasNext()) {
90                             MessageRecord msg = (MessageRecord)msgs.next();
91                             if (msg.timestamp < (new Date().getTime() - 20000)) {
92                                 try {
93                                     resendMsg(msg, seq.destination);
94                                 } catch (Exception JavaDoc e) {
95                                     e.printStackTrace(); //To change body of catch statement use Options | File Templates.
96
}
97                             }
98                         }
99                     }
100                 }
101             }
102         }
103     }
104
105     class MySequence {
106         String JavaDoc id;
107         String JavaDoc destination;
108
109         int currentMsgNum = 1;
110
111         // My queue of active messages, indexed by sequence number
112
LinkedList activeMessages = new LinkedList();
113     }
114
115     class MessageRecord {
116         int sequenceNumber;
117         long timestamp = new Date().getTime();
118         SOAPEnvelope env;
119     }
120
121     void resendMsg(MessageRecord msg, String JavaDoc destination) throws Exception JavaDoc {
122         System.out.println("Resending : dest = " + destination + ", msg " + msg.sequenceNumber);
123         
124         Call call = new Call(destination);
125         call.setProperty("OneWay", Boolean.TRUE);
126         call.invoke(msg.env);
127         msg.timestamp = new Date().getTime();
128     }
129
130     Map sequences = new HashMap();
131
132     public void invoke(MessageContext msgContext) throws AxisFault {
133         processAck(msgContext.getRequestMessage().getSOAPEnvelope());
134     }
135
136     public void processAck(SOAPEnvelope req) throws AxisFault
137     {
138         SOAPHeaderElement header = req.getHeaderByName(NS_URI_WSRM, "SequenceAcknowledgement");
139         if (header == null)
140             return; // Fault?
141

142         Iterator i = header.getChildElements(new PrefixedQName(NS_URI_WSU, "Identifier", null));
143         if (!i.hasNext()) {
144             // return fault
145
throw new AxisFault("WSRM.Fault", "Missing identifier in Sequence", null, null);
146         }
147
148         MessageElement el = (MessageElement)i.next();
149         String JavaDoc id = el.getValue();
150         MySequence seq = (MySequence)sequences.get(id);
151         if (seq == null) {
152             // Acknowledging a sequence I don't know about...
153
throw new AxisFault("WSRM.UnknownSequence", "Don't recognize ack of sequence '" + id + "'", null, null);
154         }
155
156         i = header.getChildElements(new PrefixedQName(NS_URI_WSRM, "AcknowledgementRange", null));
157         while (i.hasNext()) {
158             el = (MessageElement)i.next();
159             String JavaDoc val = el.getAttributeValue("Upper");
160             int upper = Integer.parseInt(val);
161             val = el.getAttributeValue("Lower");
162             int lower = Integer.parseInt(val);
163             acknowledgeRange(seq, lower, upper);
164         }
165
166         header.setProcessed(true);
167
168         header = req.getHeaderByName(NS_URI_WSA, "From");
169         if (header != null) {
170             header.setProcessed(true);
171         }
172
173         header = req.getHeaderByName(NS_URI_WSA, "To");
174         if (header != null) {
175             header.setProcessed(true);
176         }
177
178         header = req.getHeaderByName(NS_URI_WSA, "MessageID");
179         if (header != null) {
180             header.setProcessed(true);
181         }
182
183         header = req.getHeaderByName(NS_URI_WSA, "Action");
184         if (header != null) {
185             header.setProcessed(true);
186         }
187     }
188
189     /**
190      * Process an acknowledgement range. Remove every matching message from
191      * our resend queue.
192      *
193      * @param seq the sequence we're dealing with
194      * @param min the lowest messageID that has been ack'ed
195      * @param max the highest messageID that has been ack'ed
196      */

197     public void acknowledgeRange(MySequence seq, int min, int max) {
198         LinkedList activeMessages = seq.activeMessages;
199         synchronized (activeMessages) {
200             for (int i = 0; i < activeMessages.size(); i++) {
201                 MessageRecord curMsg = (MessageRecord)activeMessages.get(i);
202                 if (min <= curMsg.sequenceNumber && max >= curMsg.sequenceNumber) {
203                     System.out.println("Removed msg #" + curMsg.sequenceNumber);
204                     activeMessages.remove(i);
205                     i--;
206                 }
207             }
208         }
209     }
210
211     /**
212      * Take a SOAP envelope that we want to send in a particular sequence,
213      * and decorate it appropriately with all the WS-RM headers.
214      *
215      * @param env the envelope
216      * @param toAddr the destination address
217      * @param fromAddr the origination address (can be anonymous - see spec)
218      * @param identifier the sequence identifier
219      * @param isLast true if this is the last message in the sequence
220      * @param skip cheesy extra parameter to tell the engine to skip a sequence
221      * number for testing purposes
222      * @throws Exception
223      */

224     public void doit(SOAPEnvelope env,
225                      String JavaDoc toAddr,
226                      String JavaDoc fromAddr,
227                      String JavaDoc identifier,
228                      boolean isLast,
229                      boolean skip) throws Exception JavaDoc {
230         MySequence myseq = (MySequence)sequences.get(identifier);
231         if (myseq == null) {
232             // New one
233
myseq = new MySequence();
234             myseq.id = identifier;
235             myseq.destination = toAddr;
236             sequences.put(identifier, myseq);
237         }
238
239         int curMsgNum = myseq.currentMsgNum++;
240         if (skip) {
241             curMsgNum = myseq.currentMsgNum++;
242         }
243         Integer JavaDoc seq = new Integer JavaDoc(curMsgNum);
244         String JavaDoc myAddr = fromAddr;
245
246         SOAPHeaderElement header =
247                 new SOAPHeaderElement(SEQUENCE_QNAME.getNamespaceURI(),
248                                       SEQUENCE_QNAME.getLocalPart());
249         MessageElement el;
250
251         el = new MessageElement(IDENTIFIER_QNAME, identifier);
252         header.addChild(el);
253
254         el = new MessageElement(MSGNUM_QNAME, seq);
255         header.addChild(el);
256         if (isLast) {
257             el = new MessageElement(NS_URI_WSRM, "LastMessage");
258             header.addChild(el);
259         }
260
261         env.addHeader(header);
262
263         header = new SOAPHeaderElement(NS_URI_WSA, "Action", PING_URI);
264         header.setMustUnderstand(true);
265         env.addHeader(header);
266
267         header = new SOAPHeaderElement(NS_URI_WSA, "From");
268         header.setMustUnderstand(true);
269         el = new MessageElement(new QName(NS_URI_WSA, "Address"), myAddr);
270         header.addChild(el);
271         env.addHeader(header);
272
273         header = new SOAPHeaderElement(NS_URI_WSA, "To", toAddr);
274         header.setMustUnderstand(true);
275         env.addHeader(header);
276
277         header = new SOAPHeaderElement(NS_URI_WSA, "MessageID", ReliableMessagingHandler.generateNewMsgID());
278         header.setMustUnderstand(true);
279         env.addHeader(header);
280
281         MessageRecord msgRec = new MessageRecord();
282         msgRec.env = env;
283         msgRec.sequenceNumber = curMsgNum;
284         myseq.activeMessages.add(msgRec);
285
286         // Tack on any piggybacked acks I have for this guy...
287
Iterator seqs = ReliableMessagingHandler.sequences.values().iterator();
288         while (seqs.hasNext()) {
289             Sequence s = (Sequence)seqs.next();
290             if (s.endpoint.equals(toAddr)) {
291                 ReliableMessagingHandler.generateAck(env, s);
292             }
293         }
294
295         synchronized (this) {
296             if (myRetryGuy == null) {
297                 myRetryGuy = new RetryGuy();
298                 new Thread JavaDoc(myRetryGuy).start();
299             }
300         }
301     }
302 }
303
Popular Tags