KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > sandesha > server > msgprocessors > AcknowledgementProcessor


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */

17 package org.apache.sandesha.server.msgprocessors;
18
19 import org.apache.axis.AxisFault;
20 import org.apache.axis.Message;
21 import org.apache.axis.MessageContext;
22 import org.apache.axis.components.logger.LogFactory;
23 import org.apache.commons.logging.Log;
24 import org.apache.sandesha.Constants;
25 import org.apache.sandesha.EnvelopeCreator;
26 import org.apache.sandesha.IStorageManager;
27 import org.apache.sandesha.RMMessageContext;
28 import org.apache.sandesha.storage.dao.SandeshaQueueDAO;
29 import org.apache.sandesha.ws.rm.AcknowledgementRange;
30 import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
31
32 import javax.xml.soap.SOAPEnvelope JavaDoc;
33 import java.util.ArrayList JavaDoc;
34 import java.util.Iterator JavaDoc;
35 import java.util.List JavaDoc;
36 import java.util.Map JavaDoc;
37
38 /**
39  * Processor for the acknowledgements. This will handle both processing of acknowledgements
40  * and sending acknowledgements. Sending part is required as the user wants synchronous
41  * acknowldgements to be sent from the server.
42  *
43  * @author Jaliya Eknayake
44  */

