KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > cayenne > access > DataPort


1 /*****************************************************************
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  ****************************************************************/

19
20 package org.apache.cayenne.access;
21
22 import java.util.ArrayList JavaDoc;
23 import java.util.Collection JavaDoc;
24 import java.util.Collections JavaDoc;
25 import java.util.Iterator JavaDoc;
26 import java.util.List JavaDoc;
27 import java.util.Map JavaDoc;
28
29 import org.apache.cayenne.CayenneException;
30 import org.apache.cayenne.access.util.IteratedSelectObserver;
31 import org.apache.cayenne.map.DbEntity;
32 import org.apache.cayenne.map.DerivedDbEntity;
33 import org.apache.cayenne.query.InsertBatchQuery;
34 import org.apache.cayenne.query.Query;
35 import org.apache.cayenne.query.SQLTemplate;
36 import org.apache.cayenne.query.SelectQuery;
37
38 /**
39  * An engine to port data between two DataNodes. These nodes can potentially connect to
40  * databases from different vendors. The only assumption is that all of the DbEntities
41  * (tables) being ported are present in both source and destination databases and are
42  * adequately described by Cayenne mapping.
43  * <p>
44  * DataPort implements a Cayenne-based algorithm to read data from source DataNode and
45  * write to destination DataNode. It uses DataPortDelegate interface to externalize
46  * various things, such as determining what entities to port (include/exclude from port
47  * based on some criteria), logging the progress of port operation, qualifying the
48  * queries, etc.
49  * </p>
50  *
51  * @since 1.2: Prior to 1.2 DataPort classes were a part of cayenne-examples package.
52  * @author Andrus Adamchik
53  */

