1 17 18 package org.apache.sandesha.server; 19 20 import org.apache.axis.AxisFault; 21 import org.apache.axis.Message; 22 import org.apache.axis.SimpleChain; 23 import org.apache.axis.client.Call; 24 import org.apache.axis.client.Service; 25 import org.apache.axis.components.logger.LogFactory; 26 import org.apache.axis.components.uuid.UUIDGen; 27 import org.apache.axis.components.uuid.UUIDGenFactory; 28 import org.apache.axis.message.addressing.AddressingHeaders; 29 import org.apache.commons.logging.Log; 30 import org.apache.sandesha.Constants; 31 import org.apache.sandesha.EnvelopeCreator; 32 import org.apache.sandesha.IStorageManager; 33 import org.apache.sandesha.RMMessageContext; 34 import org.apache.sandesha.server.msgprocessors.IRMMessageProcessor; 35 import org.apache.sandesha.storage.Callback; 36 import org.apache.sandesha.storage.CallbackData; 37 import org.apache.sandesha.util.PolicyLoader; 38 import org.apache.sandesha.ws.rm.RMHeaders; 39 40 import javax.xml.rpc.ServiceException ; 41 import javax.xml.soap.SOAPEnvelope ; 42 import javax.xml.soap.SOAPException ; 43 44 51 public class SenderWorker implements Runnable { 52 private static final Log log = LogFactory.getLog(SenderWorker.class.getName()); 53 public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen(); 54 public static Callback callback; 55 public boolean running = true; 56 private IStorageManager storageManager; 57 58 59 public static synchronized Callback getCallback() { 60 return callback; 61 } 62 63 public static synchronized void setCallback(Callback cb) { 64 callback = cb; 65 } 66 67 private SimpleChain requestChain = null; 68 private SimpleChain responseChain = null; 69 70 public SimpleChain getRequestChain() { 71 return requestChain; 72 } 73 74 public void setRequestChain(SimpleChain requestChain) { 75 this.requestChain = requestChain; 76 } 77 78 public SimpleChain getResponseChain() { 79 return responseChain; 80 } 81 82 public void setResponseChain(SimpleChain responseChanin) { 83 this.responseChain = responseChanin; 84 } 85 86 public SenderWorker() { 87 storageManager = new ServerStorageManager(); 88 } 89 90 public SenderWorker(IStorageManager storageManager) { 91 this.storageManager = storageManager; 92 } 93 94 public boolean isRunning() { 95 return running; 96 } 97 98 public void setRunning(boolean running) { 99 this.running = running; 100 } 101 102 public void run() { 103 104 while (running) { 105 long startTime = System.currentTimeMillis(); 106 boolean hasMessages = true; 107 do { 109 110 RMMessageContext rmMessageContext = storageManager.getNextMessageToSend(); 111 if (rmMessageContext == null) { 112 hasMessages = false; 113 } else { 114 long inactivityTimeout = PolicyLoader.getInstance().getInactivityTimeout(); 115 long retransmissionInterval = PolicyLoader.getInstance() 116 .getBaseRetransmissionInterval(); 117 118 if (rmMessageContext.getFristProcessedTime() == 0) 119 rmMessageContext.setFristProcessedTime(System.currentTimeMillis()); 120 121 if ((System.currentTimeMillis() - rmMessageContext.getFristProcessedTime()) > 122 inactivityTimeout) { 123 log.error("Inactivity Time Out Reached for the message with <wsa:MessageID> " + 124 rmMessageContext.getMessageID()); 125 128 } else if (rmMessageContext.getRetransmissionTime() < 129 (System.currentTimeMillis() - rmMessageContext.getLastPrecessedTime())) { 130 try { 131 132 rmMessageContext.setLastPrecessedTime(System.currentTimeMillis()); 133 134 if (PolicyLoader.getInstance().getExponentialBackoff() != null) { 135 long newRtTime = ((long) Math.pow(retransmissionInterval / 1000, 136 rmMessageContext.getReTransmissionCount())) * 1000; 137 rmMessageContext.setRetransmissionTime(newRtTime); 138 139 } else { 140 long rtTime = rmMessageContext.getRetransmissionTime(); 142 rmMessageContext.setRetransmissionTime(2 * rtTime); 143 144 } 145 sendMessage(rmMessageContext); 146 rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1); 147 148 rmMessageContext.setLocked(false); 149 150 } catch (AxisFault e) { 151 rmMessageContext.setLocked(false); 152 log.error(e); 153 } catch (SOAPException e) { 154 rmMessageContext.setLocked(false); 155 log.error(e); 156 } catch (Exception e) { 157 rmMessageContext.setLocked(false); 158 log.error(e); 159 } 160 } 161 rmMessageContext.setLocked(false); 162 163 } 164 } while (hasMessages); 165 166 long timeGap = System.currentTimeMillis() - startTime; 167 if ((timeGap - Constants.SENDER_SLEEP_TIME) <= 0) { 168 try { 169 Thread.sleep(Constants.SENDER_SLEEP_TIME - timeGap); 170 } catch (Exception ex) { 171 log.error(ex); 172 } 173 } 174 } 175 } 176 177 private void sendMessage(RMMessageContext rmMessageContext) throws Exception { 178 switch (rmMessageContext.getMessageType()) { 179 case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST: 180 { 181 if (log.isDebugEnabled()) 182 log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ); 183 sendCreateSequenceRequest(rmMessageContext); 184 break; 185 } 186 case Constants.MSG_TYPE_CREATE_SEQUENCE_RESPONSE: 187 { 188 if (log.isDebugEnabled()) 189 log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ_RES); 190 191 sendCreateSequenceResponse(rmMessageContext); 192 break; 193 } 194 case Constants.MSG_TYPE_TERMINATE_SEQUENCE: 195 { 196 if (log.isDebugEnabled()) 197 log.debug(Constants.InfomationMessage.SENDING_TERMINATE_SEQ); 198 sendTerminateSequenceRequest(rmMessageContext); 199 storageManager.setTerminateSend(storageManager.getKeyFromOutgoingSeqId(rmMessageContext.getSequenceID())); 200 break; 201 } 202 case Constants.MSG_TYPE_ACKNOWLEDGEMENT: 203 { 204 if (log.isDebugEnabled()) 205 log.debug(Constants.InfomationMessage.SENDING_ACK); 206 sendAcknowldgement(rmMessageContext); 207 break; 208 } 209 case Constants.MSG_TYPE_SERVICE_REQUEST: 210 { 211 if (log.isDebugEnabled()) 212 log.debug(Constants.InfomationMessage.SENDING_REQ); 213 sendServiceRequest(rmMessageContext); 214 break; 215 } 216 case Constants.MSG_TYPE_SERVICE_RESPONSE: 217 { 218 if (log.isDebugEnabled()) 219 log.debug(Constants.InfomationMessage.SENDING_RES); 220 sendServiceResponse(rmMessageContext); 221 break; 222 } 223 } 224 } 225 226 227 230 private void sendTerminateSequenceRequest(RMMessageContext rmMessageContext) throws Exception { 231 SOAPEnvelope terSeqEnv = EnvelopeCreator.createTerminatSeqMessage(rmMessageContext); 232 233 Message terSeqMsg = new Message(terSeqEnv); 234 rmMessageContext.getMsgContext().setRequestMessage(terSeqMsg); 235 236 Call call; 237 call = prepareCall(rmMessageContext); 238 call.invoke(); 239 240 processResponseMessage(call, rmMessageContext); 241 } 242 243 private void sendServiceResponse(RMMessageContext rmMessageContext) throws Exception { 244 SOAPEnvelope responseEnvelope = null; 245 responseEnvelope = EnvelopeCreator.createServiceResponseEnvelope(rmMessageContext); 246 247 248 rmMessageContext.getMsgContext().setRequestMessage(new Message(responseEnvelope)); 249 251 Service service = new Service(); 252 Call call = (Call) service.createCall(); 253 254 if (rmMessageContext.getAddressingHeaders().getAction() != null) { 255 call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString()); 256 } 257 258 call.setTargetEndpointAddress(rmMessageContext.getAddressingHeaders().getReplyTo().getAddress().toString()); 259 260 String soapMsg = rmMessageContext.getMsgContext().getRequestMessage().getSOAPPartAsString(); 262 263 264 if (soapMsg != null) 265 call.setRequestMessage(new Message(soapMsg)); 266 else { 267 call.setRequestMessage(new Message(rmMessageContext.getMsgContext().getRequestMessage().getSOAPEnvelope())); 268 } 269 270 storageManager.addSendMsgNo(rmMessageContext.getSequenceID(), 274 rmMessageContext.getMsgNumber()); 275 call.invoke(); 276 277 } 278 279 private void sendCreateSequenceRequest(RMMessageContext rmMsgCtx) throws Exception { 280 Call call; 281 282 SOAPEnvelope reqEnvelope = EnvelopeCreator.createCreateSequenceEnvelope(rmMsgCtx); 283 rmMsgCtx.getMsgContext().setRequestMessage(new Message(reqEnvelope)); 284 285 call = prepareCall(rmMsgCtx); 286 call.invoke(); 287 288 processResponseMessage(call, rmMsgCtx); 289 290 } 291 292 private void sendCreateSequenceResponse(RMMessageContext rmMessageContext) throws Exception { 293 if (rmMessageContext.getMsgContext().getResponseMessage() == null) { 297 log.error(Constants.ErrorMessages.NULL_REQUEST_MSG); 299 } else { 300 Call call = prepareCall(rmMessageContext); 301 call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage()); 302 call.invoke(); 303 } 304 } 305 306 private void sendAcknowldgement(RMMessageContext rmMessageContext) throws Exception { 307 if (rmMessageContext.getMsgContext().getResponseMessage() == null) { 310 log.error(Constants.ErrorMessages.NULL_REQUEST_MSG); 311 } else { 312 Call call = prepareCall(rmMessageContext); 313 call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage()); 314 call.invoke(); 315 } 316 } 317 318 private Call prepareCall(RMMessageContext rmMessageContext) throws ServiceException , AxisFault { 319 Service service = new Service(); 320 Call call = (Call) service.createCall(); 321 call.setTargetEndpointAddress(rmMessageContext.getOutGoingAddress()); 322 323 call.setClientHandlers(requestChain, responseChain); 324 if (rmMessageContext.getMsgContext().getRequestMessage() != null) { 325 String soapMsg = rmMessageContext.getMsgContext().getRequestMessage() 326 .getSOAPPartAsString(); 327 call.setRequestMessage(new Message(soapMsg)); 328 if (rmMessageContext.getAddressingHeaders().getAction() != null) { 329 call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString()); 330 } 331 } 332 return call; 333 } 334 335 private void sendServiceRequest(RMMessageContext rmMessageContext) throws Exception { 336 337 SOAPEnvelope requestEnvelope = null; 338 340 requestEnvelope = EnvelopeCreator.createServiceRequestEnvelope(rmMessageContext); 341 rmMessageContext.getMsgContext().setRequestMessage(new Message(requestEnvelope)); 342 if (rmMessageContext.getSync()) { 343 Call call; 344 call = prepareCall(rmMessageContext); 345 storageManager.addSendMsgNo(rmMessageContext.getSequenceID(), 347 rmMessageContext.getMsgNumber()); 348 call.invoke(); 349 processResponseMessage(call, rmMessageContext); 350 351 } else { 352 Call call = prepareCall(rmMessageContext); 353 storageManager.addSendMsgNo(rmMessageContext.getSequenceID(), 354 rmMessageContext.getMsgNumber()); 355 call.invoke(); 356 processResponseMessage(call, rmMessageContext); 357 358 } 359 } 360 361 private void processResponseMessage(Call call, RMMessageContext rmMessageContext) 362 throws Exception { 363 364 if (call.getResponseMessage() != null) { 365 RMHeaders rmHeaders = new RMHeaders(); 366 rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope()); 367 rmMessageContext.setRMHeaders(rmHeaders); 368 AddressingHeaders addrHeaders = new AddressingHeaders(call.getResponseMessage().getSOAPEnvelope()); 369 rmMessageContext.setAddressingHeaders(addrHeaders); 370 rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage()); 371 IRMMessageProcessor messagePrcessor = RMMessageProcessorIdentifier.getMessageProcessor(rmMessageContext, storageManager); 372 messagePrcessor.processMessage(rmMessageContext); 373 } 374 375 if (getCallback() != null) { 376 CallbackData data = new CallbackData(); 377 data.setMessageId(rmMessageContext.getMessageID()); 378 data.setMessageType(rmMessageContext.getMessageType()); 379 data.setSequenceId(rmMessageContext.getSequenceID()); 380 callback.onIncomingMessage(data); 381 } 382 383 } 384 385 } 386 | Popular Tags |