1 9 package com.ziclix.python.sql.pipe.db; 10 11 import com.ziclix.python.sql.PyConnection; 12 import com.ziclix.python.sql.pipe.Sink; 13 import com.ziclix.python.sql.zxJDBC; 14 import org.python.core.Py; 15 import org.python.core.PyDictionary; 16 import org.python.core.PyList; 17 import org.python.core.PyObject; 18 import org.python.core.PyString; 19 20 import java.util.HashSet ; 21 import java.util.Set ; 22 23 29 public class DBSink extends BaseDB implements Sink { 30 31 34 protected PyObject sql; 35 36 39 protected Set exclude; 40 41 44 protected PyList rows; 45 46 49 protected int batchsize; 50 51 54 protected PyObject bindings; 55 56 59 protected PyDictionary indexedBindings; 60 61 71 public DBSink(PyConnection connection, Class dataHandler, String tableName, PyObject exclude, PyObject bindings, int batchsize) { 72 73 super(connection, dataHandler, tableName); 74 75 this.sql = Py.None; 76 this.rows = new PyList(); 77 this.bindings = bindings; 78 this.batchsize = batchsize; 79 this.exclude = new HashSet (); 80 this.indexedBindings = new PyDictionary(); 81 82 if (exclude != Py.None) { 83 for (int i = 0; i < exclude.__len__(); i++) { 84 PyObject lowered = Py.newString(((PyString) exclude.__getitem__(i)).lower()); 85 86 this.exclude.add(lowered); 87 } 88 } 89 } 90 91 94 protected boolean excluded(PyObject key) { 95 96 PyObject lowered = Py.newString(((PyString) key).lower()); 97 98 return this.exclude.contains(lowered); 99 } 100 101 104 protected void createSql(PyObject row) { 105 106 if ((row == Py.None) || (row.__len__() == 0)) { 108 109 throw zxJDBC.makeException(zxJDBC.getString("noColInfo")); 111 } 112 113 int index = 0, len = row.__len__(); 114 PyObject entry = Py.None, col = Py.None, pyIndex = Py.None; 115 StringBuffer sb = new StringBuffer ("insert into ").append(this.tableName).append(" ("); 116 117 122 for (int i = 0; i < len - 1; i++) { 123 entry = row.__getitem__(i); 124 col = entry.__getitem__(0); 125 126 if (!this.excluded(col)) { 127 128 sb.append(col).append(","); 130 131 pyIndex = Py.newInteger(index++); 133 134 try { 135 this.indexedBindings.__setitem__(pyIndex, this.bindings.__getitem__(col)); 136 } catch (Exception e) { 137 138 this.indexedBindings.__setitem__(pyIndex, entry.__getitem__(1)); 140 } 141 } 142 } 143 144 entry = row.__getitem__(len - 1); 145 col = entry.__getitem__(0); 146 147 if (!this.excluded(col)) { 148 sb.append(col); 149 150 pyIndex = Py.newInteger(index++); 151 152 try { 153 this.indexedBindings.__setitem__(pyIndex, this.bindings.__getitem__(col)); 154 } catch (Exception e) { 155 156 this.indexedBindings.__setitem__(pyIndex, entry.__getitem__(1)); 158 } 159 } 160 161 sb.append(") values ("); 162 163 for (int i = 1; i < len; i++) { 164 sb.append("?,"); 165 } 166 167 sb.append("?)"); 168 169 if (index == 0) { 170 throw zxJDBC.makeException(zxJDBC.ProgrammingError, zxJDBC.getString("excludedAllCols")); 171 } 172 173 this.sql = Py.newString(sb.toString()); 174 } 175 176 179 public void row(PyObject row) { 180 181 if (this.sql != Py.None) { 182 if (this.batchsize <= 0) { 183 184 this.cursor.execute(this.sql, row, this.indexedBindings, Py.None); 186 this.connection.commit(); 187 } else { 188 this.rows.append(row); 189 190 int len = rows.__len__(); 191 192 if (len % this.batchsize == 0) { 193 this.cursor.execute(this.sql, this.rows, this.indexedBindings, Py.None); 194 this.connection.commit(); 195 196 this.rows = new PyList(); 197 } 198 } 199 } else { 200 this.createSql(row); 201 } 202 } 203 204 207 public void start() { 208 } 209 210 213 public void end() { 214 215 try { 217 int len = this.rows.__len__(); 218 219 if (len > 0) { 220 this.cursor.execute(this.sql, this.rows, this.indexedBindings, Py.None); 221 this.connection.commit(); 222 } 223 } finally { 224 this.cursor.close(); 225 } 226 } 227 } 228 | Popular Tags |