1 3 package org.jgroups.util; 4 5 6 import org.apache.commons.logging.Log; 7 import org.apache.commons.logging.LogFactory; 8 9 10 30 public class ReusableThread implements Runnable { 31 volatile Thread thread=null; Runnable task=null; String thread_name="ReusableThread"; 34 volatile boolean suspended=false; 35 protected static final Log log=LogFactory.getLog(ReusableThread.class); 36 final long TASK_JOIN_TIME=3000; 38 39 public ReusableThread() { 40 } 41 42 43 public ReusableThread(String thread_name) { 44 this.thread_name=thread_name; 45 } 46 47 48 public boolean done() { 49 return task == null; 50 } 51 52 public boolean available() { 53 return done(); 54 } 55 56 public boolean isAlive() { 57 synchronized(this) { 58 return thread != null && thread.isAlive(); 59 } 60 } 61 62 63 66 public void start() { 67 if(thread == null || (thread != null && !thread.isAlive())) { 68 thread=new Thread (this, thread_name); 69 thread.setDaemon(true); 70 thread.start(); 71 } 72 } 73 74 75 79 public void stop() { 80 Thread tmp=null; 81 82 if(log.isTraceEnabled()) log.trace("entering THIS"); 83 synchronized(this) { 84 if(log.isTraceEnabled()) 85 log.trace("entered THIS (thread=" + printObj(thread) + 86 ", task=" + printObj(task) + ", suspended=" + suspended + ')'); 87 if(thread != null && thread.isAlive()) { 88 tmp=thread; 89 thread=null; task=null; 91 if(log.isTraceEnabled()) log.trace("notifying thread"); 92 notifyAll(); 93 if(log.isTraceEnabled()) log.trace("notifying thread completed"); 94 } 95 thread=null; 96 task=null; 97 } 98 99 if(tmp != null && tmp.isAlive()) { 100 long s1=System.currentTimeMillis(), s2=0; 101 if(log.isTraceEnabled()) log.trace("join(" + TASK_JOIN_TIME + ')'); 102 103 tmp.interrupt(); 104 105 try { 106 tmp.join(TASK_JOIN_TIME); 107 } 108 catch(Exception e) { 109 } 110 s2=System.currentTimeMillis(); 111 if(log.isTraceEnabled()) log.trace("join(" + TASK_JOIN_TIME + ") completed in " + (s2 - s1)); 112 if(tmp.isAlive()) 113 if(log.isErrorEnabled()) log.error("thread is still alive"); 114 tmp=null; 115 } 116 } 117 118 119 124 125 public void suspend() { 126 synchronized(this) { 127 if(log.isTraceEnabled()) log.trace("suspended=" + suspended + ", task=" + printObj(task)); 128 if(suspended) 129 return; else 131 suspended=true; 132 } 133 } 134 135 136 139 public void resume() { 140 synchronized(this) { 141 suspended=false; 142 notifyAll(); } 144 } 145 146 147 152 public boolean assignTask(Runnable t) { 153 synchronized(this) { 154 start(); if(task == null) { 156 task=t; 157 notifyAll(); return true; 159 } 160 else { 161 if(log.isErrorEnabled()) 162 log.error("already working on a thread: current_task=" + task + ", new task=" + t + 163 ", thread=" + thread + ", is alive=" + (thread != null ? "" + thread.isAlive() : "null")); 164 return false; 165 } 166 } 167 } 168 169 170 180 public void run() { 181 while(thread != null) { try { 183 if(log.isTraceEnabled()) log.trace("entering ASSIGN"); 184 synchronized(this) { 185 if(log.isTraceEnabled()) 186 log.trace("entered ASSIGN (task=" + printObj(task) + ", thread=" + printObj(thread) + ')'); 187 188 while(task == null && thread != null) { if(log.isTraceEnabled()) log.trace("wait ASSIGN"); 190 wait(); 191 if(log.isTraceEnabled()) log.trace("wait ASSIGN completed"); 192 } 193 } 194 } 195 catch(InterruptedException ex) { if(log.isTraceEnabled()) log.trace("interrupt on ASSIGN"); 197 } 198 if(thread == null) return; 200 try { 201 if(log.isTraceEnabled()) log.trace("entering SUSPEND"); 202 synchronized(this) { 203 if(log.isTraceEnabled()) 204 log.trace("entered SUSPEND (suspended=" + suspended + ", task=" + printObj(task) + ')'); 205 while(suspended && thread != null) { if(log.isTraceEnabled()) log.trace("wait SUSPEND"); 207 wait(); 208 if(log.isTraceEnabled()) log.trace("wait SUSPEND completed"); 209 } 210 } 211 } 212 catch(InterruptedException ex) { if(log.isTraceEnabled()) log.trace("interrupt on RESUME"); 214 } 215 if(thread == null) return; 217 if(task != null) { 218 if(log.isTraceEnabled()) log.trace("running task"); 219 try { 220 task.run(); } 222 catch(Throwable ex) { 223 if(log.isErrorEnabled()) log.error("failed running task", ex); 224 } 225 if(log.isTraceEnabled()) log.trace("task completed"); 226 } 227 228 if(log.isTraceEnabled()) log.trace("entering THIS"); 229 synchronized(this) { 230 if(log.isTraceEnabled()) log.trace("entered THIS"); 231 task=null; 232 if(log.isTraceEnabled()) log.trace("notify THIS"); 233 notifyAll(); 234 if(log.isTraceEnabled()) log.trace("notify THIS completed"); 235 } 236 } 237 if(log.isTraceEnabled()) log.trace("terminated"); 238 } 239 240 241 String printObj(Object obj) { 242 if(obj == null) 243 return "null"; 244 else 245 return "non-null"; 246 } 247 248 249 public void waitUntilDone() { 250 251 if(log.isTraceEnabled()) log.trace("entering THIS"); 252 synchronized(this) { 253 if(log.isTraceEnabled()) log.trace("entered THIS (task=" + printObj(task) + ')'); 254 while(task != null) { 255 try { 256 if(log.isTraceEnabled()) log.trace("wait THIS"); 257 wait(); 258 if(log.isTraceEnabled()) log.trace("wait THIS completed"); 259 } 260 catch(InterruptedException interrupted) { 261 } 262 } 263 } 264 } 265 266 267 public String toString() { 268 return "suspended=" + suspended; 269 } 270 271 272 } 273 274 275 276 277 278 279 280 | Popular Tags |