KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > sandesha > client > ClientStorageManager


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */

17 package org.apache.sandesha.client;
18
19 import org.apache.axis.components.logger.LogFactory;
20 import org.apache.axis.message.addressing.RelatesTo;
21 import org.apache.commons.logging.Log;
22 import org.apache.sandesha.Constants;
23 import org.apache.sandesha.IStorageManager;
24 import org.apache.sandesha.RMMessageContext;
25 import org.apache.sandesha.storage.Callback;
26 import org.apache.sandesha.storage.CallbackData;
27 import org.apache.sandesha.storage.dao.ISandeshaDAO;
28 import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
29 import org.apache.sandesha.ws.rm.RMHeaders;
30
31 import java.util.HashMap JavaDoc;
32 import java.util.Iterator JavaDoc;
33 import java.util.Map JavaDoc;
34 import java.util.Set JavaDoc;
35
36 /**
37  * This is the storage manager for Client side in Sandesha
38  * Provides the access points for the SandeshaQueue.
39  *
40  * @author Chamikara Jayalath
41  * @author Jaliya Ekanayake
42  */

43 public class ClientStorageManager implements IStorageManager {
44
45     protected static Log log = LogFactory.getLog(ClientStorageManager.class.getName());
46
47     private ISandeshaDAO accessor;
48     private static Callback callBack;
49
50     public void init() {
51     }
52
53     public ClientStorageManager() {
54         accessor = SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR,
55                 Constants.CLIENT);
56     }
57
58     public boolean isSequenceExist(String JavaDoc sequenceID) {
59         return accessor.isOutgoingSequenceExists(sequenceID);
60     }
61
62     public boolean isResponseSequenceExist(String JavaDoc sequenceID) {
63         return accessor.isIncomingSequenceExists(sequenceID);
64     }
65
66     public Object JavaDoc getNextSeqToProcess() {
67         return null;
68     }
69
70     public RMMessageContext getNextMessageToProcess(Object JavaDoc seq) {
71         return null;
72     }
73
74     public void setAcknowledged(String JavaDoc seqID, long msgNumber) {
75         accessor.markOutgoingMessageToDelete(seqID, new Long JavaDoc(msgNumber));
76
77     }
78
79     public void addSequence(String JavaDoc sequenceID) {
80         boolean result = accessor.addOutgoingSequence(sequenceID);
81         if (!result)
82             log.error("Sequence was not created correctly in the in the queue");
83     }
84
85     /**
86      * This will be used both by the Sender and the SimpleAxisServer to set the
87      * create sequence responses.
88      */

89     public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
90         addPriorityMessage(rmMessageContext);
91     }
92
93     /**
94      * This will be used by the RMSender to add the create sequence request.
95      */

96     public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
97         addPriorityMessage(rmMessageContext);
98     }
99
100     /**
101      * SimpleAxisServer will use this method to add acks for the application
102      * responses received from the server side.
103      */

104     public void addAcknowledgement(RMMessageContext rmMessageContext) {
105         String JavaDoc sequenceID = rmMessageContext.getSequenceID();
106         if (sequenceID != null)
107             accessor.removeAllAcks(sequenceID);
108
109         addPriorityMessage(rmMessageContext);
110     }
111
112     //private method
113
private void addPriorityMessage(RMMessageContext msg) {
114         accessor.addPriorityMessage(msg);
115     }
116
117     /**
118      * Check the existance of a message.
119      */

120     public boolean isMessageExist(String JavaDoc sequenceID, long messageNumber) {
121         return accessor.isIncomingMessageExists(sequenceID, new Long JavaDoc(messageNumber));
122     }
123
124     /**
125      * Get a Map of messages.
126      */

127     public Map JavaDoc getListOfMessageNumbers(String JavaDoc sequenceID) {
128         String JavaDoc seq = sequenceID;
129         Set JavaDoc st = accessor.getAllReceivedMsgNumsOfIncomingSeq(seq);
130         Iterator JavaDoc it = st.iterator();
131         //To find the largest id present
132
long largest = 0;
133         while (it.hasNext()) {
134             Long JavaDoc key = (Long JavaDoc) it.next();
135             if (null == key)
136                 continue;
137
138             long l = key.longValue();
139             if (l > largest)
140                 largest = l;
141         }
142
143         HashMap JavaDoc results = new HashMap JavaDoc();
144         //Add Keys to the results in order.
145
long currentPosition = 1;
146         for (long l = 1; l <= largest; l++) {
147             boolean present = st.contains(new Long JavaDoc(l));
148             if (present) {
149                 results.put(new Long JavaDoc(currentPosition), new Long JavaDoc(l));
150                 currentPosition++;
151             }
152         }
153         return results;
154     }
155
156     /**
157      * This will be used by the sender.
158      */

