1 8 package org.apache.avalon.excalibur.event; 9 10 import org.apache.avalon.excalibur.collections.Buffer; 11 import org.apache.avalon.excalibur.collections.VariableSizeBuffer; 12 import org.apache.avalon.excalibur.concurrent.Mutex; 13 14 20 public final class DefaultQueue extends AbstractQueue 21 { 22 private final Buffer m_elements; 23 private final Mutex m_mutex; 24 private int m_reserve; 25 private final int m_maxSize; 26 27 public DefaultQueue( int size ) 28 { 29 int maxSize; 30 31 if ( size > 0 ) 32 { 33 m_elements = new VariableSizeBuffer( size ); 34 maxSize = size; 35 } 36 else 37 { 38 m_elements = new VariableSizeBuffer(); 39 maxSize = -1; 40 } 41 42 m_mutex = new Mutex(); 43 m_reserve = 0; 44 m_maxSize = maxSize; 45 } 46 47 public DefaultQueue() 48 { 49 this( -1 ); 50 } 51 52 public int size() 53 { 54 return m_elements.size(); 55 } 56 57 public int maxSize() 58 { 59 return m_maxSize; 60 } 61 62 public PreparedEnqueue prepareEnqueue( final QueueElement[] elements ) 63 throws SourceException 64 { 65 PreparedEnqueue enqueue = null; 66 67 try 68 { 69 m_mutex.acquire(); 70 71 if ( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) 72 { 73 throw new SourceFullException("Not enough room to enqueue these elements."); 74 } 75 76 enqueue = new DefaultPreparedEnqueue( this, elements ); 77 } 78 catch ( InterruptedException ie ) 79 { 80 } 81 finally 82 { 83 m_mutex.release(); 84 } 85 86 return enqueue; 87 } 88 89 public boolean tryEnqueue( final QueueElement element ) 90 { 91 boolean success = false; 92 93 try 94 { 95 m_mutex.acquire(); 96 97 if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) 98 { 99 return false; 100 } 101 102 m_elements.add( element ); 103 success = true; 104 } 105 catch ( InterruptedException ie ) 106 { 107 } 108 finally 109 { 110 m_mutex.release(); 111 } 112 113 return success; 114 } 115 116 public void enqueue( final QueueElement[] elements ) 117 throws SourceException 118 { 119 final int len = elements.length; 120 121 try 122 { 123 m_mutex.acquire(); 124 if ( maxSize() > 0 && elements.length + m_reserve + size() > maxSize() ) 125 { 126 throw new SourceFullException("Not enough room to enqueue these elements."); 127 } 128 129 for ( int i = 0; i < len; i++ ) 130 { 131 m_elements.add( elements[i] ); 132 } 133 } 134 catch ( InterruptedException ie ) 135 { 136 } 137 finally 138 { 139 m_mutex.release(); 140 } 141 } 142 143 public void enqueue( final QueueElement element ) 144 throws SourceException 145 { 146 try 147 { 148 m_mutex.acquire(); 149 if ( maxSize() > 0 && 1 + m_reserve + size() > maxSize() ) 150 { 151 throw new SourceFullException("Not enough room to enqueue these elements."); 152 } 153 154 m_elements.add( element ); 155 } 156 catch ( InterruptedException ie ) 157 { 158 } 159 finally 160 { 161 m_mutex.release(); 162 } 163 } 164 165 public QueueElement[] dequeue( final int numElements ) 166 { 167 int arraySize = numElements; 168 169 if ( size() < numElements ) 170 { 171 arraySize = size(); 172 } 173 174 QueueElement[] elements = null; 175 176 try 177 { 178 m_mutex.attempt( m_timeout ); 179 180 if ( size() < numElements ) 181 { 182 arraySize = size(); 183 } 184 185 elements = new QueueElement[ arraySize ]; 186 187 for ( int i = 0; i < arraySize; i++ ) 188 { 189 elements[i] = (QueueElement) m_elements.remove(); 190 } 191 } 192 catch ( InterruptedException ie ) 193 { 194 } 195 finally 196 { 197 m_mutex.release(); 198 } 199 200 return elements; 201 } 202 203 public QueueElement[] dequeueAll() 204 { 205 QueueElement[] elements = null; 206 207 try 208 { 209 m_mutex.attempt( m_timeout ); 210 211 elements = new QueueElement[ size() ]; 212 213 for ( int i = 0; i < elements.length; i++ ) 214 { 215 elements[i] = (QueueElement) m_elements.remove(); 216 } 217 } 218 catch ( InterruptedException ie ) 219 { 220 } 221 finally 222 { 223 m_mutex.release(); 224 } 225 226 return elements; 227 } 228 229 public QueueElement dequeue() 230 { 231 QueueElement element = null; 232 233 try 234 { 235 m_mutex.attempt( m_timeout ); 236 237 if ( size() > 0 ) 238 { 239 element = (QueueElement) m_elements.remove(); 240 } 241 } 242 catch ( InterruptedException ie ) 243 { 244 } 245 finally 246 { 247 m_mutex.release(); 248 } 249 250 return element; 251 } 252 253 private final static class DefaultPreparedEnqueue implements PreparedEnqueue 254 { 255 private final DefaultQueue m_parent; 256 private QueueElement[] m_elements; 257 258 private DefaultPreparedEnqueue( DefaultQueue parent, QueueElement[] elements ) 259 { 260 m_parent = parent; 261 m_elements = elements; 262 } 263 264 265 public void commit() 266 { 267 if ( null == m_elements ) 268 { 269 throw new IllegalStateException ("This PreparedEnqueue has already been processed!"); 270 } 271 272 try 273 { 274 m_parent.enqueue( m_elements ); 275 m_parent.m_reserve -= m_elements.length; 276 m_elements = null; 277 } 278 catch (Exception e) 279 { 280 throw new IllegalStateException ("Default enqueue did not happen--should be impossible"); 281 } 283 } 284 285 public void abort() 286 { 287 if ( null == m_elements ) 288 { 289 throw new IllegalStateException ("This PreparedEnqueue has already been processed!"); 290 } 291 292 m_parent.m_reserve -= m_elements.length; 293 m_elements = null; 294 } 295 } 296 } 297 | Popular Tags |