1 17 18 package org.apache.sandesha.storage.queue; 19 20 import org.apache.axis.message.addressing.RelatesTo; 21 import org.apache.sandesha.RMMessageContext; 22 23 import java.util.*; 24 25 28 29 33 34 public class IncomingSequence extends AbstractSequence { 35 36 private long lastProcessed; 37 private boolean hasProcessableMessages; 38 private HashMap hash; 39 private boolean beingProcessedLock = false; private long lastMsgNo = -1; 41 private long finalMsgArrivedTime = 0; private long finalAckedTime = 0; 43 private boolean sendAck = false; 44 45 public long getFinalAckedTime() { 46 return finalAckedTime; 47 } 48 49 public void setFinalAckedTime(long finalAckedTime) { 50 this.finalAckedTime = finalAckedTime; 51 } 52 53 public long getFinalMsgArrivedTime() { 54 return finalMsgArrivedTime; 55 } 56 57 public void setFinalMsgArrivedTime(long finalMsgArrivedTime) { 58 this.finalMsgArrivedTime = finalMsgArrivedTime; 59 } 60 61 public boolean isSendAck() { 62 return sendAck; 63 } 64 65 public void setSendAck(boolean sendAck) { 66 this.sendAck = sendAck; 67 } 68 69 private boolean terminateReceived = false; 70 71 public boolean isTerminateReceived() { 72 return terminateReceived; 73 } 74 75 public void setTerminateReceived(boolean terminateReceived) { 76 this.terminateReceived = terminateReceived; 77 } 78 79 public IncomingSequence(String sequenceId) { 80 lastProcessed = 0; 81 hasProcessableMessages = false; 82 this.sequenceId = sequenceId; 83 hash = new HashMap(); 84 } 86 87 public boolean hasProcessableMessages() { 88 return hasProcessableMessages; 89 } 90 91 92 95 public Object putNewMessage(Long key, RMMessageContext value) { 96 Object obj = hash.put(key, value); 97 refreshHasProcessableMessages(); 99 return obj; 100 } 101 102 public RMMessageContext getNextMessageToProcess() { 103 Long nextKey = new Long (lastProcessed + 1); 104 RMMessageContext msg = (RMMessageContext) hash.get(nextKey); 105 if (msg != null) { 106 incrementProcessedCount(); 107 refreshHasProcessableMessages(); 108 } else { 109 setProcessLock(false); 110 } 111 112 return msg; 113 114 } 115 116 public List getNextMessagesToProcess() { 117 118 boolean done = false; 119 List messages = new ArrayList(); 120 121 while (!done) { 122 Long nextKey = new Long (lastProcessed + 1); 123 Object obj = hash.get(nextKey); 124 if (obj != null) { 125 messages.add(obj); 126 incrementProcessedCount(); 127 } else { 128 setProcessLock(false); 129 done = true; } 131 } 132 refreshHasProcessableMessages(); 133 return messages; 134 } 135 136 private void incrementProcessedCount() { 137 lastProcessed++; 138 } 139 140 private void refreshHasProcessableMessages() { 141 Long nextKey = new Long (lastProcessed + 1); 142 hasProcessableMessages = hash.containsKey(nextKey); 143 144 if (!hasProcessableMessages) setProcessLock(false); 147 } 148 149 public boolean hasMessage(Long msgId) { 150 Object obj = hash.get(msgId); 151 return (!(obj == null)); 152 } 153 154 public void clearSequence(boolean yes) { 155 if (!yes) 156 return; 157 hash.clear(); 158 lastProcessed = 0; 159 hasProcessableMessages = false; 160 } 161 162 public Set getAllKeys() { 163 return hash.keySet(); 164 166 } 167 168 public void setProcessLock(boolean lock) { 169 beingProcessedLock = lock; 170 } 171 172 public boolean isSequenceLocked() { 173 return beingProcessedLock; 174 } 175 176 public String getMessageId(Long key) { 177 RMMessageContext msg = (RMMessageContext) hash.get(key); 178 if (msg == null) 179 return null; 180 181 return msg.getMessageID(); 182 } 183 184 public RMMessageContext getMessageRelatingTo(String relatesTo) { 186 187 Iterator it = hash.keySet().iterator(); 188 RMMessageContext msgToSend = null; 189 190 while (it.hasNext()) { 191 RMMessageContext msg = (RMMessageContext) hash.get(it.next()); 192 List lst = msg.getAddressingHeaders().getRelatesTo(); 193 194 if (lst != null) { 195 RelatesTo rl = (RelatesTo) lst.get(0); 196 String uri = rl.getURI().toString(); 197 if (uri.equals(relatesTo)) { 198 msgToSend = msg; 199 break; 200 } 201 } 202 } 203 204 return msgToSend; 205 } 206 207 public boolean hasLastMsgReceived() { 208 if (lastMsgNo > 0) 209 return true; 210 211 return false; 212 } 213 214 public long getLastMsgNumber() { 215 if (lastMsgNo > 0) 216 return lastMsgNo; 217 218 return -1; 219 } 220 221 public void setLastMsg(long lastMsg) { 222 lastMsgNo = lastMsg; 223 } 224 225 } | Popular Tags |