1 17 18 package org.apache.sandesha.storage.queue; 19 20 import org.apache.axis.components.logger.LogFactory; 21 import org.apache.commons.logging.Log; 22 import org.apache.sandesha.RMMessageContext; 23 import org.apache.sandesha.util.PolicyLoader; 24 25 import java.util.*; 26 27 30 31 35 36 40 public class OutgoingSequence extends AbstractSequence { 41 42 private String outSequenceId; 43 private boolean outSeqApproved; 44 private HashMap hash; 45 private ArrayList markedAsDelete; 46 private ArrayList sendMsgNoList; 47 private long lastMsgNo = -1; 48 private long nextAutoNumber; private static final Log log = LogFactory.getLog(OutgoingSequence.class.getName()); 50 public boolean terminateSent = false; 51 private boolean hasResponse = false; 52 53 public boolean hasResponse() { 54 return hasResponse; 55 } 56 57 public void setHasResponse(boolean hasResponse) { 58 this.hasResponse = hasResponse; 59 } 60 61 public boolean isTerminateSent() { 62 return terminateSent; 63 } 64 65 public void setTerminateSent(boolean terminateSent) { 66 this.terminateSent = terminateSent; 67 } 68 69 public OutgoingSequence(String sequenceId) { 70 this.sequenceId = sequenceId; 71 hash = new HashMap(); 72 markedAsDelete = new ArrayList(); 73 nextAutoNumber = 1; outSeqApproved = false; 75 sendMsgNoList = new ArrayList(); 76 } 77 78 81 82 83 public boolean isOutSeqApproved() { 84 return outSeqApproved; 85 } 86 87 public void setOutSeqApproved(boolean b) { 88 outSeqApproved = b; 89 } 90 91 public String getOutSequenceId() { 92 return outSequenceId; 93 } 94 95 public void setOutSequenceId(String string) { 96 outSequenceId = string; 97 } 98 99 102 public Object putNewMessage(RMMessageContext msg) { 103 Long key = new Long (nextAutoNumber); 104 Object obj = hash.put(key, msg); 105 increaseAutoNo(); 106 return obj; 107 } 108 109 112 private void increaseAutoNo() { 113 nextAutoNumber++; 114 } 115 116 119 public RMMessageContext getNextMessageToSend() { 120 RMMessageContext minMsg = null; 121 Iterator keys = hash.keySet().iterator(); 122 123 whileLoop: while (keys.hasNext()) { 124 RMMessageContext tempMsg; 125 tempMsg = (RMMessageContext) hash.get(keys.next()); 126 Long msgNo = new Long (tempMsg.getMsgNumber()); 127 if (markedAsDelete.contains(msgNo)) { 128 continue; 129 } 130 long lastSentTime = tempMsg.getLastSentTime(); 131 Date d = new Date(); 132 long currentTime = d.getTime(); 133 134 long retransmissionInterval = PolicyLoader.getInstance().getBaseRetransmissionInterval(); 135 if (currentTime >= lastSentTime + retransmissionInterval) { 136 if (minMsg == null) 137 minMsg = tempMsg; 138 else { 139 long msgNo1, msgNo2; 140 msgNo1 = tempMsg.getMsgNumber(); 141 msgNo2 = minMsg.getMsgNumber(); 142 if (msgNo1 < msgNo2) 143 minMsg = tempMsg; 144 } 145 } 146 } 147 148 Date d = new Date(); 149 long time = d.getTime(); 150 if (minMsg != null) { 151 minMsg.setLastSentTime(time); 152 } 153 154 return minMsg; 155 } 156 157 public boolean hasMessage(Long key) { 158 Object obj = hash.get(key); 159 160 return (!(obj == null)); 161 } 162 163 public void clearSequence(boolean yes) { 164 if (!yes) 165 return; 166 hash.clear(); 167 nextAutoNumber = 1; 168 outSeqApproved = false; 169 outSequenceId = null; 170 sequenceId = null; 171 } 172 173 public Set getAllKeys() { 174 return hash.keySet(); 175 } 176 177 public String getMessageId(Long key) { 178 RMMessageContext msg = (RMMessageContext) hash.get(key); 179 if (msg == null) 180 return null; 181 182 return msg.getMessageID(); 183 184 } 185 186 public RMMessageContext deleteMessage(Long msgId) { 188 RMMessageContext msg = (RMMessageContext) hash.get(msgId); 189 if (msg == null) 190 return null; 191 hash.remove(msgId); 192 return msg; 193 } 194 195 public boolean markMessageDeleted(Long messageNo) { 196 if (hash.containsKey(messageNo)) { 197 markedAsDelete.add(messageNo); 198 return true; 199 } 200 return false; 201 } 202 203 public long nextMessageNumber() { 204 return nextAutoNumber; 205 } 206 207 public boolean isMessagePresent(String msgId) { 208 boolean b = false; 209 b = hash.containsKey(msgId); 210 return b; 211 } 212 213 public boolean hasMessageWithId(String msgId) { 214 Iterator it = hash.keySet().iterator(); 215 boolean result = false; 216 while (it.hasNext()) { 217 RMMessageContext msg = (RMMessageContext) hash.get(it.next()); 218 if (msg.getMessageID().equals(msgId)) { 219 result = true; 220 break; 221 } 222 } 223 return result; 224 } 225 226 public List getReceivedMsgNumbers() { 227 List result = new ArrayList(); 228 Iterator it = hash.keySet().iterator(); 229 230 while (it.hasNext()) { 231 Object key = it.next(); 232 RMMessageContext msg = (RMMessageContext) hash.get(key); 233 long l = msg.getMsgNumber(); 234 result.add(new Long (l)); 235 } 236 return result; 237 } 238 239 public void setAckReceived(long msgNo) { 240 RMMessageContext msg = (RMMessageContext) hash.get(new Long (msgNo)); 241 if (msg != null) { 242 msg.setAckReceived(true); 243 } else { 244 log.error("ERROR: MESSAGE IS NULL IN ResponseSeqHash"); 245 } 246 247 } 248 249 public boolean isAckComplete() { 250 long lastMsgNo = getLastMsgNumber(); 251 if (lastMsgNo <= 0) { 252 return false; 253 } 254 Iterator it = hash.keySet().iterator(); 255 for (long i = 1; i < lastMsgNo; i++) { 256 if (!hasMessage(new Long (i))) { 257 return false; 258 } 259 } 260 261 it = hash.keySet().iterator(); 262 while (it.hasNext()) { 263 RMMessageContext msg = (RMMessageContext) hash.get(it.next()); 264 if (!msg.isAckReceived()) { 265 return false; 266 } 267 } 268 return true; 269 270 } 271 272 public void addMsgToSendList(long msgNo) { 273 sendMsgNoList.add(new Long (msgNo)); 274 } 275 276 public boolean isMsgInSentList(long msgNo) { 277 return sendMsgNoList.contains(new Long (msgNo)); 278 } 279 280 public boolean hasLastMsgReceived() { 281 if (lastMsgNo > 0) 282 return true; 283 284 return false; 285 } 286 287 public long getLastMsgNumber() { 288 if (lastMsgNo > 0) 289 return lastMsgNo; 290 291 return -1; 292 } 293 294 public void setLastMsg(long lastMsg) { 295 lastMsgNo = lastMsg; 296 } 297 298 } | Popular Tags |