1 26 27 package net.sourceforge.groboutils.util.thread.v1; 28 29 30 import net.sourceforge.groboutils.util.datastruct.v1.SynchQueue; 31 32 33 69 public class QueueThread extends LoopThread 70 { 71 private IObjectListener m_objListener = null; 72 73 private SynchQueue m_queue = null; 74 75 private boolean m_isProcessingObject = false; 76 77 private long m_timeout = 0; 78 private int m_nanos = 0; 79 80 81 85 private class QueueRunnable implements Runnable 86 { 87 public void run() 88 { 89 if (m_queue.isEmpty()) 90 { 91 m_isProcessingObject = false; 92 } 93 94 try 95 { 96 Object o = m_queue.dequeue( m_timeout, m_nanos ); 97 m_isProcessingObject = true; 98 m_objListener.processObject( o ); 99 100 if (m_queue.isEmpty()) 101 { 102 m_isProcessingObject = false; 103 } 104 } 105 catch (InterruptedException ie) 106 { 107 } 109 } 110 } 111 112 113 114 115 118 119 122 public QueueThread( IObjectListener ol ) 123 { 124 this( ol, new SynchQueue() ); 125 } 126 127 130 public QueueThread( IObjectListener ol, SynchQueue sq ) 131 { 132 super(); 133 134 initialize( ol, sq ); 135 } 136 137 140 public QueueThread( IObjectListener ol, ThreadGroup tg ) 141 { 142 this( ol, new SynchQueue(), tg ); 143 } 144 145 146 149 public QueueThread( IObjectListener ol, SynchQueue sq, ThreadGroup tg ) 150 { 151 super( null, tg ); 152 153 initialize( ol, sq ); 154 } 155 156 157 160 public QueueThread( IObjectListener ol, String threadName ) 161 { 162 this( ol, new SynchQueue(), threadName ); 163 } 164 165 166 169 public QueueThread( IObjectListener ol, SynchQueue sq, String threadName ) 170 { 171 super( null, threadName ); 172 173 initialize( ol, sq ); 174 } 175 176 177 180 public QueueThread( IObjectListener ol, ThreadGroup tg, String threadName ) 181 { 182 this( ol, new SynchQueue(), tg, threadName ); 183 } 184 185 186 189 public QueueThread( IObjectListener ol, SynchQueue sq, ThreadGroup tg, 190 String threadName ) 191 { 192 super( null, tg, threadName ); 193 194 initialize( ol, sq ); 195 } 196 197 198 199 200 203 204 207 public SynchQueue getQueue() 208 { 209 return this.m_queue; 210 } 211 212 213 218 public boolean isProcessingObjects() 219 { 220 return this.m_isProcessingObject; 221 } 222 223 224 234 public void setTimeout( long timeout, int nanos ) 235 { 236 this.m_timeout = timeout; 237 this.m_nanos = nanos; 238 } 239 240 241 252 public void setTimeout( long timeout ) 253 { 254 setTimeout( timeout, 0 ); 255 } 256 257 258 265 public long getTimeoutMilliseconds() 266 { 267 return this.m_timeout; 268 } 269 270 271 278 public int getTimeoutNanoseconds() 279 { 280 return this.m_nanos; 281 } 282 283 284 293 public void processRemaining() 294 throws InterruptedException 295 { 296 if (isRunning()) 297 { 298 throw new IllegalStateException ( 299 "cannot call processRemaining() while the underlying thread "+ 300 "is still running." ); 301 } 302 if (!this.m_queue.isEmpty()) 303 { 304 this.m_isProcessingObject = true; 305 do 306 { 307 Object o = this.m_queue.dequeue(); 308 this.m_objListener.processObject( o ); 309 } 310 while (!this.m_queue.isEmpty()); 311 this.m_isProcessingObject = false; 312 } 313 } 314 315 316 319 320 protected void initialize( IObjectListener ol, SynchQueue sq ) 321 { 322 setRunnable( new QueueRunnable() ); 323 324 this.m_objListener = ol; 325 this.m_queue = sq; 326 327 initializeDefaults(); 328 } 329 330 331 protected void initializeDefaults() 332 { 333 setSleepTime( 0 ); 334 setDaemon( true ); 335 setPriority( Thread.MIN_PRIORITY ); 336 } 337 338 339 } 340 | Popular Tags |