1 22 23 24 package com.mchange.v2.async; 25 26 import java.util.Collections ; 27 import java.util.List ; 28 import java.util.LinkedList ; 29 import com.mchange.v2.log.MLevel; 30 import com.mchange.v2.log.MLog; 31 import com.mchange.v2.log.MLogger; 32 import com.mchange.v2.util.ResourceClosedException; 33 34 public class CarefulRunnableQueue implements RunnableQueue, Queuable, StrandedTaskReporting 35 { 36 private final static MLogger logger = MLog.getLogger( CarefulRunnableQueue.class ); 37 38 private List taskList = new LinkedList (); 39 private TaskThread t = new TaskThread(); 40 41 private boolean shutdown_on_interrupt; 42 43 private boolean gentle_close_requested = false; 44 45 private List strandedTasks = null; 46 47 public CarefulRunnableQueue(boolean daemon, boolean shutdown_on_interrupt) 48 { 49 this.shutdown_on_interrupt = shutdown_on_interrupt; 50 t.setDaemon( daemon ); 51 t.start(); 52 } 53 54 public RunnableQueue asRunnableQueue() 55 { return this; } 56 57 public synchronized void postRunnable(Runnable r) 58 { 59 try 60 { 61 if (gentle_close_requested) 62 throw new ResourceClosedException("Attempted to post a task to a closing " + 63 "CarefulRunnableQueue."); 64 65 taskList.add(r); 66 this.notifyAll(); 67 } 68 catch (NullPointerException e) 69 { 70 if (Debug.DEBUG) 72 { 73 if ( logger.isLoggable( MLevel.FINE ) ) 74 logger.log( MLevel.FINE, "NullPointerException while posting Runnable.", e ); 75 } 76 if (taskList == null) 77 throw new ResourceClosedException("Attempted to post a task to a CarefulRunnableQueue " + 78 "which has been closed, or whose TaskThread has been " + 79 "interrupted."); 80 else 81 throw e; 82 } 83 } 84 85 public synchronized void close( boolean skip_remaining_tasks ) 86 { 87 if (skip_remaining_tasks) 88 { 89 t.safeStop(); 90 t.interrupt(); 91 } 92 else 93 gentle_close_requested = true; 94 } 95 96 public synchronized void close() 97 { this.close( true ); } 98 99 public synchronized List getStrandedTasks() 100 { 101 try 102 { 103 while (gentle_close_requested && taskList != null) 104 this.wait(); 105 return strandedTasks; 106 } 107 catch (InterruptedException e) 108 { 109 if ( logger.isLoggable( MLevel.WARNING ) ) 114 logger.log( MLevel.WARNING, 115 Thread.currentThread() + " interrupted while waiting for stranded tasks from CarefulRunnableQueue.", 116 e ); 117 118 throw new RuntimeException (Thread.currentThread() + 119 " interrupted while waiting for stranded tasks from CarefulRunnableQueue."); 120 } 121 } 122 123 private synchronized Runnable dequeueRunnable() 124 { 125 Runnable r = (Runnable ) taskList.get(0); 126 taskList.remove(0); 127 return r; 128 } 129 130 private synchronized void awaitTask() throws InterruptedException 131 { 132 while (taskList.size() == 0) 133 { 134 if ( gentle_close_requested ) 135 { 136 t.safeStop(); t.interrupt(); 138 } 139 this.wait(); 140 } 141 } 142 143 class TaskThread extends Thread 144 { 145 boolean should_stop = false; 146 147 TaskThread() 148 { super("CarefulRunnableQueue.TaskThread"); } 149 150 public synchronized void safeStop() 151 { should_stop = true; } 152 153 private synchronized boolean shouldStop() 154 { return should_stop; } 155 156 public void run() 157 { 158 try 159 { 160 while (! shouldStop() ) 161 { 162 try 163 { 164 awaitTask(); 165 Runnable r = dequeueRunnable(); 166 try 167 { r.run(); } 168 catch (Exception e) 169 { 170 173 if ( logger.isLoggable( MLevel.WARNING ) ) 174 logger.log(MLevel.WARNING, this.getClass().getName() + " -- Unexpected exception in task!", e); 175 } 176 } 177 catch (InterruptedException e) 178 { 179 if (shutdown_on_interrupt) 180 { 181 CarefulRunnableQueue.this.close( false ); 182 if ( logger.isLoggable( MLevel.INFO ) ) 187 logger.info(this.toString() + 188 " interrupted. Shutting down after current tasks" + 189 " have completed." ); 190 } 191 else 192 { 193 logger.info(this.toString() + " received interrupt. IGNORING." ); 197 } 198 } 199 } 200 } 201 finally 221 { 222 synchronized ( CarefulRunnableQueue.this ) 223 { 224 strandedTasks = Collections.unmodifiableList( taskList ); 225 taskList = null; 226 t = null; 227 CarefulRunnableQueue.this.notifyAll(); } 231 } 232 } 233 } 234 } 235 236 | Popular Tags |