|                                                                                                              1
 17  package org.apache.excalibur.event.impl;
 18
 19  import org.apache.commons.collections.Buffer;
 20  import org.apache.commons.collections.UnboundedFifoBuffer;
 21  import org.apache.excalibur.event.EnqueuePredicate;
 22  import org.apache.excalibur.event.PreparedEnqueue;
 23  import org.apache.excalibur.event.SinkException;
 24  import org.apache.excalibur.event.SinkFullException;
 25
 26  import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
 27
 28
 35  public final class DefaultQueue extends AbstractQueue
 36  {
 37      private final Buffer m_elements;
 38      private final ReentrantLock m_mutex;
 39      protected int m_reserve;
 40      private final int m_maxSize;
 41
 42
 51      public DefaultQueue( int size )
 52      {
 53          this( new ThresholdEnqueuePredicate( size ) );
 54      }
 55
 56      public DefaultQueue( EnqueuePredicate predicate )
 57      {
 58          setEnqueuePredicate( predicate );
 59
 60          m_mutex = new ReentrantLock();
 61          m_elements = new UnboundedFifoBuffer();
 62          m_reserve = 0;
 63          m_maxSize = -1;
 64      }
 65
 66
 69      public DefaultQueue()
 70      {
 71          this( new NullEnqueuePredicate() );
 72      }
 73
 74
 79      public int size()
 80      {
 81          return m_elements.size() + m_reserve;
 82      }
 83
 84
 91      public int maxSize()
 92      {
 93          return m_maxSize;
 94      }
 95
 96      public PreparedEnqueue prepareEnqueue( final Object
  [] elements ) 97          throws SinkException
 98      {
 99          PreparedEnqueue enqueue = null;
 100
 101         try
 102         {
 103             m_mutex.acquire();
 104             try
 105             {
 106                 if( getEnqueuePredicate().accept(elements, this) )
 107                 {
 108                     enqueue = new DefaultPreparedEnqueue( this, elements );
 109                 }
 110                 else
 111                 {
 112                     throw new SinkFullException( "Not enough room to enqueue these elements." );
 113                 }
 114             }
 115             finally
 116             {
 117                 m_mutex.release();
 118             }
 119         }
 120         catch( InterruptedException
  ie ) 121         {
 122             if ( null == enqueue )
 123             {
 124                 throw new SinkException("The mutex was interrupted before it could be released");
 125             }
 126         }
 127
 128         return enqueue;
 129     }
 130
 131     public boolean tryEnqueue( final Object
  element ) 132     {
 133         boolean success = false;
 134
 135         try
 136         {
 137             m_mutex.acquire();
 138             try
 139             {
 140                 success = getEnqueuePredicate().accept( element, this );
 141
 142                 if ( success )
 143                 {
 144                     m_elements.add( element );
 145                 }
 146             }
 147             finally
 148             {
 149                 m_mutex.release();
 150             }
 151         }
 152         catch( InterruptedException
  ie ) 153         {
 154         }
 155
 156         return success;
 157     }
 158
 159     public void enqueue( final Object
  [] elements ) 160         throws SinkException
 161     {
 162         final int len = elements.length;
 163
 164         try
 165         {
 166             m_mutex.acquire();
 167             try
 168             {
 169                 if( ! getEnqueuePredicate().accept( elements, this ) )
 170                 {
 171                     throw new SinkFullException( "Not enough room to enqueue these elements." );
 172                 }
 173
 174                 for( int i = 0; i < len; i++ )
 175                 {
 176                     m_elements.add( elements[ i ] );
 177                 }
 178             }
 179             finally
 180             {
 181                 m_mutex.release();
 182             }
 183         }
 184         catch( InterruptedException
  ie ) 185         {
 186         }
 187     }
 188
 189     public void enqueue( final Object
  element ) 190         throws SinkException
 191     {
 192         try
 193         {
 194             m_mutex.acquire();
 195             try
 196             {
 197                 if( ! getEnqueuePredicate().accept(element, this) )
 198                 {
 199                     throw new SinkFullException( "Not enough room to enqueue these elements." );
 200                 }
 201
 202                 m_elements.add( element );
 203             }
 204             finally
 205             {
 206                 m_mutex.release();
 207             }
 208         }
 209         catch( InterruptedException
  ie ) 210         {
 211         }
 212     }
 213
 214     public Object
  [] dequeue( final int numElements ) 215     {
 216         getDequeueInterceptor().before(this);
 217         Object
  [] elements = EMPTY_ARRAY; 218
 219         try
 220         {
 221             if( m_mutex.attempt( m_timeout ) )
 222             {
 223                 try
 224                 {
 225                     elements = retrieveElements( m_elements,
 226                                                  Math.min( size(),
 227                                                            numElements ) );
 228                 }
 229                 finally
 230                 {
 231                     m_mutex.release();
 232                 }
 233             }
 234         }
 235         catch( InterruptedException
  ie ) 236         {
 237                     }
 239
 240         getDequeueInterceptor().after(this);
 241         return elements;
 242     }
 243
 244     public Object
  [] dequeueAll() 245     {
 246         getDequeueInterceptor().before(this);
 247         Object
  [] elements = EMPTY_ARRAY; 248
 249         try
 250         {
 251             if( m_mutex.attempt( m_timeout ) )
 252             {
 253                 try
 254                 {
 255                     elements = retrieveElements( m_elements, size() );
 256                 }
 257                 finally
 258                 {
 259                     m_mutex.release();
 260                 }
 261             }
 262         }
 263         catch( InterruptedException
  ie ) 264         {
 265                     }
 267
 268         getDequeueInterceptor().after(this);
 269         return elements;
 270     }
 271
 272
 283     private static Object
  [] retrieveElements( Buffer buf, int count ) 284     {
 285         Object
  [] elements = new Object  [ count ]; 286
 287         for( int i = 0; i < count; i++ )
 288         {
 289             elements[ i ] = buf.remove();
 290         }
 291
 292         return elements;
 293     }
 294
 295     public Object
  dequeue() 296     {
 297         getDequeueInterceptor().before(this);
 298         Object
  element = null; 299
 300         try
 301         {
 302             if( m_mutex.attempt( m_timeout ) )
 303             {
 304                 try
 305                 {
 306                     if( size() > 0 )
 307                     {
 308                         element = m_elements.remove();
 309                     }
 310                 }
 311                 finally
 312                 {
 313                     m_mutex.release();
 314                 }
 315             }
 316         }
 317         catch( InterruptedException
  ie ) 318         {
 319                     }
 321
 322         getDequeueInterceptor().after(this);
 323         return element;
 324     }
 325
 326     private static final class DefaultPreparedEnqueue implements PreparedEnqueue
 327     {
 328         private final DefaultQueue m_parent;
 329         private Object
  [] m_elements; 330
 331         private DefaultPreparedEnqueue( DefaultQueue parent, Object
  [] elements ) 332         {
 333             m_parent = parent;
 334             m_elements = elements;
 335             m_parent.m_reserve += elements.length;
 336         }
 337
 338         public void commit()
 339         {
 340             if( null == m_elements )
 341             {
 342                 throw new IllegalStateException
  ( "This PreparedEnqueue has already been processed!" ); 343             }
 344
 345             try
 346             {
 347                 m_parent.m_reserve -= m_elements.length;
 348                 m_parent.enqueue( m_elements );
 349                 m_elements = null;
 350             }
 351             catch( Exception
  e ) 352             {
 353                 throw new IllegalStateException
  ( "Default enqueue did not happen--should be impossible" ); 354                             }
 356         }
 357
 358         public void abort()
 359         {
 360             if( null == m_elements )
 361             {
 362                 throw new IllegalStateException
  ( "This PreparedEnqueue has already been processed!" ); 363             }
 364
 365             m_parent.m_reserve -= m_elements.length;
 366             m_elements = null;
 367         }
 368     }
 369 }
 370
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |