1 8 9 package com.sleepycat.je.utilint; 10 11 import java.util.Collection ; 12 import java.util.HashSet ; 13 import java.util.Set ; 14 15 import com.sleepycat.je.DbInternal; 16 import com.sleepycat.je.DatabaseException; 17 import com.sleepycat.je.DeadlockException; 18 import com.sleepycat.je.ExceptionListener; 19 import com.sleepycat.je.dbi.EnvironmentImpl; 20 import com.sleepycat.je.latch.Latch; 21 import com.sleepycat.je.latch.LatchSupport; 22 23 26 public abstract class DaemonThread implements DaemonRunner, Runnable { 27 private static final int JOIN_MILLIS = 10; 28 private long waitTime; 29 private Object synchronizer = new Object (); 30 private Thread thread; 31 private EnvironmentImpl env; 32 protected String name; 33 protected Set workQueue; 34 protected Latch workQueueLatch; 35 protected int nWakeupRequests; 36 protected boolean stifleExceptionChatter = false; 37 38 39 private volatile boolean shutdownRequest = false; 40 private volatile boolean paused = false; 41 42 43 private boolean running = false; 44 45 public DaemonThread(long waitTime, String name, EnvironmentImpl env) { 46 this.waitTime = waitTime; 47 this.name = name; 48 this.env = env; 49 workQueue = new HashSet (); 50 workQueueLatch = LatchSupport.makeLatch(name + " work queue", env); 51 } 52 53 56 public Thread getThread() { 57 return thread; 58 } 59 60 65 public void runOrPause(boolean run) { 66 if (run) { 67 paused = false; 68 if (thread != null) { 69 wakeup(); 70 } else { 71 thread = new Thread (this, name); 72 thread.setDaemon(true); 73 thread.start(); 74 } 75 } else { 76 paused = true; 77 } 78 } 79 80 public void requestShutdown() { 81 shutdownRequest = true; 82 } 83 84 87 public void shutdown() { 88 if (thread != null) { 89 shutdownRequest = true; 90 while (thread.isAlive()) { 91 synchronized (synchronizer) { 92 synchronizer.notifyAll(); 93 } 94 try { 95 thread.join(JOIN_MILLIS); 96 } catch (InterruptedException e) { 97 98 102 } 103 } 104 thread = null; 105 } 106 } 107 108 public String toString() { 109 StringBuffer sb = new StringBuffer (); 110 sb.append("<DaemonThread name=\"").append(name).append("\"/>"); 111 return sb.toString(); 112 } 113 114 public void addToQueue(Object o) 115 throws DatabaseException { 116 117 workQueueLatch.acquire(); 118 workQueue.add(o); 119 wakeup(); 120 workQueueLatch.release(); 121 } 122 123 public int getQueueSize() 124 throws DatabaseException { 125 126 workQueueLatch.acquire(); 127 int count = workQueue.size(); 128 workQueueLatch.release(); 129 return count; 130 } 131 132 136 public void addToQueueAlreadyLatched(Collection c) 137 throws DatabaseException { 138 139 workQueue.addAll(c); 140 } 141 142 public void wakeup() { 143 if (!paused) { 144 synchronized (synchronizer) { 145 synchronizer.notifyAll(); 146 } 147 } 148 } 149 150 public void run() { 151 while (true) { 152 153 if (shutdownRequest) { 154 break; 155 } 156 try { 157 workQueueLatch.acquire(); 158 boolean nothingToDo = workQueue.size() == 0; 159 workQueueLatch.release(); 160 if (nothingToDo) { 161 synchronized (synchronizer) { 162 if (waitTime == 0) { 163 synchronizer.wait(); 164 } else { 165 synchronizer.wait(waitTime); 166 } 167 } 168 } 169 170 171 if (shutdownRequest) { 172 break; 173 } 174 175 176 if (paused) { 177 synchronized (synchronizer) { 178 179 synchronizer.wait(); 180 } 181 continue; 182 } 183 184 int numTries = 0; 185 int maxRetries = nDeadlockRetries(); 186 187 do { 188 try { 189 nWakeupRequests++; 190 running = true; 191 onWakeup(); 192 break; 193 } catch (DeadlockException e) { 194 } finally { 195 running = false; 196 } 197 numTries++; 198 199 200 if (shutdownRequest) { 201 break; 202 } 203 204 } while (numTries <= maxRetries); 205 206 207 if (shutdownRequest) { 208 break; 209 } 210 } catch (InterruptedException IE) { 211 ExceptionListener exceptionListener = 212 env.getExceptionListener(); 213 if (exceptionListener != null) { 214 exceptionListener.exceptionThrown 215 (DbInternal.makeExceptionEvent(IE, name)); 216 } 217 218 if (!stifleExceptionChatter) { 219 System.err.println 220 ("Shutting down " + this + " due to exception: " + IE); 221 } 222 shutdownRequest = true; 223 } catch (Exception E) { 224 ExceptionListener exceptionListener = 225 env.getExceptionListener(); 226 if (exceptionListener != null) { 227 exceptionListener.exceptionThrown 228 (DbInternal.makeExceptionEvent(E, name)); 229 } 230 231 if (!stifleExceptionChatter) { 232 System.err.println(this + " caught exception: " + E); 233 E.printStackTrace(System.err); 234 } 235 if (env.mayNotWrite()) { 236 if (!stifleExceptionChatter) { 237 System.err.println("Exiting"); 238 } 239 shutdownRequest = true; 240 } else { 241 if (!stifleExceptionChatter) { 242 System.err.println("Continuing"); 243 } 244 } 245 } 246 } 247 } 248 249 253 protected int nDeadlockRetries() 254 throws DatabaseException { 255 256 return 0; 257 } 258 259 270 abstract protected void onWakeup() 271 throws DatabaseException; 272 273 277 protected boolean isShutdownRequested() { 278 return shutdownRequest; 279 } 280 281 285 protected boolean isPaused() { 286 return paused; 287 } 288 289 293 public boolean isRunning() { 294 return running; 295 } 296 297 300 public int getNWakeupRequests() { 301 return nWakeupRequests; 302 } 303 } 304 | Popular Tags |