1 17 package org.apache.sandesha.client; 18 19 import org.apache.axis.AxisFault; 20 import org.apache.axis.Message; 21 import org.apache.axis.MessageContext; 22 import org.apache.axis.client.Call; 23 import org.apache.axis.components.logger.LogFactory; 24 import org.apache.axis.components.uuid.UUIDGen; 25 import org.apache.axis.components.uuid.UUIDGenFactory; 26 import org.apache.axis.handlers.BasicHandler; 27 import org.apache.axis.message.addressing.AddressingHeaders; 28 import org.apache.commons.logging.Log; 29 import org.apache.sandesha.Constants; 30 import org.apache.sandesha.IStorageManager; 31 import org.apache.sandesha.RMMessageContext; 32 import org.apache.sandesha.RMReport; 33 import org.apache.sandesha.util.PolicyLoader; 34 import org.apache.sandesha.util.RMMessageCreator; 35 import org.apache.sandesha.ws.rm.RMHeaders; 36 37 52 public class RMSender extends BasicHandler { 53 54 private IStorageManager storageManager; 55 private static final Log log = LogFactory.getLog(RMSender.class.getName()); 56 private final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen(); 57 private static Boolean lock = new Boolean (false); 58 59 67 68 public void invoke(MessageContext msgContext) throws AxisFault { 69 70 storageManager = new ClientStorageManager(); 71 72 try { 73 RMMessageContext reqMsgCtx = null; 74 String tempSeqID = null; 75 76 reqMsgCtx = getRMMessageContext(msgContext); 77 78 tempSeqID = reqMsgCtx.getSequenceID(); 79 80 reqMsgCtx = processRequestMessage(reqMsgCtx, reqMsgCtx.getSync()); 81 82 if (reqMsgCtx.isHasResponse()) { 83 RMMessageContext responseMessageContext = null; 84 long startingTime = System.currentTimeMillis(); 85 long inactivityTimeOut = PolicyLoader.getInstance().getInactivityTimeout(); 86 87 while (responseMessageContext == null) { 88 synchronized (lock) { 89 responseMessageContext = 90 checkTheQueueForResponse(tempSeqID, reqMsgCtx.getMessageID()); 91 if ((System.currentTimeMillis() - startingTime) >= inactivityTimeOut) { 92 reqMsgCtx.getCtx().stopClientByForce(); 93 } 94 Thread.sleep(Constants.CLIENT_RESPONSE_CHECKING_INTERVAL); 95 } 96 } 97 98 if (responseMessageContext != null) { 100 String oldSeqId = reqMsgCtx.getOldSequenceID(); 101 if (oldSeqId != null) { 102 Call call = (Call) reqMsgCtx.getCtx().getCallMap().get(reqMsgCtx.getOldSequenceID()); 103 104 if (call != null) { 105 RMReport report = (RMReport) call.getProperty(Constants.ClientProperties.REPORT); 106 report.incrementReturnedMsgCount(); 107 } 108 } 109 } 110 111 Message resMsg = responseMessageContext.getMsgContext().getRequestMessage(); 113 RMHeaders.removeHeaders(resMsg.getSOAPEnvelope()); 114 AddressingHeaders addHeaders = new AddressingHeaders(resMsg.getSOAPEnvelope(), 115 null, true, false, false, null); 116 117 msgContext.setResponseMessage(resMsg); 118 } else { 119 msgContext.setResponseMessage(null); 120 } 121 122 } catch (Exception ex) { 123 log.error(ex); 124 125 throw new AxisFault(ex.getLocalizedMessage()); 126 127 } 128 } 129 130 138 private RMMessageContext processRequestMessage(RMMessageContext reqRMMsgContext, 139 boolean sync) throws Exception { 140 synchronized (lock) { 141 142 if (!storageManager.isSequenceExist(reqRMMsgContext.getSequenceID())) { 143 String msgID = Constants.UUID + uuidGen.nextUUID(); 144 String offerID = null; 145 if (reqRMMsgContext.isHasResponse() && reqRMMsgContext.isSendOffer()) { 146 offerID = Constants.UUID + uuidGen.nextUUID(); 147 storageManager.addRequestedSequence(offerID); 148 storageManager.addOffer(msgID, offerID); 149 } 150 151 RMMessageContext createSeqRMMsgContext = RMMessageCreator.createCreateSeqMsg(reqRMMsgContext, Constants.CLIENT, msgID, offerID); 152 storageManager.addOutgoingSequence(reqRMMsgContext.getSequenceID()); 153 storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(), 154 createSeqRMMsgContext.getMessageID()); 155 156 createSeqRMMsgContext.setSync(sync); 157 storageManager.addCreateSequenceRequest(createSeqRMMsgContext); 158 processMessage(reqRMMsgContext); 159 160 } else { 161 processMessage(reqRMMsgContext); 162 } 163 164 } 165 166 167 return reqRMMsgContext; 168 } 169 170 private RMMessageContext processMessage(RMMessageContext reqRMMsgContext) 171 throws Exception { 172 if (reqRMMsgContext.isLastMessage()) { 173 storageManager.insertTerminateSeqMessage(RMMessageCreator.createTerminateSeqMsg(reqRMMsgContext, Constants.CLIENT)); 174 } 175 RMMessageContext serviceRequestMsg = RMMessageCreator.createServiceRequestMessage(reqRMMsgContext); 176 storageManager.insertOutgoingMessage(serviceRequestMsg); 177 return reqRMMsgContext; 178 } 179 180 private RMMessageContext checkTheQueueForResponse(String sequenceId, String reqMessageID) { 181 return storageManager.checkForResponseMessage(sequenceId, reqMessageID); 182 } 183 184 private RMMessageContext getRMMessageContext(MessageContext msgCtx) throws Exception { 185 MessageContext newMsgContext = RMMessageCreator.cloneMsgContext(msgCtx); 188 RMMessageContext requestMesssageContext = new RMMessageContext(); 189 Call call = (Call) newMsgContext.getProperty(MessageContext.CALL); 190 191 requestMesssageContext = ClientPropertyValidator.validate(call); 192 requestMesssageContext.setOutGoingAddress((String ) msgCtx.getProperty(MessageContext.TRANS_URL)); 193 requestMesssageContext.setMsgContext(newMsgContext); 194 return requestMesssageContext; 195 } 196 197 198 } 199 200 201 202 | Popular Tags |