1 17 package org.apache.sandesha.client; 18 19 import org.apache.axis.components.logger.LogFactory; 20 import org.apache.axis.message.addressing.RelatesTo; 21 import org.apache.commons.logging.Log; 22 import org.apache.sandesha.Constants; 23 import org.apache.sandesha.IStorageManager; 24 import org.apache.sandesha.RMMessageContext; 25 import org.apache.sandesha.storage.Callback; 26 import org.apache.sandesha.storage.CallbackData; 27 import org.apache.sandesha.storage.dao.ISandeshaDAO; 28 import org.apache.sandesha.storage.dao.SandeshaDAOFactory; 29 import org.apache.sandesha.ws.rm.RMHeaders; 30 31 import java.util.HashMap ; 32 import java.util.Iterator ; 33 import java.util.Map ; 34 import java.util.Set ; 35 36 43 public class ClientStorageManager implements IStorageManager { 44 45 protected static Log log = LogFactory.getLog(ClientStorageManager.class.getName()); 46 47 private ISandeshaDAO accessor; 48 private static Callback callBack; 49 50 public void init() { 51 } 52 53 public ClientStorageManager() { 54 accessor = SandeshaDAOFactory.getStorageAccessor(Constants.SERVER_QUEUE_ACCESSOR, 55 Constants.CLIENT); 56 } 57 58 public boolean isSequenceExist(String sequenceID) { 59 return accessor.isOutgoingSequenceExists(sequenceID); 60 } 61 62 public boolean isResponseSequenceExist(String sequenceID) { 63 return accessor.isIncomingSequenceExists(sequenceID); 64 } 65 66 public Object getNextSeqToProcess() { 67 return null; 68 } 69 70 public RMMessageContext getNextMessageToProcess(Object seq) { 71 return null; 72 } 73 74 public void setAcknowledged(String seqID, long msgNumber) { 75 accessor.markOutgoingMessageToDelete(seqID, new Long (msgNumber)); 76 77 } 78 79 public void addSequence(String sequenceID) { 80 boolean result = accessor.addOutgoingSequence(sequenceID); 81 if (!result) 82 log.error("Sequence was not created correctly in the in the queue"); 83 } 84 85 89 public void addCreateSequenceResponse(RMMessageContext rmMessageContext) { 90 addPriorityMessage(rmMessageContext); 91 } 92 93 96 public void addCreateSequenceRequest(RMMessageContext rmMessageContext) { 97 addPriorityMessage(rmMessageContext); 98 } 99 100 104 public void addAcknowledgement(RMMessageContext rmMessageContext) { 105 String sequenceID = rmMessageContext.getSequenceID(); 106 if (sequenceID != null) 107 accessor.removeAllAcks(sequenceID); 108 109 addPriorityMessage(rmMessageContext); 110 } 111 112 private void addPriorityMessage(RMMessageContext msg) { 114 accessor.addPriorityMessage(msg); 115 } 116 117 120 public boolean isMessageExist(String sequenceID, long messageNumber) { 121 return accessor.isIncomingMessageExists(sequenceID, new Long (messageNumber)); 122 } 123 124 127 public Map getListOfMessageNumbers(String sequenceID) { 128 String seq = sequenceID; 129 Set st = accessor.getAllReceivedMsgNumsOfIncomingSeq(seq); 130 Iterator it = st.iterator(); 131 long largest = 0; 133 while (it.hasNext()) { 134 Long key = (Long ) it.next(); 135 if (null == key) 136 continue; 137 138 long l = key.longValue(); 139 if (l > largest) 140 largest = l; 141 } 142 143 HashMap results = new HashMap (); 144 long currentPosition = 1; 146 for (long l = 1; l <= largest; l++) { 147 boolean present = st.contains(new Long (l)); 148 if (present) { 149 results.put(new Long (currentPosition), new Long (l)); 150 currentPosition++; 151 } 152 } 153 return results; 154 } 155 156 159 public synchronized RMMessageContext getNextMessageToSend() { 160 RMMessageContext msg; 161 msg = accessor.getNextPriorityMessageContextToSend(); 162 if (msg == null) 163 msg = accessor.getNextOutgoingMsgContextToSend(); 164 165 if (null == msg) { 166 msg = accessor.getNextLowPriorityMessageContextToSend(); 167 168 } 170 if (null != callBack && null != msg) 171 informOutgoingMessage(msg); 172 173 if (msg != null && !msg.isLocked()) { 174 msg.setLocked(true); 175 return msg; 176 } else { 177 return null; 178 } 179 } 180 181 186 public void setTemporaryOutSequence(String sequenceId, String outSequenceId) { 187 synchronized (this) { 188 accessor.setOutSequence(sequenceId, outSequenceId); 189 accessor.setOutSequenceApproved(sequenceId, false); 190 } 191 } 192 193 197 public boolean setApprovedOutSequence(String oldSeqId, String newSeqId) { 198 if (oldSeqId == null) { 199 return false; 200 } 201 String sequenceID = accessor.getSequenceOfOutSequence(oldSeqId); 202 if (null == sequenceID) { 203 log.error(Constants.ErrorMessages.SET_APPROVED_OUT_SEQ); 204 return false; 205 } 206 accessor.setOutSequence(sequenceID, newSeqId); 207 accessor.setOutSequenceApproved(sequenceID, true); 208 accessor.removeCreateSequenceMsg(oldSeqId); 209 return true; 210 211 } 212 213 217 public long getNextMessageNumber(String sequenceID) { 218 long msgNo = accessor.getNextOutgoingMessageNumber(sequenceID); 219 return msgNo; 220 } 221 222 public void insertOutgoingMessage(RMMessageContext msg) { 223 String sequenceId = msg.getSequenceID(); 224 accessor.addMessageToOutgoingSequence(sequenceId, msg); 225 } 226 227 public void insertIncomingMessage(RMMessageContext rmMessageContext) { 228 RMHeaders rmHeaders = rmMessageContext.getRMHeaders(); 229 RelatesTo relatesTo = (RelatesTo) rmMessageContext.getAddressingHeaders().getRelatesTo() 230 .get(0); 231 String messageId = relatesTo.getURI().toString(); 232 String sequenceId = null; 233 234 sequenceId = accessor.searchForSequenceId(messageId); 235 236 boolean exists = accessor.isIncomingSequenceExists(sequenceId); 237 238 if (!exists) { 239 accessor.addIncomingSequence(sequenceId); 240 } 241 242 long messageNumber = rmHeaders.getSequence().getMessageNumber().getMessageNumber(); 243 if (messageNumber <= 0) 244 return; 245 Long msgNo = new Long (messageNumber); 246 accessor.addMessageToIncomingSequence(sequenceId, msgNo, rmMessageContext); 247 accessor.updateFinalMessageArrivedTime(sequenceId); 248 } 249 250 public RMMessageContext checkForResponseMessage(String sequenceId, String requestMsgId) { 251 RMMessageContext response = accessor.checkForResponseMessage(requestMsgId, sequenceId); 252 return response; 253 254 } 255 256 public void insertTerminateSeqMessage(RMMessageContext terminateSeqMessage) { 257 accessor.addLowPriorityMessage(terminateSeqMessage); 258 } 259 260 public void setAckReceived(String seqId, long msgNo) { 261 accessor.setAckReceived(seqId, msgNo); 262 } 263 264 public void insertFault(RMMessageContext rmMsgCtx) { 265 266 } 267 268 public void addSendMsgNo(String seqId, long msgNo) { 269 accessor.addSendMsgNo(accessor.getSequenceOfOutSequence(seqId), msgNo); 270 } 271 272 public void addOutgoingSequence(String sequenceId) { 273 accessor.addOutgoingSequence(sequenceId); 274 } 275 276 public void addIncomingSequence(String sequenceId) { 277 accessor.addIncomingSequence(sequenceId); 278 } 279 280 public long getLastIncomingMsgNo(String seqId) { 281 String key = accessor.getKeyFromIncomingSequenceId(seqId); 282 return accessor.getLastIncomingMsgNo(key); 283 } 284 285 public boolean hasLastIncomingMsgReceived(String seqId) { 286 String key = accessor.getKeyFromIncomingSequenceId(seqId); 287 return accessor.hasLastIncomingMsgReceived(key); 288 } 289 290 public void addRequestedSequence(String seqId) { 291 accessor.addRequestedSequence(seqId); 292 } 293 294 public boolean isRequestedSeqPresent(String seqId) { 295 return accessor.isRequestedSeqPresent(seqId); 296 } 297 298 public boolean isSentMsg(String seqId, long msgNo) { 299 return accessor.isSentMsg(accessor.getSequenceOfOutSequence(seqId), msgNo); 300 } 301 302 public String getOutgoingSeqOfMsg(String msgId) { 303 return accessor.searchForSequenceId(msgId); 304 } 305 306 public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg) { 307 RelatesTo relatesTo = (RelatesTo) msg.getAddressingHeaders().getRelatesTo().get(0); 309 String msgId = relatesTo.getURI().toString(); 310 return accessor.searchForSequenceId(msgId); 311 } 312 313 public void setTerminateSend(String seqId) { 314 accessor.setTerminateSend(seqId); 315 } 316 317 public void setTerminateReceived(String seqId) { 318 accessor.setTerminateReceived(seqId); 319 } 320 321 public String getKeyFromOutgoingSeqId(String seqId) { 322 return accessor.getKeyFromOutgoingSequenceId(seqId); 323 } 324 325 public void setAcksTo(String seqId, String acksTo) { 326 accessor.setAcksTo(seqId, acksTo); 327 } 328 329 public String getAcksTo(String seqId) { 330 return accessor.getAcksTo(seqId); 331 } 332 333 public void addOffer(String msgID, String offerID) { 334 accessor.addOffer(msgID, offerID); 335 } 336 337 public String getOffer(String msgID) { 338 return accessor.getOffer(msgID); 339 } 340 341 public void setCallback(Callback cb) { 342 callBack = cb; 343 } 344 345 public void removeCallback() { 346 callBack = null; 347 } 348 349 private void informOutgoingMessage(RMMessageContext rmMsgContext) { 350 351 CallbackData cbData = new CallbackData(); 352 353 if (null != rmMsgContext) { 355 cbData.setSequenceId(rmMsgContext.getSequenceID()); 356 cbData.setMessageId(rmMsgContext.getMessageID()); 357 cbData.setMessageType(rmMsgContext.getMessageType()); 358 } 359 360 if (null != callBack) 361 callBack.onOutgoingMessage(cbData); 362 } 363 364 public void clearStorage() { 365 accessor.clear(); 366 } 367 368 public boolean isSequenceComplete(String seqId) { 369 boolean outTerminateSent = accessor.isOutgoingTerminateSent(seqId); 370 boolean incomingTerminateReceived = accessor.isIncommingTerminateReceived(seqId); 371 return outTerminateSent && incomingTerminateReceived; 372 } 373 374 public void sendAck(String sequenceId) { 375 String keyId = accessor.getKeyFromIncomingSequenceId(sequenceId); 376 accessor.sendAck(keyId); 377 } 378 379 380 } | Popular Tags |