1 3 package org.jgroups.util; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 import org.jgroups.TimeoutException; 9 10 import java.util.Iterator ; 11 import java.util.LinkedList ; 12 import java.util.NoSuchElementException ; 13 import java.util.Vector ; 14 15 16 24 public class LinkedListQueue { 25 26 final LinkedList l=new LinkedList (); 27 28 29 boolean closed=false; 30 31 32 final Object mutex=new Object (); 33 34 35 int num_markers=0; 36 37 38 46 private static final Object endMarker=new Object (); 47 48 protected static final Log log=LogFactory.getLog(LinkedListQueue.class); 49 50 51 54 public LinkedListQueue() { 55 } 56 57 58 65 public boolean closed() { 66 return closed; 67 } 68 69 70 78 public void add(Object obj) throws QueueClosedException { 79 if(closed) 80 throw new QueueClosedException(); 81 if(this.num_markers > 0) 82 throw new QueueClosedException("LinkedListQueue.add(): queue has been closed. You can not add more elements. " + 83 "Waiting for removal of remaining elements."); 84 85 86 synchronized(mutex) { 87 l.add(obj); 88 89 90 mutex.notifyAll(); 91 } 92 } 93 94 95 104 public void addAtHead(Object obj) throws QueueClosedException { 105 if(closed) 106 throw new QueueClosedException(); 107 if(this.num_markers > 0) 108 throw new QueueClosedException("LinkedListQueue.addAtHead(): queue has been closed. You can not add more elements. " + 109 "Waiting for removal of remaining elements."); 110 111 112 synchronized(mutex) { 113 l.addFirst(obj); 114 115 116 mutex.notifyAll(); 117 } 118 } 119 120 121 127 public Object remove() throws QueueClosedException { 128 Object retval=null; 129 130 131 synchronized(mutex) { 132 133 while(l.size() == 0) { 134 if(closed) 135 throw new QueueClosedException(); 136 try { 137 mutex.wait(); 138 } 139 catch(InterruptedException ex) { 140 } 141 } 142 143 if(closed) 144 throw new QueueClosedException(); 145 146 147 try { 148 retval=l.removeFirst(); 149 if(l.size() == 1 && l.getFirst().equals(endMarker)) 150 closed=true; 151 } 152 catch(NoSuchElementException ex) { 153 if(log.isErrorEnabled()) log.error("retval == null, size()=" + l.size()); 154 return null; 155 } 156 157 if(retval == endMarker) { 160 close(false); throw new QueueClosedException(); 162 } 163 } 164 165 166 return retval; 167 } 168 169 170 178 public Object remove(long timeout) throws QueueClosedException, TimeoutException { 179 Object retval=null; 180 181 182 synchronized(mutex) { 183 184 if(l.size() == 0) { 185 if(closed) 186 throw new QueueClosedException(); 187 try { 188 189 mutex.wait(timeout); 190 } 191 catch(InterruptedException ex) { 192 } 193 } 194 195 196 197 if(closed) 198 throw new QueueClosedException(); 199 200 201 try { 202 retval=l.removeFirst(); 203 if(l.size() == 1 && l.getFirst().equals(endMarker)) 204 closed=true; 205 } 206 catch(NoSuchElementException ex) { 207 208 throw new TimeoutException(); 209 } 210 211 212 if(retval == endMarker) { 213 close(false); 214 throw new QueueClosedException(); 215 } 216 217 return retval; 218 } 219 } 220 221 222 228 public void removeElement(Object obj) throws QueueClosedException { 229 boolean removed; 230 231 if(obj == null) return; 232 233 234 synchronized(mutex) { 235 removed=l.remove(obj); 236 if(!removed) 237 if(log.isWarnEnabled()) log.warn("element " + obj + " was not found in the queue"); 238 } 239 } 240 241 242 249 public Object peek() throws QueueClosedException { 250 Object retval=null; 251 252 synchronized(mutex) { 253 while(l.size() == 0) { 254 if(closed) 255 throw new QueueClosedException(); 256 try { 257 mutex.wait(); 258 } 259 catch(InterruptedException ex) { 260 } 261 } 262 263 if(closed) 264 throw new QueueClosedException(); 265 266 try { 267 retval=l.getFirst(); 268 } 269 catch(NoSuchElementException ex) { 270 if(log.isErrorEnabled()) log.error("retval == null, size()=" + l.size()); 271 return null; 272 } 273 } 274 275 if(retval == endMarker) { 276 close(false); throw new QueueClosedException(); 278 } 279 280 return retval; 281 } 282 283 284 293 294 public Object peek(long timeout) throws QueueClosedException, TimeoutException { 295 Object retval=null; 296 297 synchronized(mutex) { 298 if(l.size() == 0) { 299 if(closed) 300 throw new QueueClosedException(); 301 try { 302 mutex.wait(timeout); 303 } 304 catch(InterruptedException ex) { 305 } 306 } 307 if(closed) 308 throw new QueueClosedException(); 309 310 311 try { 312 retval=l.getFirst(); 313 } 314 catch(NoSuchElementException ex) { 315 316 throw new TimeoutException(); 317 } 318 319 if(retval == endMarker) { 320 close(false); 321 throw new QueueClosedException(); 322 } 323 return retval; 324 } 325 } 326 327 328 337 public void close(boolean flush_entries) { 338 if(flush_entries) { 339 try { 340 add(endMarker); num_markers++; 342 } 343 catch(QueueClosedException closed) { 344 } 345 return; 346 } 347 348 synchronized(mutex) { 349 closed=true; 350 try { 351 mutex.notifyAll(); 352 } 353 catch(Exception e) { 354 if(log.isErrorEnabled()) log.error("exception=" + e); 355 } 356 } 357 } 358 359 360 364 public void reset() { 365 num_markers=0; 366 if(!closed) 367 close(false); 368 369 synchronized(mutex) { 370 l.clear(); 371 closed=false; 372 } 373 } 374 375 376 379 public int size() { 380 return l.size() - num_markers; 381 } 382 383 386 public String toString() { 387 return "LinkedListQueue (" + size() + ") messages [closed=" + closed + ']'; 388 } 389 390 391 394 public Vector getContents() { 395 Vector retval=new Vector (); 396 397 synchronized(mutex) { 398 for(Iterator it=l.iterator(); it.hasNext();) { 399 retval.addElement(it.next()); 400 } 401 } 402 return retval; 403 } 404 405 406 } 407 | Popular Tags |