1 9 package com.ziclix.python.sql.pipe; 10 11 import org.python.core.*; 12 import com.ziclix.python.sql.*; 13 import com.ziclix.python.sql.util.*; 14 15 26 public class Pipe { 27 28 31 public Pipe() { 32 } 33 34 41 public PyObject pipe(Source source, Sink sink) { 42 43 Queue queue = new Queue(); 44 SourceRunner sourceRunner = new SourceRunner(queue, source); 45 SinkRunner sinkRunner = new SinkRunner(queue, sink); 46 47 sourceRunner.start(); 48 sinkRunner.start(); 49 50 try { 51 sourceRunner.join(); 52 } catch (InterruptedException e) { 53 queue.close(); 54 55 throw zxJDBC.makeException(e); 56 } 57 58 try { 59 sinkRunner.join(); 60 } catch (InterruptedException e) { 61 queue.close(); 62 63 throw zxJDBC.makeException(e); 64 } 65 66 75 if (sourceRunner.threwException()) { 76 throw zxJDBC.makeException(sourceRunner.getException().toString()); 77 } 78 79 if (sinkRunner.threwException()) { 80 throw zxJDBC.makeException(sinkRunner.getException().toString()); 81 } 82 83 if (sinkRunner.getCount() == 0) { 85 return Py.newInteger(0); 86 } 87 88 if ((sourceRunner.getCount() - sinkRunner.getCount()) != 0) { 92 Integer [] counts = {new Integer (sourceRunner.getCount()), 93 new Integer (sinkRunner.getCount())}; 94 String msg = zxJDBC.getString("inconsistentRowCount", counts); 95 96 Py.assert_(Py.Zero, Py.newString(msg)); 97 } 98 99 return Py.newInteger(sinkRunner.getCount()); 100 } 101 } 102 103 113 abstract class PipeRunner extends Thread { 114 115 118 protected int counter; 119 120 123 protected Queue queue; 124 125 128 protected Throwable exception; 129 130 135 public PipeRunner(Queue queue) { 136 137 this.counter = 0; 138 this.queue = queue; 139 this.exception = null; 140 } 141 142 145 public int getCount() { 146 return this.counter; 147 } 148 149 152 public void run() { 153 154 try { 155 this.pipe(); 156 } catch (QueueClosedException e) { 157 158 164 return; 165 } catch (Throwable e) { 166 this.exception = e.fillInStackTrace(); 167 168 this.queue.close(); 169 } 170 } 171 172 175 abstract protected void pipe() throws InterruptedException ; 176 177 180 public boolean threwException() { 181 return this.exception != null; 182 } 183 184 187 public Throwable getException() { 188 return this.exception; 189 } 190 } 191 192 202 class SourceRunner extends PipeRunner { 203 204 207 protected Source source; 208 209 215 public SourceRunner(Queue queue, Source source) { 216 217 super(queue); 218 219 this.source = source; 220 } 221 222 227 protected void pipe() throws InterruptedException { 228 229 PyObject row = Py.None; 230 231 this.source.start(); 232 233 try { 234 while ((row = this.source.next()) != Py.None) { 235 this.queue.enqueue(row); 236 237 this.counter++; 238 } 239 } finally { 240 try { 241 this.queue.enqueue(Py.None); 242 } finally { 243 this.source.end(); 244 } 245 } 246 } 247 } 248 249 259 class SinkRunner extends PipeRunner { 260 261 264 protected Sink sink; 265 266 272 public SinkRunner(Queue queue, Sink sink) { 273 274 super(queue); 275 276 this.sink = sink; 277 } 278 279 284 protected void pipe() throws InterruptedException { 285 286 PyObject row = Py.None; 287 288 this.sink.start(); 289 290 try { 291 while ((row = (PyObject) this.queue.dequeue()) != Py.None) { 292 this.sink.row(row); 293 294 this.counter++; 295 } 296 } finally { 297 this.sink.end(); 298 } 299 } 300 } 301 | Popular Tags |