KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > sandesha > server > ServerStorageManager


1 /*
2  * Copyright 1999-2004 The Apache Software Foundation.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */

17 package org.apache.sandesha.server;
18
19 import org.apache.axis.components.logger.LogFactory;
20 import org.apache.commons.logging.Log;
21 import org.apache.sandesha.Constants;
22 import org.apache.sandesha.IStorageManager;
23 import org.apache.sandesha.RMMessageContext;
24 import org.apache.sandesha.storage.Callback;
25 import org.apache.sandesha.storage.dao.ISandeshaDAO;
26 import org.apache.sandesha.storage.dao.SandeshaDAOFactory;
27 import org.apache.sandesha.ws.rm.RMHeaders;
28
29 import java.util.HashMap JavaDoc;
30 import java.util.Iterator JavaDoc;
31 import java.util.Map JavaDoc;
32 import java.util.Set JavaDoc;
33
34 /**
35  * ServerStorageManager is the access point for the SandeshaQueue from server side.
36  *
37  * @author Chamikara Jayalath
38  * @author Jaliya Ekanayaka
39  */

40
41 public class ServerStorageManager implements IStorageManager {
42
43     public void setTerminateSend(String JavaDoc seqId) {
44
45     }
46
47     public void setTerminateReceived(String JavaDoc seqId) {
48
49     }
50
51     protected static Log log = LogFactory.getLog(ServerStorageManager.class.getName());
52     private ISandeshaDAO accessor;
53
54     public ServerStorageManager() {
55         accessor =
56                 SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR,
57                         Constants.SERVER);
58     }
59
60
61     /**
62      * A very important method. Makes life easy for the thread or thread pool
63      * that is using this. Every thread just have to create an instance of
64      * ServerStorageManager and keep calling getNextMessageToProcess() and
65      * processing messages. The method will try to give the messages from the
66      * same sequence id. But if that doesnt hv processable messages it will go for
67      * a new sequence.
68      */

69     public RMMessageContext getNextMessageToProcess(Object JavaDoc seq) {
70
71         if (seq == null)
72             return null;
73
74         RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(seq);
75         return nextMsg;
76     }
77
78     public void setAcknowledged(String JavaDoc seqID, long msgNumber) {
79         accessor.markOutgoingMessageToDelete(seqID, new Long JavaDoc(msgNumber));
80     }
81
82     public void init() {
83     }
84
85     /**
86      * Used to find out weather the sequence with this id has already been
87      * created.
88      */

89     public boolean isSequenceExist(String JavaDoc sequenceID) {
90         return accessor.isIncomingSequenceExists(sequenceID);
91     }
92
93     public boolean isResponseSequenceExist(String JavaDoc sequenceID) {
94         return accessor.isOutgoingSequenceExists(sequenceID);
95     }
96
97     public Object JavaDoc getNextSeqToProcess() {
98         return accessor.getRandomSeqToProcess();
99     }
100
101
102     /**
103      * This is used to get a random message from the out queue Basically server
104      * sender will use this.
105      */

106     public synchronized RMMessageContext getNextMessageToSend() {
107         RMMessageContext msg;
108         msg = accessor.getNextPriorityMessageContextToSend();
109         if (msg == null)
110             msg = accessor.getNextOutgoingMsgContextToSend();
111         if (msg == null)
112             msg = accessor.getNextLowPriorityMessageContextToSend();
113
114         if (msg != null && !msg.isLocked()) {
115             msg.setLocked(true);
116             return msg;
117         } else {
118             return null;
119         }
120
121     }
122
123     /**
124      * Will be used to add a new Sequence Hash to the In Queue.
125      */

126     public void addSequence(String JavaDoc sequenceId) {
127         boolean result = accessor.addIncomingSequence(sequenceId);
128         if (!result)
129             ServerStorageManager.log.error(Constants.ErrorMessages.SEQ_IS_NOT_CREATED);
130     }
131
132     /**
133      * This gives a sorted(by keys) map of messageIds present for a sequence.
134      * This will be used to send Acks.
135      */

