KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectstyle > cayenne > access > DataPort


1 /* ====================================================================
2  *
3  * The ObjectStyle Group Software License, version 1.1
4  * ObjectStyle Group - http://objectstyle.org/
5  *
6  * Copyright (c) 2002-2005, Andrei (Andrus) Adamchik and individual authors
7  * of the software. All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  *
13  * 1. Redistributions of source code must retain the above copyright
14  * notice, this list of conditions and the following disclaimer.
15  *
16  * 2. Redistributions in binary form must reproduce the above copyright
17  * notice, this list of conditions and the following disclaimer in
18  * the documentation and/or other materials provided with the
19  * distribution.
20  *
21  * 3. The end-user documentation included with the redistribution, if any,
22  * must include the following acknowlegement:
23  * "This product includes software developed by independent contributors
24  * and hosted on ObjectStyle Group web site (http://objectstyle.org/)."
25  * Alternately, this acknowlegement may appear in the software itself,
26  * if and wherever such third-party acknowlegements normally appear.
27  *
28  * 4. The names "ObjectStyle Group" and "Cayenne" must not be used to endorse
29  * or promote products derived from this software without prior written
30  * permission. For written permission, email
31  * "andrus at objectstyle dot org".
32  *
33  * 5. Products derived from this software may not be called "ObjectStyle"
34  * or "Cayenne", nor may "ObjectStyle" or "Cayenne" appear in their
35  * names without prior written permission.
36  *
37  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40  * DISCLAIMED. IN NO EVENT SHALL THE OBJECTSTYLE GROUP OR
41  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
42  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
43  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
44  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
45  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
46  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
47  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
48  * SUCH DAMAGE.
49  * ====================================================================
50  *
51  * This software consists of voluntary contributions made by many
52  * individuals and hosted on ObjectStyle Group web site. For more
53  * information on the ObjectStyle Group, please see
54  * <http://objectstyle.org/>.
55  */

56 package org.objectstyle.cayenne.access;
57
58 import java.util.ArrayList JavaDoc;
59 import java.util.Collection JavaDoc;
60 import java.util.Collections JavaDoc;
61 import java.util.Iterator JavaDoc;
62 import java.util.List JavaDoc;
63 import java.util.Map JavaDoc;
64
65 import org.objectstyle.cayenne.CayenneException;
66 import org.objectstyle.cayenne.access.util.IteratedSelectObserver;
67 import org.objectstyle.cayenne.map.DbEntity;
68 import org.objectstyle.cayenne.map.DerivedDbEntity;
69 import org.objectstyle.cayenne.query.GenericSelectQuery;
70 import org.objectstyle.cayenne.query.InsertBatchQuery;
71 import org.objectstyle.cayenne.query.Query;
72 import org.objectstyle.cayenne.query.SQLTemplate;
73 import org.objectstyle.cayenne.query.SelectQuery;
74
75 /**
76  * An engine to port data between two DataNodes. These nodes can potentially connect to
77  * databases from different vendors. The only assumption is that all of the DbEntities
78  * (tables) being ported are present in both source and destination databases and are
79  * adequately described by Cayenne mapping.
80  * <p>
81  * DataPort implements a Cayenne-based algorithm to read data from source DataNode and
82  * write to destination DataNode. It uses DataPortDelegate interface to externalize
83  * various things, such as determining what entities to port (include/exclude from port
84  * based on some criteria), logging the progress of port operation, qualifying the
85  * queries, etc.
86  * </p>
87  *
88  * @since 1.2: Prior to 1.2 DataPort classes were a part of cayenne-examples package.
89  * @author Andrei Adamchik
90  */

