1 17 package org.apache.sandesha.server.msgprocessors; 18 19 import org.apache.axis.AxisFault; 20 import org.apache.axis.Message; 21 import org.apache.axis.MessageContext; 22 import org.apache.axis.components.logger.LogFactory; 23 import org.apache.axis.message.addressing.AddressingHeaders; 24 import org.apache.axis.message.addressing.RelatesTo; 25 import org.apache.commons.logging.Log; 26 import org.apache.sandesha.Constants; 27 import org.apache.sandesha.IStorageManager; 28 import org.apache.sandesha.RMMessageContext; 29 import org.apache.sandesha.storage.dao.SandeshaQueueDAO; 30 import org.apache.sandesha.ws.rm.RMHeaders; 31 32 import javax.xml.namespace.QName ; 33 34 39 public class CompositeProcessor implements IRMMessageProcessor { 40 41 private IStorageManager storageManager; 42 private static final Log log = LogFactory.getLog(SandeshaQueueDAO.class.getName()); 43 44 public CompositeProcessor(IStorageManager storageManger) { 45 storageManager = storageManger; 46 } 47 48 public boolean processMessage(RMMessageContext rmMessageContext) throws AxisFault { 49 50 RMHeaders rmHeaders = rmMessageContext.getRMHeaders(); 51 AddressingHeaders addrHeaders = rmMessageContext.getAddressingHeaders(); 52 AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor(storageManager); 53 if (rmHeaders.getSequenceAcknowledgement() != null) { 54 ackProcessor.processMessage(rmMessageContext); 55 } 56 57 if (rmHeaders.getSequence() != null) { 58 if (rmHeaders.getSequence().getMessageNumber() != null) { 59 String sequenceUUID = rmHeaders.getSequence().getIdentifier().getIdentifier(); 60 long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber(); 61 62 String seqId = storageManager.getOutgoingSeqenceIdOfIncomingMsg(rmMessageContext); 63 boolean hasSequence = storageManager.isSequenceExist(seqId); 64 65 if (addrHeaders.getRelatesTo() != null && !addrHeaders.getRelatesTo().isEmpty()) { 66 RelatesTo relatesTo = (RelatesTo) addrHeaders.getRelatesTo().get(0); 67 String messageId = relatesTo.getURI().toString(); 68 seqId = storageManager.getOutgoingSeqOfMsg(messageId); 69 } 70 if (!hasSequence) { 71 storageManager.addIncomingSequence(seqId); 72 } 73 if (!storageManager.isMessageExist(seqId, messageNumber)) { 74 RMMessageContext rmMsgContext = new RMMessageContext(); 76 rmMessageContext.copyContents(rmMsgContext); 78 rmMsgContext.setSequenceID(sequenceUUID); 79 rmMsgContext.setMsgNumber(messageNumber); 80 try { 81 MessageContext msgContext = new MessageContext(rmMessageContext.getMsgContext().getAxisEngine()); 82 RMMessageContext.copyMessageContext(rmMessageContext.getMsgContext(), 83 msgContext); 84 String soapMsg = rmMessageContext.getMsgContext().getRequestMessage() 85 .getSOAPEnvelope() 86 .toString(); 87 Message reqMsg = new Message(soapMsg); 88 89 msgContext.setRequestMessage(reqMsg); 90 rmMsgContext.setMsgContext(msgContext); 91 rmMsgContext.setMessageType(Constants.MSG_TYPE_SERVICE_REQUEST); 92 } catch (Exception e) { 93 log.error(e); 94 throw new AxisFault(new QName (Constants.FaultCodes.WSRM_SERVER_INTERNAL_ERROR), 95 Constants.FaultMessages.SERVER_INTERNAL_ERROR, null, null); 96 } 97 storageManager.insertIncomingMessage(rmMsgContext); 98 } 99 100 if (rmHeaders.getAckRequest() != null || 103 rmHeaders.getSequence().getLastMessage() != null) { 104 storageManager.sendAck(sequenceUUID); 105 return ackProcessor.sendAcknowledgement(rmMessageContext); 106 } else { 107 boolean sync = ackProcessor.sendAcknowledgement(rmMessageContext); 108 return sync; 109 } 110 } 111 } 112 return false; 113 } 114 115 } | Popular Tags |