1 45 package org.exolab.jms.common.threads; 46 47 import java.util.LinkedList ; 48 49 import org.apache.commons.logging.Log; 50 import org.apache.commons.logging.LogFactory; 51 52 53 63 class QueueWorker { 64 65 68 private ThreadPool _pool; 69 70 73 private LinkedList _queue = new LinkedList (); 74 75 78 private Thread _worker; 79 80 83 private volatile boolean _stop = false; 84 85 88 private static final Log _log = LogFactory.getLog(QueueWorker.class); 89 90 91 100 public QueueWorker(ThreadPool pool, ThreadGroup group, String name) { 101 _pool = pool; 102 103 Runnable worker = new Runnable () { 104 public void run() { 105 try { 106 runWork(); 107 } catch (Exception exception) { 108 _log.error("Thread " + _worker.getName() 109 + ": terminating on exception", exception); 110 } 111 } 112 }; 113 114 _worker = new Thread (group, worker, name); 116 _worker.setDaemon(true); 117 _worker.start(); 118 } 119 120 127 public void add(Runnable target, CompletionListener listener) { 128 synchronized (_queue) { 129 _queue.add(new Executor(target, listener)); 130 _queue.notify(); 131 } 132 } 133 134 137 public void stop() { 138 _stop = true; 139 _worker.interrupt(); 140 } 141 142 147 public boolean isAlive() { 148 return _worker.isAlive(); 149 } 150 151 155 private void runWork() { 156 while (!_stop) { 157 Runnable target = null; 158 synchronized (_queue) { 159 if (_queue.isEmpty()) { 160 try { 161 _queue.wait(); 162 } catch (InterruptedException ignore) { 163 } 164 } else { 165 target = (Executor) _queue.removeFirst(); 166 } 167 } 168 169 if (!_stop && target != null) { 170 try { 171 _pool.execute(target); 172 } catch (InterruptedException ignore) { 173 } 174 } 175 } 176 } 177 178 181 private static class Executor implements Runnable { 182 183 186 private Runnable _target; 187 188 191 private CompletionListener _listener; 192 193 200 public Executor(Runnable target, CompletionListener listener) { 201 _target = target; 202 _listener = listener; 203 } 204 205 208 public void run() { 209 try { 210 _target.run(); 211 } catch (Throwable exception) { 212 _log.error("Thread " + Thread.currentThread().getName() 213 + " - uncaught exception fell through from run", 214 exception); 215 } finally { 216 if (_listener != null) { 217 _listener.completed(_target); 218 } 219 } 220 } 221 } 222 223 } 224 | Popular Tags |