1 17 package org.apache.sandesha.server.msgprocessors; 18 19 import org.apache.axis.AxisFault; 20 import org.apache.axis.Message; 21 import org.apache.axis.MessageContext; 22 import org.apache.axis.components.logger.LogFactory; 23 import org.apache.commons.logging.Log; 24 import org.apache.sandesha.Constants; 25 import org.apache.sandesha.EnvelopeCreator; 26 import org.apache.sandesha.IStorageManager; 27 import org.apache.sandesha.RMMessageContext; 28 import org.apache.sandesha.storage.dao.SandeshaQueueDAO; 29 import org.apache.sandesha.ws.rm.AcknowledgementRange; 30 import org.apache.sandesha.ws.rm.SequenceAcknowledgement; 31 32 import javax.xml.soap.SOAPEnvelope ; 33 import java.util.ArrayList ; 34 import java.util.Iterator ; 35 import java.util.List ; 36 import java.util.Map ; 37 38 45 public final class AcknowledgementProcessor implements IRMMessageProcessor { 46 private IStorageManager storageManager = null; 47 private static final Log log = LogFactory.getLog(SandeshaQueueDAO.class.getName()); 48 49 public AcknowledgementProcessor(IStorageManager storageManager) { 50 this.storageManager = storageManager; 51 } 52 53 public final boolean processMessage(RMMessageContext rmMessageContext) throws AxisFault { 54 SequenceAcknowledgement seqAcknowledgement = rmMessageContext.getRMHeaders() 55 .getSequenceAcknowledgement(); 56 String seqID = seqAcknowledgement.getIdentifier().getIdentifier(); 57 List ackRanges = seqAcknowledgement.getAckRanges(); 58 Iterator ite = ackRanges.iterator(); 59 60 while (ite.hasNext()) { 61 AcknowledgementRange ackRange = (AcknowledgementRange) ite.next(); 62 long msgNumber = ackRange.getMinValue(); 63 while (ackRange.getMaxValue() >= msgNumber) { 64 if (!storageManager.isSentMsg(seqID, msgNumber)) { 65 throw new AxisFault(new javax.xml.namespace.QName (Constants.FaultCodes.WSRM_FAULT_INVALID_ACKNOWLEDGEMENT), 66 Constants.FaultMessages.INVALID_ACKNOWLEDGEMENT, null, null); 67 } 68 storageManager.setAckReceived(seqID, msgNumber); 69 storageManager.setAcknowledged(seqID, msgNumber); 70 msgNumber++; 71 } 72 } 73 return false; 74 } 75 76 77 public boolean sendAcknowledgement(RMMessageContext rmMessageContext) throws AxisFault { 78 String seqID = rmMessageContext.getSequenceID(); 79 80 long messageNumber = rmMessageContext.getRMHeaders().getSequence().getMessageNumber() 81 .getMessageNumber(); 82 String seq = storageManager.getOutgoingSeqenceIdOfIncomingMsg(rmMessageContext); 83 Map listOfMsgNumbers = storageManager.getListOfMessageNumbers(seq); 84 85 ArrayList ackRangeList = null; 86 if (listOfMsgNumbers != null) { 87 ackRangeList = getAckRangesVector(listOfMsgNumbers); 88 } else { 89 ackRangeList = new ArrayList (); 90 AcknowledgementRange ackRange = new AcknowledgementRange(); 91 ackRange.setMaxValue(messageNumber); 92 ackRange.setMinValue(messageNumber); 93 ackRangeList.add(ackRange); 94 } 95 RMMessageContext rmMsgContext = getAckRMMsgCtx(rmMessageContext, ackRangeList); 96 97 if (true == 98 (storageManager.getAcksTo(seqID).equals(Constants.WSA.NS_ADDRESSING_ANONYMOUS))) { 99 try { 100 String soapMsg = rmMsgContext.getMsgContext().getResponseMessage().getSOAPEnvelope() 101 .toString(); 102 rmMessageContext.getMsgContext().setResponseMessage(new Message(soapMsg)); 103 } catch (AxisFault af) { 104 af.setFaultCodeAsString(Constants.FaultCodes.WSRM_SERVER_INTERNAL_ERROR); 105 throw af; 106 } 107 return true; 108 } else { 109 storageManager.addAcknowledgement(rmMsgContext); 111 return false; 112 } 113 } 114 115 private RMMessageContext getAckRMMsgCtx(RMMessageContext rmMessageContext, 116 List ackRangeList) { 117 RMMessageContext rmMsgContext = new RMMessageContext(); 118 try { 119 120 String to = storageManager.getAcksTo(rmMessageContext.getRMHeaders().getSequence().getIdentifier().getIdentifier()); 121 122 SOAPEnvelope ackEnvelope = EnvelopeCreator.createAcknowledgementEnvelope(rmMessageContext, to, ackRangeList); 123 124 Message resMsg = new Message(ackEnvelope); 125 MessageContext msgContext = new MessageContext(rmMessageContext.getMsgContext().getAxisEngine()); 126 rmMessageContext.copyContents(rmMsgContext); 127 msgContext.setResponseMessage(resMsg); 128 rmMsgContext.setMsgContext(msgContext); 129 130 rmMsgContext.setOutGoingAddress(to); 133 rmMsgContext.setMessageType(Constants.MSG_TYPE_ACKNOWLEDGEMENT); 134 } catch (Exception e) { 135 log.error(e); 136 } 137 return rmMsgContext; 138 } 139 140 147 private ArrayList getAckRangesVector(Map listOfMsgNumbers) { 148 long min; 149 long max; 150 long size = listOfMsgNumbers.size(); 151 ArrayList list = new ArrayList (); 152 boolean found = false; 153 154 min = ((Long ) listOfMsgNumbers.get(new Long (1))).longValue(); 155 max = min; 156 157 if (size > 1) { 158 for (long i = 1; i <= size; i++) { 159 160 if (i + 1 > size) { 161 found = true; 162 max = ((Long ) listOfMsgNumbers.get(new Long (i))).longValue(); 163 } else { 164 165 if (1 == (((Long ) listOfMsgNumbers.get(new Long (i + 1))).longValue() - ((Long ) listOfMsgNumbers.get(new Long (i))).longValue())) { 166 max = ((Long ) listOfMsgNumbers.get(new Long (i + 1))).longValue(); 167 found = true; 168 } else { 169 found = false; 170 max = ((Long ) listOfMsgNumbers.get(new Long (i))).longValue(); 171 AcknowledgementRange ackRange = new AcknowledgementRange(); 172 ackRange.setMaxValue(max); 173 ackRange.setMinValue(min); 174 list.add(ackRange); 175 176 min = ((Long ) listOfMsgNumbers.get(new Long (i + 1))).longValue(); 177 } 178 179 } 180 } 181 if (found) { 182 AcknowledgementRange ackRange = new AcknowledgementRange(); 183 ackRange.setMaxValue(max); 184 ackRange.setMinValue(min); 185 list.add(ackRange); 186 } 187 } else { 188 AcknowledgementRange ackRange = new AcknowledgementRange(); 189 ackRange.setMaxValue(max); 190 ackRange.setMinValue(min); 191 list.add(ackRange); 192 } 193 return list; 194 } 195 } | Popular Tags |