1 17 package org.apache.excalibur.event.impl; 18 19 import org.apache.commons.collections.Buffer; 20 import org.apache.commons.collections.UnboundedFifoBuffer; 21 import org.apache.excalibur.event.EnqueuePredicate; 22 import org.apache.excalibur.event.PreparedEnqueue; 23 import org.apache.excalibur.event.SinkException; 24 import org.apache.excalibur.event.SinkFullException; 25 26 import EDU.oswego.cs.dl.util.concurrent.ReentrantLock; 27 28 35 public final class DefaultQueue extends AbstractQueue 36 { 37 private final Buffer m_elements; 38 private final ReentrantLock m_mutex; 39 protected int m_reserve; 40 private final int m_maxSize; 41 42 51 public DefaultQueue( int size ) 52 { 53 this( new ThresholdEnqueuePredicate( size ) ); 54 } 55 56 public DefaultQueue( EnqueuePredicate predicate ) 57 { 58 setEnqueuePredicate( predicate ); 59 60 m_mutex = new ReentrantLock(); 61 m_elements = new UnboundedFifoBuffer(); 62 m_reserve = 0; 63 m_maxSize = -1; 64 } 65 66 69 public DefaultQueue() 70 { 71 this( new NullEnqueuePredicate() ); 72 } 73 74 79 public int size() 80 { 81 return m_elements.size() + m_reserve; 82 } 83 84 91 public int maxSize() 92 { 93 return m_maxSize; 94 } 95 96 public PreparedEnqueue prepareEnqueue( final Object [] elements ) 97 throws SinkException 98 { 99 PreparedEnqueue enqueue = null; 100 101 try 102 { 103 m_mutex.acquire(); 104 try 105 { 106 if( getEnqueuePredicate().accept(elements, this) ) 107 { 108 enqueue = new DefaultPreparedEnqueue( this, elements ); 109 } 110 else 111 { 112 throw new SinkFullException( "Not enough room to enqueue these elements." ); 113 } 114 } 115 finally 116 { 117 m_mutex.release(); 118 } 119 } 120 catch( InterruptedException ie ) 121 { 122 if ( null == enqueue ) 123 { 124 throw new SinkException("The mutex was interrupted before it could be released"); 125 } 126 } 127 128 return enqueue; 129 } 130 131 public boolean tryEnqueue( final Object element ) 132 { 133 boolean success = false; 134 135 try 136 { 137 m_mutex.acquire(); 138 try 139 { 140 success = getEnqueuePredicate().accept( element, this ); 141 142 if ( success ) 143 { 144 m_elements.add( element ); 145 } 146 } 147 finally 148 { 149 m_mutex.release(); 150 } 151 } 152 catch( InterruptedException ie ) 153 { 154 } 155 156 return success; 157 } 158 159 public void enqueue( final Object [] elements ) 160 throws SinkException 161 { 162 final int len = elements.length; 163 164 try 165 { 166 m_mutex.acquire(); 167 try 168 { 169 if( ! getEnqueuePredicate().accept( elements, this ) ) 170 { 171 throw new SinkFullException( "Not enough room to enqueue these elements." ); 172 } 173 174 for( int i = 0; i < len; i++ ) 175 { 176 m_elements.add( elements[ i ] ); 177 } 178 } 179 finally 180 { 181 m_mutex.release(); 182 } 183 } 184 catch( InterruptedException ie ) 185 { 186 } 187 } 188 189 public void enqueue( final Object element ) 190 throws SinkException 191 { 192 try 193 { 194 m_mutex.acquire(); 195 try 196 { 197 if( ! getEnqueuePredicate().accept(element, this) ) 198 { 199 throw new SinkFullException( "Not enough room to enqueue these elements." ); 200 } 201 202 m_elements.add( element ); 203 } 204 finally 205 { 206 m_mutex.release(); 207 } 208 } 209 catch( InterruptedException ie ) 210 { 211 } 212 } 213 214 public Object [] dequeue( final int numElements ) 215 { 216 getDequeueInterceptor().before(this); 217 Object [] elements = EMPTY_ARRAY; 218 219 try 220 { 221 if( m_mutex.attempt( m_timeout ) ) 222 { 223 try 224 { 225 elements = retrieveElements( m_elements, 226 Math.min( size(), 227 numElements ) ); 228 } 229 finally 230 { 231 m_mutex.release(); 232 } 233 } 234 } 235 catch( InterruptedException ie ) 236 { 237 } 239 240 getDequeueInterceptor().after(this); 241 return elements; 242 } 243 244 public Object [] dequeueAll() 245 { 246 getDequeueInterceptor().before(this); 247 Object [] elements = EMPTY_ARRAY; 248 249 try 250 { 251 if( m_mutex.attempt( m_timeout ) ) 252 { 253 try 254 { 255 elements = retrieveElements( m_elements, size() ); 256 } 257 finally 258 { 259 m_mutex.release(); 260 } 261 } 262 } 263 catch( InterruptedException ie ) 264 { 265 } 267 268 getDequeueInterceptor().after(this); 269 return elements; 270 } 271 272 283 private static Object [] retrieveElements( Buffer buf, int count ) 284 { 285 Object [] elements = new Object [ count ]; 286 287 for( int i = 0; i < count; i++ ) 288 { 289 elements[ i ] = buf.remove(); 290 } 291 292 return elements; 293 } 294 295 public Object dequeue() 296 { 297 getDequeueInterceptor().before(this); 298 Object element = null; 299 300 try 301 { 302 if( m_mutex.attempt( m_timeout ) ) 303 { 304 try 305 { 306 if( size() > 0 ) 307 { 308 element = m_elements.remove(); 309 } 310 } 311 finally 312 { 313 m_mutex.release(); 314 } 315 } 316 } 317 catch( InterruptedException ie ) 318 { 319 } 321 322 getDequeueInterceptor().after(this); 323 return element; 324 } 325 326 private static final class DefaultPreparedEnqueue implements PreparedEnqueue 327 { 328 private final DefaultQueue m_parent; 329 private Object [] m_elements; 330 331 private DefaultPreparedEnqueue( DefaultQueue parent, Object [] elements ) 332 { 333 m_parent = parent; 334 m_elements = elements; 335 m_parent.m_reserve += elements.length; 336 } 337 338 public void commit() 339 { 340 if( null == m_elements ) 341 { 342 throw new IllegalStateException ( "This PreparedEnqueue has already been processed!" ); 343 } 344 345 try 346 { 347 m_parent.m_reserve -= m_elements.length; 348 m_parent.enqueue( m_elements ); 349 m_elements = null; 350 } 351 catch( Exception e ) 352 { 353 throw new IllegalStateException ( "Default enqueue did not happen--should be impossible" ); 354 } 356 } 357 358 public void abort() 359 { 360 if( null == m_elements ) 361 { 362 throw new IllegalStateException ( "This PreparedEnqueue has already been processed!" ); 363 } 364 365 m_parent.m_reserve -= m_elements.length; 366 m_elements = null; 367 } 368 } 369 } 370 | Popular Tags |