1 22 package org.jboss.util.timeout; 23 24 151 public class TimeoutPriorityQueueImpl implements TimeoutPriorityQueue 152 { 153 176 177 private Object lock = new Object (); 178 179 180 private int size; 181 182 183 private TimeoutExtImpl[] queue; 184 185 188 public TimeoutPriorityQueueImpl() 189 { 190 queue = new TimeoutExtImpl[16]; 191 size = 0; 192 } 193 194 public TimeoutExt offer(long time, TimeoutTarget target) 195 { 196 if (queue == null) 197 throw new IllegalStateException ("TimeoutPriorityQueue has been cancelled"); 198 if (time < 0) 199 throw new IllegalArgumentException ("Negative time"); 200 if (target == null) 201 throw new IllegalArgumentException ("Null timeout target"); 202 203 synchronized (lock) 204 { 205 if (++size == queue.length) 208 { 209 TimeoutExtImpl[] newQ = new TimeoutExtImpl[2 * queue.length]; 210 System.arraycopy(queue, 0, newQ, 0, queue.length); 211 queue = newQ; 212 } 213 TimeoutExtImpl timeout; 216 timeout = queue[size] = new TimeoutExtImpl(); 217 timeout.index = size; 218 timeout.time = time; 219 timeout.target = target; 220 normalizeUp(size); 221 if (timeout.index == 1) 222 lock.notify(); 223 return timeout; 225 } 226 } 227 228 public TimeoutExt take() 229 { 230 return poll(-1); 231 } 232 233 public TimeoutExt poll() 234 { 235 return poll(1); 236 } 237 238 public TimeoutExt poll(long wait) 239 { 240 long endWait = -1; 241 if (wait > 0) 242 endWait = System.currentTimeMillis() + wait; 243 synchronized (lock) 245 { 246 while (queue != null && (wait >= 0 || endWait == -1)) 247 { 248 if (size == 0) 249 { 250 try 251 { 252 if (endWait == -1) 253 lock.wait(); 254 else 255 lock.wait(wait); 256 } 257 catch (InterruptedException ex) 258 { 259 } 260 } 261 else 262 { 263 long now = System.currentTimeMillis(); 264 if (queue[1].time > now) 265 { 266 long waitForFirst = queue[1].time - now; 267 if (endWait != -1 && waitForFirst > wait) 268 waitForFirst = wait; 269 try 270 { 271 lock.wait(waitForFirst); 272 } 273 catch (InterruptedException ex) 274 { 275 } 276 } 277 if (size > 0 && queue != null && queue[1].time <= System.currentTimeMillis()) 278 { 279 TimeoutExtImpl result = removeNode(1); 280 result.index = TimeoutExtImpl.TIMEOUT; 281 return result; 282 } 283 } 284 if (endWait != -1) 285 wait = endWait - System.currentTimeMillis(); 286 } 287 } 288 return null; 289 } 290 291 public TimeoutExt peek() 292 { 293 synchronized (lock) 294 { 295 if (size > 0) 296 return queue[1]; 297 else 298 return null; 299 } 300 } 301 302 public boolean remove(TimeoutExt timeout) 303 { 304 TimeoutExtImpl timeoutImpl = (TimeoutExtImpl) timeout; 305 synchronized (lock) 306 { 307 if (timeoutImpl.index > 0) 308 { 309 removeNode(timeoutImpl.index); 313 timeoutImpl.index = TimeoutExtImpl.DONE; 315 316 return true; 318 } 319 else 320 { 321 return false; 324 } 325 } 326 } 327 328 public void clear() 329 { 330 synchronized (lock) 331 { 332 if (queue == null) 333 return; 334 335 for (int i = 1; i <= size; ++i) 337 queue[i] = cleanupTimeoutExtImpl(queue[i]); 338 } 339 } 340 341 public void cancel() 342 { 343 synchronized (lock) 344 { 345 if (queue == null) 346 return; 347 clear(); 348 queue = null; 349 size = 0; 350 lock.notifyAll(); 351 } 352 } 353 354 public int size() 355 { 356 return size; 357 } 358 359 364 public boolean isCancelled() 365 { 366 return queue == null; 367 } 368 369 375 private boolean normalizeUp(int index) 376 { 377 if (index == 1) 381 return false; boolean ret = false; 383 long t = queue[index].time; 384 int p = index >> 1; 385 while (queue[p].time > t) 386 { 387 swap(p, index); 389 ret = true; 390 if (p == 1) 391 break; index = p; 393 p >>= 1; 394 } 395 return ret; 396 } 397 398 404 private void swap(int a, int b) 405 { 406 TimeoutExtImpl temp = queue[a]; 415 queue[a] = queue[b]; 416 queue[a].index = a; 417 queue[b] = temp; 418 queue[b].index = b; 419 } 420 421 427 private TimeoutExtImpl removeNode(int index) 428 { 429 TimeoutExtImpl res = queue[index]; 432 if (index == size) 435 { 436 --size; 437 queue[index] = null; 438 return res; 439 } 440 swap(index, size); --size; 442 queue[res.index] = null; 444 if (normalizeUp(index)) 445 return res; long t = queue[index].time; 447 int c = index << 1; 448 while (c <= size) 449 { 450 TimeoutExtImpl l = queue[c]; 452 if (c + 1 <= size) 455 { 456 TimeoutExtImpl r = queue[c + 1]; 458 if (l.time <= r.time) 461 { 462 if (t <= l.time) 463 break; swap(index, c); 465 index = c; 466 } 467 else 468 { 469 if (t <= r.time) 470 break; swap(index, c + 1); 472 index = c + 1; 473 } 474 } 475 else 476 { if (t <= l.time) 478 break; swap(index, c); 480 index = c; 481 } 482 c = index << 1; 483 } 484 return res; 485 } 486 487 492 private TimeoutExtImpl cleanupTimeoutExtImpl(TimeoutExtImpl timeout) 493 { 494 if (timeout != null) 495 timeout.target = null; 496 return null; 497 } 498 499 502 void checkTree() 503 { 504 assertExpr(size >= 0); 505 assertExpr(size < queue.length); 506 assertExpr(queue[0] == null); 507 if (size > 0) 508 { 509 assertExpr(queue[1] != null); 510 assertExpr(queue[1].index == 1); 511 for (int i = 2; i <= size; ++i) 512 { 513 assertExpr(queue[i] != null); 514 assertExpr(queue[i].index == i); 515 assertExpr(queue[i >> 1].time <= queue[i].time); } 517 for (int i = size + 1; i < queue.length; ++i) 518 assertExpr(queue[i] == null); 519 } 520 } 521 522 525 private void assertExpr(boolean expr) 526 { 527 if (!expr) 528 throw new IllegalStateException ("***** assert failed *****"); 529 } 530 531 534 private class TimeoutExtImpl implements TimeoutExt 535 { 536 537 static final int DONE = -1; 538 539 540 static final int TIMEOUT = -2; 541 542 543 int index; 544 545 546 long time; 547 548 549 TimeoutTarget target; 550 551 public long getTime() 552 { 553 return time; 554 } 555 556 public TimeoutTarget getTimeoutTarget() 557 { 558 return target; 559 } 560 561 public void done() 562 { 563 index = DONE; 564 } 565 566 public boolean cancel() 567 { 568 return remove(this); 569 } 570 } 571 } 572 | Popular Tags |