1 8 package org.apache.avalon.excalibur.event; 9 10 import java.util.ArrayList ; 11 import org.apache.avalon.excalibur.concurrent.Mutex; 12 13 19 public final class FixedSizeQueue extends AbstractQueue 20 { 21 private final QueueElement[] m_elements; 22 private final Mutex m_mutex; 23 private int m_start = 0; 24 private int m_end = 0; 25 private int m_reserve = 0; 26 27 public FixedSizeQueue( int size ) 28 { 29 m_elements = new QueueElement[ size + 1 ]; 30 m_mutex = new Mutex(); 31 } 32 33 public int size() 34 { 35 int size = 0; 36 37 if ( m_end < m_start ) 38 { 39 size = maxSize() - m_start + m_end; 40 } 41 else 42 { 43 size = m_end - m_start; 44 } 45 46 return size; 47 } 48 49 public int maxSize() 50 { 51 return m_elements.length; 52 } 53 54 public PreparedEnqueue prepareEnqueue( final QueueElement[] elements ) 55 throws SourceException 56 { 57 PreparedEnqueue enqueue = null; 58 59 try 60 { 61 m_mutex.acquire(); 62 63 if ( elements.length + m_reserve + size() > maxSize() ) 64 { 65 throw new SourceFullException("Not enough room to enqueue these elements."); 66 } 67 68 enqueue = new FixedSizePreparedEnqueue( this, elements ); 69 } 70 catch ( InterruptedException ie ) 71 { 72 } 73 finally 74 { 75 m_mutex.release(); 76 } 77 78 return enqueue; 79 } 80 81 public boolean tryEnqueue( final QueueElement element ) 82 { 83 boolean success = false; 84 85 try 86 { 87 m_mutex.acquire(); 88 89 if ( 1 + m_reserve + size() > maxSize() ) 90 { 91 return false; 92 } 93 94 addElement( element ); 95 success = true; 96 } 97 catch ( InterruptedException ie ) 98 { 99 } 100 finally 101 { 102 m_mutex.release(); 103 } 104 105 return success; 106 } 107 108 public void enqueue( final QueueElement[] elements ) 109 throws SourceException 110 { 111 final int len = elements.length; 112 113 try 114 { 115 m_mutex.acquire(); 116 if ( elements.length + m_reserve + size() > maxSize() ) 117 { 118 throw new SourceFullException("Not enough room to enqueue these elements."); 119 } 120 121 for ( int i = 0; i < len; i++ ) 122 { 123 addElement( elements[i] ); 124 } 125 } 126 catch ( InterruptedException ie ) 127 { 128 } 129 finally 130 { 131 m_mutex.release(); 132 } 133 } 134 135 public void enqueue( final QueueElement element ) 136 throws SourceException 137 { 138 try 139 { 140 m_mutex.acquire(); 141 if ( 1 + m_reserve + size() > maxSize() ) 142 { 143 throw new SourceFullException("Not enough room to enqueue these elements."); 144 } 145 146 addElement( element ); 147 } 148 catch ( InterruptedException ie ) 149 { 150 } 151 finally 152 { 153 m_mutex.release(); 154 } 155 } 156 157 public QueueElement[] dequeue( final int numElements ) 158 { 159 int arraySize = numElements; 160 161 if ( size() < numElements ) 162 { 163 arraySize = size(); 164 } 165 166 QueueElement[] elements = null; 167 168 try 169 { 170 m_mutex.attempt( m_timeout ); 171 172 if ( size() < numElements ) 173 { 174 arraySize = size(); 175 } 176 177 elements = new QueueElement[ arraySize ]; 178 179 for ( int i = 0; i < arraySize; i++ ) 180 { 181 elements[i] = removeElement(); 182 } 183 } 184 catch ( InterruptedException ie ) 185 { 186 } 187 finally 188 { 189 m_mutex.release(); 190 } 191 192 return elements; 193 } 194 195 private final void addElement( QueueElement element ) 196 { 197 m_elements[ m_end ] = element; 198 199 m_end++; 200 if ( m_end >= maxSize() ) 201 { 202 m_end = 0; 203 } 204 } 205 206 private final QueueElement removeElement() 207 { 208 QueueElement element = m_elements[ m_start ]; 209 210 if ( null != element ) 211 { 212 m_elements[ m_start ] = null; 213 214 m_start++; 215 if ( m_start >= maxSize() ) 216 { 217 m_start = 0; 218 } 219 } 220 221 return element; 222 } 223 224 public QueueElement[] dequeueAll() 225 { 226 QueueElement[] elements = null; 227 228 try 229 { 230 m_mutex.attempt( m_timeout ); 231 232 elements = new QueueElement[ size() ]; 233 234 for ( int i = 0; i < elements.length; i++ ) 235 { 236 elements[i] = removeElement(); 237 } 238 } 239 catch ( InterruptedException ie ) 240 { 241 } 242 finally 243 { 244 m_mutex.release(); 245 } 246 247 return elements; 248 } 249 250 public QueueElement dequeue() 251 { 252 QueueElement element = null; 253 254 try 255 { 256 m_mutex.attempt( m_timeout ); 257 258 if ( size() > 0 ) 259 { 260 element = removeElement(); 261 } 262 } 263 catch ( InterruptedException ie ) 264 { 265 } 266 finally 267 { 268 m_mutex.release(); 269 } 270 271 return element; 272 } 273 274 private final static class FixedSizePreparedEnqueue implements PreparedEnqueue 275 { 276 private final FixedSizeQueue m_parent; 277 private QueueElement[] m_elements; 278 279 private FixedSizePreparedEnqueue( FixedSizeQueue parent, QueueElement[] elements ) 280 { 281 m_parent = parent; 282 m_elements = elements; 283 } 284 285 public void commit() 286 { 287 if ( null == m_elements ) 288 { 289 throw new IllegalStateException ("This PreparedEnqueue has already been processed!"); 290 } 291 292 try 293 { 294 m_parent.enqueue( m_elements ); 295 m_parent.m_reserve -= m_elements.length; 296 m_elements = null; 297 } 298 catch (Exception e) 299 { 300 throw new IllegalStateException ("Default enqueue did not happen--should be impossible"); 301 } 303 } 304 305 public void abort() 306 { 307 if ( null == m_elements ) 308 { 309 throw new IllegalStateException ("This PreparedEnqueue has already been processed!"); 310 } 311 312 m_parent.m_reserve -= m_elements.length; 313 m_elements = null; 314 } 315 } 316 } | Popular Tags |