136     public Map JavaDoc getListOfMessageNumbers(String JavaDoc sequenceID) {
137         Set JavaDoc st = accessor.getAllReceivedMsgNumsOfIncomingSeq(sequenceID);
138         Iterator JavaDoc it = st.iterator();
139         //To find the largest id present
140
long largest = 0;
141         while (it.hasNext()) {
142             Long JavaDoc key = (Long JavaDoc) it.next();
143             if (key == null)
144                 continue;
145
146             long l = key.longValue();
147             if (l > largest)
148                 largest = l;
149         }
150
151         HashMap JavaDoc results = new HashMap JavaDoc();
152         //Add Keys to the results in order.
153
long currentPosition = 1;
154         for (long l = 1; l <= largest; l++) {
155             boolean present = st.contains(new Long JavaDoc(l));
156             if (present) {
157                 results.put(new Long JavaDoc(currentPosition), new Long JavaDoc(l));
158                 currentPosition++;
159             }
160         }
161         return results;
162     }
163
164     public boolean isMessageExist(String JavaDoc sequenceID, long messageNumber) {
165         synchronized (accessor) {
166             return accessor.isIncomingMessageExists(sequenceID, new Long JavaDoc(messageNumber));
167         }
168     }
169
170
171     public void addCreateSequenceResponse(RMMessageContext rmMessageContext) {
172         addPriorityMessage(rmMessageContext);
173     }
174
175     public void addCreateSequenceRequest(RMMessageContext rmMessageContext) {
176         addPriorityMessage(rmMessageContext);
177     }
178
179     public void addAcknowledgement(RMMessageContext rmMessageContext) {
180         String JavaDoc sequenceID = rmMessageContext.getSequenceID();
181         if (sequenceID != null)
182             accessor.removeAllAcks(sequenceID);
183         addPriorityMessage(rmMessageContext);
184     }
185
186     private void addPriorityMessage(RMMessageContext msg) {
187         accessor.addPriorityMessage(msg);
188     }
189
190     public void setTemporaryOutSequence(String JavaDoc sequenceId, String JavaDoc outSequenceId) {
191         accessor.setOutSequence(sequenceId, outSequenceId);
192         accessor.setOutSequenceApproved(sequenceId, false);
193     }
194
195     public boolean setApprovedOutSequence(String JavaDoc createSeqId, String JavaDoc newOutSequenceId) {
196
197         String JavaDoc tempOutSeq = createSeqId;
198         if (tempOutSeq == null)
199             tempOutSeq = createSeqId;
200         String JavaDoc sequenceID = accessor.getSequenceOfOutSequence(tempOutSeq);
201
202         if (sequenceID == null) {
203             ServerStorageManager.log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ);
204             return false;
205         }
206         accessor.setOutSequence(sequenceID, newOutSequenceId);
207         accessor.setOutSequenceApproved(sequenceID, true);
208         accessor.removeCreateSequenceMsg(tempOutSeq);
209         return true;
210     }
211
212     public long getNextMessageNumber(String JavaDoc sequenceID) {
213         long l = accessor.getNextOutgoingMessageNumber(sequenceID);
214         return l;
215     }
216
217     public void insertOutgoingMessage(RMMessageContext msg) {
218         String JavaDoc sequenceId = msg.getSequenceID();
219
220         boolean exists = accessor.isOutgoingSequenceExists(sequenceId);
221         if (!exists)
222             accessor.addOutgoingSequence(sequenceId);
223         accessor.addMessageToOutgoingSequence(sequenceId, msg);
224
225     }
226
227     public void insertIncomingMessage(RMMessageContext rmMessageContext) {
228         RMHeaders rmHeaders = rmMessageContext.getRMHeaders();
229         String JavaDoc sequenceId = rmHeaders.getSequence().getIdentifier().getIdentifier();
230         boolean exists = accessor.isIncomingSequenceExists(sequenceId);
231         if (!exists)
232             addSequence(sequenceId); //Creating new sequence
233

234         //TODO: add getRmHeaders method to MessageContext
235
long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber();
236
237         if (messageNumber <= 0)
238             return;
239
240         Long JavaDoc msgNo = new Long JavaDoc(messageNumber);
241         accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext);
242         accessor.updateFinalMessageArrivedTime(sequenceId);
243
244     }
245
246     public RMMessageContext checkForResponseMessage(String JavaDoc sequenceId, String JavaDoc requestMsgId) {
247         return null;
248     }
249
250     public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) {
251         accessor.addLowPriorityMessage(terminateSeqMessage);
252     }
253
254     public void setAckReceived(String JavaDoc seqId, long msgNo) {
255         accessor.setAckReceived(seqId, msgNo);
256
257     }
258
259     public void insertFault(RMMessageContext rmMsgCtx) {
260     }
261
262
263     public void addSendMsgNo(String JavaDoc seqId, long msgNo) {
264         accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo);
265     }
266
267     public boolean isSentMsg(String JavaDoc seqId, long msgNo) {
268         return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo);
269     }
270
271
272     public void addOutgoingSequence(String JavaDoc sequenceId) {
273         accessor.addOutgoingSequence(sequenceId);
274     }
275
276     public void addIncomingSequence(String JavaDoc sequenceId) {
277         accessor.addIncomingSequence(sequenceId);
278     }
279
280     public String JavaDoc getOutgoingSeqOfMsg(String JavaDoc msgId) {
281         return null;
282     }
283
284     public void addRequestedSequence(String JavaDoc seqId) {
285         accessor.addRequestedSequence(seqId);
286     }
287
288     public boolean isRequestedSeqPresent(String JavaDoc seqId) {
289         return accessor.isRequestedSeqPresent(seqId);
290     }
291
292     public String JavaDoc getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) {
293
294         return msg.getSequenceID();
295     }
296
297     public long getLastIncomingMsgNo(String JavaDoc seqId) {
298         return accessor.getLastIncomingMsgNo(seqId);
299     }
300
301     public boolean hasLastIncomingMsgReceived(String JavaDoc seqId) {
302         return accessor.hasLastIncomingMsgReceived(seqId);
303     }
304
305     public String JavaDoc getKeyFromOutgoingSeqId(String JavaDoc seqId) {
306         return null;
307     }
308
309     public void setAcksTo(String JavaDoc seqId, String JavaDoc acksTo) {
310         accessor.setAcksTo(seqId, acksTo);
311     }
312
313     public String JavaDoc getAcksTo(String JavaDoc seqId) {
314         return accessor.getAcksTo(seqId);
315     }
316
317     public void setCallback(Callback cb) {
318     }
319
320     public void removeCallback() {
321     }
322
323     public void addOffer(String JavaDoc msgID, String JavaDoc offerID) {
324
325     }
326
327     public String JavaDoc getOffer(String JavaDoc msgID) {
328         return null;
329     }
330
331     public void clearStorage() {
332         accessor.clear();
333     }
334
335     public boolean isSequenceComplete(String JavaDoc seqId) {
336         return false;
337     }
338
339     public void sendAck(String JavaDoc sequenceId) {
340         accessor.sendAck(sequenceId);
341     }
342 }
Popular Tags