159     public synchronized RMMessageContext getNextMessageToSend() {
160         RMMessageContext msg;
161         msg = accessor.getNextPriorityMessageContextToSend();
162         if (msg == null)
163             msg = accessor.getNextOutgoingMsgContextToSend();
164
165         if (null == msg) {
166             msg = accessor.getNextLowPriorityMessageContextToSend();
167
168             // checks whether all the request messages have been acked
169
}
170         if (null != callBack && null != msg)
171             informOutgoingMessage(msg);
172
173         if (msg != null && !msg.isLocked()) {
174             msg.setLocked(true);
175             return msg;
176         } else {
177             return null;
178         }
179     }
180
181     /**
182      * This will be used by the RMSender when adding messages to the Queue.
183      * RMSender will also add a createSequenceRequest message to the prioriy
184      * queue using this temporary ID as the messageID.
185      */

186     public void setTemporaryOutSequence(String JavaDoc sequenceId, String JavaDoc outSequenceId) {
187         synchronized (this) {
188             accessor.setOutSequence(sequenceId, outSequenceId);
189             accessor.setOutSequenceApproved(sequenceId, false);
190         }
191     }
192
193     /**
194      * This will be used by the Client Listener and the Sender to set the
195      * proper sequenceID
196      */

197     public boolean setApprovedOutSequence(String JavaDoc oldSeqId, String JavaDoc newSeqId) {
198         if (oldSeqId == null) {
199             return false;
200         }
201         String JavaDoc sequenceID = accessor.getSequenceOfOutSequence(oldSeqId);
202         if (null == sequenceID) {
203             log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ);
204             return false;
205         }
206         accessor.setOutSequence(sequenceID, newSeqId);
207         accessor.setOutSequenceApproved(sequenceID, true);
208         accessor.removeCreateSequenceMsg(oldSeqId);
209         return true;
210
211     }
212
213     /**
214      * This will be used by the RMSender when adding messages. Initially it
215      * should return 1.
216      */

