KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > mchange > v2 > async > CarefulRunnableQueue


1 /*
2  * Distributed as part of c3p0 v.0.9.1
3  *
4  * Copyright (C) 2005 Machinery For Change, Inc.
5  *
6  * Author: Steve Waldman <swaldman@mchange.com>
7  *
8  * This library is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License version 2.1, as
10  * published by the Free Software Foundation.
11  *
12  * This software is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this software; see the file LICENSE. If not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */

22
23
24 package com.mchange.v2.async;
25
26 import java.util.Collections JavaDoc;
27 import java.util.List JavaDoc;
28 import java.util.LinkedList JavaDoc;
29 import com.mchange.v2.log.MLevel;
30 import com.mchange.v2.log.MLog;
31 import com.mchange.v2.log.MLogger;
32 import com.mchange.v2.util.ResourceClosedException;
33
34 public class CarefulRunnableQueue implements RunnableQueue, Queuable, StrandedTaskReporting
35 {
36     private final static MLogger logger = MLog.getLogger( CarefulRunnableQueue.class );
37
38     private List JavaDoc taskList = new LinkedList JavaDoc();
39     private TaskThread t = new TaskThread();
40
41     private boolean shutdown_on_interrupt;
42
43     private boolean gentle_close_requested = false;
44
45     private List JavaDoc strandedTasks = null;
46
47     public CarefulRunnableQueue(boolean daemon, boolean shutdown_on_interrupt)
48     {
49     this.shutdown_on_interrupt = shutdown_on_interrupt;
50     t.setDaemon( daemon );
51     t.start();
52     }
53
54     public RunnableQueue asRunnableQueue()
55     { return this; }
56
57     public synchronized void postRunnable(Runnable JavaDoc r)
58     {
59     try
60         {
61         if (gentle_close_requested)
62             throw new ResourceClosedException("Attempted to post a task to a closing " +
63                               "CarefulRunnableQueue.");
64         
65         taskList.add(r);
66         this.notifyAll();
67         }
68     catch (NullPointerException JavaDoc e)
69         {
70         //e.printStackTrace();
71
if (Debug.DEBUG)
72             {
73             if ( logger.isLoggable( MLevel.FINE ) )
74                 logger.log( MLevel.FINE, "NullPointerException while posting Runnable.", e );
75             }
76         if (taskList == null)
77             throw new ResourceClosedException("Attempted to post a task to a CarefulRunnableQueue " +
78                               "which has been closed, or whose TaskThread has been " +
79                               "interrupted.");
80         else
81             throw e;
82         }
83     }
84
85     public synchronized void close( boolean skip_remaining_tasks )
86     {
87     if (skip_remaining_tasks)
88         {
89         t.safeStop();
90         t.interrupt();
91         }
92     else
93         gentle_close_requested = true;
94     }
95
96     public synchronized void close()
97     { this.close( true ); }
98
99     public synchronized List JavaDoc getStrandedTasks()
100     {
101     try
102         {
103         while (gentle_close_requested && taskList != null)
104             this.wait();
105         return strandedTasks;
106         }
107     catch (InterruptedException JavaDoc e)
108         {
109         // very, very rare I think...
110
// if necessary I'll try a more complex solution, but I don't think
111
// it's worth it.
112
//e.printStackTrace();
113
if ( logger.isLoggable( MLevel.WARNING ) )
114             logger.log( MLevel.WARNING,
115                 Thread.currentThread() + " interrupted while waiting for stranded tasks from CarefulRunnableQueue.",
116                 e );
117
118         throw new RuntimeException JavaDoc(Thread.currentThread() +
119                        " interrupted while waiting for stranded tasks from CarefulRunnableQueue.");
120         }
121     }
122
123     private synchronized Runnable JavaDoc dequeueRunnable()
124     {
125     Runnable JavaDoc r = (Runnable JavaDoc) taskList.get(0);
126     taskList.remove(0);
127     return r;
128     }
129
130     private synchronized void awaitTask() throws InterruptedException JavaDoc
131     {
132     while (taskList.size() == 0)
133         {
134         if ( gentle_close_requested )
135             {
136             t.safeStop(); // remember t == Thread.currentThread()
137
t.interrupt();
138             }
139         this.wait();
140         }
141     }
142
143     class TaskThread extends Thread JavaDoc
144     {
145     boolean should_stop = false;
146
147     TaskThread()
148     { super("CarefulRunnableQueue.TaskThread"); }
149
150     public synchronized void safeStop()
151     { should_stop = true; }
152
153     private synchronized boolean shouldStop()
154     { return should_stop; }
155
156     public void run()
157     {
158         try
159         {
160             while (! shouldStop() )
161             {
162                 try
163                 {
164                     awaitTask();
165                     Runnable JavaDoc r = dequeueRunnable();
166                     try
167                     { r.run(); }
168                     catch (Exception JavaDoc e)
169                     {
170                         //System.err.println(this.getClass().getName() + " -- Unexpected exception in task!");
171
//e.printStackTrace();
172

173                         if ( logger.isLoggable( MLevel.WARNING ) )
174                         logger.log(MLevel.WARNING, this.getClass().getName() + " -- Unexpected exception in task!", e);
175                     }
176                 }
177                 catch (InterruptedException JavaDoc e)
178                 {
179                     if (shutdown_on_interrupt)
180                     {
181                         CarefulRunnableQueue.this.close( false );
182 // if (Debug.DEBUG && Debug.TRACE >= Debug.TRACE_MED )
183
// System.err.println( this.toString() +
184
// " interrupted. Shutting down after current tasks" +
185
// " have completed." );
186
if ( logger.isLoggable( MLevel.INFO ) )
187                         logger.info(this.toString() +
188                                 " interrupted. Shutting down after current tasks" +
189                                 " have completed." );
190                     }
191                     else
192                     {
193 // if (Debug.DEBUG && Debug.TRACE >= Debug.TRACE_MED )
194
// System.err.println( this.toString() +
195
// " received interrupt. IGNORING." );
196
logger.info(this.toString() + " received interrupt. IGNORING." );
197                     }
198                 }
199             }
200         }
201 // catch (ThreadDeath td) //DEBUG ONLY -- remove soon, swaldman 08-Jun-2003
202
// {
203
// System.err.print("c3p0-TRAVIS: ");
204
// System.err.println(this.getName() + ": Some bastard used the deprecated stop() method to kill me!!!!");
205
// td.printStackTrace();
206
// throw td;
207
// }
208
// catch (Throwable t) //DEBUG ONLY -- remove soon, swaldman 08-Jun-2003
209
// {
210
// System.err.print("c3p0-TRAVIS: ");
211
// System.err.println(this.getName() + ": Some unexpected Throwable occurred and killed me!!!!");
212
// t.printStackTrace();
213
// if (t instanceof Error)
214
// throw (Error) t;
215
// else if (t instanceof RuntimeException)
216
// throw (RuntimeException) t;
217
// else
218
// throw new InternalError( t.toString() ); //we don't expect any checked Exceptions can happen here.
219
// }
220
finally
221         {
222             synchronized ( CarefulRunnableQueue.this )
223             {
224                 strandedTasks = Collections.unmodifiableList( taskList );
225                 taskList = null;
226                 t = null;
227                 CarefulRunnableQueue.this.notifyAll(); //if anyone is waiting for stranded tasks...
228
//System.err.print("c3p0-TRAVIS: ");
229
//System.err.println("TaskThread dead. strandedTasks: " + strandedTasks);
230
}
231         }
232     }
233     }
234 }
235
236
Popular Tags