1 8 package org.ozoneDB.core.storage.gammaStore; 9 10 import java.io.ByteArrayOutputStream ; 11 import java.io.PrintStream ; 12 import java.util.LinkedHashMap ; 13 import java.util.Map ; 14 import java.util.logging.Level ; 15 import java.util.logging.Logger ; 16 17 32 public class AsyncExec { 33 34 private static final Logger log = Logger.getLogger(AsyncExec.class.getName()); 35 36 private static final int NOT_STARTED = 0; 37 private static final int RUNNING = 1; 38 private static final int STOPPING = 2; 39 private static final int STOPPED = 3; 40 private LinkedHashMap _map = new LinkedHashMap (); 41 private volatile int status = NOT_STARTED; 42 43 47 private volatile Object currentKey = null; 48 private ProcessorThread thread; 49 50 public AsyncExec(String threadName, int priority, boolean useDaemonThread) { 51 thread = new ProcessorThread(threadName); 52 thread.setPriority(priority); 53 thread.setDaemon(useDaemonThread); 54 thread.start(); 55 synchronized (this) { 56 while (getStatus() < RUNNING) { 57 try { 58 wait(1000); 59 } catch (InterruptedException ignore) { 60 } 61 } 62 } 63 } 64 65 75 public synchronized Runnable put(Object key, Runnable task) { 76 77 remove(key); 79 80 checkStatus(); 81 Runnable result = (Runnable ) getMap().put(key, task); 82 notifyAll(); 83 return result; 84 } 85 86 89 private void checkStatus() { 90 switch (getStatus()) { 91 case NOT_STARTED: 92 throw new IllegalStateException ("not started"); 93 case RUNNING: 95 break; 96 case STOPPING: 97 throw new IllegalStateException ("stopping"); 98 case STOPPED: 99 throw new IllegalStateException ("stopped"); 100 default: 102 throw new IllegalStateException ("It's worse than that: he's dead Jim."); 103 } 104 } 105 106 116 public synchronized Runnable get(Object key) { 117 checkStatus(); 118 while (key.equals(getCurrentKey())) { 119 try { 120 wait(); 121 } catch (InterruptedException ignore) { 122 } 123 } 124 return (Runnable ) getMap().get(key); 125 } 126 127 133 public synchronized Runnable remove(Object key) { 134 if (key == null) { 135 throw new IllegalArgumentException ("key must not be null"); 136 } 137 while (key.equals(getCurrentKey())) { 138 if (log.isLoggable(Level.FINE)) log.fine(this + ": key is currentKey: " + getCurrentKey()); 139 try { 140 wait(); 141 } catch (InterruptedException ignore) { 142 } 143 } 144 return (Runnable ) getMap().remove(key); 145 } 146 147 154 public synchronized int size() { 155 return getMap().size(); 156 } 157 158 162 public void stopWhenReady() { 163 synchronized (this) { 164 checkStatus(); 165 setStatus(STOPPING); 166 notifyAll(); 167 } 168 for (boolean interrupted = true; interrupted; ) { 169 try { 170 thread.join(); 171 interrupted = false; 172 } catch (InterruptedException ignore) { 173 } 174 } 175 } 176 177 private void setStatus(int status) { 178 this.status = status; 179 } 180 181 private int getStatus() { 182 return status; 183 } 184 185 private Object getCurrentKey() { 186 return currentKey; 187 } 188 189 private void setCurrentKey(Object currentKey) { 190 this.currentKey = currentKey; 191 } 192 193 private Map getMap() { 194 return _map; 195 } 196 197 200 private class ProcessorThread extends Thread { 201 202 public ProcessorThread(String name) { 203 super(name); 204 } 205 206 public void run() { 207 synchronized (AsyncExec.this) { 208 setStatus(RUNNING); 209 AsyncExec.this.notifyAll(); 210 } 211 if (log.isLoggable(Level.FINE)) log.fine(this + " started"); 212 for (;;) { 213 Runnable task; 214 synchronized (AsyncExec.this) { 215 216 if (log.isLoggable(Level.FINER)) log.finer("key has finished: " + getCurrentKey()); 219 getMap().remove(getCurrentKey()); 220 setCurrentKey(null); 221 AsyncExec.this.notifyAll(); 222 while (getMap().size() == 0) { 223 if (getStatus() == STOPPING) { 224 setStatus(STOPPED); 225 if (log.isLoggable(Level.FINE)) log.fine(this + " almost stopped"); 226 return; 227 } 228 try { 229 AsyncExec.this.notifyAll(); 231 AsyncExec.this.wait(); 232 } catch (InterruptedException ignore) { 233 } 234 } 235 Map.Entry entry = (Map.Entry ) getMap().entrySet().iterator().next(); 236 setCurrentKey(entry.getKey()); 237 task = (Runnable ) entry.getValue(); 238 } 239 try { 240 if (log.isLoggable(Level.FINER)) log.finer("running task with key: " + getCurrentKey()); 241 task.run(); 242 } catch (Throwable t) { 243 ByteArrayOutputStream buf = new ByteArrayOutputStream (); 244 PrintStream printStream = new PrintStream (buf); 245 t.printStackTrace(printStream); 246 printStream.close(); 247 log.severe(this + " has caught unhandled Throwable." 248 + " Thread will NOT stop, Stacktrace will follow" 249 + " both on stdout and here.\n" + buf.toString()); 250 t.printStackTrace(); 251 } 252 } 253 } 254 255 } 256 257 } | Popular Tags |