1 5 6 package com.hp.hpl.jena.rdf.arp; 7 8 9 import org.xml.sax.*; 10 11 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; 12 13 import com.hp.hpl.jena.shared.JenaException; 14 15 19 class PushMePullYouPipe extends TokenPipe { 20 21 volatile private Throwable brokenPipe = null; 22 23 private boolean open = true; 24 25 private boolean overflow = false; 26 27 private BoundedBuffer buffer = new BoundedBuffer(100); 28 29 private Object pending = null; 30 31 private static final String finished = "<finished>"; 32 33 private static final RuntimeException naturalEnd = new RuntimeException (); 34 35 final private Thread puller; 36 37 38 PushMePullYouPipe(final ARPRunnable puller) { 39 this.puller = 40 new Thread () { 41 public void run() { 42 try { 44 puller.run(); 45 naturalDeath(); 46 } catch (Throwable e) { 47 setException(e); 48 } finally { 49 } 50 } 51 52 }; 53 54 } 55 void start() { 56 this.puller.start(); 57 } 58 62 private Object fetch() { 63 try { 64 return buffer.take(); 65 } catch (Exception e) { 66 throw new BoundedBufferTakeException(e); 67 } 68 } 69 70 74 private void putAny(Object d) throws SAXParseException { 75 try { 76 do { 77 if (d != finished) 78 isPipeBroken(); 79 } while (!buffer.offer(d, 100)); 80 } catch (InterruptedException e) { 81 throw new BoundedBufferPutException(e); 82 } 83 } 84 85 private void isPipeBroken() throws SAXParseException { 86 if (brokenPipe != null) { 87 if ( brokenPipe == naturalEnd ){ 88 SAXParseException ee = new SAXParseException("RDF parsing finished, additional XML events",getLocator()); 89 overflow = true; 92 throw ee; 93 } 94 try { 95 throw brokenPipe; 96 } catch (RuntimeException e) { 97 throw e; 98 } catch (Error e) { 99 throw e; 100 } catch (SAXParseException e) { 101 throw e; 102 } catch (Exception e) { 103 throw new WrappedException(e); 104 105 } catch (Throwable t) { 106 throw new RuntimeException ("Exception from RDF thread.",t); 107 } 108 119 } 120 } 121 122 public void putNextToken(Token d) throws SAXParseException { 123 putAny(d); 124 } 125 126 public void close() throws SAXParseException { 127 putAny(finished); 128 try { 129 puller.join(); 130 } catch (InterruptedException e) { 131 132 } 133 if (brokenPipe != naturalEnd) 134 isPipeBroken(); 135 138 } 139 140 private boolean hasNext() { 141 if (open) { 142 if (pending == null) { 143 pending = fetch(); 144 if (pending == finished) 145 open = false; 146 return open; 147 } else 148 return true; 149 } else 150 return false; 151 } 152 153 boolean exactlyExhausted() { 154 return !(overflow||hasNext()); 155 } 156 157 public Token getNextToken() { 158 if (hasNext() == false) 159 return new Token(RDFParserConstants.EOF, null); 160 try { 161 return (Token) pending; 162 } finally { 163 pending = null; 164 } 165 } 166 167 void naturalDeath() { 168 setException(naturalEnd); 169 } 170 171 void setException(Throwable t) { 172 brokenPipe = t; 173 } 174 175 178 static class BoundedBufferTakeException extends JenaException { 179 BoundedBufferTakeException(Exception e) { 180 super(e); 181 } 182 } 183 184 187 static class BoundedBufferPutException extends JenaException { 188 BoundedBufferPutException(Exception e) { 189 super(e); 190 } 191 } 192 193 } 194 195 220 221 | Popular Tags |