KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ziclix > python > sql > pipe > Pipe


1 /*
2  * Jython Database Specification API 2.0
3  *
4  * $Id: Pipe.java,v 1.4 2005/02/23 04:26:19 bzimmer Exp $
5  *
6  * Copyright (c) 2001 brian zimmer <bzimmer@ziclix.com>
7  *
8  */

9 package com.ziclix.python.sql.pipe;
10
11 import org.python.core.*;
12 import com.ziclix.python.sql.*;
13 import com.ziclix.python.sql.util.*;
14
15 /**
16  * Manager for a Sink and Source. The Pipe creates a Queue through which the Source
17  * can feed data to the Sink. Both Sink and Source run in their own thread and can
18  * are completely independent of the other. When the Source pushes None onto the
19  * Queue, the piping is stopped and the Sink finishes processing all the remaining
20  * data. This class is especially useful for loading/copying data from one database
21  * or table to another.
22  *
23  * @author brian zimmer
24  * @version $Revision: 1.4 $
25  */

26 public class Pipe {
27
28     /**
29      * Default empty constructor.
30      */

31     public Pipe() {
32     }
33
34     /**
35      * Start the processing of the Source->Sink.
36      *
37      * @param source the data generator
38      * @param sink the consumer of the data
39      * @return the number of rows seen (this includes the header row)
40      */

41     public PyObject pipe(Source source, Sink sink) {
42
43         Queue queue = new Queue();
44         SourceRunner sourceRunner = new SourceRunner(queue, source);
45         SinkRunner sinkRunner = new SinkRunner(queue, sink);
46
47         sourceRunner.start();
48         sinkRunner.start();
49
50         try {
51             sourceRunner.join();
52         } catch (InterruptedException JavaDoc e) {
53             queue.close();
54
55             throw zxJDBC.makeException(e);
56         }
57
58         try {
59             sinkRunner.join();
60         } catch (InterruptedException JavaDoc e) {
61             queue.close();
62
63             throw zxJDBC.makeException(e);
64         }
65
66         /*
67          * This is interesting territory. I originally tried to store the the Throwable in the Thread instance
68          * and then re-throw it here, but whenever I tried, I would get an NPE in the construction of the
69          * PyTraceback required for the PyException. I tried calling .fillInStackTrace() but that didn't work
70          * either. So I'm left with getting the String representation and throwing that. At least it gives
71          * the relevant error messages, but the stack is lost. This might have something to do with a Java
72          * issue I don't completely understand, such as what happens for an Exception whose Thread is no longer
73          * running? Anyways, if anyone knows what to do I would love to hear about it.
74          */

75         if (sourceRunner.threwException()) {
76             throw zxJDBC.makeException(sourceRunner.getException().toString());
77         }
78
79         if (sinkRunner.threwException()) {
80             throw zxJDBC.makeException(sinkRunner.getException().toString());
81         }
82
83         // if the source count is -1, no rows were queried
84
if (sinkRunner.getCount() == 0) {
85             return Py.newInteger(0);
86         }
87
88         // Assert that both sides handled the same number of rows. I know doing the check up front kinda defeats
89
// the purpose of the assert, but there's no need to create the buffer if I don't need it and I still
90
// want to throw the AssertionError if required
91
if ((sourceRunner.getCount() - sinkRunner.getCount()) != 0) {
92             Integer JavaDoc[] counts = {new Integer JavaDoc(sourceRunner.getCount()),
93                                 new Integer JavaDoc(sinkRunner.getCount())};
94             String JavaDoc msg = zxJDBC.getString("inconsistentRowCount", counts);
95
96             Py.assert_(Py.Zero, Py.newString(msg));
97         }
98
99         return Py.newInteger(sinkRunner.getCount());
100     }
101 }
102
103 /**
104  * Class PipeRunner
105  *
106  * @author
107  * @author last modified by $Author: bzimmer $
108  * @version $Revision: 1.4 $
109  * @date $today.date$
110  * @date last modified on $Date: 2005/02/23 04:26:19 $
111  * @copyright 2001 brian zimmer
112  */

