1 6 7 package com.hp.hpl.jena.graph.query; 8 9 import EDU.oswego.cs.dl.util.concurrent.*; 10 import com.hp.hpl.jena.shared.*; 11 12 import java.util.*; 13 14 18 public class BufferPipe implements Pipe 19 { 20 private boolean open = true; 21 private BoundedBuffer buffer = new BoundedBuffer( 5 ); 22 private Object pending = null; 23 24 public static class Finished 25 { 26 protected RuntimeException e; 27 28 public Finished() {} 29 30 public Finished( Exception e ) { this.e = new QueryStageException( e ); } 31 32 public RuntimeException getCause() { return e; } 33 } 34 35 private static final Finished finished = new Finished(); 36 37 public BufferPipe() 38 { } 39 40 41 private Object fetch() 42 { 43 try { return buffer.take(); } 44 catch (Exception e) { throw new BoundedBufferTakeException( e ); } 45 } 46 47 48 private void putAny( Object d ) 49 { 50 try { buffer.put( d ); return; } 51 catch (Exception e) { throw new BoundedBufferPutException( e ); } 52 } 53 54 public void put( Domain d ) 55 { putAny( d ); } 56 57 public void close() 58 { putAny( finished ); } 59 60 public void close( Exception e ) 61 { putAny( new Finished( e ) ); } 62 63 public boolean hasNext() 64 { 65 if (open) 66 { 67 if (pending == null) 68 { 69 pending = fetch(); 70 if (pending instanceof Finished) 71 { 72 Finished end = (Finished) pending; 73 RuntimeException cause = end.getCause(); 74 if (cause == null) open = false; 75 else throw cause; 76 } 77 return open; 78 } 79 else 80 return true; 81 } 82 else 83 return false; 84 } 85 86 public Domain get() 87 { 88 if (hasNext() == false) throw new NoSuchElementException(); 89 try { return (Domain) pending; } finally { pending = null; } 90 } 91 92 95 public static class BoundedBufferTakeException extends JenaException 96 { BoundedBufferTakeException( Exception e ) { super( e ); } } 97 98 101 public static class BoundedBufferPutException extends JenaException 102 { BoundedBufferPutException( Exception e ) { super( e ); } } 103 } 104 133 | Popular Tags |