1 8 9 package foxtrot.workers; 10 11 import foxtrot.AbstractWorkerThread; 12 import foxtrot.Task; 13 14 20 public class SingleWorkerThread extends AbstractWorkerThread implements Runnable 21 { 22 private static int sequence = 0; 23 static final boolean debug = false; 24 25 private Thread thread; 26 private Link current; 27 private boolean pending; 28 29 public void start() 30 { 31 if (isAlive()) return; 32 if (debug) System.out.println("[SingleWorkerThread] Starting"); 33 34 stop(); 35 36 thread = new Thread (this, getThreadName()); 37 thread.setDaemon(true); 39 thread.start(); 40 } 41 42 45 protected String getThreadName() 46 { 47 return "Foxtrot Single Worker Thread #" + nextSequence(); 48 } 49 50 static synchronized int nextSequence() 51 { 52 return ++sequence; 53 } 54 55 60 protected void stop() 61 { 62 if (thread != null) 63 { 64 if (debug) System.out.println("[SingleWorkerThread] Ending " + thread); 65 thread.interrupt(); 66 } 67 } 68 69 public boolean isAlive() 70 { 71 if (thread == null) return false; 72 return thread.isAlive() && !isThreadInterrupted(); 73 } 74 75 public boolean isWorkerThread() 76 { 77 return Thread.currentThread() == thread; 78 } 79 80 84 public void postTask(Task t) 85 { 86 if (!isAlive()) start(); 90 91 synchronized (this) 94 { 95 if (hasTasks()) 96 { 97 if (debug) System.out.println("[SingleWorkerThread] Task queue not empty, enqueueing task:" + t); 98 99 Link item = current; 101 while (item.next != null) item = item.next; 102 item.next = new Link(t); 103 } 104 else 105 { 106 if (debug) System.out.println("[SingleWorkerThread] Task queue empty, adding task:" + t); 107 108 current = new Link(t); 110 notifyAll(); 111 } 112 } 113 } 114 115 120 protected Task takeTask() throws InterruptedException 121 { 122 synchronized (this) 125 { 126 while (!hasTasks()) 127 { 128 if (debug) System.out.println("[SingleWorkerThread] Task queue empty, waiting for tasks"); 129 pending = false; 130 wait(); 131 } 132 pending = true; 133 Task t = current.task; 135 current = current.next; 136 return t; 137 } 138 } 139 140 private boolean hasTasks() 141 { 142 synchronized (this) 143 { 144 return current != null; 145 } 146 } 147 148 boolean hasPendingTasks() 149 { 150 synchronized (this) 151 { 152 return pending; 153 } 154 } 155 156 160 protected boolean isThreadInterrupted() 161 { 162 return thread.isInterrupted(); 163 } 164 165 169 public void run() 170 { 171 if (debug) System.out.println("[SingleWorkerThread] Started " + thread); 172 173 while (!isThreadInterrupted()) 174 { 175 try 176 { 177 Task t = takeTask(); 178 if (debug) System.out.println("[SingleWorkerThread] Dequeued Task " + t); 179 run(t); 180 } 181 catch (InterruptedException x) 182 { 183 if (debug) System.out.println("[SingleWorkerThread] Interrupted " + thread); 184 Thread.currentThread().interrupt(); 185 break; 186 } 187 } 188 } 189 190 194 protected void run(Task task) 195 { 196 runTask(task); 197 } 198 199 private static class Link 200 { 201 private Link next; 202 private final Task task; 203 204 private Link(Task task) 205 { 206 this.task = task; 207 } 208 } 209 } 210 | Popular Tags |