KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > quikj > server > framework > AceTimer


1 package com.quikj.server.framework;
2
3 import java.io.*;
4 import java.util.*;
5
6 public class AceTimer extends AceThread implements AceCompareMessageInterface
7 {
8     public AceTimer() throws IOException
9     {
10         super("AceTimer", true);
11         // setPriority (MAX_PRIORITY);
12
instance = this;
13     }
14     
15     public static AceTimer Instance() throws IOException
16     {
17         if (instance == null)
18         {
19             new AceTimer();
20         }
21         
22         return instance;
23     }
24     
25     public void run()
26     {
27         while (true)
28         {
29             try
30             {
31                 int interval = 0;
32                 
33                 if (timerQueue.size() == 0) // queue is empty
34
{
35                     // the sleep interval is the max possible
36
interval = Integer.MAX_VALUE;
37                 }
38                 else
39                 {
40                     interval = (int)(((AceTimerMessage)timerQueue.getFirst()).getExpiryTime()
41                     - (new Date()).getTime());
42                     if (interval < 0) interval = 0;
43                 }
44                 
45                 sleep(interval);
46                 
47                 // now that I am awake, iterate through the queue and send message to
48
// all the threads whose times has expired.
49

50                 long cur_time = (new Date()).getTime();
51                 synchronized (timerQueue)
52                 {
53                     ListIterator iter = timerQueue.listIterator(0);
54                     
55                     while (iter.hasNext() == true)
56                     {
57                         AceTimerMessage queued_msg = (AceTimerMessage)iter.next();
58                         long exp_time = queued_msg.getExpiryTime();
59                         if (exp_time <= cur_time)
60                         {
61                             if (queued_msg.getRequestingThread().sendMessage(queued_msg) == false)
62                             {
63                                 System.err.println("AceTimer.run() -- could not send timer expiry message to thread "
64                                 + queued_msg.getRequestingThread().getName()
65                                 + ", timer id : "
66                                 + queued_msg.getTimerId()
67                                 + ", error : "
68                                 + getErrorMessage());
69                             }
70                             iter.remove();
71                             
72                         }
73                         else // the rest of the timers have not expired
74
{
75                             break;
76                         }
77                     }
78                 }
79                 
80             }
81             catch (InterruptedException JavaDoc ex)
82             {
83                 if (killThread == false)
84                 {
85                     continue;
86                 }
87                 else
88                 {
89                     break;
90                 }
91             }
92             
93         } // forever
94

95     }
96     
97     public int startTimer(long interval,
98     AceThread cthread,
99     long user_parm)
100     {
101         return addTimer((new Date()).getTime() + interval,
102         cthread, user_parm);
103     }
104     
105     public int startTimer(Date abs_time,
106     AceThread cthread,
107     long user_parm)
108     {
109         return addTimer(abs_time.getTime(), cthread, user_parm);
110     }
111     
112     public int startTimer(long interval,
113     long user_parm)
114     {
115         return addTimer((new Date()).getTime() + interval,
116         null, user_parm);
117     }
118     
119     public int startTimer(Date abs_time,
120     long user_parm)
121     {
122         return addTimer(abs_time.getTime(), null, user_parm);
123     }
124     
125     public boolean cancelTimer(int timer_id, AceThread cthread)
126     {
127         boolean ret = false;
128         synchronized (timerQueue)
129         {
130             ListIterator iter = timerQueue.listIterator(0);
131             
132             while (iter.hasNext() == true)
133             {
134                 AceTimerMessage msg = (AceTimerMessage)iter.next();
135                 
136                 if (msg.getTimerId() == timer_id)
137                 {
138                     ret = true;
139                     iter.remove();
140                     break;
141                 }
142             }
143             
144             if (ret == false) // not found in the timer queue
145
{
146                 // the message may already have been delivered to the thread,
147
// try to remove it from the queue
148
if (cthread == null)
149                 {
150                     Thread JavaDoc cur_thr = Thread.currentThread();
151                     
152                     if ((cur_thr instanceof AceThread) == false)
153                     {
154                         writeErrorMessage("Element not found");
155                         return ret;
156                     }
157                     
158                     cthread = (AceThread)cur_thr;
159                 }
160                 
161                 ret = cthread.removeMessage(new AceTimerMessage(0L, null, timer_id, 0L),
162                 this);
163                 
164                 if (ret == true)
165                 {
166                     System.out.println("AceTimer.cancelTimer() -- Request to cancel timer id "
167                     + timer_id
168                     + " received - the timer message was already delivered to the requesting thread but not read from its queue, removed the message from queue");
169                 }
170             }
171         }
172         
173         if (ret == false)
174         {
175             writeErrorMessage("Element not found");
176         }
177         
178         return ret;
179     }
180     
181     public boolean cancelTimer(int timer_id)
182     {
183         return cancelTimer(timer_id, null);
184     }
185     
186     public void cancelAllTimers(AceThread cthread)
187     {
188         if (cthread == null)
189         {
190             cthread = (AceThread)Thread.currentThread();
191         }
192         
193         synchronized (timerQueue)
194         {
195             ListIterator iter = timerQueue.listIterator(0);
196             
197             while (iter.hasNext() == true)
198             {
199                 AceTimerMessage msg = (AceTimerMessage)iter.next();
200                 
201                 if (msg.getRequestingThread() == cthread)
202                 {
203                     iter.remove();
204                 }
205             }
206         }
207     }
208     
209     public void cancelAllTimers()
210     {
211         cancelAllTimers(null);
212     }
213     
214     // Note: assumes that the timer with timer_id was started by the calling thread.
215
public AceMessageInterface waitTimer(int timerid)
216     {
217         Thread JavaDoc thr = Thread.currentThread();
218         
219         if ((thr instanceof AceThread) == false)
220         {
221             writeErrorMessage("This method is not being called from an object which is a sub-class of type AceThread");
222             return null;
223         }
224         
225         AceThread cthread = (AceThread)thr;
226         
227         while (true)
228         {
229             AceMessageInterface msg = cthread.waitMessage();
230             if ((msg instanceof AceTimerMessage) == true)
231             {
232                 if (((AceTimerMessage)msg).getTimerId() == timerid)
233                 {
234                     return msg;
235                 }
236             }
237             else if ((msg instanceof AceSignalMessage) == true)
238             {
239                 return msg;
240             }
241         }
242     }
243     
244     public void dispose()
245     {
246         killThread = true; // mark the thread as being killed
247
this.interrupt(); // interrupt the thread
248

249         super.dispose();
250     }
251     
252     public boolean same(AceMessageInterface obj1, AceMessageInterface obj2)
253     {
254         boolean ret = false;
255         
256         if (((obj1 instanceof AceTimerMessage) == true) &&
257         ((obj2 instanceof AceTimerMessage) == true))
258         {
259             if (((AceTimerMessage)obj1).getTimerId() == ((AceTimerMessage)obj2).getTimerId())
260             {
261                 ret = true;
262             }
263         }
264         
265         return ret;
266     }
267     
268     private int addTimer(long abs_time,
269     AceThread cthread,
270     long user_parm)
271     {
272         Thread JavaDoc calling_thread;
273         if (cthread != null)
274         {
275             calling_thread = cthread;
276         }
277         else
278         {
279             calling_thread = Thread.currentThread();
280         }
281         
282         // make sure that the thread to which the timer expiry message has to be delivered
283
// has the capacity to receive messages.
284
if ((calling_thread instanceof AceThread) == false)
285         {
286             writeErrorMessage("This method is not being called from an object which is a sub-class of type AceThread");
287             return -1;
288         }
289         
290         int timer_id = nextTimerId++;
291         AceTimerMessage msg = new AceTimerMessage(abs_time,
292         (AceThread)calling_thread,
293         timer_id,
294         user_parm);
295         
296         // insert the element in the array
297
if (insertElementInTimerQueue(msg) == true)
298         {
299             this.interrupt(); //interrupt the thread, so that it can re-adjust the time
300
}
301         else // failure
302
{
303             timer_id = -1;
304         }
305         return timer_id;
306     }
307     
308     
309     private boolean insertElementInTimerQueue(AceTimerMessage msg)
310     {
311         long exp_time = msg.getExpiryTime();
312         
313         synchronized (timerQueue)
314         {
315             ListIterator iter = timerQueue.listIterator(0);
316             
317             try
318             {
319                 while (true)
320                 {
321                     AceTimerMessage queued_msg = (AceTimerMessage)iter.next();
322                     
323                     if (exp_time <= queued_msg.getExpiryTime())
324                     {
325                         // element must be inserted in the previous position
326
try
327                         {
328                             Object JavaDoc prev = iter.previous();
329                             iter.add(msg);
330                             break;
331                         }
332                         catch (NoSuchElementException ex1) // first element
333
{
334                             timerQueue.addFirst(msg);
335                             break;
336                         }
337                     }
338                 }
339             }
340             catch (NoSuchElementException ex2)
341             {
342                 iter.add(msg);
343             }
344         }
345         
346         return true;
347     }
348     
349     
350     private LinkedList timerQueue = new LinkedList();
351     public boolean killThread = false;
352     public int nextTimerId = 0;
353     public static AceTimer instance = null;
354     
355     
356     // test program
357
public static void main(String JavaDoc[] args)
358     {
359         // inner classes
360
class MyAceWaitThread extends AceThread
361         {
362             public MyAceWaitThread() throws IOException
363             {
364                 super("WaitTestThread");
365             }
366             
367             public void run()
368             {
369                 // local classes
370
class MyEventMessage implements AceMessageInterface
371                 {
372                     public String JavaDoc messageType()
373                     {
374                         return new String JavaDoc("my_event");
375                     }
376                 }
377                 // end local classes
378

379                 System.out.println(getName() + "-- started");
380                 
381                 long time = 0;
382                 try
383                 {
384                     while (true)
385                     {
386                         // start a timer
387
++time;
388                         Date cur_time = new Date();
389                         System.out.println(getName()
390                         + "-- Starting a " + time + " seconds timer at "
391                         + cur_time
392                         + " (" + cur_time.getTime() + ")");
393                         int id = AceTimer.Instance().startTimer(time * 1000,
394                         time);
395                         if (id < 0)
396                         {
397                             System.err.println(getName()
398                             + "-- "
399                             + getErrorMessage());
400                             break;
401                         }
402                         
403                         // uncomment the portion below to test receiving other events and to test resumeWaitTimer()
404
// if (sendMessage (new MyEventMessage()) == false)
405
// {
406
// System.err.println (getName()
407
// + "-- Could not send my event message : "
408
// + getErrorMessage());
409
// }
410

411                         boolean res;
412                         AceMessageInterface msg;
413                         msg = AceTimer.Instance().waitTimer(id);
414                         
415                         if ((msg instanceof AceTimerMessage) == true)
416                         {
417                             cur_time = new Date();
418                             System.out.println(getName()
419                             + "-- Timer expired at "
420                             + cur_time
421                             + " (" + cur_time.getTime() + ")");
422                         }
423                         else if ((msg instanceof AceSignalMessage) == true)
424                         {
425                             AceSignalMessage signal = (AceSignalMessage)msg;
426                             
427                             System.out.println(getName()
428                             + "-- Signal of type "
429                             + signal.getSignalId()
430                             + " received");
431                             AceTimer.Instance().cancelAllTimers();
432                             return;
433                         }
434                         else // should not happen
435
{
436                             System.err.println(getName()
437                             + "-- "
438                             + "Unexpected message : "
439                             + msg.messageType()
440                             + " received");
441                             AceTimer.Instance().cancelAllTimers();
442                             return;
443                         }
444                     }
445                 }
446                 catch (IOException ex)
447                 {
448                     System.err.println(getName() + "-- "
449                     + ex.getMessage());
450                     return;
451                 }
452                 
453             }
454             
455         }
456         
457         class MyAceCancelThread extends AceThread
458         {
459             public MyAceCancelThread() throws IOException
460             {
461                 super("CancelTestThread");
462             }
463             
464             public void run()
465             {
466                 System.out.println(getName() + "-- started");
467                 
468                 try
469                 {
470                     while (true)
471                     {
472                         Date cur_time = new Date();
473                         System.out.println(getName()
474                         + "-- Starting a 1 second timer at "
475                         + cur_time
476                         + " (" + cur_time.getTime() + ")");
477                         int id = AceTimer.Instance().startTimer(1000,
478                         1);
479                         if (id < 0)
480                         {
481                             System.err.println(getErrorMessage());
482                             break;
483                         }
484                         
485                         cur_time = new Date();
486                         System.out.println(getName()
487                         + "-- Starting a 2 second timer at "
488                         + cur_time
489                         + " (" + cur_time.getTime() + ")");
490                         int id2 = AceTimer.Instance().startTimer(2000,
491                         2);
492                         if (id2 < 0)
493                         {
494                             System.err.println(getName()
495                             + "-- "
496                             + getErrorMessage());
497                             break;
498                         }
499                         
500                         // Uncomment the code below to test the case where a cancel is being
501
// called after the timer expiry message has been delivered to the
502
// calling thread's queue (but not read).
503
// try
504
// {
505
// sleep (5 * 1000);
506
// }
507
// catch (InterruptedException ex)
508
// {
509
// System.err.println (ex.getMessage());
510
// return;
511
// }
512

513                         System.out.println(getName() + "-- Cancelling timer "
514                         + id);
515                         if (AceTimer.Instance().cancelTimer(id) == false)
516                         {
517                             System.err.println(getErrorMessage());
518                             break;
519                         }
520                         
521                         AceMessageInterface message = waitMessage();
522                         if ((message instanceof AceTimerMessage) == true)
523                         {
524                             cur_time = new Date();
525                             System.out.println(getName()
526                             + "-- Timer expired at "
527                             + cur_time
528                             + " (" + cur_time.getTime() + ")"
529                             + " timer id : "
530                             + ((AceTimerMessage)message).getTimerId()
531                             + " user parm : "
532                             + ((AceTimerMessage)message).getUserSpecifiedParm());
533                         }
534                         else
535                         {
536                             System.out.println(getName()
537                             + "-- "
538                             + "Message of type "
539                             + message.messageType()
540                             + " received");
541                         }
542                         
543                     }
544                 }
545                 catch (IOException ex)
546                 {
547                     System.err.println(getName() + "-- "
548                     + ex.getMessage());
549                     return;
550                 }
551             }
552         }
553         
554         
555         class MyAceThread extends AceThread
556         {
557             public MyAceThread() throws IOException
558             {
559                 super();
560             }
561             
562             public void run()
563             {
564                 System.out.println(getName() + "-- MyAceThread started");
565                 long time = 0;
566                 
567                 try
568                 {
569                     while (true)
570                     {
571                         // start a timer
572
++time;
573                         Date cur_time = new Date();
574                         System.out.println(getName()
575                         + "-- Starting a " + time + " seconds timer at "
576                         + cur_time
577                         + " (" + cur_time.getTime() + ")");
578                         int id = AceTimer.Instance().startTimer(time * 1000,
579                         time);
580                         if (id < 0)
581                         {
582                             System.err.println(getName()
583                             + "-- "
584                             + getErrorMessage());
585                             break;
586                         }
587                         
588                         AceMessageInterface message = waitMessage();
589                         if ((message instanceof AceTimerMessage) == true)
590                         {
591                             cur_time = new Date();
592                             System.out.println(getName()
593                             + "-- Timer expired at "
594                             + cur_time
595                             + " (" + cur_time.getTime() + ")"
596                             + " timer id : "
597                             + ((AceTimerMessage)message).getTimerId()
598                             + " user parm : "
599                             + ((AceTimerMessage)message).getUserSpecifiedParm());
600                         }
601                         else
602                         {
603                             System.out.println("Message of type "
604                             + message.messageType()
605                             + " received");
606                         }
607                         
608                     }
609                     
610                 }
611                 catch (IOException ex)
612                 {
613                     System.err.println(getName() + "-- "
614                     + ex.getMessage());
615                     return;
616                 }
617             }
618         }
619         
620         // end inner classes
621

622         int num_threads = 10;
623         
624         if (args.length > 0)
625         {
626             try
627             {
628                 num_threads = Integer.parseInt(args[0]);
629             }
630             catch (NumberFormatException JavaDoc ex)
631             {
632                 System.err.println("Command line syntax error - the first argument should specify the number of threads to run");
633                 System.exit(1);
634             }
635         }
636         else
637         {
638             System.out.println("No argument specified, defaulting number of threads to "
639             + num_threads);
640         }
641         
642         boolean cancel_test = false;
643         boolean wait_test = false;
644         
645         if (args.length > 1)
646         {
647             if (args[1].equals("cancel") == true)
648             {
649                 cancel_test = true;
650             }
651             else if (args[1].equals("wait") == true)
652             {
653                 wait_test = true;
654             }
655             else
656             {
657                 System.err.println("The second argument must specify the keyword \"cancel\" to specify running cancel test");
658             }
659         }
660         
661         try
662         {
663             AceTimer.Instance().start(); // start the timer thread
664

665             for (int i = 0; i < num_threads; i++)
666             {
667                 MyAceThread thr = new MyAceThread();
668                 thr.start();
669                 
670                 try
671                 {
672                     sleep(1000);
673                 }
674                 catch (InterruptedException JavaDoc ex1)
675                 {
676                     System.err.println(ex1.getMessage());
677                     System.exit(1);
678                 }
679             }
680             
681             if (cancel_test == true)
682             {
683                 MyAceCancelThread thr = new MyAceCancelThread();
684                 thr.start();
685             }
686             
687             if (wait_test == true)
688             {
689                 MyAceWaitThread thr = new MyAceWaitThread();
690                 thr.start();
691             }
692         }
693         catch (IOException ex1)
694         {
695             System.err.println(ex1.getMessage());
696             System.exit(1);
697         }
698         
699     }
700     
701 }
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
Popular Tags