KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > derby > impl > sql > execute > GroupedAggregateResultSet


1 /*
2
3    Derby - Class org.apache.derby.impl.sql.execute.GroupedAggregateResultSet
4
5    Licensed to the Apache Software Foundation (ASF) under one or more
6    contributor license agreements. See the NOTICE file distributed with
7    this work for additional information regarding copyright ownership.
8    The ASF licenses this file to you under the Apache License, Version 2.0
9    (the "License"); you may not use this file except in compliance with
10    the License. You may obtain a copy of the License at
11
12       http://www.apache.org/licenses/LICENSE-2.0
13
14    Unless required by applicable law or agreed to in writing, software
15    distributed under the License is distributed on an "AS IS" BASIS,
16    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17    See the License for the specific language governing permissions and
18    limitations under the License.
19
20  */

21
22 package org.apache.derby.impl.sql.execute;
23
24 import org.apache.derby.iapi.services.monitor.Monitor;
25
26 import org.apache.derby.iapi.services.sanity.SanityManager;
27
28 import org.apache.derby.iapi.services.stream.HeaderPrintWriter;
29 import org.apache.derby.iapi.services.stream.InfoStreams;
30
31 import org.apache.derby.iapi.services.io.Formatable;
32
33 import org.apache.derby.iapi.sql.execute.CursorResultSet;
34 import org.apache.derby.iapi.sql.Activation;
35 import org.apache.derby.iapi.sql.ResultSet;
36 import org.apache.derby.iapi.sql.execute.ExecRow;
37 import org.apache.derby.iapi.sql.execute.ExecIndexRow;
38 import org.apache.derby.iapi.sql.execute.NoPutResultSet;
39
40 import org.apache.derby.iapi.store.access.ColumnOrdering;
41 import org.apache.derby.iapi.types.DataValueDescriptor;
42 import org.apache.derby.iapi.store.access.SortObserver;
43 import org.apache.derby.iapi.store.access.TransactionController;
44 import org.apache.derby.iapi.store.access.SortController;
45 import org.apache.derby.iapi.store.access.ScanController;
46
47 import org.apache.derby.iapi.services.loader.GeneratedMethod;
48
49 import org.apache.derby.iapi.sql.execute.ExecutionFactory;
50 import org.apache.derby.iapi.sql.execute.ExecutionContext;
51 import org.apache.derby.iapi.sql.conn.LanguageConnectionContext;
52
53 import org.apache.derby.iapi.error.StandardException;
54
55 import org.apache.derby.iapi.types.RowLocation;
56
57 import org.apache.derby.iapi.services.io.FormatableArrayHolder;
58
59 import java.util.Properties JavaDoc;
60 import java.util.Vector JavaDoc;
61 import java.util.Enumeration JavaDoc;
62
63 /**
64  * This ResultSet evaluates grouped, non distinct aggregates.
65  * It will scan the entire source result set and calculate
66  * the grouped aggregates when scanning the source during the
67  * first call to next().
68  *
69  * @author jerry (broken out from SortResultSet)
70  */

