KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > sandesha > server > SenderWorker


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */

17
18 package org.apache.sandesha.server;
19
20 import org.apache.axis.AxisFault;
21 import org.apache.axis.Message;
22 import org.apache.axis.SimpleChain;
23 import org.apache.axis.client.Call;
24 import org.apache.axis.client.Service;
25 import org.apache.axis.components.logger.LogFactory;
26 import org.apache.axis.components.uuid.UUIDGen;
27 import org.apache.axis.components.uuid.UUIDGenFactory;
28 import org.apache.axis.message.addressing.AddressingHeaders;
29 import org.apache.commons.logging.Log;
30 import org.apache.sandesha.Constants;
31 import org.apache.sandesha.EnvelopeCreator;
32 import org.apache.sandesha.IStorageManager;
33 import org.apache.sandesha.RMMessageContext;
34 import org.apache.sandesha.server.msgprocessors.IRMMessageProcessor;
35 import org.apache.sandesha.storage.Callback;
36 import org.apache.sandesha.storage.CallbackData;
37 import org.apache.sandesha.util.PolicyLoader;
38 import org.apache.sandesha.ws.rm.RMHeaders;
39
40 import javax.xml.rpc.ServiceException JavaDoc;
41 import javax.xml.soap.SOAPEnvelope JavaDoc;
42 import javax.xml.soap.SOAPException JavaDoc;
43
44 /**
45  * This is the worker for the Sender. Sender will start several workers depending on the
46  * Constants value SENDER_THREADS in the Constants file.
47  *
48  * @author Jaliya Ekanayake
49  * @author Chamikara Jayalath
50  */

51 public class SenderWorker implements Runnable JavaDoc {
52     private static final Log log = LogFactory.getLog(SenderWorker.class.getName());
53     public static final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
54     public static Callback callback;
55     public boolean running = true;
56     private IStorageManager storageManager;
57
58
59     public static synchronized Callback getCallback() {
60         return callback;
61     }
62
63     public static synchronized void setCallback(Callback cb) {
64         callback = cb;
65     }
66
67     private SimpleChain requestChain = null;
68     private SimpleChain responseChain = null;
69
70     public SimpleChain getRequestChain() {
71         return requestChain;
72     }
73
74     public void setRequestChain(SimpleChain requestChain) {
75         this.requestChain = requestChain;
76     }
77
78     public SimpleChain getResponseChain() {
79         return responseChain;
80     }
81
82     public void setResponseChain(SimpleChain responseChanin) {
83         this.responseChain = responseChanin;
84     }
85
86     public SenderWorker() {
87         storageManager = new ServerStorageManager();
88     }
89
90     public SenderWorker(IStorageManager storageManager) {
91         this.storageManager = storageManager;
92     }
93
94     public boolean isRunning() {
95         return running;
96     }
97
98     public void setRunning(boolean running) {
99         this.running = running;
100     }
101
102     public void run() {
103
104         while (running) {
105             long startTime = System.currentTimeMillis();
106             boolean hasMessages = true;
107             //Take a messge from the storage and check whether we can send it.
108
do {
109
110                 RMMessageContext rmMessageContext = storageManager.getNextMessageToSend();
111                 if (rmMessageContext == null) {
112                     hasMessages = false;
113                 } else {
114                     long inactivityTimeout = PolicyLoader.getInstance().getInactivityTimeout();
115                     long retransmissionInterval = PolicyLoader.getInstance()
116                             .getBaseRetransmissionInterval();
117
118                     if (rmMessageContext.getFristProcessedTime() == 0)
119                         rmMessageContext.setFristProcessedTime(System.currentTimeMillis());
120
121                     if ((System.currentTimeMillis() - rmMessageContext.getFristProcessedTime()) >
122                             inactivityTimeout) {
123                         log.error("Inactivity Time Out Reached for the message with <wsa:MessageID> " +
124                                 rmMessageContext.getMessageID());
125                         //Need to clear the storage only for this sequece.
126
// storageManager.clearStorage();
127

128                     } else if (rmMessageContext.getRetransmissionTime() <
129                             (System.currentTimeMillis() - rmMessageContext.getLastPrecessedTime())) {
130                         try {
131
132                             rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
133
134                             if (PolicyLoader.getInstance().getExponentialBackoff() != null) {
135                                 long newRtTime = ((long) Math.pow(retransmissionInterval / 1000,
136                                         rmMessageContext.getReTransmissionCount())) * 1000;
137                                 rmMessageContext.setRetransmissionTime(newRtTime);
138
139                             } else {
140                                 //Let's do Binary Back Off
141
long rtTime = rmMessageContext.getRetransmissionTime();
142                                 rmMessageContext.setRetransmissionTime(2 * rtTime);
143
144                             }
145                             sendMessage(rmMessageContext);
146                             rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
147
148                             rmMessageContext.setLocked(false);
149
150                         } catch (AxisFault e) {
151                             rmMessageContext.setLocked(false);
152                             log.error(e);
153                         } catch (SOAPException JavaDoc e) {
154                             rmMessageContext.setLocked(false);
155                             log.error(e);
156                         } catch (Exception JavaDoc e) {
157                             rmMessageContext.setLocked(false);
158                             log.error(e);
159                         }
160                     }
161                     rmMessageContext.setLocked(false);
162
163                 }
164             } while (hasMessages);
165
166             long timeGap = System.currentTimeMillis() - startTime;
167             if ((timeGap - Constants.SENDER_SLEEP_TIME) <= 0) {
168                 try {
169                     Thread.sleep(Constants.SENDER_SLEEP_TIME - timeGap);
170                 } catch (Exception JavaDoc ex) {
171                     log.error(ex);
172                 }
173             }
174         }
175     }
176
177     private void sendMessage(RMMessageContext rmMessageContext) throws Exception JavaDoc {
178         switch (rmMessageContext.getMessageType()) {
179             case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
180                 {
181                     if (log.isDebugEnabled())
182                         log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ);
183                     sendCreateSequenceRequest(rmMessageContext);
184                     break;
185                 }
186             case Constants.MSG_TYPE_CREATE_SEQUENCE_RESPONSE:
187                 {
188                     if (log.isDebugEnabled())
189                         log.debug(Constants.InfomationMessage.SENDING_CREATE_SEQ_RES);
190
191                     sendCreateSequenceResponse(rmMessageContext);
192                     break;
193                 }
194             case Constants.MSG_TYPE_TERMINATE_SEQUENCE:
195                 {
196                     if (log.isDebugEnabled())
197                         log.debug(Constants.InfomationMessage.SENDING_TERMINATE_SEQ);
198                     sendTerminateSequenceRequest(rmMessageContext);
199                     storageManager.setTerminateSend(storageManager.getKeyFromOutgoingSeqId(rmMessageContext.getSequenceID()));
200                     break;
201                 }
202             case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
203                 {
204                     if (log.isDebugEnabled())
205                         log.debug(Constants.InfomationMessage.SENDING_ACK);
206                     sendAcknowldgement(rmMessageContext);
207                     break;
208                 }
209             case Constants.MSG_TYPE_SERVICE_REQUEST:
210                 {
211                     if (log.isDebugEnabled())
212                         log.debug(Constants.InfomationMessage.SENDING_REQ);
213                     sendServiceRequest(rmMessageContext);
214                     break;
215                 }
216             case Constants.MSG_TYPE_SERVICE_RESPONSE:
217                 {
218                     if (log.isDebugEnabled())
219                         log.debug(Constants.InfomationMessage.SENDING_RES);
220                     sendServiceResponse(rmMessageContext);
221                     break;
222                 }
223         }
224     }
225
226
227     /**
228      * @param rmMessageContext
229      */

