1 24 25 package org.objectweb.dream.queue; 26 27 import org.objectweb.dream.message.Message; 28 import org.objectweb.dream.message.manager.MessageManager; 29 import org.objectweb.dream.pool.ObjectPool; 30 import org.objectweb.dream.pool.Recyclable; 31 import org.objectweb.fractal.api.NoSuchInterfaceException; 32 import org.objectweb.fractal.api.control.IllegalBindingException; 33 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 34 35 55 public class BufferAscendingSequenceNumberImpl extends AbstractBufferImpl 56 implements 57 QueueSortedAttributeController 58 { 59 63 protected String sortingChunkName; 64 65 66 protected Element first = null; 67 68 69 protected Element last = null; 70 71 72 protected ObjectPool objectPoolItf; 73 74 75 protected long lastInSequence = -1; 76 77 81 84 protected boolean hasAvailableMessage() 85 { 86 return (availableMessagesIndicator > 0); 87 } 88 89 92 protected boolean canAdd(Message message) 93 { 94 if (maxCapacity > 0 && (maxCapacity - storedMessagesIndicator <= 0)) 95 { 96 return false; 97 } 98 return true; 99 } 100 101 105 108 protected void doAdd(Message message) 109 { 110 long sequenceNumber = ((SequenceNumberChunk) message 111 .getChunk(sortingChunkName)).getSequenceNumber(); 112 if (sequenceNumber <= lastInSequence) 113 { 114 messageManagerItf.deleteMessage(message); 116 return; 117 } 118 Element current = last; 119 while (current != null && sequenceNumber < current.sequenceNumber) 120 { 121 current = current.previous; 122 } 123 124 if (current != null && sequenceNumber == current.sequenceNumber) 125 { 126 messageManagerItf.deleteMessage(message); 128 return; 129 } 130 Element element = (Element) objectPoolItf.newInstance(); 131 element.msg = message; 132 element.sequenceNumber = sequenceNumber; 133 134 if (current == null) 135 { 136 if (first != null) 138 { 139 element.next = first; 141 first.previous = element; 142 } 143 else 144 { 145 last = element; 147 } 148 first = element; 149 } 150 else 151 { 152 element.previous = current; 154 if (current.next != null) 155 { 156 element.next = current.next; 158 element.next.previous = element; 159 } 160 else 161 { 162 last = element; 164 } 165 element.previous.next = element; 166 167 } 168 169 current = element; 173 int counter = 0; 174 while (current != null && current.sequenceNumber == lastInSequence + 1) 175 { 176 counter++; 177 lastInSequence++; 178 current = current.next; 179 } 180 if (counter > 0) 181 { 182 incrementAvailableMessagesIndicator(counter); 183 } 184 incrementStoredMessagesIndicator(1); 185 } 186 187 190 protected Message doRemove() 191 { 192 incrementAvailableMessagesIndicator(-1); 193 incrementStoredMessagesIndicator(-1); 194 Element newFirst = first.next; 195 if (newFirst != null) 196 { 197 newFirst.previous = null; 198 } 199 else 200 { 201 last = null; 202 } 203 Message msg = first.msg; 204 objectPoolItf.recycleInstance(first); 205 first = newFirst; 206 return msg; 207 } 208 209 212 protected Message doGet() 213 { 214 return first.msg; 215 } 216 217 221 224 public String getSortingChunkName() 225 { 226 return sortingChunkName; 227 } 228 229 232 public void setSortingChunkName(String name) 233 { 234 this.sortingChunkName = name; 235 } 236 237 241 246 public static class Element implements Recyclable 247 { 248 Element previous; 249 Element next; 250 Message msg; 251 long sequenceNumber; 252 253 257 260 public void recycle() 261 { 262 previous = null; 263 next = null; 264 msg = null; 265 sequenceNumber = -1; 266 } 267 } 268 269 273 277 public synchronized void bindFc(String clientItfName, Object serverItf) 278 throws NoSuchInterfaceException, IllegalBindingException, 279 IllegalLifeCycleException 280 { 281 super.bindFc(clientItfName, serverItf); 282 if (clientItfName.equals(ObjectPool.ITF_NAME)) 283 { 284 objectPoolItf = (ObjectPool) serverItf; 285 } 286 } 287 288 291 public String [] listFc() 292 { 293 return new String []{MessageManager.ITF_NAME, ObjectPool.ITF_NAME}; 294 } 295 } | Popular Tags |