71 class GroupedAggregateResultSet extends GenericAggregateResultSet
72     implements CursorResultSet {
73
74     /* Run time statistics variables */
75     public int rowsInput;
76     public int rowsReturned;
77
78     // set in constructor and not altered during
79
// life of object.
80
private ColumnOrdering[] order;
81     private ExecIndexRow sortTemplateRow;
82     public boolean hasDistinctAggregate; // true if distinct aggregate
83
public boolean isInSortedOrder; // true if source results in sorted order
84
private int maxRowSize;
85
86     // set in open and not modified thereafter
87
private ScanController scanController;
88
89     // Cache ExecIndexRow
90
private ExecIndexRow sourceExecIndexRow;
91
92     private ExecIndexRow sortResultRow;
93
94     // In order group bys
95
private ExecIndexRow currSortedRow;
96     private boolean nextCalled;
97
98     // used to track and close sorts
99
private long distinctAggSortId;
100     private boolean dropDistinctAggSort;
101     private long genericSortId;
102     private boolean dropGenericSort;
103     private TransactionController tc;
104
105     // RTS
106
public Properties JavaDoc sortProperties = new Properties JavaDoc();
107
108     /**
109      * Constructor
110      *
111      * @param s input result set
112      * @param isInSortedOrder true if the source results are in sorted order
113      * @param aggregateItem indicates the number of the
114      * SavedObject off of the PreparedStatement that holds the
115      * AggregatorInfoList used by this routine.
116      * @param orderingItem indicates the number of the
117      * SavedObject off of the PreparedStatement that holds the
118      * ColumOrdering array used by this routine
119      * @param a activation
120      * @param ra generated method to build an empty
121      * output row
122      * @param maxRowSize approx row size, passed to sorter
123      * @param resultSetNumber The resultSetNumber for this result set
124      *
125      * @exception StandardException Thrown on error
126      */

127     GroupedAggregateResultSet(NoPutResultSet s,
128                     boolean isInSortedOrder,
129                     int aggregateItem,
130                     int orderingItem,
131                     Activation a,
132                     GeneratedMethod ra,
133                     int maxRowSize,
134                     int resultSetNumber,
135                     double optimizerEstimatedRowCount,
136                     double optimizerEstimatedCost) throws StandardException
137     {
138         super(s, aggregateItem, a, ra, resultSetNumber, optimizerEstimatedRowCount, optimizerEstimatedCost);
139         this.isInSortedOrder = isInSortedOrder;
140         sortTemplateRow = getExecutionFactory().getIndexableRow((ExecRow) rowAllocator.invoke(activation));
141         order = (ColumnOrdering[])
142                     ((FormatableArrayHolder)
143                         (a.getPreparedStatement().getSavedObject(orderingItem)))
144                     .getArray(ColumnOrdering.class);
145
146         if (SanityManager.DEBUG)
147         {
148             SanityManager.DEBUG("AggregateTrace","execution time: "+
149                     a.getPreparedStatement().getSavedObject(aggregateItem));
150         }
151
152         constructorTime += getElapsedMillis(beginTime);
153     }
154
155
156     ///////////////////////////////////////////////////////////////////////////////
157
//
158
// ResultSet interface (leftover from NoPutResultSet)
159
//
160
///////////////////////////////////////////////////////////////////////////////
161

162     /**
163      * Open the scan. Load the sorter and prepare to get
164      * rows from it.
165      *
166      * @exception StandardException thrown if cursor finished.
167      */

168     public void openCore() throws StandardException
169     {
170         beginTime = getCurrentTimeMillis();
171         // REVISIT: through the direct DB API, this needs to be an
172
// error, not an ASSERT; users can open twice. Only through JDBC
173
// is access to open controlled and ensured valid.
174
if (SanityManager.DEBUG)
175             SanityManager.ASSERT( ! isOpen, "GroupedAggregateResultSet already open");
176
177         sortResultRow = getExecutionFactory().getIndexableRow(sortTemplateRow.getClone());
178         sourceExecIndexRow = getExecutionFactory().getIndexableRow(sortTemplateRow.getClone());
179
180         source.openCore();
181
182         /* If this is an in-order group by then we do not need the sorter.
183          * (We can do the aggregation ourselves.)
184          * We save a clone of the first row so that subsequent next()s
185          * do not overwrite the saved row.
186          */

187         if (isInSortedOrder)
188         {
189             currSortedRow = getNextRowFromRS();
190             if (currSortedRow != null)
191             {
192                 currSortedRow = (ExecIndexRow) currSortedRow.getClone();
193                 initializeVectorAggregation(currSortedRow);
194             }
195         }
196         else
197         {
198             /*
199             ** Load up the sorter
200             */

201             scanController = loadSorter();
202         }
203
204         isOpen = true;
205         numOpens++;
206
207         openTime += getElapsedMillis(beginTime);
208     }
209
210     /**
211      * Load up the sorter. Feed it every row from the
212      * source scan. If we have a vector aggregate, initialize
213      * the aggregator for each source row. When done, close
214      * the source scan and open the sort. Return the sort
215      * scan controller.
216      *
217      * @exception StandardException thrown on failure.
218      *
219      * @return the sort controller
220      */

221     private ScanController loadSorter()
222         throws StandardException
223     {
224         SortController sorter;
225         long sortId;
226         ExecRow sourceRow;
227         ExecRow inputRow;
228         int inputRowCountEstimate = (int) optimizerEstimatedRowCount;
229         boolean inOrder = isInSortedOrder;
230
231         tc = getTransactionController();
232
233         ColumnOrdering[] currentOrdering = order;
234
235         /*
236         ** Do we have any distinct aggregates? If so, we'll need
237         ** a separate sort. We use all of the sorting columns and
238         ** drop the aggregation on the distinct column. Then
239         ** we'll feed this into the sorter again w/o the distinct
240         ** column in the ordering list.
241         */

242         if (aggInfoList.hasDistinct())
243         {
244             hasDistinctAggregate = true;
245             
246             GenericAggregator[] aggsNoDistinct = getSortAggregators(aggInfoList, true,
247                         activation.getLanguageConnectionContext(), source);
248             SortObserver sortObserver = new AggregateSortObserver(true, aggsNoDistinct, aggregates,
249                                                                   sortTemplateRow);
250
251             sortId = tc.createSort((Properties JavaDoc)null,
252                     sortTemplateRow.getRowArray(),
253                     order,
254                     sortObserver,
255                     false, // not in order
256
inputRowCountEstimate, // est rows, -1 means no idea
257
maxRowSize // est rowsize
258
);
259             sorter = tc.openSort(sortId);
260             distinctAggSortId = sortId;
261             dropDistinctAggSort = true;
262                 
263             while ((sourceRow = source.getNextRowCore())!=null)
264             {
265                 sorter.insert(sourceRow.getRowArray());
266                 rowsInput++;
267             }
268
269             /*
270             ** End the sort and open up the result set
271             */

272             source.close();
273             sortProperties = sorter.getSortInfo().getAllSortInfo(sortProperties);
274             sorter.close();
275
276             scanController =
277                 tc.openSortScan(sortId, activation.getResultSetHoldability());
278             
279             /*
280             ** Aggs are initialized and input rows
281             ** are in order. All we have to do is
282             ** another sort to remove (merge) the
283             ** duplicates in the distinct column
284             */

285             inOrder = true;
286             inputRowCountEstimate = rowsInput;
287     
288             /*
289             ** Drop the last column from the ordering. The
290             ** last column is the distinct column. Don't
291             ** pay any attention to the fact that the ordering
292             ** object's name happens to correspond to a techo
293             ** band from the 80's.
294             **
295             ** If there aren't any ordering columns other
296             ** than the distinct (i.e. for scalar distincts)
297             ** just skip the 2nd sort altogether -- we'll
298             ** do the aggregate merge ourselves rather than
299             ** force a 2nd sort.
300             */

301             if (order.length == 1)
302             {
303                 return scanController;
304             }
305
306             ColumnOrdering[] newOrder = new ColumnOrdering[order.length - 1];
307             System.arraycopy(order, 0, newOrder, 0, order.length - 1);
308             currentOrdering = newOrder;
309         }
310
311         SortObserver sortObserver = new AggregateSortObserver(true, aggregates, aggregates,
312                                                               sortTemplateRow);
313
314         sortId = tc.createSort((Properties JavaDoc)null,
315                         sortTemplateRow.getRowArray(),
316                         currentOrdering,
317                         sortObserver,
318                         inOrder,
319                         inputRowCountEstimate, // est rows
320
maxRowSize // est rowsize
321
);
322         sorter = tc.openSort(sortId);
323         genericSortId = sortId;
324         dropGenericSort = true;
325     
326         /* The sorter is responsible for doing the cloning */
327         while ((inputRow = getNextRowFromRS()) != null)
328         {
329             sorter.insert(inputRow.getRowArray());
330         }
331         source.close();
332         sortProperties = sorter.getSortInfo().getAllSortInfo(sortProperties);
333         sorter.close();
334
335         return tc.openSortScan(sortId, activation.getResultSetHoldability());
336     }
337
338
339     /**
340      * Return the next row.
341      *
342      * @exception StandardException thrown on failure.
343      * @exception StandardException ResultSetNotOpen thrown if not yet open.
344      *
345      * @return the next row in the result
346      */

347     public ExecRow getNextRowCore() throws StandardException
348     {
349         if (!isOpen)
350         {
351             return null;
352         }
353
354         beginTime = getCurrentTimeMillis();
355
356         // In order group by
357
if (isInSortedOrder)
358         {
359             // No rows, no work to do
360
if (currSortedRow == null)
361             {
362                 nextTime += getElapsedMillis(beginTime);
363                 return null;
364             }
365
366             ExecIndexRow nextRow = getNextRowFromRS();
367
368             /* Drain and merge rows until we find new distinct values for the grouping columns. */
369             while (nextRow != null)
370             {
371                 /* We found a new set of values for the grouping columns.
372                  * Update the current row and return this group.
373                  */

374                 if (! sameGroupingValues(currSortedRow, nextRow))
375                 {
376                     ExecIndexRow result = currSortedRow;
377
378                     /* Save a clone of the new row so that it doesn't get overwritten */
379                     currSortedRow = (ExecIndexRow) nextRow.getClone();
380                     initializeVectorAggregation(currSortedRow);
381
382                     nextTime += getElapsedMillis(beginTime);
383                     rowsReturned++;
384                     return finishAggregation(result);
385                 }
386                 else
387                 {
388                     /* Same group - initialize the new row and then merge the aggregates */
389                     initializeVectorAggregation(nextRow);
390                     mergeVectorAggregates(nextRow, currSortedRow);
391                 }
392
393                 // Get the next row
394
nextRow = getNextRowFromRS();
395             }
396
397             // We've drained the source, so no more rows to return
398
ExecIndexRow result = currSortedRow;
399             currSortedRow = null;
400             nextTime += getElapsedMillis(beginTime);
401             return finishAggregation(result);
402         }
403         else
404         {
405             ExecIndexRow sortResult = null;
406
407             if ((sortResult = getNextRowFromRS()) != null)
408             {
409                 setCurrentRow(sortResult);
410             }
411
412             /*
413             ** Only finish the aggregation
414             ** if we have a return row. We don't generate
415             ** a row on a vector aggregate unless there was
416             ** a group.
417             */

418             if (sortResult != null)
419             {
420                 sortResult = finishAggregation(sortResult);
421                 currentRow = sortResult;
422             }
423
424             if (sortResult != null)
425             {
426                 rowsReturned++;
427             }
428
429             nextTime += getElapsedMillis(beginTime);
430             return sortResult;
431         }
432     }
433
434     /**
435      * Return whether or not the new row has the same values for the
436      * grouping columns as the current row. (This allows us to process in-order
437      * group bys without a sorter.)
438      *
439      * @param currRow The current row.
440      * @param newRow The new row.
441      *
442      * @return Whether or not to filter out the new row has the same values for the
443      * grouping columns as the current row.
444      *
445      * @exception StandardException thrown on failure to get row location
446      */

447     private boolean sameGroupingValues(ExecRow currRow, ExecRow newRow)
448         throws StandardException
449     {
450         for (int index = 0; index < order.length; index++)
451         {
452             DataValueDescriptor currOrderable = currRow.getColumn(order[index].getColumnId() + 1);
453             DataValueDescriptor newOrderable = newRow.getColumn(order[index].getColumnId() + 1);
454             if (! (currOrderable.compare(DataValueDescriptor.ORDER_OP_EQUALS, newOrderable, true, true)))
455             {
456                 return false;
457             }
458         }
459         return true;
460     }
461
462     /**
463      * If the result set has been opened,
464      * close the open scan.
465      *
466      * @exception StandardException thrown on error
467      */

468     public void close() throws StandardException
469     {
470         beginTime = getCurrentTimeMillis();
471         if ( isOpen )
472         {
473             // we don't want to keep around a pointer to the
474
// row ... so it can be thrown away.
475
// REVISIT: does this need to be in a finally
476
// block, to ensure that it is executed?
477
clearCurrentRow();
478
479             sortResultRow = null;
480             sourceExecIndexRow = null;
481             closeSource();
482
483             if (dropDistinctAggSort)
484             {
485                 tc.dropSort(distinctAggSortId);
486                 dropDistinctAggSort = false;
487             }
488
489             if (dropGenericSort)
490             {
491                 tc.dropSort(genericSortId);
492                 dropGenericSort = false;
493             }
494             super.close();
495         }
496         else
497             if (SanityManager.DEBUG)
498                 SanityManager.DEBUG("CloseRepeatInfo","Close of SortResultSet repeated");
499
500         closeTime += getElapsedMillis(beginTime);
501
502         isOpen = false;
503     }
504
505     /**
506      * Return the total amount of time spent in this ResultSet
507      *
508      * @param type CURRENT_RESULTSET_ONLY - time spent only in this ResultSet
509      * ENTIRE_RESULTSET_TREE - time spent in this ResultSet and below.
510      *
511      * @return long The total amount of time spent (in milliseconds).
512      */

513     public long getTimeSpent(int type)
514     {
515         long totTime = constructorTime + openTime + nextTime +
516                         closeTime;
517
518         if (type == NoPutResultSet.CURRENT_RESULTSET_ONLY)
519         {
520             return totTime - originalSource.getTimeSpent(ENTIRE_RESULTSET_TREE);
521         }
522         else
523         {
524             return totTime;
525         }
526     }
527
528     ///////////////////////////////////////////////////////////////////////////////
529
//
530
// CursorResultSet interface
531
//
532
///////////////////////////////////////////////////////////////////////////////
533

534     /**
535      * This result set has its row location from
536      * the last fetch done. If the cursor is closed,
537      * a null is returned.
538      *
539      * @see CursorResultSet
540      *
541      * @return the row location of the current cursor row.
542      * @exception StandardException thrown on failure to get row location
543      */

544     public RowLocation getRowLocation() throws StandardException
545     {
546         if (! isOpen) return null;
547
548         // REVISIT: could we reuse the same rowlocation object
549
// across several calls?
550
RowLocation rl;
551         rl = scanController.newRowLocationTemplate();
552         scanController.fetchLocation(rl);
553         return rl;
554     }
555
556     /**
557      * This result set has its row from the last fetch done.
558      * If the cursor is closed, a null is returned.
559      *
560      * @see CursorResultSet
561      *
562      * @return the last row returned;
563      * @exception StandardException thrown on failure.
564      */

565     /* RESOLVE - this should return activation.getCurrentRow(resultSetNumber),
566      * once there is such a method. (currentRow is redundant)
567      */

568     public ExecRow getCurrentRow() throws StandardException
569     {
570         if (SanityManager.DEBUG)
571             SanityManager.ASSERT(isOpen, "SortResultSet expected to be open");
572
573         return currentRow;
574     }
575
576     ///////////////////////////////////////////////////////////////////////////////
577
//
578
// SCAN ABSTRACTION UTILITIES
579
//
580
///////////////////////////////////////////////////////////////////////////////
581
/**
582      * Get the next output row for processing
583      */

584     private ExecIndexRow getNextRowFromRS()
585         throws StandardException
586     {
587         return (scanController == null) ?
588             getRowFromResultSet() :
589             getRowFromSorter();
590     }
591
592     /**
593      * Get a row from the input result set.
594      */

595     private ExecIndexRow getRowFromResultSet()
596         throws StandardException
597     {
598         ExecRow sourceRow;
599         ExecIndexRow inputRow = null;
600
601         if ((sourceRow = source.getNextRowCore()) != null)
602         {
603             rowsInput++;
604             sourceExecIndexRow.execRowToExecIndexRow(sourceRow);
605             inputRow = sourceExecIndexRow;
606         }
607
608         return inputRow;
609     }
610
611
612     /**
613      * Get a row from the sorter. Side effects:
614      * sets currentRow.
615      */

616     private ExecIndexRow getRowFromSorter()
617         throws StandardException
618     {
619         ExecIndexRow inputRow = null;
620         
621         if (scanController.next())
622         {
623             // REMIND: HACKALERT we are assuming that result will
624
// point to what sortResult is manipulating when
625
// we complete the fetch.
626
currentRow = sortResultRow;
627
628             inputRow = getExecutionFactory().getIndexableRow(currentRow);
629
630             scanController.fetch(inputRow.getRowArray());
631         }
632         return inputRow;
633     }
634
635     /**
636      * Close the source of whatever we have been scanning.
637      *
638      * @exception StandardException thrown on error
639      */

640     public void closeSource() throws StandardException
641     {
642         if (scanController == null)
643         {
644             /*
645             ** NOTE: do not null out source, we
646             ** may be opened again, in which case
647             ** we will open source again.
648             */

649             source.close();
650         }
651         else
652         {
653             scanController.close();
654             scanController = null;
655         }
656     }
657
658     ///////////////////////////////////////////////////////////////////////////////
659
//
660
// AGGREGATION UTILITIES
661
//
662
///////////////////////////////////////////////////////////////////////////////
663
/**
664      * Run the aggregator initialization method for
665      * each aggregator in the row. Accumulate the
666      * input column. WARNING: initializiation performs
667      * accumulation -- no need to accumulate a row
668      * that has been passed to initialization.
669      *
670      * @param row the row to initialize
671      *
672      * @exception standard cloudscape exception
673      */

674     private void initializeVectorAggregation(ExecRow row)
675         throws StandardException
676     {
677         int size = aggregates.length;
678
679         if (SanityManager.DEBUG)
680         {
681             SanityManager.ASSERT(row != null,
682                     "Null row passed to initializeVectorAggregation");
683         }
684
685         for (int i = 0; i < size; i++)
686         {
687             GenericAggregator currAggregate = aggregates[i];
688
689             // initialize the aggregator
690
currAggregate.initialize(row);
691
692             // get the first value, accumulate it into itself
693
currAggregate.accumulate(row, row);
694         }
695     }
696
697     /**
698      * Run the aggregator merge method for
699      * each aggregator in the row.
700      *
701      * @param newRow the row to merge
702      * @param currRow the row to merge into
703      *
704      * @exception standard cloudscape exception
705      */

706     private void mergeVectorAggregates(ExecRow newRow, ExecRow currRow)
707         throws StandardException
708     {
709         for (int i = 0; i < aggregates.length; i++)
710         {
711             GenericAggregator currAggregate = aggregates[i];
712
713             // merge the aggregator
714
currAggregate.merge(newRow, currRow);
715         }
716     }
717
718 }
719
Popular Tags