KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > sandesha > client > RMSender


1 /*
2 * Copyright 1999-2004 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
6 * the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
15 *
16 */

17 package org.apache.sandesha.client;
18
19 import org.apache.axis.AxisFault;
20 import org.apache.axis.Message;
21 import org.apache.axis.MessageContext;
22 import org.apache.axis.client.Call;
23 import org.apache.axis.components.logger.LogFactory;
24 import org.apache.axis.components.uuid.UUIDGen;
25 import org.apache.axis.components.uuid.UUIDGenFactory;
26 import org.apache.axis.handlers.BasicHandler;
27 import org.apache.axis.message.addressing.AddressingHeaders;
28 import org.apache.commons.logging.Log;
29 import org.apache.sandesha.Constants;
30 import org.apache.sandesha.IStorageManager;
31 import org.apache.sandesha.RMMessageContext;
32 import org.apache.sandesha.RMReport;
33 import org.apache.sandesha.util.PolicyLoader;
34 import org.apache.sandesha.util.RMMessageCreator;
35 import org.apache.sandesha.ws.rm.RMHeaders;
36
37 /**
38  * In the client side of axis there is a flexibility of using custom sender to send SOAP messages.
39  * However axis's use of senders are mainly to handle transport related funtionalites.
40  * <code>RMSender</code>has to be used by the users who wish to use WS-ReliableMessaging capability
41  * in their clients.<P>
42  * The main funtionality of <code>RMSender</code> is to insert the messages coming from client and
43  * also the generated messages to the <code>SandeshaQueue</code>.
44  * If the message coming in from the client is request/response in nature then <code>RMSender</code>
45  * will wait polling <code>SandeshaQueue</code> till it gets an appropriate response.
46  * Due to the above reason, if the client is sending several messages
47  * (of request/response in nature) to be sent reliably, they will be sent reliably by
48  * <b>Sandesha</b> however the client will wait at each message till it gets the response, to send
49  * the next. To avoid this client can use callbacks provided by axis
50  * in <code>org.apache.axis.client.async</code> package.
51  */

