KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > ziclix > python > sql > pipe > db > DBSink


1 /*
2  * Jython Database Specification API 2.0
3  *
4  * $Id: DBSink.java,v 1.4 2005/02/23 04:26:20 bzimmer Exp $
5  *
6  * Copyright (c) 2001 brian zimmer <bzimmer@ziclix.com>
7  *
8  */

9 package com.ziclix.python.sql.pipe.db;
10
11 import com.ziclix.python.sql.PyConnection;
12 import com.ziclix.python.sql.pipe.Sink;
13 import com.ziclix.python.sql.zxJDBC;
14 import org.python.core.Py;
15 import org.python.core.PyDictionary;
16 import org.python.core.PyList;
17 import org.python.core.PyObject;
18 import org.python.core.PyString;
19
20 import java.util.HashSet JavaDoc;
21 import java.util.Set JavaDoc;
22
23 /**
24  * A database consumer. All data transferred will be inserted into the appropriate table.
25  *
26  * @author brian zimmer
27  * @version $Revision: 1.4 $
28  */

29 public class DBSink extends BaseDB implements Sink {
30
31     /**
32      * Field sql
33      */

34     protected PyObject sql;
35
36     /**
37      * Field exclude
38      */

39     protected Set JavaDoc exclude;
40
41     /**
42      * Field rows
43      */

44     protected PyList rows;
45
46     /**
47      * Field batchsize
48      */

49     protected int batchsize;
50
51     /**
52      * Field bindings
53      */

54     protected PyObject bindings;
55
56     /**
57      * Field indexedBindings
58      */

59     protected PyDictionary indexedBindings;
60
61     /**
62      * Constructor for handling the consumption of data.
63      *
64      * @param connection the database connection
65      * @param dataHandler a custom DataHandler for the cursor, can be None
66      * @param tableName the table to insert the data
67      * @param exclude the columns to be excluded from insertion on the destination, all if None
68      * @param bindings the optional bindings for the destination, this allows morphing of types during the copy
69      * @param batchsize the optional batchsize for the inserts
70      */

71     public DBSink(PyConnection connection, Class JavaDoc dataHandler, String JavaDoc tableName, PyObject exclude, PyObject bindings, int batchsize) {
72
73         super(connection, dataHandler, tableName);
74
75         this.sql = Py.None;
76         this.rows = new PyList();
77         this.bindings = bindings;
78         this.batchsize = batchsize;
79         this.exclude = new HashSet JavaDoc();
80         this.indexedBindings = new PyDictionary();
81
82         if (exclude != Py.None) {
83             for (int i = 0; i < exclude.__len__(); i++) {
84                 PyObject lowered = Py.newString(((PyString) exclude.__getitem__(i)).lower());
85
86                 this.exclude.add(lowered);
87             }
88         }
89     }
90
91     /**
92      * Return true if the key (converted to lowercase) is not found in the exclude list.
93      */

94     protected boolean excluded(PyObject key) {
95
96         PyObject lowered = Py.newString(((PyString) key).lower());
97
98         return this.exclude.contains(lowered);
99     }
100
101     /**
102      * Create the insert statement given the header row.
103      */

104     protected void createSql(PyObject row) {
105
106         // this should be the column info
107
if ((row == Py.None) || (row.__len__() == 0)) {
108
109             // if there are no columns, what's the point?
110
throw zxJDBC.makeException(zxJDBC.getString("noColInfo"));
111         }
112
113         int index = 0, len = row.__len__();
114         PyObject entry = Py.None, col = Py.None, pyIndex = Py.None;
115         StringBuffer JavaDoc sb = new StringBuffer JavaDoc("insert into ").append(this.tableName).append(" (");
116
117         /*
118          * Iterate through the columns and pull out the names for use in the insert
119          * statement and the types for use in the bindings. The tuple is of the form
120          * (column name, column type).
121          */

122         for (int i = 0; i < len - 1; i++) {
123             entry = row.__getitem__(i);
124             col = entry.__getitem__(0);
125
126             if (!this.excluded(col)) {
127
128                 // add to the list
129
sb.append(col).append(",");
130
131                 // add the binding
132
pyIndex = Py.newInteger(index++);
133
134                 try {
135                     this.indexedBindings.__setitem__(pyIndex, this.bindings.__getitem__(col));
136                 } catch (Exception JavaDoc e) {
137
138                     // either a KeyError or this.bindings is None or null
139
this.indexedBindings.__setitem__(pyIndex, entry.__getitem__(1));
140                 }
141             }
142         }
143
144         entry = row.__getitem__(len - 1);
145         col = entry.__getitem__(0);
146
147         if (!this.excluded(col)) {
148             sb.append(col);
149
150             pyIndex = Py.newInteger(index++);
151
152             try {
153                 this.indexedBindings.__setitem__(pyIndex, this.bindings.__getitem__(col));
154             } catch (Exception JavaDoc e) {
155
156                 // either a KeyError or this.bindings is None or null
157
this.indexedBindings.__setitem__(pyIndex, entry.__getitem__(1));
158             }
159         }
160
161         sb.append(") values (");
162
163         for (int i = 1; i < len; i++) {
164             sb.append("?,");
165         }
166
167         sb.append("?)");
168
169         if (index == 0) {
170             throw zxJDBC.makeException(zxJDBC.ProgrammingError, zxJDBC.getString("excludedAllCols"));
171         }
172
173         this.sql = Py.newString(sb.toString());
174     }
175
176     /**
177      * Handle the row. Insert the data into the correct table and columns. No updates are done.
178      */

179     public void row(PyObject row) {
180
181         if (this.sql != Py.None) {
182             if (this.batchsize <= 0) {
183
184                 // no batching, just go ahead each time
185
this.cursor.execute(this.sql, row, this.indexedBindings, Py.None);
186                 this.connection.commit();
187             } else {
188                 this.rows.append(row);
189
190                 int len = rows.__len__();
191
192                 if (len % this.batchsize == 0) {
193                     this.cursor.execute(this.sql, this.rows, this.indexedBindings, Py.None);
194                     this.connection.commit();
195
196                     this.rows = new PyList();
197                 }
198             }
199         } else {
200             this.createSql(row);
201         }
202     }
203
204     /**
205      * Method start
206      */

207     public void start() {
208     }
209
210     /**
211      * Handles flushing any buffers and closes the cursor.
212      */

213     public void end() {
214
215         // finish what we started
216
try {
217             int len = this.rows.__len__();
218
219             if (len > 0) {
220                 this.cursor.execute(this.sql, this.rows, this.indexedBindings, Py.None);
221                 this.connection.commit();
222             }
223         } finally {
224             this.cursor.close();
225         }
226     }
227 }
228
Popular Tags