230     private void sendTerminateSequenceRequest(RMMessageContext rmMessageContext) throws Exception JavaDoc {
231         SOAPEnvelope JavaDoc terSeqEnv = EnvelopeCreator.createTerminatSeqMessage(rmMessageContext);
232
233         Message terSeqMsg = new Message(terSeqEnv);
234         rmMessageContext.getMsgContext().setRequestMessage(terSeqMsg);
235
236         Call call;
237         call = prepareCall(rmMessageContext);
238         call.invoke();
239
240         processResponseMessage(call, rmMessageContext);
241     }
242
243     private void sendServiceResponse(RMMessageContext rmMessageContext) throws Exception JavaDoc {
244         SOAPEnvelope JavaDoc responseEnvelope = null;
245         responseEnvelope = EnvelopeCreator.createServiceResponseEnvelope(rmMessageContext);
246
247
248         rmMessageContext.getMsgContext().setRequestMessage(new Message(responseEnvelope));
249         //rmMessageContext.getMsgContext().setResponseMessage(new Message(responseEnvelope));
250

251         Service service = new Service();
252         Call call = (Call) service.createCall();
253
254         if (rmMessageContext.getAddressingHeaders().getAction() != null) {
255             call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString());
256         }
257
258         call.setTargetEndpointAddress(rmMessageContext.getAddressingHeaders().getReplyTo().getAddress().toString());
259
260         //NOTE: WE USE THE REQUEST MESSAGE TO SEND THE RESPONSE.
261
String JavaDoc soapMsg = rmMessageContext.getMsgContext().getRequestMessage().getSOAPPartAsString();
262
263
264         if (soapMsg != null)
265             call.setRequestMessage(new Message(soapMsg));
266         else {
267             call.setRequestMessage(new Message(rmMessageContext.getMsgContext().getRequestMessage().getSOAPEnvelope()));
268         }
269
270         // rmMessageContext.setLastPrecessedTime(System.currentTimeMillis());
271
// rmMessageContext.setReTransmissionCount(rmMessageContext.getReTransmissionCount() + 1);
272
//We are not expecting the ack over the same connection
273
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
274                 rmMessageContext.getMsgNumber());
275         call.invoke();
276
277     }
278
279     private void sendCreateSequenceRequest(RMMessageContext rmMsgCtx) throws Exception JavaDoc {
280         Call call;
281
282         SOAPEnvelope JavaDoc reqEnvelope = EnvelopeCreator.createCreateSequenceEnvelope(rmMsgCtx);
283         rmMsgCtx.getMsgContext().setRequestMessage(new Message(reqEnvelope));
284
285         call = prepareCall(rmMsgCtx);
286         call.invoke();
287
288         processResponseMessage(call, rmMsgCtx);
289
290     }
291
292     private void sendCreateSequenceResponse(RMMessageContext rmMessageContext) throws Exception JavaDoc {
293         //Here there is no concept of sending synchronous CreateSequenceRequest
294
// response.
295
//i.e. we are not expecting any response for this.
296
if (rmMessageContext.getMsgContext().getResponseMessage() == null) {
297             //The code should not come to this point.
298
log.error(Constants.ErrorMessages.NULL_REQUEST_MSG);
299         } else {
300             Call call = prepareCall(rmMessageContext);
301             call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
302             call.invoke();
303         }
304     }
305
306     private void sendAcknowldgement(RMMessageContext rmMessageContext) throws Exception JavaDoc {
307         // Here there is no concept of sending synchronous CreateSequenceRequest
308
// resposne.
309
if (rmMessageContext.getMsgContext().getResponseMessage() == null) {
310             log.error(Constants.ErrorMessages.NULL_REQUEST_MSG);
311         } else {
312             Call call = prepareCall(rmMessageContext);
313             call.setRequestMessage(rmMessageContext.getMsgContext().getResponseMessage());
314             call.invoke();
315         }
316     }
317
318     private Call prepareCall(RMMessageContext rmMessageContext) throws ServiceException JavaDoc, AxisFault {
319         Service service = new Service();
320         Call call = (Call) service.createCall();
321         call.setTargetEndpointAddress(rmMessageContext.getOutGoingAddress());
322
323         call.setClientHandlers(requestChain, responseChain);
324         if (rmMessageContext.getMsgContext().getRequestMessage() != null) {
325             String JavaDoc soapMsg = rmMessageContext.getMsgContext().getRequestMessage()
326                     .getSOAPPartAsString();
327             call.setRequestMessage(new Message(soapMsg));
328             if (rmMessageContext.getAddressingHeaders().getAction() != null) {
329                 call.setSOAPActionURI(rmMessageContext.getAddressingHeaders().getAction().toString());
330             }
331         }
332         return call;
333     }
334
335     private void sendServiceRequest(RMMessageContext rmMessageContext) throws Exception JavaDoc {
336
337         SOAPEnvelope JavaDoc requestEnvelope = null;
338         //Need to create the response envelope.
339

340         requestEnvelope = EnvelopeCreator.createServiceRequestEnvelope(rmMessageContext);
341         rmMessageContext.getMsgContext().setRequestMessage(new Message(requestEnvelope));
342         if (rmMessageContext.getSync()) {
343             Call call;
344             call = prepareCall(rmMessageContext);
345             //CHECK THIS
346
storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
347                     rmMessageContext.getMsgNumber());
348             call.invoke();
349             processResponseMessage(call, rmMessageContext);
350
351         } else {
352             Call call = prepareCall(rmMessageContext);
353             storageManager.addSendMsgNo(rmMessageContext.getSequenceID(),
354                     rmMessageContext.getMsgNumber());
355             call.invoke();
356             processResponseMessage(call, rmMessageContext);
357
358         }
359     }
360
361     private void processResponseMessage(Call call, RMMessageContext rmMessageContext)
362             throws Exception JavaDoc {
363
364         if (call.getResponseMessage() != null) {
365             RMHeaders rmHeaders = new RMHeaders();
366             rmHeaders.fromSOAPEnvelope(call.getResponseMessage().getSOAPEnvelope());
367             rmMessageContext.setRMHeaders(rmHeaders);
368             AddressingHeaders addrHeaders = new AddressingHeaders(call.getResponseMessage().getSOAPEnvelope());
369             rmMessageContext.setAddressingHeaders(addrHeaders);
370             rmMessageContext.getMsgContext().setResponseMessage(call.getResponseMessage());
371             IRMMessageProcessor messagePrcessor = RMMessageProcessorIdentifier.getMessageProcessor(rmMessageContext, storageManager);
372             messagePrcessor.processMessage(rmMessageContext);
373         }
374
375         if (getCallback() != null) {
376             CallbackData data = new CallbackData();
377             data.setMessageId(rmMessageContext.getMessageID());
378             data.setMessageType(rmMessageContext.getMessageType());
379             data.setSequenceId(rmMessageContext.getSequenceID());
380             callback.onIncomingMessage(data);
381         }
382
383     }
384
385 }
386
Popular Tags