113 abstract class PipeRunner extends Thread JavaDoc {
114
115     /**
116      * Field counter
117      */

118     protected int counter;
119
120     /**
121      * Field queue
122      */

123     protected Queue queue;
124
125     /**
126      * Field exception
127      */

128     protected Throwable JavaDoc exception;
129
130     /**
131      * Constructor PipeRunner
132      *
133      * @param Queue queue
134      */

135     public PipeRunner(Queue queue) {
136
137         this.counter = 0;
138         this.queue = queue;
139         this.exception = null;
140     }
141
142     /**
143      * The total number of rows handled.
144      */

145     public int getCount() {
146         return this.counter;
147     }
148
149     /**
150      * Method run
151      */

152     public void run() {
153
154         try {
155             this.pipe();
156         } catch (QueueClosedException e) {
157
158             /*
159              * thrown by a closed queue when any operation is performed. we know
160              * at this point that nothing else can happen to the queue and that
161              * both producer and consumer will stop since one closed the queue
162              * by throwing an exception (below) and the other is here.
163              */

164             return;
165         } catch (Throwable JavaDoc e) {
166             this.exception = e.fillInStackTrace();
167
168             this.queue.close();
169         }
170     }
171
172     /**
173      * Handle the source/destination specific copying.
174      */

175     abstract protected void pipe() throws InterruptedException JavaDoc;
176
177     /**
178      * Return true if the thread terminated because of an uncaught exception.
179      */

180     public boolean threwException() {
181         return this.exception != null;
182     }
183
184     /**
185      * Return the uncaught exception.
186      */

187     public Throwable JavaDoc getException() {
188         return this.exception;
189     }
190 }
191
192 /**
193  * Class SourceRunner
194  *
195  * @author
196  * @author last modified by $Author: bzimmer $
197  * @version $Revision: 1.4 $
198  * @date $today.date$
199  * @date last modified on $Date: 2005/02/23 04:26:19 $
200  * @copyright 2001 brian zimmer
201  */

202 class SourceRunner extends PipeRunner {
203
204     /**
205      * Field source
206      */

207     protected Source source;
208
209     /**
210      * Constructor SourceRunner
211      *
212      * @param Queue queue
213      * @param Source source
214      */

215     public SourceRunner(Queue queue, Source source) {
216
217         super(queue);
218
219         this.source = source;
220     }
221
222     /**
223      * Method pipe
224      *
225      * @throws InterruptedException
226      */

227     protected void pipe() throws InterruptedException JavaDoc {
228
229         PyObject row = Py.None;
230
231         this.source.start();
232
233         try {
234             while ((row = this.source.next()) != Py.None) {
235                 this.queue.enqueue(row);
236
237                 this.counter++;
238             }
239         } finally {
240             try {
241                 this.queue.enqueue(Py.None);
242             } finally {
243                 this.source.end();
244             }
245         }
246     }
247 }
248
249 /**
250  * Class SinkRunner
251  *
252  * @author
253  * @author last modified by $Author: bzimmer $
254  * @version $Revision: 1.4 $
255  * @date $today.date$
256  * @date last modified on $Date: 2005/02/23 04:26:19 $
257  * @copyright 2001 brian zimmer
258  */

259 class SinkRunner extends PipeRunner {
260
261     /**
262      * Field sink
263      */

264     protected Sink sink;
265
266     /**
267      * Constructor SinkRunner
268      *
269      * @param Queue queue
270      * @param Sink sink
271      */

272     public SinkRunner(Queue queue, Sink sink) {
273
274         super(queue);
275
276         this.sink = sink;
277     }
278
279     /**
280      * Method pipe
281      *
282      * @throws InterruptedException
283      */

284     protected void pipe() throws InterruptedException JavaDoc {
285
286         PyObject row = Py.None;
287
288         this.sink.start();
289
290         try {
291             while ((row = (PyObject) this.queue.dequeue()) != Py.None) {
292                 this.sink.row(row);
293
294                 this.counter++;
295             }
296         } finally {
297             this.sink.end();
298         }
299     }
300 }
301
Popular Tags