KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > hp > hpl > jena > rdf > arp > PushMePullYouPipe


1 /*
2  * (c) Copyright 2004, 2005 Hewlett-Packard Development Company, LP
3  * [See end of file]
4  */

5
6 package com.hp.hpl.jena.rdf.arp;
7
8
9 import org.xml.sax.*;
10
11 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
12
13 import com.hp.hpl.jena.shared.JenaException;
14
15 /**
16  * @author Jeremy J. Carroll
17  *
18  */

19 class PushMePullYouPipe extends TokenPipe {
20
21     volatile private Throwable JavaDoc brokenPipe = null;
22
23     private boolean open = true;
24     
25     private boolean overflow = false;
26
27     private BoundedBuffer buffer = new BoundedBuffer(100);
28
29     private Object JavaDoc pending = null;
30
31     private static final String JavaDoc finished = "<finished>";
32
33     private static final RuntimeException JavaDoc naturalEnd = new RuntimeException JavaDoc();
34
35     final private Thread JavaDoc puller;
36
37     
38     PushMePullYouPipe(final ARPRunnable puller) {
39         this.puller =
40                 new Thread JavaDoc() {
41             public void run() {
42                 // pipe.pullerSleep();
43
try {
44                     puller.run();
45                     naturalDeath();
46                 } catch (Throwable JavaDoc e) {
47                     setException(e);
48                 } finally {
49                 }
50             }
51
52         };
53         
54     }
55     void start() {
56        this.puller.start();
57     }
58     /**
59      * get something from the pipe; take care of BoundedBuffer's checked
60      * exceptions
61      */

62     private Object JavaDoc fetch() {
63         try {
64             return buffer.take();
65         } catch (Exception JavaDoc e) {
66             throw new BoundedBufferTakeException(e);
67         }
68     }
69
70     /**
71      * put something into the pipe; take care of BoundedBuffer's checked
72      * exceptions
73      */

74     private void putAny(Object JavaDoc d) throws SAXParseException {
75         try {
76             do {
77                 if (d != finished)
78                     isPipeBroken();
79             } while (!buffer.offer(d, 100));
80         } catch (InterruptedException JavaDoc e) {
81             throw new BoundedBufferPutException(e);
82         }
83     }
84
85     private void isPipeBroken() throws SAXParseException {
86         if (brokenPipe != null) {
87             if ( brokenPipe == naturalEnd ){
88                 SAXParseException ee = new SAXParseException("RDF parsing finished, additional XML events",getLocator());
89                 //System.err.println(
90
// ParseException.formatMessage(ee));
91
overflow = true;
92                 throw ee;
93             }
94             try {
95                 throw brokenPipe;
96             } catch (RuntimeException JavaDoc e) {
97                 throw e;
98             } catch (Error JavaDoc e) {
99                 throw e;
100             } catch (SAXParseException e) {
101                 throw e;
102             } catch (Exception JavaDoc e) {
103                 throw new WrappedException(e);
104
105             } catch (Throwable JavaDoc t) {
106                 throw new RuntimeException JavaDoc("Exception from RDF thread.",t);
107             }
108             /*
109             pipe().naturalDeath();
110         } catch (WrappedException wrapped) {
111             pipe().setException(wrapped.inner);
112         } catch (ParseException parse) {
113             pipe().setException(parse.rootCause());
114         } catch (RuntimeException e) {
115             pipe().setException(e);
116         } catch (Error e) {
117             pipe().setException(e);
118         */

119         }
120     }
121
122     public void putNextToken(Token d) throws SAXParseException {
123         putAny(d);
124     }
125
126     public void close() throws SAXParseException {
127         putAny(finished);
128         try {
129             puller.join();
130         } catch (InterruptedException JavaDoc e) {
131
132         }
133         if (brokenPipe != naturalEnd)
134             isPipeBroken();
135         /* Note check that pipe is exhausted is done
136          * on exactlyExhausted ....
137          */

138     }
139
140     private boolean hasNext() {
141         if (open) {
142             if (pending == null) {
143                 pending = fetch();
144                 if (pending == finished)
145                     open = false;
146                 return open;
147             } else
148                 return true;
149         } else
150             return false;
151     }
152     
153     boolean exactlyExhausted() {
154         return !(overflow||hasNext());
155     }
156
157     public Token getNextToken() {
158         if (hasNext() == false)
159             return new Token(RDFParserConstants.EOF, null);
160         try {
161             return (Token) pending;
162         } finally {
163             pending = null;
164         }
165     }
166
167     void naturalDeath() {
168         setException(naturalEnd);
169     }
170
171     void setException(Throwable JavaDoc t) {
172         brokenPipe = t;
173     }
174
175     /**
176      * Exception to throw if a <code>take</code> throws an exception.
177      */

178     static class BoundedBufferTakeException extends JenaException {
179         BoundedBufferTakeException(Exception JavaDoc e) {
180             super(e);
181         }
182     }
183
184     /**
185      * Exception to throw if a <code>put</code> throws an exception.
186      */

187     static class BoundedBufferPutException extends JenaException {
188         BoundedBufferPutException(Exception JavaDoc e) {
189             super(e);
190         }
191     }
192
193 }
194
195 /*
196  * (c) Copyright 2004, 2005 Hewlett-Packard Development Company, LP All rights
197  * reserved.
198  *
199  * Redistribution and use in source and binary forms, with or without
200  * modification, are permitted provided that the following conditions are met:
201  * 1. Redistributions of source code must retain the above copyright notice,
202  * this list of conditions and the following disclaimer. 2. Redistributions in
203  * binary form must reproduce the above copyright notice, this list of
204  * conditions and the following disclaimer in the documentation and/or other
205  * materials provided with the distribution. 3. The name of the author may not
206  * be used to endorse or promote products derived from this software without
207  * specific prior written permission.
208  *
209  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
210  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
211  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
212  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
213  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
214  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
215  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
216  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
217  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
218  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
219  */

220
221
Popular Tags