52 public class RMSender extends BasicHandler {
53
54     private IStorageManager storageManager;
55     private static final Log log = LogFactory.getLog(RMSender.class.getName());
56     private final UUIDGen uuidGen = UUIDGenFactory.getUUIDGen();
57     private static Boolean JavaDoc lock = new Boolean JavaDoc(false);
58
59     /**
60      * This is the main method that is invoked by the axis engine. This method will add the reqest
61      * messages from the client to <code>SandeshaQueue</code> with the generated messages such as
62      * Create Sequence message and Terminate Sequence message.
63      *
64      * @param msgContext
65      * @throws AxisFault
66      */

67
68     public void invoke(MessageContext msgContext) throws AxisFault {
69
70         storageManager = new ClientStorageManager();
71
72         try {
73             RMMessageContext reqMsgCtx = null;
74             String JavaDoc tempSeqID = null;
75
76             reqMsgCtx = getRMMessageContext(msgContext);
77
78             tempSeqID = reqMsgCtx.getSequenceID();
79
80             reqMsgCtx = processRequestMessage(reqMsgCtx, reqMsgCtx.getSync());
81
82             if (reqMsgCtx.isHasResponse()) {
83                 RMMessageContext responseMessageContext = null;
84                 long startingTime = System.currentTimeMillis();
85                 long inactivityTimeOut = PolicyLoader.getInstance().getInactivityTimeout();
86
87                 while (responseMessageContext == null) {
88                     synchronized (lock) {
89                         responseMessageContext =
90                                 checkTheQueueForResponse(tempSeqID, reqMsgCtx.getMessageID());
91                         if ((System.currentTimeMillis() - startingTime) >= inactivityTimeOut) {
92                             reqMsgCtx.getCtx().stopClientByForce();
93                         }
94                         Thread.sleep(Constants.CLIENT_RESPONSE_CHECKING_INTERVAL);
95                     }
96                 }
97
98                 //setting RMReport;
99
if (responseMessageContext != null) {
100                     String JavaDoc oldSeqId = reqMsgCtx.getOldSequenceID();
101                     if (oldSeqId != null) {
102                         Call call = (Call) reqMsgCtx.getCtx().getCallMap().get(reqMsgCtx.getOldSequenceID());
103
104                         if (call != null) {
105                             RMReport report = (RMReport) call.getProperty(Constants.ClientProperties.REPORT);
106                             report.incrementReturnedMsgCount();
107                         }
108                     }
109                 }
110
111                 //We need these steps to filter all addressing and rm related headers.
112
Message resMsg = responseMessageContext.getMsgContext().getRequestMessage();
113                 RMHeaders.removeHeaders(resMsg.getSOAPEnvelope());
114                 AddressingHeaders addHeaders = new AddressingHeaders(resMsg.getSOAPEnvelope(),
115                         null, true, false, false, null);
116
117                 msgContext.setResponseMessage(resMsg);
118             } else {
119                 msgContext.setResponseMessage(null);
120             }
121
122         } catch (Exception JavaDoc ex) {
123             log.error(ex);
124
125             throw new AxisFault(ex.getLocalizedMessage());
126
127         }
128     }
129
130     /**
131      * This method will process the first request message.
132      *
133      * @param reqRMMsgContext
134      * @param sync
135      * @return
136      * @throws Exception
137      */

138     private RMMessageContext processRequestMessage(RMMessageContext reqRMMsgContext,
139                                                    boolean sync) throws Exception JavaDoc {
140         synchronized (lock) {
141
142             if (!storageManager.isSequenceExist(reqRMMsgContext.getSequenceID())) {
143                 String JavaDoc msgID = Constants.UUID + uuidGen.nextUUID();
144                 String JavaDoc offerID = null;
145                 if (reqRMMsgContext.isHasResponse() && reqRMMsgContext.isSendOffer()) {
146                     offerID = Constants.UUID + uuidGen.nextUUID();
147                     storageManager.addRequestedSequence(offerID);
148                     storageManager.addOffer(msgID, offerID);
149                 }
150
151                 RMMessageContext createSeqRMMsgContext = RMMessageCreator.createCreateSeqMsg(reqRMMsgContext, Constants.CLIENT, msgID, offerID);
152                 storageManager.addOutgoingSequence(reqRMMsgContext.getSequenceID());
153                 storageManager.setTemporaryOutSequence(reqRMMsgContext.getSequenceID(),
154                         createSeqRMMsgContext.getMessageID());
155
156                 createSeqRMMsgContext.setSync(sync);
157                 storageManager.addCreateSequenceRequest(createSeqRMMsgContext);
158                 processMessage(reqRMMsgContext);
159
160             } else {
161                 processMessage(reqRMMsgContext);
162             }
163
164         }
165
166
167         return reqRMMsgContext;
168     }
169
170     private RMMessageContext processMessage(RMMessageContext reqRMMsgContext)
171             throws Exception JavaDoc {
172         if (reqRMMsgContext.isLastMessage()) {
173             storageManager.insertTerminateSeqMessage(RMMessageCreator.createTerminateSeqMsg(reqRMMsgContext, Constants.CLIENT));
174         }
175         RMMessageContext serviceRequestMsg = RMMessageCreator.createServiceRequestMessage(reqRMMsgContext);
176         storageManager.insertOutgoingMessage(serviceRequestMsg);
177         return reqRMMsgContext;
178     }
179
180     private RMMessageContext checkTheQueueForResponse(String JavaDoc sequenceId, String JavaDoc reqMessageID) {
181         return storageManager.checkForResponseMessage(sequenceId, reqMessageID);
182     }
183
184     private RMMessageContext getRMMessageContext(MessageContext msgCtx) throws Exception JavaDoc {
185         //Get a copy of the MessageContext. This is required when sending multiple messages from
186
//one call object
187
MessageContext newMsgContext = RMMessageCreator.cloneMsgContext(msgCtx);
188         RMMessageContext requestMesssageContext = new RMMessageContext();
189         Call call = (Call) newMsgContext.getProperty(MessageContext.CALL);
190
191         requestMesssageContext = ClientPropertyValidator.validate(call);
192         requestMesssageContext.setOutGoingAddress((String JavaDoc) msgCtx.getProperty(MessageContext.TRANS_URL));
193         requestMesssageContext.setMsgContext(newMsgContext);
194         return requestMesssageContext;
195     }
196
197
198 }
199
200
201
202
Popular Tags