91 public class DataPort {
92
93     public static final int INSERT_BATCH_SIZE = 1000;
94
95     protected DataNode sourceNode;
96     protected DataNode destinationNode;
97     protected Collection JavaDoc entities;
98     protected boolean cleaningDestination;
99     protected DataPortDelegate delegate;
100     protected int insertBatchSize;
101
102     public DataPort() {
103         this.insertBatchSize = INSERT_BATCH_SIZE;
104     }
105
106     /**
107      * Creates a new DataPort instance, setting its delegate.
108      */

109     public DataPort(DataPortDelegate delegate) {
110         this.delegate = delegate;
111     }
112
113     /**
114      * Runs DataPort. The instance must be fully configured by the time this method is
115      * invoked, having its delegate, source and destinatio nodes, and a list of entities
116      * set up.
117      */

118     public void execute() throws CayenneException {
119         // sanity check
120
if (sourceNode == null) {
121             throw new CayenneException("Can't port data, source node is null.");
122         }
123
124         if (destinationNode == null) {
125             throw new CayenneException("Can't port data, destination node is null.");
126         }
127
128         // the simple equality check may actually detect problems with misconfigred nodes
129
// it is not as dumb as it may look at first
130
if (sourceNode == destinationNode) {
131             throw new CayenneException(
132                     "Can't port data, source and target nodes are the same.");
133         }
134
135         if (entities == null || entities.isEmpty()) {
136             return;
137         }
138
139         // sort entities for insertion
140
List JavaDoc sorted = new ArrayList JavaDoc(entities);
141         destinationNode.getEntitySorter().sortDbEntities(sorted, false);
142
143         if (cleaningDestination) {
144             // reverse insertion order for deletion
145
List JavaDoc entitiesInDeleteOrder = new ArrayList JavaDoc(sorted.size());
146             entitiesInDeleteOrder.addAll(sorted);
147             Collections.reverse(entitiesInDeleteOrder);
148             processDelete(entitiesInDeleteOrder);
149         }
150
151         processInsert(sorted);
152     }
153
154     /**
155      * Cleans up destination tables data.
156      */

157     protected void processDelete(List JavaDoc entities) {
158         // Allow delegate to modify the list of entities
159
// any way it wants. For instance delegate may filter
160
// or sort the list (though it doesn't have to, and can simply
161
// pass through the original list).
162
if (delegate != null) {
163             entities = delegate.willCleanData(this, entities);
164         }
165
166         if (entities == null || entities.isEmpty()) {
167             return;
168         }
169
170         // Using QueryResult as observer for the data cleanup.
171
// This allows to collect query statistics and pass it to the delegate.
172
QueryResult observer = new QueryResult();
173
174         // Delete data from entities one by one
175
Iterator JavaDoc it = entities.iterator();
176         while (it.hasNext()) {
177             DbEntity entity = (DbEntity) it.next();
178
179             // skip derived DbEntities. Should we consult delegate ?
180
// Using derived entities may allow things like materialized views....
181
if (entity instanceof DerivedDbEntity) {
182                 continue;
183             }
184
185             Query query = new SQLTemplate(entity, "DELETE FROM "
186                     + entity.getFullyQualifiedName(), false);
187
188             // notify delegate that delete is about to happen
189
if (delegate != null) {
190                 query = delegate.willCleanData(this, entity, query);
191             }
192
193             // perform delete query
194
observer.clear();
195             destinationNode.performQueries(Collections.singletonList(query), observer);
196
197             // notify delegate that delete just happened
198
if (delegate != null) {
199                 // observer will store query statistics
200
int count = observer.getFirstUpdateCount(query);
201                 delegate.didCleanData(this, entity, count);
202             }
203         }
204     }
205
206     /**
207      * Reads source data from source, saving it to destination.
208      */

209     protected void processInsert(List JavaDoc entities) throws CayenneException {
210         // Allow delegate to modify the list of entities
211
// any way it wants. For instance delegate may filter
212
// or sort the list (though it doesn't have to, and can simply
213
// pass through the original list).
214
if (delegate != null) {
215             entities = delegate.willCleanData(this, entities);
216         }
217
218         if (entities == null || entities.isEmpty()) {
219             return;
220         }
221
222         // Create an observer for to get the iterated result
223
// instead of getting each table as a list
224
IteratedSelectObserver observer = new IteratedSelectObserver();
225
226         // Using QueryResult as observer for the data insert.
227
// This allows to collect query statistics and pass it to the delegate.
228
QueryResult insertObserver = new QueryResult();
229
230         // process ordered list of entities one by one
231
Iterator JavaDoc it = entities.iterator();
232         while (it.hasNext()) {
233             insertObserver.clear();
234
235             DbEntity entity = (DbEntity) it.next();
236
237             // skip derived DbEntities...
238
if (entity instanceof DerivedDbEntity) {
239                 continue;
240             }
241
242             SelectQuery select = new SelectQuery(entity);
243             select.setFetchingDataRows(true);
244
245             // delegate is allowed to substitute query
246
GenericSelectQuery query = (delegate != null) ? delegate.willPortEntity(this,
247                     entity,
248                     select) : select;
249
250             sourceNode.performQueries(Collections.singletonList(query), observer);
251             ResultIterator result = observer.getResultIterator();
252             InsertBatchQuery insert = new InsertBatchQuery(entity, INSERT_BATCH_SIZE);
253
254             try {
255
256                 // Split insertions into the same table into batches.
257
// This will allow to process tables of arbitrary size
258
// and not run out of memory.
259
int currentRow = 0;
260
261                 // even if we don't use intermediate batch commits, we still need to
262
// estimate batch insert size
263
int batchSize = insertBatchSize > 0 ? insertBatchSize : INSERT_BATCH_SIZE;
264
265                 while (result.hasNextRow()) {
266                     if (insertBatchSize > 0
267                             && currentRow > 0
268                             && currentRow % insertBatchSize == 0) {
269                         // end of the batch detected... commit and start a new insert
270
// query
271
destinationNode.performQueries(Collections.singletonList(insert),
272                                 insertObserver);
273                         insert = new InsertBatchQuery(entity, batchSize);
274                         insertObserver.clear();
275                     }
276
277                     currentRow++;
278
279                     Map JavaDoc nextRow = result.nextDataRow();
280                     insert.add(nextRow);
281                 }
282
283                 // commit remaining batch if needed
284
if (insert.size() > 0) {
285                     destinationNode.performQueries(Collections.singletonList(insert),
286                             insertObserver);
287                 }
288
289                 if (delegate != null) {
290                     delegate.didPortEntity(this, entity, currentRow);
291                 }
292             }
293             finally {
294                 try {
295                     // don't forget to close ResultIterator
296
result.close();
297                 }
298                 catch (CayenneException ex) {
299                 }
300             }
301         }
302     }
303
304     public Collection JavaDoc getEntities() {
305         return entities;
306     }
307
308     public DataNode getSourceNode() {
309         return sourceNode;
310     }
311
312     public DataNode getDestinationNode() {
313         return destinationNode;
314     }
315
316     /**
317      * Sets the initial list of entities to process. This list can be later modified by
318      * the delegate.
319      */

320     public void setEntities(Collection JavaDoc entities) {
321         this.entities = entities;
322     }
323
324     /**
325      * Sets the DataNode serving as a source of the ported data.
326      */

327     public void setSourceNode(DataNode sourceNode) {
328         this.sourceNode = sourceNode;
329     }
330
331     /**
332      * Sets the DataNode serving as a destination of the ported data.
333      */

334     public void setDestinationNode(DataNode destinationNode) {
335         this.destinationNode = destinationNode;
336     }
337
338     /**
339      * Returns previously initialized DataPortDelegate object.
340      */

341     public DataPortDelegate getDelegate() {
342         return delegate;
343     }
344
345     public void setDelegate(DataPortDelegate delegate) {
346         this.delegate = delegate;
347     }
348
349     /**
350      * Returns true if a DataPort was configured to delete all data from the destination
351      * tables.
352      */

353     public boolean isCleaningDestination() {
354         return cleaningDestination;
355     }
356
357     /**
358      * Defines whether DataPort should delete all data from destination tables before
359      * doing the port.
360      */

361     public void setCleaningDestination(boolean cleaningDestination) {
362         this.cleaningDestination = cleaningDestination;
363     }
364
365     public int getInsertBatchSize() {
366         return insertBatchSize;
367     }
368
369     /**
370      * Sets a parameter used for tuning insert batches. If set to a value greater than
371      * zero, DataPort will commit every N rows. If set to value less or equal to zero,
372      * DataPort will commit only once at the end of the insert.
373      */

374     public void setInsertBatchSize(int insertBatchSize) {
375         this.insertBatchSize = insertBatchSize;
376     }
377 }
Popular Tags