KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > Ostermiller > util > CircularObjectBuffer


1 /*
2  * Circular Object Buffer
3  * Copyright (C) 2002-2004 Stephen Ostermiller
4  * http://ostermiller.org/contact.pl?regarding=Java+Utilities
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * See COPYING.TXT for details.
17  */

18 package com.Ostermiller.util;
19
20 /**
21  * Implements the Circular Buffer producer/consumer model for Objects.
22  * More information about this class is available from <a target="_top" HREF=
23  * "http://ostermiller.org/utils/CircularObjectBuffer.html">ostermiller.org</a>.
24  * <p>
25  * This class is thread safe.
26  *
27  * @see CircularCharBuffer
28  * @see CircularByteBuffer
29  *
30  * @author Stephen Ostermiller http://ostermiller.org/contact.pl?regarding=Java+Utilities
31  * @since ostermillerutils 1.00.00
32  */

33 public class CircularObjectBuffer <ElementType> {
34
35     /**
36      * The default size for a circular object buffer.
37      *
38      * @since ostermillerutils 1.00.00
39      */

40     private final static int DEFAULT_SIZE = 1024;
41
42     /**
43      * A buffer that will grow as things are added.
44      *
45      * @since ostermillerutils 1.00.00
46      */

47     public final static int INFINITE_SIZE = -1;
48
49     /**
50      * The circular buffer.
51      * <p>
52      * The actual capacity of the buffer is one less than the actual length
53      * of the buffer so that an empty and a full buffer can be
54      * distinguished. An empty buffer will have the readPostion and the
55      * writePosition equal to each other. A full buffer will have
56      * the writePosition one less than the readPostion.
57      * <p>
58      * There are two important indexes into the buffer:
59      * The readPosition, and the writePosition. The Objects
60      * available to be read go from the readPosition to the writePosition,
61      * wrapping around the end of the buffer. The space available for writing
62      * goes from the write position to one less than the readPosition,
63      * wrapping around the end of the buffer.
64      *
65      * @since ostermillerutils 1.00.00
66      */

67     protected ElementType[] buffer;
68     /**
69      * Index of the first Object available to be read.
70      *
71      * @since ostermillerutils 1.00.00
72      */

73     protected volatile int readPosition = 0;
74     /**
75      * Index of the first Object available to be written.
76      *
77      * @since ostermillerutils 1.00.00
78      */

79     protected volatile int writePosition = 0;
80     /**
81      * If this buffer is infinite (should resize itself when full)
82      *
83      * @since ostermillerutils 1.00.00
84      */

85     protected volatile boolean infinite = false;
86     /**
87      * True if a write to a full buffer should block until the buffer
88      * has room, false if the write method should throw an IOException
89      *
90      * @since ostermillerutils 1.00.00
91      */

92     protected boolean blockingWrite = true;
93
94     /**
95      * True when no more input is coming into this buffer. At that
96      * point reading from the buffer may return null if the buffer
97      * is empty, otherwise a read will block until an Object is available.
98      *
99      * @since ostermillerutils 1.00.00
100      */

101     protected boolean inputDone = false;
102
103     /**
104      * Make this buffer ready for reuse. The contents of the buffer
105      * will be cleared and the streams associated with this buffer
106      * will be reopened if they had been closed.
107      *
108      * @since ostermillerutils 1.00.00
109      */

110     public void clear(){
111         synchronized (this){
112             readPosition = 0;
113             writePosition = 0;
114             inputDone = false;
115         }
116     }
117
118     /**
119      * Get number of Objects that are available to be read.
120      * <p>
121      * Note that the number of Objects available plus
122      * the number of Objects free may not add up to the
123      * capacity of this buffer, as the buffer may reserve some
124      * space for other purposes.
125      *
126      * @return the size in Objects of this buffer
127      *
128      * @since ostermillerutils 1.00.00
129      */

130     public int getAvailable(){
131         synchronized (this){
132             return available();
133         }
134     }
135
136     /**
137      * Get the number of Objects this buffer has free for
138      * writing.
139      * <p>
140      * Note that the number of Objects available plus
141      * the number of Objects free may not add up to the
142      * capacity of this buffer, as the buffer may reserve some
143      * space for other purposes.
144      *
145      * @return the available space in Objects of this buffer
146      *
147      * @since ostermillerutils 1.00.00
148      */

149     public int getSpaceLeft(){
150         synchronized (this){
151             return spaceLeft();
152         }
153     }
154
155     /**
156      * Get the capacity of this buffer.
157      * <p>
158      * Note that the number of Objects available plus
159      * the number of Objects free may not add up to the
160      * capacity of this buffer, as the buffer may reserve some
161      * space for other purposes.
162      *
163      * @return the size in Objects of this buffer
164      *
165      * @since ostermillerutils 1.00.00
166      */

167     public int getSize(){
168         synchronized (this){
169             return buffer.length;
170         }
171     }
172
173     private ElementType[] createArray(int size){
174         // The Java 1.5 compiler complains that this is unsafe
175
// casting - it really isn't and there is no way to
176
// make the compiler think otherwise.
177
return (ElementType[]) new Object JavaDoc[size];
178     }
179
180     /**
181      * double the size of the buffer
182      *
183      * @since ostermillerutils 1.00.00
184      */

185     private void resize(){
186         ElementType[] newBuffer = createArray(buffer.length * 2);
187         int available = available();
188         if (readPosition <= writePosition){
189             // any space between the read and
190
// the first write needs to be saved.
191
// In this case it is all in one piece.
192
int length = writePosition - readPosition;
193             System.arraycopy(buffer, readPosition, newBuffer, 0, length);
194         } else {
195             int length1 = buffer.length - readPosition;
196             System.arraycopy(buffer, readPosition, newBuffer, 0, length1);
197             int length2 = writePosition;
198             System.arraycopy(buffer, 0, newBuffer, length1, length2);
199         }
200         buffer = newBuffer;
201         readPosition = 0;
202         writePosition = available;
203     }
204
205     /**
206      * Space available in the buffer which can be written.
207      *
208      * @since ostermillerutils 1.00.00
209      */

210     private int spaceLeft(){
211         if (writePosition < readPosition){
212             // any space between the first write and
213
// the read except one Object is available.
214
// In this case it is all in one piece.
215
return (readPosition - writePosition - 1);
216         } else {
217             // space at the beginning and end.
218
return ((buffer.length - 1) - (writePosition - readPosition));
219         }
220     }
221
222     /**
223      * Objects available for reading.
224      *
225      * @since ostermillerutils 1.00.00
226      */

227     private int available(){
228         if (readPosition <= writePosition){
229             // any space between the first read and
230
// the first write is available. In this case i
231
// is all in one piece.
232
return (writePosition - readPosition);
233         } else {
234             // space at the beginning and end.
235
return (buffer.length - (readPosition - writePosition));
236         }
237     }
238
239     /**
240      * Create a new buffer with a default capacity.
241      * Writing to a full buffer will block until space
242      * is available rather than throw an exception.
243      *
244      * @since ostermillerutils 1.00.00
245      */

246     public CircularObjectBuffer(){
247         this (DEFAULT_SIZE, true);
248     }
249
250     /**
251      * Create a new buffer with given capacity.
252      * Writing to a full buffer will block until space
253      * is available rather than throw an exception.
254      * <p>
255      * Note that the buffer may reserve some Objects for
256      * special purposes and capacity number of Objects may
257      * not be able to be written to the buffer.
258      * <p>
259      * Note that if the buffer is of INFINITE_SIZE it will
260      * neither block or throw exceptions, but rather grow
261      * without bound.
262      *
263      * @param size desired capacity of the buffer in Objects or CircularObjectBuffer.INFINITE_SIZE.
264      *
265      * @since ostermillerutils 1.00.00
266      */

267     public CircularObjectBuffer(int size){
268         this (size, true);
269     }
270
271     /**
272      * Create a new buffer with a default capacity and
273      * given blocking behavior.
274      *
275      * @param blockingWrite true writing to a full buffer should block
276      * until space is available, false if an exception should
277      * be thrown instead.
278      *
279      * @since ostermillerutils 1.00.00
280      */

281     public CircularObjectBuffer(boolean blockingWrite){
282         this (DEFAULT_SIZE, blockingWrite);
283     }
284
285     /**
286      * Create a new buffer with the given capacity and
287      * blocking behavior.
288      * <p>
289      * Note that the buffer may reserve some Objects for
290      * special purposes and capacity number of Objects may
291      * not be able to be written to the buffer.
292      * <p>
293      * Note that if the buffer is of INFINITE_SIZE it will
294      * neither block or throw exceptions, but rather grow
295      * without bound.
296      *
297      * @param size desired capacity of the buffer in Objects or CircularObjectBuffer.INFINITE_SIZE.
298      * @param blockingWrite true writing to a full buffer should block
299      * until space is available, false if an exception should
300      * be thrown instead.
301      *
302      * @since ostermillerutils 1.00.00
303      */

304     public CircularObjectBuffer(int size, boolean blockingWrite){
305         if (size == INFINITE_SIZE){
306             buffer = createArray(DEFAULT_SIZE);
307             infinite = true;
308         } else {
309             buffer = createArray(size);
310             infinite = false;
311         }
312         this.blockingWrite = blockingWrite;
313     }
314
315
316     /**
317      * Get a single Object from this buffer. This method should be called
318      * by the consumer.
319      * This method will block until a Object is available or no more
320      * objects are available.
321      *
322      * @return The Object read, or null if there are no more objects
323      * @throws InterruptedException if the thread is inturrupted while waiting.
324      *
325      * @since ostermillerutils 1.00.00
326      */

327     public ElementType read() throws InterruptedException JavaDoc {
328         while (true){
329             synchronized (this){
330                 int available = available();
331                 if (available > 0){
332                     ElementType result = buffer[readPosition];
333                     readPosition++;
334                     if (readPosition == buffer.length){
335                         readPosition = 0;
336                     }
337                     return result;
338                 } else if (inputDone){
339                     return null;
340                 }
341             }
342             Thread.sleep(100);
343         }
344     }
345
346     /**
347      * Get Objects into an array from this buffer. This method should
348      * be called by the consumer.
349      * This method will block until some input is available,
350      * or there is no more input.
351      *
352      * @param buf Destination buffer.
353      * @return The number of Objects read, or -1 there will
354      * be no more objects available.
355      * @throws InterruptedException if the thread is inturrupted while waiting.
356      *
357      * @since ostermillerutils 1.00.00
358      */

359     public int read(ElementType[] buf) throws InterruptedException JavaDoc {
360         return read(buf, 0, buf.length);
361     }
362
363     /**
364      * Get Objects into a portion of an array from this buffer. This
365      * method should be called by the consumer.
366      * This method will block until some input is available,
367      * an I/O error occurs, or the end of the stream is reached.
368      *
369      * @param buf Destination buffer.
370      * @param off Offset at which to start storing Objects.
371      * @param len Maximum number of Objects to read.
372      * @return The number of Objects read, or -1 there will
373      * be no more objects available.
374      * @throws InterruptedException if the thread is inturrupted while waiting.
375      *
376      * @since ostermillerutils 1.00.00
377      */

378     public int read(ElementType[] buf, int off, int len) throws InterruptedException JavaDoc {
379         while (true){
380             synchronized (this){
381                 int available = available();
382                 if (available > 0){
383                     int length = Math.min(len, available);
384                     int firstLen = Math.min(length, buffer.length - readPosition);
385                     int secondLen = length - firstLen;
386                     System.arraycopy(buffer, readPosition, buf, off, firstLen);
387                     if (secondLen > 0){
388                         System.arraycopy(buffer, 0, buf, off+firstLen, secondLen);
389                         readPosition = secondLen;
390                     } else {
391                         readPosition += length;
392                     }
393                     if (readPosition == buffer.length) {
394                         readPosition = 0;
395                     }
396                     return length;
397                 } else if (inputDone){
398                     return -1;
399                 }
400             }
401             Thread.sleep(100);
402         }
403     }
404
405
406     /**
407      * Skip Objects. This method should be used by the consumer
408      * when it does not care to examine some number of Objects.
409      * This method will block until some Objects are available,
410      * or there will be no more Objects available.
411      *
412      * @param n The number of Objects to skip
413      * @return The number of Objects actually skipped
414      * @throws IllegalArgumentException if n is negative.
415      * @throws InterruptedException if the thread is inturrupted while waiting.
416      *
417      * @since ostermillerutils 1.00.00
418      */

419     public long skip(long n) throws InterruptedException JavaDoc, IllegalArgumentException JavaDoc {
420         while (true){
421             synchronized (this){
422                 int available = available();
423                 if (available > 0){
424                     int length = Math.min((int)n, available);
425                     int firstLen = Math.min(length, buffer.length - readPosition);
426                     int secondLen = length - firstLen;
427                     if (secondLen > 0){
428                         readPosition = secondLen;
429                     } else {
430                         readPosition += length;
431                     }
432                     if (readPosition == buffer.length) {
433                         readPosition = 0;
434                     }
435                     return length;
436                 } else if (inputDone){
437                     return 0;
438                 }
439             }
440             Thread.sleep(100);
441         }
442     }
443
444     /**
445      * This method should be used by the producer to signal to the consumer
446      * that the producer is done producing objects and that the consumer
447      * should stop asking for objects once it has used up buffered objects.
448      * <p>
449      * Once the producer has signaled that it is done, further write() invocations
450      * will cause an IllegalStateException to be thrown. Calling done() multiple times,
451      * however, has no effect.
452      *
453      * @since ostermillerutils 1.00.00
454      */

455     public void done(){
456         synchronized (this){
457             inputDone = true;
458         }
459     }
460
461     /**
462      * Fill this buffer with array of Objects. This method should be called
463      * by the producer.
464      * If the buffer allows blocking writes, this method will block until
465      * all the data has been written rather than throw a BufferOverflowException.
466      *
467      * @param buf Array of Objects to be written
468      * @throws BufferOverflowException if buffer does not allow blocking writes
469      * and the buffer is full. If the exception is thrown, no data
470      * will have been written since the buffer was set to be non-blocking.
471      * @throws IllegalStateException if done() has been called.
472      * @throws InterruptedException if the write is interrupted.
473      *
474      * @since ostermillerutils 1.00.00
475      */

476     public void write(ElementType[] buf) throws BufferOverflowException, IllegalStateException JavaDoc, InterruptedException JavaDoc {
477         write(buf, 0, buf.length);
478     }
479
480     /**
481      * Fill this buffer with a portion of an array of Objects.
482      * This method should be called by the producer.
483      * If the buffer allows blocking writes, this method will block until
484      * all the data has been written rather than throw an IOException.
485      *
486      * @param buf Array of Objects
487      * @param off Offset from which to start writing Objects
488      * @param len - Number of Objects to write
489      * @throws BufferOverflowException if buffer does not allow blocking writes
490      * and the buffer is full. If the exception is thrown, no data
491      * will have been written since the buffer was set to be non-blocking.
492      * @throws IllegalStateException if done() has been called.
493      * @throws InterruptedException if the write is interrupted.
494      *
495      * @since ostermillerutils 1.00.00
496      */

497     public void write(ElementType[] buf, int off, int len) throws BufferOverflowException, IllegalStateException JavaDoc, InterruptedException JavaDoc {
498         while (len > 0){
499             synchronized (CircularObjectBuffer.this){
500                 if (inputDone) throw new IllegalStateException JavaDoc("CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed.");
501                 int spaceLeft = spaceLeft();
502                 while (infinite && spaceLeft < len){
503                     resize();
504                     spaceLeft = spaceLeft();
505                 }
506                 if (!blockingWrite && spaceLeft < len) throw new BufferOverflowException("CircularObjectBuffer is full; cannot write " + len + " Objects");
507                 int realLen = Math.min(len, spaceLeft);
508                 int firstLen = Math.min(realLen, buffer.length - writePosition);
509                 int secondLen = Math.min(realLen - firstLen, buffer.length - readPosition - 1);
510                 int written = firstLen + secondLen;
511                 if (firstLen > 0){
512                     System.arraycopy(buf, off, buffer, writePosition, firstLen);
513                 }
514                 if (secondLen > 0){
515                     System.arraycopy(buf, off+firstLen, buffer, 0, secondLen);
516                     writePosition = secondLen;
517                 } else {
518                     writePosition += written;
519                 }
520                 if (writePosition == buffer.length) {
521                     writePosition = 0;
522                 }
523                 off += written;
524                 len -= written;
525             }
526             if (len > 0){
527                 Thread.sleep(100);
528             }
529         }
530     }
531
532     /**
533      * Add a single Object to this buffer. This method should be
534      * called by the producer.
535      * If the buffer allows blocking writes, this method will block until
536      * all the data has been written rather than throw an IOException.
537      *
538      * @param o Object to be written.
539      * @throws BufferOverflowException if buffer does not allow blocking writes
540      * and the buffer is full. If the exception is thrown, no data
541      * will have been written since the buffer was set to be non-blocking.
542      * @throws IllegalStateException if done() has been called.
543      * @throws InterruptedException if the write is interrupted.
544      *
545      * @since ostermillerutils 1.00.00
546      */

547     public void write(ElementType o) throws BufferOverflowException, IllegalStateException JavaDoc, InterruptedException JavaDoc {
548         boolean written = false;
549         while (!written){
550             synchronized (CircularObjectBuffer.this){
551                 if (inputDone) throw new IllegalStateException JavaDoc("CircularObjectBuffer.done() has been called, CircularObjectBuffer.write() failed.");
552                 int spaceLeft = spaceLeft();
553                 while (infinite && spaceLeft < 1){
554                     resize();
555                     spaceLeft = spaceLeft();
556                 }
557                 if (!blockingWrite && spaceLeft < 1) throw new BufferOverflowException("CircularObjectBuffer is full; cannot write 1 Object");
558                 if (spaceLeft > 0){
559                     buffer[writePosition] = o;
560                     writePosition++;
561                     if (writePosition == buffer.length) {
562                         writePosition = 0;
563                     }
564                     written = true;
565                 }
566             }
567             if (!written){
568                 Thread.sleep(100);
569             }
570         }
571     }
572 }
573
Popular Tags