KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mchange > v2 > async > ThreadPerTaskAsynchronousRunner


1 /*
2  * Distributed as part of c3p0 v.0.9.1
3  *
4  * Copyright (C) 2005 Machinery For Change, Inc.
5  *
6  * Author: Steve Waldman <swaldman@mchange.com>
7  *
8  * This library is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License version 2.1, as
10  * published by the Free Software Foundation.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this software; see the file LICENSE. If not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */

22
23
24 package com.mchange.v2.async;
25
26 import java.util.*;
27 import com.mchange.v2.log.*;
28 import com.mchange.v2.util.ResourceClosedException;
29
30 public class ThreadPerTaskAsynchronousRunner implements AsynchronousRunner
31 {
32     final static int PRESUME_DEADLOCKED_MULTIPLE = 3; //after three times the interrupt period, we presume deadlock
33

34     final static MLogger logger = MLog.getLogger( ThreadPerTaskAsynchronousRunner.class );
35
36     //MT: unchanged post-ctor
37
final int max_task_threads;
38     final long interrupt_task_delay;
39     
40     //MT: protected by this' lock
41
LinkedList queue = new LinkedList();
42     ArrayList running = new ArrayList(); //as a Collection -- duplicate-accepting-ness is important, order is not
43
ArrayList deadlockSnapshot = null;
44     boolean still_open = true;
45
46     //MT: thread-safe and not reassigned post-ctor
47
Thread JavaDoc dispatchThread = new DispatchThread();
48     Timer interruptAndDeadlockTimer;
49
50     public ThreadPerTaskAsynchronousRunner(int max_task_threads)
51     { this( max_task_threads, 0); }
52
53     public ThreadPerTaskAsynchronousRunner(int max_task_threads, long interrupt_task_delay)
54     {
55     this.max_task_threads = max_task_threads;
56     this.interrupt_task_delay = interrupt_task_delay;
57     if ( hasIdTimer() )
58         {
59         interruptAndDeadlockTimer = new Timer( true );
60         TimerTask deadlockChecker = new TimerTask()
61             {
62             public void run()
63             { checkForDeadlock(); }
64             };
65         long delay = interrupt_task_delay * PRESUME_DEADLOCKED_MULTIPLE;
66         interruptAndDeadlockTimer.schedule(deadlockChecker, delay, delay);
67         }
68
69     dispatchThread.start();
70     }
71
72     private boolean hasIdTimer()
73     { return (interrupt_task_delay > 0); }
74
75     public synchronized void postRunnable(Runnable JavaDoc r)
76     {
77     if ( still_open )
78         {
79         queue.add( r );
80         this.notifyAll();
81         }
82     else
83         throw new ResourceClosedException("Attempted to use a ThreadPerTaskAsynchronousRunner in a closed or broken state.");
84
85     }
86
87     public void close()
88     { close( true ); }
89
90     public synchronized void close( boolean skip_remaining_tasks )
91     {
92     if ( still_open )
93         {
94         this.still_open = false;
95         if (skip_remaining_tasks)
96             {
97             queue.clear();
98             for (Iterator ii = running.iterator(); ii.hasNext(); )
99                 ((Thread JavaDoc) ii.next()).interrupt();
100             closeThreadResources();
101             }
102         }
103     }
104
105     public synchronized int getRunningCount()
106     { return running.size(); }
107
108     public synchronized Collection getRunningTasks()
109     { return (Collection) running.clone(); }
110
111     public synchronized int getWaitingCount()
112     { return queue.size(); }
113
114     public synchronized Collection getWaitingTasks()
115     { return (Collection) queue.clone(); }
116
117     public synchronized boolean isClosed()
118     { return !still_open; }
119
120     public synchronized boolean isDoneAndGone()
121     { return (!dispatchThread.isAlive() && running.isEmpty() && interruptAndDeadlockTimer == null); }
122
123     private synchronized void acknowledgeComplete(TaskThread tt)
124     {
125     if (! tt.isCompleted())
126         {
127         running.remove( tt );
128         tt.markCompleted();
129         ThreadPerTaskAsynchronousRunner.this.notifyAll();
130         
131         if (! still_open && queue.isEmpty() && running.isEmpty())
132             closeThreadResources();
133         }
134     }
135
136     private synchronized void checkForDeadlock()
137     {
138     if (deadlockSnapshot == null)
139         {
140         if (running.size() == max_task_threads)
141             deadlockSnapshot = (ArrayList) running.clone();
142         }
143     else if (running.size() < max_task_threads)
144         deadlockSnapshot = null;
145     else if (deadlockSnapshot.equals( running )) //deadlock!
146
{
147         if (logger.isLoggable(MLevel.WARNING))
148             {
149             StringBuffer JavaDoc warningMsg = new StringBuffer JavaDoc(1024);
150             warningMsg.append("APPARENT DEADLOCK! (");
151             warningMsg.append( this );
152             warningMsg.append(") Deadlocked threads (unresponsive to interrupt()) are being set aside as hopeless and up to ");
153             warningMsg.append( max_task_threads );
154             warningMsg.append(" may now be spawned for new tasks. If tasks continue to deadlock, you may run out of memory. Deadlocked task list: ");
155             for (int i = 0, len = deadlockSnapshot.size(); i < len; ++i)
156                 {
157                 if (i != 0) warningMsg.append(", ");
158                 warningMsg.append( ((TaskThread) deadlockSnapshot.get(i)).getTask() );
159                 }
160             
161             logger.log(MLevel.WARNING, warningMsg.toString());
162             }
163
164         // note "complete" here means from the perspective of the async runner, as complete
165
// as it will ever be, since the task is presumed hopelessly hung
166
for (int i = 0, len = deadlockSnapshot.size(); i < len; ++i)
167             acknowledgeComplete( (TaskThread) deadlockSnapshot.get(i) );
168         deadlockSnapshot = null;
169         }
170     else
171         deadlockSnapshot = (ArrayList) running.clone();
172     }
173
174     private void closeThreadResources()
175     {
176     if (interruptAndDeadlockTimer != null)
177         {
178         interruptAndDeadlockTimer.cancel();
179         interruptAndDeadlockTimer = null;
180         }
181     dispatchThread.interrupt();
182     }
183
184     class DispatchThread extends Thread JavaDoc
185     {
186     DispatchThread()
187     { super( "Dispatch-Thread-for-" + ThreadPerTaskAsynchronousRunner.this ); }
188
189     public void run()
190     {
191         synchronized (ThreadPerTaskAsynchronousRunner.this)
192         {
193             try
194             {
195                 while (true)
196                 {
197                     while (queue.isEmpty() || running.size() == max_task_threads)
198                     ThreadPerTaskAsynchronousRunner.this.wait();
199                     
200                     Runnable JavaDoc next = (Runnable JavaDoc) queue.remove(0);
201                     TaskThread doer = new TaskThread( next );
202                     doer.start();
203                     running.add( doer );
204                 }
205             }
206             catch (InterruptedException JavaDoc e)
207             {
208                 if (still_open) //we're not closed...
209
{
210                     if ( logger.isLoggable( MLevel.WARNING ) )
211                     logger.log( MLevel.WARNING, this.getName() + " unexpectedly interrupted! Shutting down!" );
212                     close( false );
213                 }
214             }
215         }
216     }
217     }
218
219     class TaskThread extends Thread JavaDoc
220     {
221     //MT: post-ctor constant
222
Runnable JavaDoc r;
223
224     //MT: protected by this' lock
225
boolean completed = false;
226
227     TaskThread( Runnable JavaDoc r )
228     {
229         super( "Task-Thread-for-" + ThreadPerTaskAsynchronousRunner.this );
230         this.r = r;
231     }
232
233     Runnable JavaDoc getTask()
234     { return r; }
235
236     synchronized void markCompleted()
237     { completed = true; }
238
239     synchronized boolean isCompleted()
240     { return completed; }
241
242     public void run()
243     {
244         try
245         {
246             if (hasIdTimer())
247             {
248                 TimerTask interruptTask = new TimerTask()
249                 {
250                     public void run()
251                     { TaskThread.this.interrupt(); }
252                 };
253                 interruptAndDeadlockTimer.schedule( interruptTask , interrupt_task_delay );
254             }
255             r.run();
256         }
257         finally
258         { acknowledgeComplete( this ); }
259     }
260     }
261 }
262
Popular Tags