1 package com.quikj.server.framework; 2 3 import java.io.*; 4 import java.util.*; 5 6 public class AceTimer extends AceThread implements AceCompareMessageInterface 7 { 8 public AceTimer() throws IOException 9 { 10 super("AceTimer", true); 11 instance = this; 13 } 14 15 public static AceTimer Instance() throws IOException 16 { 17 if (instance == null) 18 { 19 new AceTimer(); 20 } 21 22 return instance; 23 } 24 25 public void run() 26 { 27 while (true) 28 { 29 try 30 { 31 int interval = 0; 32 33 if (timerQueue.size() == 0) { 35 interval = Integer.MAX_VALUE; 37 } 38 else 39 { 40 interval = (int)(((AceTimerMessage)timerQueue.getFirst()).getExpiryTime() 41 - (new Date()).getTime()); 42 if (interval < 0) interval = 0; 43 } 44 45 sleep(interval); 46 47 50 long cur_time = (new Date()).getTime(); 51 synchronized (timerQueue) 52 { 53 ListIterator iter = timerQueue.listIterator(0); 54 55 while (iter.hasNext() == true) 56 { 57 AceTimerMessage queued_msg = (AceTimerMessage)iter.next(); 58 long exp_time = queued_msg.getExpiryTime(); 59 if (exp_time <= cur_time) 60 { 61 if (queued_msg.getRequestingThread().sendMessage(queued_msg) == false) 62 { 63 System.err.println("AceTimer.run() -- could not send timer expiry message to thread " 64 + queued_msg.getRequestingThread().getName() 65 + ", timer id : " 66 + queued_msg.getTimerId() 67 + ", error : " 68 + getErrorMessage()); 69 } 70 iter.remove(); 71 72 } 73 else { 75 break; 76 } 77 } 78 } 79 80 } 81 catch (InterruptedException ex) 82 { 83 if (killThread == false) 84 { 85 continue; 86 } 87 else 88 { 89 break; 90 } 91 } 92 93 } 95 } 96 97 public int startTimer(long interval, 98 AceThread cthread, 99 long user_parm) 100 { 101 return addTimer((new Date()).getTime() + interval, 102 cthread, user_parm); 103 } 104 105 public int startTimer(Date abs_time, 106 AceThread cthread, 107 long user_parm) 108 { 109 return addTimer(abs_time.getTime(), cthread, user_parm); 110 } 111 112 public int startTimer(long interval, 113 long user_parm) 114 { 115 return addTimer((new Date()).getTime() + interval, 116 null, user_parm); 117 } 118 119 public int startTimer(Date abs_time, 120 long user_parm) 121 { 122 return addTimer(abs_time.getTime(), null, user_parm); 123 } 124 125 public boolean cancelTimer(int timer_id, AceThread cthread) 126 { 127 boolean ret = false; 128 synchronized (timerQueue) 129 { 130 ListIterator iter = timerQueue.listIterator(0); 131 132 while (iter.hasNext() == true) 133 { 134 AceTimerMessage msg = (AceTimerMessage)iter.next(); 135 136 if (msg.getTimerId() == timer_id) 137 { 138 ret = true; 139 iter.remove(); 140 break; 141 } 142 } 143 144 if (ret == false) { 146 if (cthread == null) 149 { 150 Thread cur_thr = Thread.currentThread(); 151 152 if ((cur_thr instanceof AceThread) == false) 153 { 154 writeErrorMessage("Element not found"); 155 return ret; 156 } 157 158 cthread = (AceThread)cur_thr; 159 } 160 161 ret = cthread.removeMessage(new AceTimerMessage(0L, null, timer_id, 0L), 162 this); 163 164 if (ret == true) 165 { 166 System.out.println("AceTimer.cancelTimer() -- Request to cancel timer id " 167 + timer_id 168 + " received - the timer message was already delivered to the requesting thread but not read from its queue, removed the message from queue"); 169 } 170 } 171 } 172 173 if (ret == false) 174 { 175 writeErrorMessage("Element not found"); 176 } 177 178 return ret; 179 } 180 181 public boolean cancelTimer(int timer_id) 182 { 183 return cancelTimer(timer_id, null); 184 } 185 186 public void cancelAllTimers(AceThread cthread) 187 { 188 if (cthread == null) 189 { 190 cthread = (AceThread)Thread.currentThread(); 191 } 192 193 synchronized (timerQueue) 194 { 195 ListIterator iter = timerQueue.listIterator(0); 196 197 while (iter.hasNext() == true) 198 { 199 AceTimerMessage msg = (AceTimerMessage)iter.next(); 200 201 if (msg.getRequestingThread() == cthread) 202 { 203 iter.remove(); 204 } 205 } 206 } 207 } 208 209 public void cancelAllTimers() 210 { 211 cancelAllTimers(null); 212 } 213 214 public AceMessageInterface waitTimer(int timerid) 216 { 217 Thread thr = Thread.currentThread(); 218 219 if ((thr instanceof AceThread) == false) 220 { 221 writeErrorMessage("This method is not being called from an object which is a sub-class of type AceThread"); 222 return null; 223 } 224 225 AceThread cthread = (AceThread)thr; 226 227 while (true) 228 { 229 AceMessageInterface msg = cthread.waitMessage(); 230 if ((msg instanceof AceTimerMessage) == true) 231 { 232 if (((AceTimerMessage)msg).getTimerId() == timerid) 233 { 234 return msg; 235 } 236 } 237 else if ((msg instanceof AceSignalMessage) == true) 238 { 239 return msg; 240 } 241 } 242 } 243 244 public void dispose() 245 { 246 killThread = true; this.interrupt(); 249 super.dispose(); 250 } 251 252 public boolean same(AceMessageInterface obj1, AceMessageInterface obj2) 253 { 254 boolean ret = false; 255 256 if (((obj1 instanceof AceTimerMessage) == true) && 257 ((obj2 instanceof AceTimerMessage) == true)) 258 { 259 if (((AceTimerMessage)obj1).getTimerId() == ((AceTimerMessage)obj2).getTimerId()) 260 { 261 ret = true; 262 } 263 } 264 265 return ret; 266 } 267 268 private int addTimer(long abs_time, 269 AceThread cthread, 270 long user_parm) 271 { 272 Thread calling_thread; 273 if (cthread != null) 274 { 275 calling_thread = cthread; 276 } 277 else 278 { 279 calling_thread = Thread.currentThread(); 280 } 281 282 if ((calling_thread instanceof AceThread) == false) 285 { 286 writeErrorMessage("This method is not being called from an object which is a sub-class of type AceThread"); 287 return -1; 288 } 289 290 int timer_id = nextTimerId++; 291 AceTimerMessage msg = new AceTimerMessage(abs_time, 292 (AceThread)calling_thread, 293 timer_id, 294 user_parm); 295 296 if (insertElementInTimerQueue(msg) == true) 298 { 299 this.interrupt(); } 301 else { 303 timer_id = -1; 304 } 305 return timer_id; 306 } 307 308 309 private boolean insertElementInTimerQueue(AceTimerMessage msg) 310 { 311 long exp_time = msg.getExpiryTime(); 312 313 synchronized (timerQueue) 314 { 315 ListIterator iter = timerQueue.listIterator(0); 316 317 try 318 { 319 while (true) 320 { 321 AceTimerMessage queued_msg = (AceTimerMessage)iter.next(); 322 323 if (exp_time <= queued_msg.getExpiryTime()) 324 { 325 try 327 { 328 Object prev = iter.previous(); 329 iter.add(msg); 330 break; 331 } 332 catch (NoSuchElementException ex1) { 334 timerQueue.addFirst(msg); 335 break; 336 } 337 } 338 } 339 } 340 catch (NoSuchElementException ex2) 341 { 342 iter.add(msg); 343 } 344 } 345 346 return true; 347 } 348 349 350 private LinkedList timerQueue = new LinkedList(); 351 public boolean killThread = false; 352 public int nextTimerId = 0; 353 public static AceTimer instance = null; 354 355 356 public static void main(String [] args) 358 { 359 class MyAceWaitThread extends AceThread 361 { 362 public MyAceWaitThread() throws IOException 363 { 364 super("WaitTestThread"); 365 } 366 367 public void run() 368 { 369 class MyEventMessage implements AceMessageInterface 371 { 372 public String messageType() 373 { 374 return new String ("my_event"); 375 } 376 } 377 379 System.out.println(getName() + "-- started"); 380 381 long time = 0; 382 try 383 { 384 while (true) 385 { 386 ++time; 388 Date cur_time = new Date(); 389 System.out.println(getName() 390 + "-- Starting a " + time + " seconds timer at " 391 + cur_time 392 + " (" + cur_time.getTime() + ")"); 393 int id = AceTimer.Instance().startTimer(time * 1000, 394 time); 395 if (id < 0) 396 { 397 System.err.println(getName() 398 + "-- " 399 + getErrorMessage()); 400 break; 401 } 402 403 411 boolean res; 412 AceMessageInterface msg; 413 msg = AceTimer.Instance().waitTimer(id); 414 415 if ((msg instanceof AceTimerMessage) == true) 416 { 417 cur_time = new Date(); 418 System.out.println(getName() 419 + "-- Timer expired at " 420 + cur_time 421 + " (" + cur_time.getTime() + ")"); 422 } 423 else if ((msg instanceof AceSignalMessage) == true) 424 { 425 AceSignalMessage signal = (AceSignalMessage)msg; 426 427 System.out.println(getName() 428 + "-- Signal of type " 429 + signal.getSignalId() 430 + " received"); 431 AceTimer.Instance().cancelAllTimers(); 432 return; 433 } 434 else { 436 System.err.println(getName() 437 + "-- " 438 + "Unexpected message : " 439 + msg.messageType() 440 + " received"); 441 AceTimer.Instance().cancelAllTimers(); 442 return; 443 } 444 } 445 } 446 catch (IOException ex) 447 { 448 System.err.println(getName() + "-- " 449 + ex.getMessage()); 450 return; 451 } 452 453 } 454 455 } 456 457 class MyAceCancelThread extends AceThread 458 { 459 public MyAceCancelThread() throws IOException 460 { 461 super("CancelTestThread"); 462 } 463 464 public void run() 465 { 466 System.out.println(getName() + "-- started"); 467 468 try 469 { 470 while (true) 471 { 472 Date cur_time = new Date(); 473 System.out.println(getName() 474 + "-- Starting a 1 second timer at " 475 + cur_time 476 + " (" + cur_time.getTime() + ")"); 477 int id = AceTimer.Instance().startTimer(1000, 478 1); 479 if (id < 0) 480 { 481 System.err.println(getErrorMessage()); 482 break; 483 } 484 485 cur_time = new Date(); 486 System.out.println(getName() 487 + "-- Starting a 2 second timer at " 488 + cur_time 489 + " (" + cur_time.getTime() + ")"); 490 int id2 = AceTimer.Instance().startTimer(2000, 491 2); 492 if (id2 < 0) 493 { 494 System.err.println(getName() 495 + "-- " 496 + getErrorMessage()); 497 break; 498 } 499 500 513 System.out.println(getName() + "-- Cancelling timer " 514 + id); 515 if (AceTimer.Instance().cancelTimer(id) == false) 516 { 517 System.err.println(getErrorMessage()); 518 break; 519 } 520 521 AceMessageInterface message = waitMessage(); 522 if ((message instanceof AceTimerMessage) == true) 523 { 524 cur_time = new Date(); 525 System.out.println(getName() 526 + "-- Timer expired at " 527 + cur_time 528 + " (" + cur_time.getTime() + ")" 529 + " timer id : " 530 + ((AceTimerMessage)message).getTimerId() 531 + " user parm : " 532 + ((AceTimerMessage)message).getUserSpecifiedParm()); 533 } 534 else 535 { 536 System.out.println(getName() 537 + "-- " 538 + "Message of type " 539 + message.messageType() 540 + " received"); 541 } 542 543 } 544 } 545 catch (IOException ex) 546 { 547 System.err.println(getName() + "-- " 548 + ex.getMessage()); 549 return; 550 } 551 } 552 } 553 554 555 class MyAceThread extends AceThread 556 { 557 public MyAceThread() throws IOException 558 { 559 super(); 560 } 561 562 public void run() 563 { 564 System.out.println(getName() + "-- MyAceThread started"); 565 long time = 0; 566 567 try 568 { 569 while (true) 570 { 571 ++time; 573 Date cur_time = new Date(); 574 System.out.println(getName() 575 + "-- Starting a " + time + " seconds timer at " 576 + cur_time 577 + " (" + cur_time.getTime() + ")"); 578 int id = AceTimer.Instance().startTimer(time * 1000, 579 time); 580 if (id < 0) 581 { 582 System.err.println(getName() 583 + "-- " 584 + getErrorMessage()); 585 break; 586 } 587 588 AceMessageInterface message = waitMessage(); 589 if ((message instanceof AceTimerMessage) == true) 590 { 591 cur_time = new Date(); 592 System.out.println(getName() 593 + "-- Timer expired at " 594 + cur_time 595 + " (" + cur_time.getTime() + ")" 596 + " timer id : " 597 + ((AceTimerMessage)message).getTimerId() 598 + " user parm : " 599 + ((AceTimerMessage)message).getUserSpecifiedParm()); 600 } 601 else 602 { 603 System.out.println("Message of type " 604 + message.messageType() 605 + " received"); 606 } 607 608 } 609 610 } 611 catch (IOException ex) 612 { 613 System.err.println(getName() + "-- " 614 + ex.getMessage()); 615 return; 616 } 617 } 618 } 619 620 622 int num_threads = 10; 623 624 if (args.length > 0) 625 { 626 try 627 { 628 num_threads = Integer.parseInt(args[0]); 629 } 630 catch (NumberFormatException ex) 631 { 632 System.err.println("Command line syntax error - the first argument should specify the number of threads to run"); 633 System.exit(1); 634 } 635 } 636 else 637 { 638 System.out.println("No argument specified, defaulting number of threads to " 639 + num_threads); 640 } 641 642 boolean cancel_test = false; 643 boolean wait_test = false; 644 645 if (args.length > 1) 646 { 647 if (args[1].equals("cancel") == true) 648 { 649 cancel_test = true; 650 } 651 else if (args[1].equals("wait") == true) 652 { 653 wait_test = true; 654 } 655 else 656 { 657 System.err.println("The second argument must specify the keyword \"cancel\" to specify running cancel test"); 658 } 659 } 660 661 try 662 { 663 AceTimer.Instance().start(); 665 for (int i = 0; i < num_threads; i++) 666 { 667 MyAceThread thr = new MyAceThread(); 668 thr.start(); 669 670 try 671 { 672 sleep(1000); 673 } 674 catch (InterruptedException ex1) 675 { 676 System.err.println(ex1.getMessage()); 677 System.exit(1); 678 } 679 } 680 681 if (cancel_test == true) 682 { 683 MyAceCancelThread thr = new MyAceCancelThread(); 684 thr.start(); 685 } 686 687 if (wait_test == true) 688 { 689 MyAceWaitThread thr = new MyAceWaitThread(); 690 thr.start(); 691 } 692 } 693 catch (IOException ex1) 694 { 695 System.err.println(ex1.getMessage()); 696 System.exit(1); 697 } 698 699 } 700 701 } 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 | Popular Tags |