1 22 23 24 package com.mchange.v2.async; 25 26 import java.util.*; 27 import com.mchange.v2.log.*; 28 import com.mchange.v2.util.ResourceClosedException; 29 30 public class ThreadPerTaskAsynchronousRunner implements AsynchronousRunner 31 { 32 final static int PRESUME_DEADLOCKED_MULTIPLE = 3; 34 final static MLogger logger = MLog.getLogger( ThreadPerTaskAsynchronousRunner.class ); 35 36 final int max_task_threads; 38 final long interrupt_task_delay; 39 40 LinkedList queue = new LinkedList(); 42 ArrayList running = new ArrayList(); ArrayList deadlockSnapshot = null; 44 boolean still_open = true; 45 46 Thread dispatchThread = new DispatchThread(); 48 Timer interruptAndDeadlockTimer; 49 50 public ThreadPerTaskAsynchronousRunner(int max_task_threads) 51 { this( max_task_threads, 0); } 52 53 public ThreadPerTaskAsynchronousRunner(int max_task_threads, long interrupt_task_delay) 54 { 55 this.max_task_threads = max_task_threads; 56 this.interrupt_task_delay = interrupt_task_delay; 57 if ( hasIdTimer() ) 58 { 59 interruptAndDeadlockTimer = new Timer( true ); 60 TimerTask deadlockChecker = new TimerTask() 61 { 62 public void run() 63 { checkForDeadlock(); } 64 }; 65 long delay = interrupt_task_delay * PRESUME_DEADLOCKED_MULTIPLE; 66 interruptAndDeadlockTimer.schedule(deadlockChecker, delay, delay); 67 } 68 69 dispatchThread.start(); 70 } 71 72 private boolean hasIdTimer() 73 { return (interrupt_task_delay > 0); } 74 75 public synchronized void postRunnable(Runnable r) 76 { 77 if ( still_open ) 78 { 79 queue.add( r ); 80 this.notifyAll(); 81 } 82 else 83 throw new ResourceClosedException("Attempted to use a ThreadPerTaskAsynchronousRunner in a closed or broken state."); 84 85 } 86 87 public void close() 88 { close( true ); } 89 90 public synchronized void close( boolean skip_remaining_tasks ) 91 { 92 if ( still_open ) 93 { 94 this.still_open = false; 95 if (skip_remaining_tasks) 96 { 97 queue.clear(); 98 for (Iterator ii = running.iterator(); ii.hasNext(); ) 99 ((Thread ) ii.next()).interrupt(); 100 closeThreadResources(); 101 } 102 } 103 } 104 105 public synchronized int getRunningCount() 106 { return running.size(); } 107 108 public synchronized Collection getRunningTasks() 109 { return (Collection) running.clone(); } 110 111 public synchronized int getWaitingCount() 112 { return queue.size(); } 113 114 public synchronized Collection getWaitingTasks() 115 { return (Collection) queue.clone(); } 116 117 public synchronized boolean isClosed() 118 { return !still_open; } 119 120 public synchronized boolean isDoneAndGone() 121 { return (!dispatchThread.isAlive() && running.isEmpty() && interruptAndDeadlockTimer == null); } 122 123 private synchronized void acknowledgeComplete(TaskThread tt) 124 { 125 if (! tt.isCompleted()) 126 { 127 running.remove( tt ); 128 tt.markCompleted(); 129 ThreadPerTaskAsynchronousRunner.this.notifyAll(); 130 131 if (! still_open && queue.isEmpty() && running.isEmpty()) 132 closeThreadResources(); 133 } 134 } 135 136 private synchronized void checkForDeadlock() 137 { 138 if (deadlockSnapshot == null) 139 { 140 if (running.size() == max_task_threads) 141 deadlockSnapshot = (ArrayList) running.clone(); 142 } 143 else if (running.size() < max_task_threads) 144 deadlockSnapshot = null; 145 else if (deadlockSnapshot.equals( running )) { 147 if (logger.isLoggable(MLevel.WARNING)) 148 { 149 StringBuffer warningMsg = new StringBuffer (1024); 150 warningMsg.append("APPARENT DEADLOCK! ("); 151 warningMsg.append( this ); 152 warningMsg.append(") Deadlocked threads (unresponsive to interrupt()) are being set aside as hopeless and up to "); 153 warningMsg.append( max_task_threads ); 154 warningMsg.append(" may now be spawned for new tasks. If tasks continue to deadlock, you may run out of memory. Deadlocked task list: "); 155 for (int i = 0, len = deadlockSnapshot.size(); i < len; ++i) 156 { 157 if (i != 0) warningMsg.append(", "); 158 warningMsg.append( ((TaskThread) deadlockSnapshot.get(i)).getTask() ); 159 } 160 161 logger.log(MLevel.WARNING, warningMsg.toString()); 162 } 163 164 for (int i = 0, len = deadlockSnapshot.size(); i < len; ++i) 167 acknowledgeComplete( (TaskThread) deadlockSnapshot.get(i) ); 168 deadlockSnapshot = null; 169 } 170 else 171 deadlockSnapshot = (ArrayList) running.clone(); 172 } 173 174 private void closeThreadResources() 175 { 176 if (interruptAndDeadlockTimer != null) 177 { 178 interruptAndDeadlockTimer.cancel(); 179 interruptAndDeadlockTimer = null; 180 } 181 dispatchThread.interrupt(); 182 } 183 184 class DispatchThread extends Thread 185 { 186 DispatchThread() 187 { super( "Dispatch-Thread-for-" + ThreadPerTaskAsynchronousRunner.this ); } 188 189 public void run() 190 { 191 synchronized (ThreadPerTaskAsynchronousRunner.this) 192 { 193 try 194 { 195 while (true) 196 { 197 while (queue.isEmpty() || running.size() == max_task_threads) 198 ThreadPerTaskAsynchronousRunner.this.wait(); 199 200 Runnable next = (Runnable ) queue.remove(0); 201 TaskThread doer = new TaskThread( next ); 202 doer.start(); 203 running.add( doer ); 204 } 205 } 206 catch (InterruptedException e) 207 { 208 if (still_open) { 210 if ( logger.isLoggable( MLevel.WARNING ) ) 211 logger.log( MLevel.WARNING, this.getName() + " unexpectedly interrupted! Shutting down!" ); 212 close( false ); 213 } 214 } 215 } 216 } 217 } 218 219 class TaskThread extends Thread 220 { 221 Runnable r; 223 224 boolean completed = false; 226 227 TaskThread( Runnable r ) 228 { 229 super( "Task-Thread-for-" + ThreadPerTaskAsynchronousRunner.this ); 230 this.r = r; 231 } 232 233 Runnable getTask() 234 { return r; } 235 236 synchronized void markCompleted() 237 { completed = true; } 238 239 synchronized boolean isCompleted() 240 { return completed; } 241 242 public void run() 243 { 244 try 245 { 246 if (hasIdTimer()) 247 { 248 TimerTask interruptTask = new TimerTask() 249 { 250 public void run() 251 { TaskThread.this.interrupt(); } 252 }; 253 interruptAndDeadlockTimer.schedule( interruptTask , interrupt_task_delay ); 254 } 255 r.run(); 256 } 257 finally 258 { acknowledgeComplete( this ); } 259 } 260 } 261 } 262 | Popular Tags |