217     public long getNextMessageNumber(String JavaDoc sequenceID) {
218         long msgNo = accessor.getNextOutgoingMessageNumber(sequenceID);
219         return msgNo;
220     }
221
222     public void insertOutgoingMessage(RMMessageContext msg) {
223         String JavaDoc sequenceId = msg.getSequenceID();
224         accessor.addMessageToOutgoingSequence(sequenceId, msg);
225     }
226
227     public void insertIncomingMessage(RMMessageContext rmMessageContext) {
228         RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
229         RelatesTo relatesTo = (RelatesTo) rmMessageContext.getAddressingHeaders().getRelatesTo()
230                 .get(0);
231         String JavaDoc messageId = relatesTo.getURI().toString();
232         String JavaDoc sequenceId = null;
233
234         sequenceId = accessor.searchForSequenceId(messageId);
235
236         boolean exists = accessor.isIncomingSequenceExists(sequenceId);
237
238         if (!exists) {
239             accessor.addIncomingSequence(sequenceId);
240         }
241
242         long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber();
243         if (messageNumber <= 0)
244             return;
245         Long JavaDoc msgNo = new Long JavaDoc(messageNumber);
246         accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext);
247         accessor.updateFinalMessageArrivedTime(sequenceId);
248     }
249
250     public RMMessageContext checkForResponseMessage(String JavaDoc sequenceId, String JavaDoc requestMsgId) {
251         RMMessageContext response = accessor.checkForResponseMessage(requestMsgId, sequenceId);
252         return response;
253
254     }
255
256      public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) {
257         accessor.addLowPriorityMessage(terminateSeqMessage);
258     }
259
260     public void setAckReceived(String JavaDoc seqId, long msgNo) {
261         accessor.setAckReceived(seqId, msgNo);
262     }
263
264     public void insertFault(RMMessageContext rmMsgCtx) {
265
266     }
267
268     public void addSendMsgNo(String JavaDoc seqId, long msgNo) {
269         accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo);
270     }
271
272     public void addOutgoingSequence(String JavaDoc sequenceId) {
273         accessor.addOutgoingSequence(sequenceId);
274     }
275
276     public void addIncomingSequence(String JavaDoc sequenceId) {
277         accessor.addIncomingSequence(sequenceId);
278     }
279
280     public long getLastIncomingMsgNo(String JavaDoc seqId) {
281         String JavaDoc key = accessor.getKeyFromIncomingSequenceId(seqId);
282         return accessor.getLastIncomingMsgNo(key);
283     }
284
285     public boolean hasLastIncomingMsgReceived(String JavaDoc seqId) {
286         String JavaDoc key = accessor.getKeyFromIncomingSequenceId(seqId);
287         return accessor.hasLastIncomingMsgReceived(key);
288     }
289
290     public void addRequestedSequence(String JavaDoc seqId) {
291         accessor.addRequestedSequence(seqId);
292     }
293
294     public boolean isRequestedSeqPresent(String JavaDoc seqId) {
295         return accessor.isRequestedSeqPresent(seqId);
296     }
297
298     public boolean isSentMsg(String JavaDoc seqId, long msgNo) {
299         return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo);
300     }
301
302     public String JavaDoc getOutgoingSeqOfMsg(String JavaDoc msgId) {
303         return accessor.searchForSequenceId(msgId);
304     }
305
306     public String JavaDoc getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) {
307         //String msgId = msg.getMessageID();
308
RelatesTo relatesTo = (RelatesTo) msg.getAddressingHeaders().getRelatesTo().get(0);
309         String JavaDoc msgId = relatesTo.getURI().toString();
310         return accessor.searchForSequenceId(msgId);
311     }
312
313     public void setTerminateSend(String JavaDoc seqId) {
314         accessor.setTerminateSend(seqId);
315     }
316
317     public void setTerminateReceived(String JavaDoc seqId) {
318         accessor.setTerminateReceived(seqId);
319     }
320
321     public String JavaDoc getKeyFromOutgoingSeqId(String JavaDoc seqId) {
322         return accessor.getKeyFromOutgoingSequenceId(seqId);
323     }
324
325     public void setAcksTo(String JavaDoc seqId, String JavaDoc acksTo) {
326         accessor.setAcksTo(seqId, acksTo);
327     }
328
329     public String JavaDoc getAcksTo(String JavaDoc seqId) {
330         return accessor.getAcksTo(seqId);
331     }
332
333     public void addOffer(String JavaDoc msgID, String JavaDoc offerID) {
334         accessor.addOffer(msgID, offerID);
335     }
336
337     public String JavaDoc getOffer(String JavaDoc msgID) {
338         return accessor.getOffer(msgID);
339     }
340
341     public void setCallback(Callback cb) {
342         callBack = cb;
343     }
344
345     public void removeCallback() {
346         callBack = null;
347     }
348
349     private void informOutgoingMessage(RMMessageContext rmMsgContext) {
350
351         CallbackData cbData = new CallbackData();
352
353         // setting callback data;
354
if (null != rmMsgContext) {
355             cbData.setSequenceId(rmMsgContext.getSequenceID());
356             cbData.setMessageId(rmMsgContext.getMessageID());
357             cbData.setMessageType(rmMsgContext.getMessageType());
358         }
359
360         if (null != callBack)
361             callBack.onOutgoingMessage(cbData);
362     }
363
364     public void clearStorage() {
365         accessor.clear();
366     }
367
368     public boolean isSequenceComplete(String JavaDoc seqId) {
369         boolean outTerminateSent = accessor.isOutgoingTerminateSent(seqId);
370         boolean incomingTerminateReceived = accessor.isIncommingTerminateReceived(seqId);
371         return outTerminateSent && incomingTerminateReceived;
372     }
373
374     public void sendAck(String JavaDoc sequenceId) {
375         String JavaDoc keyId = accessor.getKeyFromIncomingSequenceId(sequenceId);
376         accessor.sendAck(keyId);
377     }
378
379
380 }
Popular Tags