KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > quikj > server > framework > AceThread


1 package com.quikj.server.framework;
2
3 import java.io.*;
4 import java.util.*;
5
6 public class AceThread extends Thread JavaDoc
7 {
8     public AceThread()
9     throws IOException
10     {
11         super();
12         createMessageQueue();
13         aceThreadListId = getUniqueAceThreadId();
14         synchronized (aceThreadList)
15         {
16             aceThreadList.put(new Integer JavaDoc(aceThreadListId), this);
17         }
18     }
19     
20     public AceThread(String JavaDoc name)
21     throws IOException
22     {
23         super(name);
24         createMessageQueue();
25         aceThreadListId = getUniqueAceThreadId();
26         synchronized (aceThreadList)
27         {
28             aceThreadList.put(new Integer JavaDoc(aceThreadListId), this);
29         }
30     }
31     
32     public AceThread(ThreadGroup JavaDoc group, String JavaDoc name)
33     throws IOException
34     {
35         super(group, name);
36         createMessageQueue();
37         aceThreadListId = getUniqueAceThreadId();
38         synchronized (aceThreadList)
39         {
40             aceThreadList.put(new Integer JavaDoc(aceThreadListId), this);
41         }
42     }
43     
44     public AceThread(boolean nomsgq)
45     throws IOException
46     {
47         super();
48         
49         if (nomsgq == false)
50         {
51             createMessageQueue();
52         }
53         
54         aceThreadListId = getUniqueAceThreadId();
55         synchronized (aceThreadList)
56         {
57             aceThreadList.put(new Integer JavaDoc(aceThreadListId), this);
58         }
59     }
60     
61     
62     public AceThread(String JavaDoc name, boolean nomsgq)
63     throws IOException
64     {
65         super(name);
66         
67         if (nomsgq == false)
68         {
69             createMessageQueue();
70         }
71         
72         aceThreadListId = getUniqueAceThreadId();
73         synchronized (aceThreadList)
74         {
75             aceThreadList.put(new Integer JavaDoc(aceThreadListId), this);
76         }
77     }
78     
79     public AceThread(ThreadGroup JavaDoc group, String JavaDoc name, boolean nomsgq)
80     throws IOException
81     {
82         super(group, name);
83         
84         if (nomsgq == false)
85         {
86             createMessageQueue();
87         }
88         
89         aceThreadListId = getUniqueAceThreadId();
90         synchronized(aceThreadList)
91         {
92             aceThreadList.put(new Integer JavaDoc(aceThreadListId), this);
93         }
94     }
95     
96     public final boolean sendMessage(AceMessageInterface msg)
97     {
98         if (messageQueue == null)
99         {
100             writeErrorMessage("The thread has not enabled message queue");
101             return false;
102         }
103         
104         synchronized (messageQueue) //lock access
105
{
106             messageQueue.addLast(msg);
107             
108             // send a notification
109
messageQueue.notify();
110         }
111         
112         return true;
113     }
114     
115     public final boolean removeMessage(AceMessageInterface obj,
116     AceCompareMessageInterface comp)
117     {
118         boolean removed = false;
119         
120         if (messageQueue == null)
121         {
122             writeErrorMessage("The thread has not enabled message queue");
123             return false;
124         }
125         
126         synchronized (remLock)
127         {
128             synchronized (messageQueue)
129             {
130                 ListIterator iter = messageQueue.listIterator(0);
131                 
132                 while (iter.hasNext() == true)
133                 {
134                     AceMessageInterface msg = (AceMessageInterface)iter.next();
135                     if (comp.same(obj, msg) == true) // found
136
{
137                         // remove the message from the queue
138
iter.remove();
139                         removed = true;
140                     }
141                 }
142             }
143         }
144         
145         return removed;
146     }
147     
148     public final boolean flushMessages()
149     {
150         if (messageQueue == null)
151         {
152             writeErrorMessage("The thread has not enabled message queue");
153             return false;
154         }
155         
156         synchronized (remLock)
157         {
158             synchronized (messageQueue)
159             {
160                 messageQueue.clear();
161             }
162         }
163         return true;
164     }
165     
166     protected final AceMessageInterface waitMessage()
167     {
168         if (messageQueue == null)
169         {
170             writeErrorMessage("The thread has not enabled message queue");
171             return null;
172         }
173         
174         synchronized (remLock)
175         {
176             synchronized (messageQueue)
177             {
178                 // there is something in the queue
179
if (messageQueue.size() > 0)
180                 {
181                     try
182                     {
183                         return (AceMessageInterface)messageQueue.removeFirst();
184                     }
185                     catch (NoSuchElementException ex1)
186                     {
187                         writeErrorMessage(ex1.getMessage());
188                         return null;
189                     }
190                 }
191                 else //nothing in the message queue
192
{
193                     while (true)
194                     {
195                         try
196                         {
197                             messageQueue.wait();
198                             break;
199                         }
200                         catch (InterruptedException JavaDoc ex)
201                         {
202                             ; // ignore for the time-being
203
}
204                     }
205                     
206                     // we have been awakened
207
try
208                     {
209                         return (AceMessageInterface)messageQueue.removeFirst();
210                     }
211                     catch (NoSuchElementException ex1)
212                     {
213                         writeErrorMessage(ex1.getMessage());
214                         return null;
215                     }
216                 }
217             }
218         }
219     }
220     
221     public final boolean interruptWait(int sig_id, String JavaDoc message)
222     {
223         // send a signal message
224
return sendMessage(new AceSignalMessage(sig_id, message));
225     }
226     
227     public final boolean interruptWait(int sig_id)
228     {
229         return interruptWait(sig_id, "");
230     }
231     
232     public void dispose()
233     {
234         
235         if (messageQueue != null)
236         {
237             messageQueue.clear();
238             messageQueue = null;
239         }
240         
241         synchronized (aceThreadList)
242         {
243             aceThreadList.remove(new Integer JavaDoc(aceThreadListId));
244         }
245         
246     }
247     
248     public final String JavaDoc getErrorMessage()
249     {
250         synchronized (errorLock)
251         {
252             return new String JavaDoc(errorMessage);
253         }
254     }
255     
256     protected void writeErrorMessage(String JavaDoc error)
257     {
258         Thread JavaDoc cthread = Thread.currentThread();
259         
260         if ((cthread instanceof AceThread) == true)
261         {
262             ((AceThread)cthread).dispatchErrorMessage(error);
263         }
264         else
265         {
266             System.err.println(error);
267         }
268     }
269     
270     protected void dispatchErrorMessage(String JavaDoc error)
271     {
272         synchronized (errorLock)
273         {
274             errorMessage = new String JavaDoc(error);
275         }
276     }
277     
278     protected final void setOperationContext(AceOperationContextInterface context)
279     {
280         operationContext = context;
281     }
282     
283     protected final AceOperationContextInterface getOperationContext()
284     {
285         return operationContext;
286     }
287     
288     public static AceThread getAceThreadObject(int code)
289     {
290         synchronized (aceThreadList)
291         {
292             return (AceThread)aceThreadList.get(new Integer JavaDoc(code));
293         }
294     }
295     
296     protected final int getAceThreadId()
297     {
298         return aceThreadListId;
299     }
300     
301     public static int getUniqueAceThreadId()
302     {
303         do
304         {
305             int random_num = randomNumberGenerator.nextInt();
306             if (random_num <= 0) continue;
307             
308             synchronized (aceThreadList)
309             {
310                 if (aceThreadList.get(new Integer JavaDoc(random_num)) == null) // not found
311
{
312                     return random_num;
313                 }
314             }
315         }
316         while (true);
317     }
318     
319     private void createMessageQueue()
320     throws IOException
321     {
322         messageQueue = new LinkedList();
323     }
324     
325     private LinkedList messageQueue = null;
326     private Object JavaDoc errorLock = new Object JavaDoc();
327     private String JavaDoc errorMessage = "";
328     private AceOperationContextInterface operationContext = null;
329     private int aceThreadListId;
330     
331     // The remLock takes care of a rare situation when a different thread is removing an element from
332
// the queue while this thread is waiting for a message. If this lock is not used, there could
333
// be a race condition causing errors.
334
private Object JavaDoc remLock = new Object JavaDoc();
335     
336     
337     private static WeakHashMap aceThreadList = new WeakHashMap();
338     private static Random randomNumberGenerator = new Random((new Date()).getTime());
339     
340     // test program
341
public static void main(String JavaDoc[] args)
342     {
343         class MyMessage implements AceMessageInterface
344         {
345             public MyMessage(String JavaDoc message)
346             {
347                 this.message = message;
348             }
349             
350             public String JavaDoc messageType()
351             {
352                 return new String JavaDoc(message);
353             }
354             
355             private String JavaDoc message;
356         }
357         
358         
359         class MyAceThreadClass extends AceThread
360         {
361             public MyAceThreadClass(String JavaDoc name) throws IOException
362             {
363                 super(name);
364             }
365             
366             public void run()
367             {
368                 System.out.println(getName() + " started.");
369                 while (true)
370                 {
371                     AceMessageInterface message = this.waitMessage();
372                     if (message == null)
373                     {
374                         System.err.println(getName() + ": Null message received, killing thread");
375                         this.dispose();
376                         break;
377                     }
378                     
379                     if (message.messageType().equals("kill") == true)
380                     {
381                         System.out.println(getName() + ": kill received, killing thread");
382                         this.dispose();
383                         break;
384                     }
385                     
386                     System.out.println(getName() + ": " + message.messageType() + " received");
387                     
388                 }
389             }
390         }
391         
392         try
393         {
394             BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
395             
396             int num_threads = 10;
397             
398             if (args.length > 0)
399             {
400                 try
401                 {
402                     num_threads = Integer.parseInt(args[0]);
403                 }
404                 catch (NumberFormatException JavaDoc ex)
405                 {
406                     System.err.println("Command line syntax error - the argument should specify the number of threads to run");
407                     System.exit(1);
408                 }
409             }
410             else
411             {
412                 System.out.println("No argument specified, defaulting number of threads to "
413                 + num_threads);
414             }
415             
416             
417             MyAceThreadClass[] threads = new MyAceThreadClass [num_threads];
418             for (int i = 0; i < num_threads; i++)
419             {
420                 threads[i] = new MyAceThreadClass("AceThread-" + i);
421                 
422                 threads[i].start(); // run the thread
423
}
424             
425             String JavaDoc syntax = new String JavaDoc("Invalid syntax! Correct syntax : <thread number> <message> <number of times>");
426             while (true)
427             {
428                 System.out.print("Input> ");
429                 System.out.flush();
430                 
431                 String JavaDoc line = reader.readLine().trim();
432                 if (line.length() > 0)
433                 {
434                     StringTokenizer strtok = new StringTokenizer(line, " ");
435                     if (strtok.countTokens() >= 3)
436                     {
437                         int num_times;
438                         int thread_index;
439                         String JavaDoc message;
440                         
441                         try
442                         {
443                             thread_index = Integer.parseInt(strtok.nextToken());
444                             message = new String JavaDoc(strtok.nextToken());
445                             num_times = Integer.parseInt(strtok.nextToken());
446                             
447                             if (thread_index >= num_threads)
448                             {
449                                 System.err.println("That thread is not running");
450                                 continue;
451                             }
452                             
453                         }
454                         catch (NumberFormatException JavaDoc ex3)
455                         {
456                             System.err.println(syntax);
457                             continue;
458                         }
459                         
460                         for (int i = 0; i < num_times; i++)
461                         {
462                             if (threads[thread_index].sendMessage(new MyMessage(message)) == false)
463                             {
464                                 System.err.println("Error occured while sending message");
465                                 break;
466                             }
467                         }
468                     }
469                     else if (strtok.countTokens() == 1)
470                     {
471                         if (strtok.nextToken().equals("quit") == true)
472                         {
473                             for (int i = 0; i < num_threads; i++)
474                             {
475                                 if (threads[i].sendMessage(new MyMessage("kill")) == false)
476                                 {
477                                     System.err.println("Error occured while sending message");
478                                 }
479                             }
480                             System.exit(0);
481                         }
482                         else
483                         {
484                             System.err.println(syntax);
485                         }
486                     }
487                     else
488                     {
489                         System.err.println(syntax);
490                     }
491                 }
492             }
493         }
494         catch (IOException ex)
495         {
496             System.err.println(ex.getMessage());
497             System.exit(1);
498         }
499     }
500 }
501
502
503
504
505
506
507
Popular Tags