1 26 27 package net.sourceforge.groboutils.util.thread.v1; 28 29 30 import net.sourceforge.groboutils.util.datastruct.v1.SynchQueue; 31 32 import java.util.Vector ; 33 34 35 58 public class ThreadPool 59 { 60 63 64 67 private Class m_objListenerClass = null; 68 private Object m_objListenerInitData = null; 69 70 private QueueThread[] m_pool = null; 71 private SynchQueue m_sharedQueue = new SynchQueue(); 72 73 private int m_maxThreads = 10; 74 private int m_numThreads = 0; 75 private int m_depthThreshold = 5; 76 77 80 83 public ThreadPool( Class objectListenerClass ) 84 { 85 this( objectListenerClass, null, 1, 10 ); 86 } 87 88 89 92 public ThreadPool( Class objectListenerClass, int maxThreads ) 93 { 94 this( objectListenerClass, null, 1, maxThreads ); 95 } 96 97 98 104 public ThreadPool( Class objectListenerClass, Object initData ) 105 { 106 this( objectListenerClass, initData, 1, 10 ); 107 } 108 109 110 113 public ThreadPool( Class objectListenerClass, Object initData, 114 int maxThreads ) 115 { 116 this( objectListenerClass, initData, 1, maxThreads ); 117 } 118 119 120 123 public ThreadPool( Class objectListenerClass, Object initData, 124 int startingThreadCount, int maxThreads ) 125 { 126 this.m_objListenerClass = objectListenerClass; 127 this.m_objListenerInitData = initData; 128 try 129 { 130 createObjectListenerInstance(); 131 } 132 catch (Exception ex) 133 { 134 ex.printStackTrace(); 135 throw new IllegalArgumentException ( "Class "+objectListenerClass+ 136 " does not create ObjectListener instances"); 137 } 138 139 setMaximumThreadCount( maxThreads ); 140 this.m_pool = new QueueThread[ maxThreads ]; 141 while (this.m_numThreads < startingThreadCount) 142 { 143 addNewThread(); 144 } 145 } 146 147 148 151 154 public void setDepthThreshold( int threshold ) 155 { 156 if (threshold < 1) 157 { 158 throw new IllegalArgumentException ("threshold "+threshold+ 159 " is too low"); 160 } 161 this.m_depthThreshold = threshold; 162 } 163 164 167 public int getObjectDepth() 168 { 169 return this.m_sharedQueue.size(); 170 } 171 172 176 public void addObject( Object o ) 177 { 178 checkThreshold(); 179 this.m_sharedQueue.enqueue( o ); 180 } 181 182 185 public int getThreadCount() 186 { 187 return this.m_numThreads; 188 } 189 190 193 public int getMaximumThreadCount() 194 { 195 return this.m_maxThreads; 196 } 197 198 201 public void setMaximumThreadCount( int max ) 202 { 203 if (max < 1) 204 { 205 throw new IllegalArgumentException ("maximum count "+max+ 206 " is out of bounds" ); 207 } 208 this.m_maxThreads = max; 209 } 210 211 212 216 public void waitForThreadsToFinish() 217 { 218 while (getObjectDepth() > 0) 220 { 221 Thread.yield(); 222 } 223 224 Vector v = new Vector (); 225 synchronized (v) 226 { 227 for (int i = this.m_numThreads; --i >= 0;) 229 { 230 if (this.m_pool[i].isProcessingObjects()) 231 { 232 v.addElement( this.m_pool[i] ); 233 } 234 } 235 236 QueueThread qt; 238 while (v.size() > 0) 239 { 240 Thread.yield(); 241 for (int i = v.size(); --i >= 0;) 242 { 243 qt = (QueueThread)v.elementAt(i); 244 if (!qt.isProcessingObjects()) 245 { 246 v.removeElementAt(i); 247 } 250 } 251 } 252 } 253 } 254 255 258 public synchronized void stopThreads() 259 { 260 for (int i = this.m_numThreads; --i >= 0;) 261 { 262 if (this.m_pool[i] != null) 263 this.m_pool[i].stop(); 264 } 265 } 266 267 270 public synchronized void suspendThreads() 271 { 272 for (int i = this.m_numThreads; --i >= 0;) 273 { 274 if (this.m_pool[i] != null) 275 this.m_pool[i].suspend(); 276 } 277 } 278 279 282 public synchronized void resumeThreads() 283 { 284 for (int i = this.m_numThreads; --i >= 0;) 285 { 286 if (this.m_pool[i] != null) 287 this.m_pool[i].resume(); 288 } 289 } 290 291 292 293 294 297 305 protected synchronized QueueThread addNewThread() 306 { 307 QueueThread qt = null; 308 if (this.m_numThreads < this.m_maxThreads) 309 { 310 qt = this.m_pool[ this.m_numThreads++ ] = 311 new QueueThread( createObjectListenerInstance(), 312 this.m_sharedQueue ); 313 qt.start(); 314 } 315 return qt; 316 } 317 318 319 324 protected void checkThreshold() 325 { 326 if (this.m_sharedQueue.size() > this.m_depthThreshold) 327 { 328 addNewThread(); 329 } 330 } 331 332 339 protected IObjectListener createObjectListenerInstance() 340 { 341 try 342 { 343 IObjectListener ol = (IObjectListener) 347 this.m_objListenerClass.newInstance(); 348 if (ol instanceof IThreadObjectListener) 349 { 350 ((IThreadObjectListener)ol).initialize( 351 this.m_objListenerInitData ); 352 } 353 return ol; 354 } 355 catch (InstantiationException ie) 356 { 357 throw new IllegalStateException ("could not instantiate from class "+ 358 this.m_objListenerClass.getName()+ 359 ": general instantiation exception "+ie.getMessage()); 360 } 361 catch (IllegalAccessException iae) 362 { 363 throw new IllegalStateException ("could not instantiate from class "+ 364 this.m_objListenerClass.getName()+ 365 ": could not access constructor "+iae.getMessage()); 366 } 367 catch (ClassCastException cce) 368 { 369 throw new IllegalStateException ("could not instantiate from class "+ 370 this.m_objListenerClass.getName()+": instance of wrong type "+ 371 cce.getMessage()); 372 } 373 } 374 375 } 378 379 380 | Popular Tags |