54 public class DataPort {
55
56     public static final int INSERT_BATCH_SIZE = 1000;
57
58     protected DataNode sourceNode;
59     protected DataNode destinationNode;
60     protected Collection JavaDoc entities;
61     protected boolean cleaningDestination;
62     protected DataPortDelegate delegate;
63     protected int insertBatchSize;
64
65     public DataPort() {
66         this.insertBatchSize = INSERT_BATCH_SIZE;
67     }
68
69     /**
70      * Creates a new DataPort instance, setting its delegate.
71      */

72     public DataPort(DataPortDelegate delegate) {
73         this.delegate = delegate;
74     }
75
76     /**
77      * Runs DataPort. The instance must be fully configured by the time this method is
78      * invoked, having its delegate, source and destinatio nodes, and a list of entities
79      * set up.
80      */

81     public void execute() throws CayenneException {
82         // sanity check
83
if (sourceNode == null) {
84             throw new CayenneException("Can't port data, source node is null.");
85         }
86
87         if (destinationNode == null) {
88             throw new CayenneException("Can't port data, destination node is null.");
89         }
90
91         // the simple equality check may actually detect problems with misconfigred nodes
92
// it is not as dumb as it may look at first
93
if (sourceNode == destinationNode) {
94             throw new CayenneException(
95                     "Can't port data, source and target nodes are the same.");
96         }
97
98         if (entities == null || entities.isEmpty()) {
99             return;
100         }
101
102         // sort entities for insertion
103
List JavaDoc sorted = new ArrayList JavaDoc(entities);
104         destinationNode.getEntitySorter().sortDbEntities(sorted, false);
105
106         if (cleaningDestination) {
107             // reverse insertion order for deletion
108
List JavaDoc entitiesInDeleteOrder = new ArrayList JavaDoc(sorted.size());
109             entitiesInDeleteOrder.addAll(sorted);
110             Collections.reverse(entitiesInDeleteOrder);
111             processDelete(entitiesInDeleteOrder);
112         }
113
114         processInsert(sorted);
115     }
116
117     /**
118      * Cleans up destination tables data.
119      */

120     protected void processDelete(List JavaDoc entities) {
121         // Allow delegate to modify the list of entities
122
// any way it wants. For instance delegate may filter
123
// or sort the list (though it doesn't have to, and can simply
124
// pass through the original list).
125
if (delegate != null) {
126             entities = delegate.willCleanData(this, entities);
127         }
128
129         if (entities == null || entities.isEmpty()) {
130             return;
131         }
132
133         // Using QueryResult as observer for the data cleanup.
134
// This allows to collect query statistics and pass it to the delegate.
135
QueryResult observer = new QueryResult();
136
137         // Delete data from entities one by one
138
Iterator JavaDoc it = entities.iterator();
139         while (it.hasNext()) {
140             DbEntity entity = (DbEntity) it.next();
141
142             // skip derived DbEntities. Should we consult delegate ?
143
// Using derived entities may allow things like materialized views....
144
if (entity instanceof DerivedDbEntity) {
145                 continue;
146             }
147
148             Query query = new SQLTemplate(entity, "DELETE FROM "
149                     + entity.getFullyQualifiedName());
150
151             // notify delegate that delete is about to happen
152
if (delegate != null) {
153                 query = delegate.willCleanData(this, entity, query);
154             }
155
156             // perform delete query
157
observer.clear();
158             destinationNode.performQueries(Collections.singletonList(query), observer);
159
160             // notify delegate that delete just happened
161
if (delegate != null) {
162                 // observer will store query statistics
163
int count = observer.getFirstUpdateCount(query);
164                 delegate.didCleanData(this, entity, count);
165             }
166         }
167     }
168
169     /**
170      * Reads source data from source, saving it to destination.
171      */

172     protected void processInsert(List JavaDoc entities) throws CayenneException {
173         // Allow delegate to modify the list of entities
174
// any way it wants. For instance delegate may filter
175
// or sort the list (though it doesn't have to, and can simply
176
// pass through the original list).
177
if (delegate != null) {
178             entities = delegate.willCleanData(this, entities);
179         }
180
181         if (entities == null || entities.isEmpty()) {
182             return;
183         }
184
185         // Create an observer for to get the iterated result
186
// instead of getting each table as a list
187
IteratedSelectObserver observer = new IteratedSelectObserver();
188
189         // Using QueryResult as observer for the data insert.
190
// This allows to collect query statistics and pass it to the delegate.
191
QueryResult insertObserver = new QueryResult();
192
193         // process ordered list of entities one by one
194
Iterator JavaDoc it = entities.iterator();
195         while (it.hasNext()) {
196             insertObserver.clear();
197
198             DbEntity entity = (DbEntity) it.next();
199
200             // skip derived DbEntities...
201
if (entity instanceof DerivedDbEntity) {
202                 continue;
203             }
204
205             SelectQuery select = new SelectQuery(entity);
206             select.setFetchingDataRows(true);
207
208             // delegate is allowed to substitute query
209
Query query = (delegate != null) ? delegate.willPortEntity(
210                     this,
211                     entity,
212                     select) : select;
213
214             sourceNode.performQueries(Collections.singletonList(query), observer);
215             ResultIterator result = observer.getResultIterator();
216             InsertBatchQuery insert = new InsertBatchQuery(entity, INSERT_BATCH_SIZE);
217
218             try {
219
220                 // Split insertions into the same table into batches.
221
// This will allow to process tables of arbitrary size
222
// and not run out of memory.
223
int currentRow = 0;
224
225                 // even if we don't use intermediate batch commits, we still need to
226
// estimate batch insert size
227
int batchSize = insertBatchSize > 0 ? insertBatchSize : INSERT_BATCH_SIZE;
228
229                 while (result.hasNextRow()) {
230                     if (insertBatchSize > 0
231                             && currentRow > 0
232                             && currentRow % insertBatchSize == 0) {
233                         // end of the batch detected... commit and start a new insert
234
// query
235
destinationNode.performQueries(
236                                 Collections.singletonList(insert),
237                                 insertObserver);
238                         insert = new InsertBatchQuery(entity, batchSize);
239                         insertObserver.clear();
240                     }
241
242                     currentRow++;
243
244                     Map JavaDoc nextRow = result.nextDataRow();
245                     insert.add(nextRow);
246                 }
247
248                 // commit remaining batch if needed
249
if (insert.size() > 0) {
250                     destinationNode.performQueries(
251                             Collections.singletonList(insert),
252                             insertObserver);
253                 }
254
255                 if (delegate != null) {
256                     delegate.didPortEntity(this, entity, currentRow);
257                 }
258             }
259             finally {
260                 try {
261                     // don't forget to close ResultIterator
262
result.close();
263                 }
264                 catch (CayenneException ex) {
265                 }
266             }
267         }
268     }
269
270     public Collection JavaDoc getEntities() {
271         return entities;
272     }
273
274     public DataNode getSourceNode() {
275         return sourceNode;
276     }
277
278     public DataNode getDestinationNode() {
279         return destinationNode;
280     }
281
282     /**
283      * Sets the initial list of entities to process. This list can be later modified by
284      * the delegate.
285      */

286     public void setEntities(Collection JavaDoc entities) {
287         this.entities = entities;
288     }
289
290     /**
291      * Sets the DataNode serving as a source of the ported data.
292      */

293     public void setSourceNode(DataNode sourceNode) {
294         this.sourceNode = sourceNode;
295     }
296
297     /**
298      * Sets the DataNode serving as a destination of the ported data.
299      */

300     public void setDestinationNode(DataNode destinationNode) {
301         this.destinationNode = destinationNode;
302     }
303
304     /**
305      * Returns previously initialized DataPortDelegate object.
306      */

307     public DataPortDelegate getDelegate() {
308         return delegate;
309     }
310
311     public void setDelegate(DataPortDelegate delegate) {
312         this.delegate = delegate;
313     }
314
315     /**
316      * Returns true if a DataPort was configured to delete all data from the destination
317      * tables.
318      */

319     public boolean isCleaningDestination() {
320         return cleaningDestination;
321     }
322
323     /**
324      * Defines whether DataPort should delete all data from destination tables before
325      * doing the port.
326      */

327     public void setCleaningDestination(boolean cleaningDestination) {
328         this.cleaningDestination = cleaningDestination;
329     }
330
331     public int getInsertBatchSize() {
332         return insertBatchSize;
333     }
334
335     /**
336      * Sets a parameter used for tuning insert batches. If set to a value greater than
337      * zero, DataPort will commit every N rows. If set to value less or equal to zero,
338      * DataPort will commit only once at the end of the insert.
339      */

340     public void setInsertBatchSize(int insertBatchSize) {
341         this.insertBatchSize = insertBatchSize;
342     }
343 }
344
Popular Tags