45 public final class AcknowledgementProcessor implements IRMMessageProcessor {
46     private IStorageManager storageManager = null;
47     private static final Log log = LogFactory.getLog(SandeshaQueueDAO.class.getName());
48
49     public AcknowledgementProcessor(IStorageManager storageManager) {
50         this.storageManager = storageManager;
51     }
52
53     public final boolean processMessage(RMMessageContext rmMessageContext) throws AxisFault {
54         SequenceAcknowledgement seqAcknowledgement = rmMessageContext.getRMHeaders()
55                 .getSequenceAcknowledgement();
56         String JavaDoc seqID = seqAcknowledgement.getIdentifier().getIdentifier();
57         List JavaDoc ackRanges = seqAcknowledgement.getAckRanges();
58         Iterator JavaDoc ite = ackRanges.iterator();
59
60         while (ite.hasNext()) {
61             AcknowledgementRange ackRange = (AcknowledgementRange) ite.next();
62             long msgNumber = ackRange.getMinValue();
63             while (ackRange.getMaxValue() >= msgNumber) {
64                 if (!storageManager.isSentMsg(seqID, msgNumber)) {
65                     throw new AxisFault(new javax.xml.namespace.QName JavaDoc(Constants.FaultCodes.WSRM_FAULT_INVALID_ACKNOWLEDGEMENT),
66                             Constants.FaultMessages.INVALID_ACKNOWLEDGEMENT, null, null);
67                 }
68                 storageManager.setAckReceived(seqID, msgNumber);
69                 storageManager.setAcknowledged(seqID, msgNumber);
70                 msgNumber++;
71             }
72         }
73         return false;
74     }
75
76
77     public boolean sendAcknowledgement(RMMessageContext rmMessageContext) throws AxisFault {
78         String JavaDoc seqID = rmMessageContext.getSequenceID();
79
80         long messageNumber = rmMessageContext.getRMHeaders().getSequence().getMessageNumber()
81                 .getMessageNumber();
82         String JavaDoc seq = storageManager.getOutgoingSeqenceIdOfIncomingMsg(rmMessageContext);
83         Map JavaDoc listOfMsgNumbers = storageManager.getListOfMessageNumbers(seq);
84
85         ArrayList JavaDoc ackRangeList = null;
86         if (listOfMsgNumbers != null) {
87             ackRangeList = getAckRangesVector(listOfMsgNumbers);
88         } else {
89             ackRangeList = new ArrayList JavaDoc();
90             AcknowledgementRange ackRange = new AcknowledgementRange();
91             ackRange.setMaxValue(messageNumber);
92             ackRange.setMinValue(messageNumber);
93             ackRangeList.add(ackRange);
94         }
95         RMMessageContext rmMsgContext = getAckRMMsgCtx(rmMessageContext, ackRangeList);
96
97         if (true ==
98                 (storageManager.getAcksTo(seqID).equals(Constants.WSA.NS_ADDRESSING_ANONYMOUS))) {
99             try {
100                 String JavaDoc soapMsg = rmMsgContext.getMsgContext().getResponseMessage().getSOAPEnvelope()
101                         .toString();
102                 rmMessageContext.getMsgContext().setResponseMessage(new Message(soapMsg));
103             } catch (AxisFault af) {
104                 af.setFaultCodeAsString(Constants.FaultCodes.WSRM_SERVER_INTERNAL_ERROR);
105                 throw af;
106             }
107             return true;
108         } else {
109             //Store the asynchronize ack in the queue. The name for this queue is not yet fixed.
110
storageManager.addAcknowledgement(rmMsgContext);
111             return false;
112         }
113     }
114
115     private RMMessageContext getAckRMMsgCtx(RMMessageContext rmMessageContext,
116                                             List JavaDoc ackRangeList) {
117         RMMessageContext rmMsgContext = new RMMessageContext();
118         try {
119
120             String JavaDoc to = storageManager.getAcksTo(rmMessageContext.getRMHeaders().getSequence().getIdentifier().getIdentifier());
121
122             SOAPEnvelope JavaDoc ackEnvelope = EnvelopeCreator.createAcknowledgementEnvelope(rmMessageContext, to, ackRangeList);
123
124             Message resMsg = new Message(ackEnvelope);
125             MessageContext msgContext = new MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
126             rmMessageContext.copyContents(rmMsgContext);
127             msgContext.setResponseMessage(resMsg);
128             rmMsgContext.setMsgContext(msgContext);
129
130             //Get the from address to send the Ack. Doesn't matter whether we have Sync or
131
// ASync messages. If we have Sync them this property is not used.
132
rmMsgContext.setOutGoingAddress(to);
133             rmMsgContext.setMessageType(Constants.MSG_TYPE_ACKNOWLEDGEMENT);
134         } catch (Exception JavaDoc e) {
135             log.error(e);
136         }
137         return rmMsgContext;
138     }
139
140     /**
141      * This method will split the input map with messages numbers to respectable
142      * message ranges and will return a vector of AcknowledgementRange.
143      *
144      * @param listOfMsgNumbers
145      * @return
146      */

147     private ArrayList JavaDoc getAckRangesVector(Map JavaDoc listOfMsgNumbers) {
148         long min;
149         long max;
150         long size = listOfMsgNumbers.size();
151         ArrayList JavaDoc list = new ArrayList JavaDoc();
152         boolean found = false;
153
154         min = ((Long JavaDoc) listOfMsgNumbers.get(new Long JavaDoc(1))).longValue();
155         max = min;
156
157         if (size > 1) {
158             for (long i = 1; i <= size; i++) {
159
160                 if (i + 1 > size) {
161                     found = true;
162                     max = ((Long JavaDoc) listOfMsgNumbers.get(new Long JavaDoc(i))).longValue();
163                 } else {
164
165                     if (1 == (((Long JavaDoc) listOfMsgNumbers.get(new Long JavaDoc(i + 1))).longValue() - ((Long JavaDoc) listOfMsgNumbers.get(new Long JavaDoc(i))).longValue())) {
166                         max = ((Long JavaDoc) listOfMsgNumbers.get(new Long JavaDoc(i + 1))).longValue();
167                         found = true;
168                     } else {
169                         found = false;
170                         max = ((Long JavaDoc) listOfMsgNumbers.get(new Long JavaDoc(i))).longValue();
171                         AcknowledgementRange ackRange = new AcknowledgementRange();
172                         ackRange.setMaxValue(max);
173                         ackRange.setMinValue(min);
174                         list.add(ackRange);
175
176                         min = ((Long JavaDoc) listOfMsgNumbers.get(new Long JavaDoc(i + 1))).longValue();
177                     }
178
179                 }
180             }
181             if (found) {
182                 AcknowledgementRange ackRange = new AcknowledgementRange();
183                 ackRange.setMaxValue(max);
184                 ackRange.setMinValue(min);
185                 list.add(ackRange);
186             }
187         } else {
188             AcknowledgementRange ackRange = new AcknowledgementRange();
189             ackRange.setMaxValue(max);
190             ackRange.setMinValue(min);
191             list.add(ackRange);
192         }
193         return list;
194     }
195 }
Popular Tags