KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > Jt > JtThread


1 package Jt;
2 import java.util.*;
3 import java.lang.reflect.*;
4 import java.beans.*;
5 import java.io.*;
6
7
8 /**
9  * Jt objects that inherit from this class execute in a separate/independent thread:
10  * Jt Messages are processed asynchronously using a separate thread.
11  * A queue of messages is required to accomplish this asynchronous behavior.
12  * When a Jt message is sent to this object (sendMessage),
13  * the message is automatically added to the message queue.
14  * The run method (separate thread) is contantly extracting the next message in
15  * the queue and calling processMessage() to process it.
16  */

17
18
19 public class JtThread extends JtObject implements Runnable JavaDoc {
20
21   // Object states
22

23   public static final int JtINACTIVE_STATE = 0; // Before start
24
public static final int JtACTIVE_STATE = 1; // After start
25
//public static final int JtSUSPENDED_STATE = 2;
26
public static final int JtSTOPPED_STATE = 3; // After stop event
27
public static final int JtIDLE_STATE = 4; // No messages in the queue
28

29
30   // Events
31

32   private static final int JtSTART = 21; // Start the object (start processing messages)
33
private static final int JtSTOP = 20; // Stop the object
34
private static final int JtEMPTY_QUEUE = 15; // Message queue is empty
35
private static final int JtNEW_MESSAGE = 16; // New message in the queue
36

37   //public static final int JtRESUME = 19;
38

39   //public static final int JtSUSPEND = 18;
40
//public static final int JtNOEVENT = 0;
41

42
43   private int state = JtINACTIVE_STATE; // Current state
44
private boolean daemon = true; // Daemon thread (default)
45
private int priority = Thread.NORM_PRIORITY;
46
47   private transient Thread JavaDoc thread = null; // Thread
48
JtQueue msgQueue = new JtQueue (); // Message queue
49

50
51   public JtThread() {
52   }
53
54 /**
55   * Returns the Thread instance.
56   */

57
58   public Thread JavaDoc getThread () {
59     return (thread);
60   }
61
62 /**
63   * Changes the Thread instance.
64   * @param thread thread
65   */

66
67   public void setThread (Thread JavaDoc thread) {
68     this.thread = thread; // check
69
}
70
71 /**
72   * Returns this object's state.
73   */

74
75   public int getState () {
76     return (state);
77   }
78
79 /**
80   * Changes this object's state.
81   */

82
83   public void setState (int state) {
84     this.state = state;
85   }
86
87 /**
88   * Returns the thread priority.
89   */

90
91   public int getPriority () {
92     if (thread != null)
93       priority = thread.getPriority ();
94     return (priority);
95   }
96
97
98 /**
99   * Changes the thread priority.
100   */

101
102   public void setPriority (int priority) {
103
104     if (thread != null) {
105       thread.setPriority (priority); // check
106
this.priority = thread.getPriority ();
107     } else
108       this.priority = priority;
109
110
111   }
112
113
114
115
116 /**
117   * Verifies if the object's thread is a daemon thread or a user thread.
118   */

119
120   public boolean getDaemon () {
121
122    if (thread != null) {
123       daemon = thread.isDaemon (); // check
124
}
125      
126     return (daemon);
127   }
128
129
130 /**
131   * Marks the object's thread as either a daemon thread or a user thread.
132   */

133
134
135   public void setdaemon (boolean daemon) {
136     
137     if (thread != null) {
138       try {
139        thread.setDaemon (daemon); // check
140
} catch (Exception JavaDoc e) {
141        handleException (e);
142       }
143       this.daemon = thread.isDaemon ();
144     } else
145       this.daemon = daemon;
146
147   }
148
149   // Activate object
150

151   void activate () {
152
153      thread = new Thread JavaDoc ((Runnable JavaDoc) this);
154
155      if (thread == null) {
156         handleError("JtThread.activate: unable to create new thread");
157         return;
158      }
159
160      handleTrace ("JtThread.activate: ..." + thread.getName());
161
162      try { // just in case
163

164
165         if (daemon)
166          thread.setDaemon (true);
167
168         thread.start ();
169
170         thread.setPriority (priority); // check
171
} catch (Exception JavaDoc e) {
172         //updateState (JtINACTIVE_STATE); check
173
handleException (e);
174      }
175
176   }
177
178
179
180   // sleep_for_awhile: sleep for a period of time
181

182   void sleep_for_awhile (long period) {
183
184     try {
185         Thread.sleep (period);
186     } catch (Exception JavaDoc e) {
187         handleException (e);
188     }
189
190   }
191   
192
193   // updateState
194

195
196   synchronized private void updateState (int state)
197   {
198     this.state = state;
199     handleTrace ("JtThread.updateState:" + stateName (state));
200   }
201
202
203   // State name
204

205   private String JavaDoc stateName (int state) {
206
207     switch (state) {
208       case JtINACTIVE_STATE:
209         return ("INACTIVE_STATE");
210       case JtACTIVE_STATE:
211         return ("ACTIVE_STATE");
212       //case JtSUSPENDED_STATE:
213
//return ("SUSPENDED_STATE");
214
case JtSTOPPED_STATE:
215         return ("STOPPED_STATE");
216       case JtIDLE_STATE:
217         return ("IDLE_STATE");
218       default:
219         return ("UNKNOWN");
220
221     }
222
223   }
224
225
226   // Event name
227

228   private String JavaDoc eventName (int event) {
229
230     switch (event) {
231       case JtSTOP:
232         return ("STOP");
233       //case JtRESUME:
234
//return ("RESUME");
235
//case JtSUSPEND:
236
//return ("SUSPEND");
237
case JtSTART:
238         return ("START");
239       case JtEMPTY_QUEUE:
240         return ("EMPTY_QUEUE");
241       case JtNEW_MESSAGE:
242         return ("NEW_MESSAGE");
243       case 0:
244         return ("NOEVENT");
245       default:
246         return ("UNKNOWN");
247
248     }
249
250   }
251
252   // Request state transition
253

254   synchronized private void requestStateTransition (int event) {
255
256     if (event < 0)
257       return;
258     
259     switch (state) {
260       case JtINACTIVE_STATE:
261         if (event == JtSTART || event == JtNEW_MESSAGE) {
262           updateState (JtACTIVE_STATE); // check
263
activate ();
264           break;
265         }
266         if (event == JtSTOP) {
267           updateState (JtSTOPPED_STATE);
268           break;
269         }
270         invalidTransition (state, event);
271         break;
272       case JtACTIVE_STATE:
273         if (event == JtEMPTY_QUEUE) {
274           updateState (JtIDLE_STATE);
275           suspendThread ();
276           break;
277         }
278         if (event == JtNEW_MESSAGE || event == JtSTART) {
279           break;
280         }
281         if (event == JtSTOP) {
282           updateState (JtSTOPPED_STATE);
283           break;
284         }
285         invalidTransition (state, event);
286         break;
287       case JtIDLE_STATE:
288         if (event == JtNEW_MESSAGE) {
289           updateState (JtACTIVE_STATE);
290           resumeThread ();
291           break;
292         }
293         if (event == JtSTOP) {
294           updateState (JtSTOPPED_STATE);
295           // resume the Thread
296
resumeThread ();
297           break;
298         }
299         invalidTransition (state, event);
300         break;
301        default:
302      }
303
304   }
305
306   // Invalid state transition
307

308   private void invalidTransition (int state, int event) {
309
310     handleError ("JtThread.invalidTransition: invalid state transition(state, event):"
311             + stateName(state)
312             + "," + eventName (event));
313
314   }
315
316
317   // Dequeue a message
318

319   synchronized private Object JavaDoc dequeueMessage () {
320
321     Object JavaDoc omsg;
322
323     if (msgQueue == null)
324       return (null); // this should never happen
325

326     if (msgQueue.getSize () == 0)
327       return (null);
328
329     omsg = msgQueue.processMessage (new JtMessage ("JtDEQUEUE"));
330
331     return (omsg);
332   }
333
334
335
336   // Check empty queue
337

338   synchronized private boolean checkEmptyQueue ()
339   {
340
341     if (msgQueue.getSize () == 0) {
342       handleTrace
343        ("JtThread.checkEmptyQueue:empty queue");
344
345
346       requestStateTransition (JtEMPTY_QUEUE);
347       return (true);
348     } else
349       return (false);
350    
351   }
352
353
354
355   // processQueue: process queue of messages
356

357   private void processQueue ()
358   {
359     int i;
360     Object JavaDoc omsg;
361     int size;
362
363
364
365     if (checkEmptyQueue ())
366       return;
367
368     //handleTrace
369
// ("JtThread.processQueue:queue size:" + msgQueue.getSize ());
370

371     size = msgQueue.getSize ();
372     for (i = 0; i < size; i++)
373     {
374       try { // just in case
375

376
377         if (state == JtSTOPPED_STATE) // Do I need to stop ?
378
break;
379
380         //processNextMessage ();
381
omsg = dequeueMessage ();
382         processMessage (omsg);
383
384       } catch (Exception JavaDoc e) {
385         handleException (e);
386       }
387     }
388   }
389
390
391   /**
392    * Extracts Jt messages from the queue and processes them via
393    * processMessage (). Returns when the state becomes JtSTOPPED_STATE (JtSTOP message).
394    */

395
396   public final void run () {
397
398     while (true) {
399
400       if (state == JtSTOPPED_STATE)
401         break;
402       
403       //sleep_for_awhile (2000L);
404

405       processQueue (); // Process queue of messages
406

407     }
408     handleTrace ("JtThread.run: object (thread) is stopping ..." + thread.getName());
409   }
410
411   // Thread management
412

413   private synchronized void suspendThread () {
414
415
416     handleTrace ("JtThread.suspendThread:suspending thread " + thread.getName());
417     try {
418       wait ();
419     } catch (Exception JavaDoc e) {
420       handleException (e);
421     }
422     handleTrace ("JtThread.suspendThread:resuming ... " + thread.getName());
423   }
424
425   private synchronized void resumeThread () {
426
427     handleTrace ("JtThread.resumeThread: ... " + thread.getName());
428     notify ();
429   }
430
431   synchronized private void checkInactiveState () {
432
433     if (state == JtINACTIVE_STATE)
434      requestStateTransition (JtNEW_MESSAGE);
435
436   }
437
438
439 /**
440  * Add a message to the queue for further processing.
441  * When a Jt message is sent to this object via sendMessage,
442  * the message is automatically added to the message queue.
443  * The run method (separate thread) is contantly extracting the next message in
444  * the queue and calling processMessage() to process it.
445  */

446
447
448   synchronized public Object JavaDoc enqueueMessage (Object JavaDoc msg)
449   {
450     JtMessage imsg, tmp;
451     String JavaDoc msgid = null;
452
453
454     if (msg == null)
455       return (null);
456             
457     imsg = (JtMessage) msg;
458     msgid = (String JavaDoc) imsg.getMsgId ();
459
460     // If the current state is inactive, activate the thread since
461
// a new message has been received
462

463     checkInactiveState ();
464
465     // JtSTART and JtSTOP should be processed right away (don't add these to the queue)
466

467     if (msgid.equals ("JtSTART") || msgid.equals ("JtSTOP")) {
468        processMessage (msg);
469        return (msg);
470     }
471
472     // enqueue the message
473

474     tmp = new JtMessage ("JtENQUEUE");
475     tmp.setMsgContent (msg);
476     msgQueue.processMessage (tmp);
477
478     requestStateTransition (JtNEW_MESSAGE);
479     return (msg); //check
480

481   }
482   /**
483     * Process object messages.
484     * <ul>
485     * <li> JtSTART - Starts the thread associated with this object.
486     * <li> JtSTOP - Stops the thread by resetting the state variable to JtSTOPPED_STATE.
487     * This causes the run method to return.
488     * <li> JtREMOVE - Performs any housekeeping needed before this object is removed.
489     * This includes stopping its execution thread.
490     * </ul>
491     * @param event Jt Message
492     */

493
494
495   public Object JavaDoc processMessage (Object JavaDoc event) {
496
497    String JavaDoc msgid = null;
498    JtMessage e = (JtMessage) event;
499
500      if (e == null)
501     return null;
502             
503
504      msgid = (String JavaDoc) e.getMsgId ();
505
506      if (msgid == null)
507        return null;
508
509      handleTrace ("JtThread.processMessage:" + msgid);
510
511      if (msgid.equals ("JtSTART")) {
512        requestStateTransition (JtSTART);
513        return (null);
514      }
515
516      if (msgid.equals ("JtSTOP") || msgid.equals ("JtREMOVE")) {
517        requestStateTransition (JtSTOP);
518        return (null);
519      }
520
521     
522      return (null);
523
524 /*
525      handleError ("JtThread.processMessage: invalid message id:" + msgid);
526      return (null);
527 */

528
529   }
530
531
532
533   static private char waitForInputKey () {
534     char c = ' ';
535
536       try {
537
538       c = (char) System.in.read ();
539       while (System.in.available () > 0)
540         System.in.read ();
541
542       } catch (Exception JavaDoc e) {
543         e.printStackTrace ();
544       }
545
546       return (c);
547   }
548
549   static private char readInputKey () {
550     char c = ' ';
551
552       try {
553
554       if (System.in.available () <= 0) {
555         return (' ');
556       }
557
558       c = (char) System.in.read ();
559       while (System.in.available () > 0)
560         System.in.read ();
561
562       } catch (Exception JavaDoc e) {
563         e.printStackTrace ();
564       }
565
566       return (c);
567   }
568
569
570
571   /**
572     * Unit tests the messages processed by JtThread and illustrates its use.
573     */

574
575   public static void main(String JavaDoc[] args) {
576
577     JtThread thread;
578     JtObject main;
579     char c = 0;
580     int flag = 0;
581     int i;
582
583     main = new JtObject ();
584     //main.setObjTrace (1);
585
thread = (JtThread) main.createObject ("Jt.JtThread", "thread");
586     main.setValue (thread, "daemon", "true");
587     main.setValue (thread, "priority", "" + Thread.MIN_PRIORITY);
588
589
590
591
592
593     System.out.print ("Press any Key to Start the object (thread) ...");
594
595     waitForInputKey ();
596
597     main.sendMessage ("thread", new JtMessage ("JtSTART"));
598
599     System.out.println ("Press X to stop the object. ");
600     System.out.println ("Press any other key to start/stop sending messages to the object ...");
601
602     
603     c = waitForInputKey ();
604     if (c != 'X') {
605     flag = 1;
606     for (i = 1; ;) {
607       String JavaDoc tmp = "JtTEST" + i;
608
609       if (flag == 1) {
610         i++;
611         // Send a test message to the object
612
main.sendMessage (thread, new JtMessage (tmp));
613         System.out.println (i);
614
615       } else
616         c = waitForInputKey ();
617
618       if (flag == 1)
619         c = readInputKey ( );
620       if (c != ' ') {
621         flag = 1 - flag; // Start/Stop sending messages
622
}
623
624       // Stop the object
625
if (c == 'X') {
626         break;
627
628       }
629
630     }
631     }
632
633      // Stop the object
634

635      main.sendMessage (thread, new JtMessage ("JtSTOP"));
636
637      System.out.println ("Press any key to exit");
638      waitForInputKey ();
639
640   }
641
642 }
643
644
645
Popular Tags