1 17 package org.apache.excalibur.event.impl; 18 19 import org.apache.excalibur.event.PreparedEnqueue; 20 import org.apache.excalibur.event.SinkException; 21 import org.apache.excalibur.event.SinkFullException; 22 23 import EDU.oswego.cs.dl.util.concurrent.ReentrantLock; 24 25 34 public final class FixedSizeQueue 35 extends AbstractQueue 36 { 37 private final Object [] m_elements; 38 private final ReentrantLock m_mutex; 39 private int m_start = 0; 40 private int m_end = 0; 41 protected int m_reserve = 0; 42 43 49 public FixedSizeQueue( int size ) 50 { 51 if( size < 1 ) 52 throw new IllegalArgumentException ( "Cannot specify an unbounded Queue" ); 53 54 m_elements = new Object [ size + 1 ]; 55 m_mutex = new ReentrantLock(); 56 } 57 58 public int size() 59 { 60 int size = 0; 61 62 if( m_end < m_start ) 63 { 64 size = maxSize() - m_start + m_end; 65 } 66 else 67 { 68 size = m_end - m_start; 69 } 70 71 return size + m_reserve; 72 } 73 74 public int maxSize() 75 { 76 return m_elements.length; 77 } 78 79 public PreparedEnqueue prepareEnqueue( final Object [] elements ) 80 throws SinkException 81 { 82 PreparedEnqueue enqueue = null; 83 84 try 85 { 86 m_mutex.acquire(); 87 try 88 { 89 if( elements.length + size() > maxSize() ) 90 { 91 throw new SinkFullException( "Not enough room to enqueue these elements." ); 92 } 93 94 enqueue = new FixedSizePreparedEnqueue( this, elements ); 95 } 96 finally 97 { 98 m_mutex.release(); 99 } 100 } 101 catch( InterruptedException ie ) 102 { 103 } 104 105 return enqueue; 106 } 107 108 public boolean tryEnqueue( final Object element ) 109 { 110 boolean success = false; 111 112 try 113 { 114 m_mutex.acquire(); 115 try 116 { 117 if( 1 + size() > maxSize() ) 118 { 119 return false; 120 } 121 122 addElement( element ); 123 success = true; 124 } 125 finally 126 { 127 m_mutex.release(); 128 } 129 } 130 catch( InterruptedException ie ) 131 { 132 } 133 134 return success; 135 } 136 137 public void enqueue( final Object [] elements ) 138 throws SinkException 139 { 140 final int len = elements.length; 141 142 try 143 { 144 m_mutex.acquire(); 145 try 146 { 147 if( elements.length + size() > maxSize() ) 148 { 149 throw new SinkFullException( "Not enough room to enqueue these elements." ); 150 } 151 152 for( int i = 0; i < len; i++ ) 153 { 154 addElement( elements[ i ] ); 155 } 156 } 157 finally 158 { 159 m_mutex.release(); 160 } 161 } 162 catch( InterruptedException ie ) 163 { 164 } 165 } 166 167 public void enqueue( final Object element ) 168 throws SinkException 169 { 170 try 171 { 172 m_mutex.acquire(); 173 try 174 { 175 if( 1 + size() > maxSize() ) 176 { 177 throw new SinkFullException( "Not enough room to enqueue these elements." ); 178 } 179 180 addElement( element ); 181 } 182 finally 183 { 184 m_mutex.release(); 185 } 186 } 187 catch( InterruptedException ie ) 188 { 189 } 190 } 191 192 public Object [] dequeue( final int numElements ) 193 { 194 Object [] elements = EMPTY_ARRAY; 195 196 try 197 { 198 if( m_mutex.attempt( m_timeout ) ) 199 { 200 try 201 { 202 elements = retrieveElements( Math.min( size(), 203 numElements ) ); 204 } 205 finally 206 { 207 m_mutex.release(); 208 } 209 } 210 } 211 catch( InterruptedException ie ) 212 { 213 } 214 215 return elements; 216 } 217 218 private final void addElement( Object element ) 219 { 220 m_elements[ m_end ] = element; 221 222 m_end++; 223 if( m_end >= maxSize() ) 224 { 225 m_end = 0; 226 } 227 } 228 229 private final Object removeElement() 230 { 231 Object element = m_elements[ m_start ]; 232 233 if( null != element ) 234 { 235 m_elements[ m_start ] = null; 236 237 m_start++; 238 if( m_start >= maxSize() ) 239 { 240 m_start = 0; 241 } 242 } 243 244 return element; 245 } 246 247 260 private final Object [] retrieveElements( int count ) 261 { 262 Object [] elements = new Object [ count ]; 263 264 for( int i = 0; i < count; i++ ) 265 { 266 elements[ i ] = removeElement(); 267 } 268 269 return elements; 270 } 271 272 public Object [] dequeueAll() 273 { 274 Object [] elements = EMPTY_ARRAY; 275 276 try 277 { 278 if( m_mutex.attempt( m_timeout ) ) 279 { 280 try 281 { 282 elements = retrieveElements( size() ); 283 } 284 finally 285 { 286 m_mutex.release(); 287 } 288 } 289 } 290 catch( InterruptedException ie ) 291 { 292 } 293 294 return elements; 295 } 296 297 public Object dequeue() 298 { 299 Object element = null; 300 301 try 302 { 303 if( m_mutex.attempt( m_timeout ) ) 304 { 305 try 306 { 307 if( size() > 0 ) 308 { 309 element = removeElement(); 310 } 311 } 312 finally 313 { 314 m_mutex.release(); 315 } 316 } 317 } 318 catch( InterruptedException ie ) 319 { 320 } 321 322 return element; 323 } 324 325 private static final class FixedSizePreparedEnqueue implements PreparedEnqueue 326 { 327 private final FixedSizeQueue m_parent; 328 private Object [] m_elements; 329 330 private FixedSizePreparedEnqueue( FixedSizeQueue parent, Object [] elements ) 331 { 332 m_parent = parent; 333 m_elements = elements; 334 m_parent.m_reserve += m_elements.length; 335 } 336 337 public void commit() 338 { 339 if( null == m_elements ) 340 { 341 throw new IllegalStateException ( "This PreparedEnqueue has already been processed!" ); 342 } 343 344 try 345 { 346 m_parent.enqueue( m_elements ); 347 m_parent.m_reserve -= m_elements.length; 348 m_elements = null; 349 } 350 catch( Exception e ) 351 { 352 throw new IllegalStateException ( "Default enqueue did not happen--should be impossible" ); 353 } 355 } 356 357 public void abort() 358 { 359 if( null == m_elements ) 360 { 361 throw new IllegalStateException ( "This PreparedEnqueue has already been processed!" ); 362 } 363 364 m_parent.m_reserve -= m_elements.length; 365 m_elements = null; 366 } 367 } 368 } 369 | Popular Tags |