1 17 package org.apache.sandesha.server; 18 19 import org.apache.axis.components.logger.LogFactory; 20 import org.apache.commons.logging.Log; 21 import org.apache.sandesha.Constants; 22 import org.apache.sandesha.IStorageManager; 23 import org.apache.sandesha.RMMessageContext; 24 import org.apache.sandesha.storage.Callback; 25 import org.apache.sandesha.storage.dao.ISandeshaDAO; 26 import org.apache.sandesha.storage.dao.SandeshaDAOFactory; 27 import org.apache.sandesha.ws.rm.RMHeaders; 28 29 import java.util.HashMap ; 30 import java.util.Iterator ; 31 import java.util.Map ; 32 import java.util.Set ; 33 34 40 41 public class ServerStorageManager implements IStorageManager { 42 43 public void setTerminateSend(String seqId) { 44 45 } 46 47 public void setTerminateReceived(String seqId) { 48 49 } 50 51 protected static Log log = LogFactory.getLog(ServerStorageManager.class.getName()); 52 private ISandeshaDAO accessor; 53 54 public ServerStorageManager() { 55 accessor = 56 SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR, 57 Constants.SERVER); 58 } 59 60 61 69 public RMMessageContext getNextMessageToProcess(Object seq) { 70 71 if (seq == null) 72 return null; 73 74 RMMessageContext nextMsg = accessor.getNextMsgContextToProcess(seq); 75 return nextMsg; 76 } 77 78 public void setAcknowledged(String seqID, long msgNumber) { 79 accessor.markOutgoingMessageToDelete(seqID, new Long (msgNumber)); 80 } 81 82 public void init() { 83 } 84 85 89 public boolean isSequenceExist(String sequenceID) { 90 return accessor.isIncomingSequenceExists(sequenceID); 91 } 92 93 public boolean isResponseSequenceExist(String sequenceID) { 94 return accessor.isOutgoingSequenceExists(sequenceID); 95 } 96 97 public Object getNextSeqToProcess() { 98 return accessor.getRandomSeqToProcess(); 99 } 100 101 102 106 public synchronized RMMessageContext getNextMessageToSend() { 107 RMMessageContext msg; 108 msg = accessor.getNextPriorityMessageContextToSend(); 109 if (msg == null) 110 msg = accessor.getNextOutgoingMsgContextToSend(); 111 if (msg == null) 112 msg = accessor.getNextLowPriorityMessageContextToSend(); 113 114 if (msg != null && !msg.isLocked()) { 115 msg.setLocked(true); 116 return msg; 117 } else { 118 return null; 119 } 120 121 } 122 123 126 public void addSequence(String sequenceId) { 127 boolean result = accessor.addIncomingSequence(sequenceId); 128 if (!result) 129 ServerStorageManager.log.error(Constants.ErrorMessages.SEQ_IS_NOT_CREATED); 130 } 131 132 136 public Map getListOfMessageNumbers(String sequenceID) { 137 Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(sequenceID); 138 Iterator it = st.iterator(); 139 long largest = 0; 141 while (it.hasNext()) { 142 Long key = (Long ) it.next(); 143 if (key == null) 144 continue; 145 146 long l = key.longValue(); 147 if (l > largest) 148 largest = l; 149 } 150 151 HashMap results = new HashMap (); 152 long currentPosition = 1; 154 for (long l = 1; l <= largest; l++) { 155 boolean present = st.contains(new Long (l)); 156 if (present) { 157 results.put(new Long (currentPosition), new Long (l)); 158 currentPosition++; 159 } 160 } 161 return results; 162 } 163 164 public boolean isMessageExist(String sequenceID, long messageNumber) { 165 synchronized (accessor) { 166 return accessor.isIncomingMessageExists(sequenceID, new Long (messageNumber)); 167 } 168 } 169 170 171 public void addCreateSequenceResponse(RMMessageContext rmMessageContext) { 172 addPriorityMessage(rmMessageContext); 173 } 174 175 public void addCreateSequenceRequest(RMMessageContext rmMessageContext) { 176 addPriorityMessage(rmMessageContext); 177 } 178 179 public void addAcknowledgement(RMMessageContext rmMessageContext) { 180 String sequenceID = rmMessageContext.getSequenceID(); 181 if (sequenceID != null) 182 accessor.removeAllAcks(sequenceID); 183 addPriorityMessage(rmMessageContext); 184 } 185 186 private void addPriorityMessage(RMMessageContext msg) { 187 accessor.addPriorityMessage(msg); 188 } 189 190 public void setTemporaryOutSequence(String sequenceId, String outSequenceId) { 191 accessor.setOutSequence(sequenceId, outSequenceId); 192 accessor.setOutSequenceApproved(sequenceId, false); 193 } 194 195 public boolean setApprovedOutSequence(String createSeqId, String newOutSequenceId) { 196 197 String tempOutSeq = createSeqId; 198 if (tempOutSeq == null) 199 tempOutSeq = createSeqId; 200 String sequenceID = accessor.getSequenceOfOutSequence(tempOutSeq); 201 202 if (sequenceID == null) { 203 ServerStorageManager.log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ); 204 return false; 205 } 206 accessor.setOutSequence(sequenceID, newOutSequenceId); 207 accessor.setOutSequenceApproved(sequenceID, true); 208 accessor.removeCreateSequenceMsg(tempOutSeq); 209 return true; 210 } 211 212 public long getNextMessageNumber(String sequenceID) { 213 long l = accessor.getNextOutgoingMessageNumber(sequenceID); 214 return l; 215 } 216 217 public void insertOutgoingMessage(RMMessageContext msg) { 218 String sequenceId = msg.getSequenceID(); 219 220 boolean exists = accessor.isOutgoingSequenceExists(sequenceId); 221 if (!exists) 222 accessor.addOutgoingSequence(sequenceId); 223 accessor.addMessageToOutgoingSequence(sequenceId, msg); 224 225 } 226 227 public void insertIncomingMessage(RMMessageContext rmMessageContext) { 228 RMHeaders rmHeaders = rmMessageContext.getRMHeaders(); 229 String sequenceId = rmHeaders.getSequence().getIdentifier().getIdentifier(); 230 boolean exists = accessor.isIncomingSequenceExists(sequenceId); 231 if (!exists) 232 addSequence(sequenceId); 234 long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber(); 236 237 if (messageNumber <= 0) 238 return; 239 240 Long msgNo = new Long (messageNumber); 241 accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext); 242 accessor.updateFinalMessageArrivedTime(sequenceId); 243 244 } 245 246 public RMMessageContext checkForResponseMessage(String sequenceId, String requestMsgId) { 247 return null; 248 } 249 250 public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) { 251 accessor.addLowPriorityMessage(terminateSeqMessage); 252 } 253 254 public void setAckReceived(String seqId, long msgNo) { 255 accessor.setAckReceived(seqId, msgNo); 256 257 } 258 259 public void insertFault(RMMessageContext rmMsgCtx) { 260 } 261 262 263 public void addSendMsgNo(String seqId, long msgNo) { 264 accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo); 265 } 266 267 public boolean isSentMsg(String seqId, long msgNo) { 268 return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo); 269 } 270 271 272 public void addOutgoingSequence(String sequenceId) { 273 accessor.addOutgoingSequence(sequenceId); 274 } 275 276 public void addIncomingSequence(String sequenceId) { 277 accessor.addIncomingSequence(sequenceId); 278 } 279 280 public String getOutgoingSeqOfMsg(String msgId) { 281 return null; 282 } 283 284 public void addRequestedSequence(String seqId) { 285 accessor.addRequestedSequence(seqId); 286 } 287 288 public boolean isRequestedSeqPresent(String seqId) { 289 return accessor.isRequestedSeqPresent(seqId); 290 } 291 292 public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) { 293 294 return msg.getSequenceID(); 295 } 296 297 public long getLastIncomingMsgNo(String seqId) { 298 return accessor.getLastIncomingMsgNo(seqId); 299 } 300 301 public boolean hasLastIncomingMsgReceived(String seqId) { 302 return accessor.hasLastIncomingMsgReceived(seqId); 303 } 304 305 public String getKeyFromOutgoingSeqId(String seqId) { 306 return null; 307 } 308 309 public void setAcksTo(String seqId, String acksTo) { 310 accessor.setAcksTo(seqId, acksTo); 311 } 312 313 public String getAcksTo(String seqId) { 314 return accessor.getAcksTo(seqId); 315 } 316 317 public void setCallback(Callback cb) { 318 } 319 320 public void removeCallback() { 321 } 322 323 public void addOffer(String msgID, String offerID) { 324 325 } 326 327 public String getOffer(String msgID) { 328 return null; 329 } 330 331 public void clearStorage() { 332 accessor.clear(); 333 } 334 335 public boolean isSequenceComplete(String seqId) { 336 return false; 337 } 338 339 public void sendAck(String sequenceId) { 340 accessor.sendAck(sequenceId); 341 } 342 } | Popular Tags |