KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > derby > impl > store > access > sort > MergeSort


1 /*
2
3    Derby - Class org.apache.derby.impl.store.access.sort.MergeSort
4
5    Licensed to the Apache Software Foundation (ASF) under one or more
6    contributor license agreements. See the NOTICE file distributed with
7    this work for additional information regarding copyright ownership.
8    The ASF licenses this file to you under the Apache License, Version 2.0
9    (the "License"); you may not use this file except in compliance with
10    the License. You may obtain a copy of the License at
11
12       http://www.apache.org/licenses/LICENSE-2.0
13
14    Unless required by applicable law or agreed to in writing, software
15    distributed under the License is distributed on an "AS IS" BASIS,
16    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17    See the License for the specific language governing permissions and
18    limitations under the License.
19
20  */

21
22 package org.apache.derby.impl.store.access.sort;
23
24 import org.apache.derby.iapi.reference.SQLState;
25
26 import org.apache.derby.iapi.services.io.FormatableBitSet;
27
28 import org.apache.derby.iapi.services.io.Storable;
29 import org.apache.derby.iapi.services.sanity.SanityManager;
30 import org.apache.derby.iapi.error.StandardException;
31 import org.apache.derby.iapi.store.access.conglomerate.ScanControllerRowSource;
32 import org.apache.derby.iapi.store.access.conglomerate.Sort;
33 import org.apache.derby.iapi.store.access.conglomerate.SortFactory;
34 import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;
35 import org.apache.derby.iapi.types.CloneableObject;
36 import org.apache.derby.iapi.store.access.ColumnOrdering;
37 import org.apache.derby.iapi.store.access.ConglomerateController;
38 import org.apache.derby.iapi.store.access.Qualifier;
39 import org.apache.derby.iapi.store.access.RowUtil;
40 import org.apache.derby.iapi.store.access.ScanController;
41 import org.apache.derby.iapi.store.access.SortObserver;
42 import org.apache.derby.iapi.store.access.SortController;
43 import org.apache.derby.iapi.store.access.TransactionController;
44
45 import org.apache.derby.iapi.store.raw.StreamContainerHandle;
46 import org.apache.derby.iapi.store.raw.RawStoreFactory;
47 import org.apache.derby.iapi.store.raw.Transaction;
48
49 import org.apache.derby.iapi.types.DataValueDescriptor;
50
51 import org.apache.derby.iapi.types.Orderable;
52 import org.apache.derby.iapi.types.RowLocation;
53
54 import java.util.Enumeration JavaDoc;
55 import java.util.Properties JavaDoc;
56 import java.util.Vector JavaDoc;
57
58 /**
59
60   A sort implementation which does the sort in-memory if it can,
61   but which can do an external merge sort so that it can sort an
62   arbitrary number of rows.
63
64 **/

65
66 public final class MergeSort implements Sort
67 {
68
69     /*
70      * Fields
71      */

72
73     /**
74     **/

75     static final int STATE_CLOSED = 0;
76
77     /**
78     **/

79     static final int STATE_INITIALIZED = 1;
80
81     /**
82     **/

83     static final int STATE_INSERTING = 2;
84
85     /**
86     **/

87     static final int STATE_DONE_INSERTING = 3;
88
89     /**
90     **/

91     static final int STATE_SCANNING = 4;
92
93     /**
94     **/

95     static final int STATE_DONE_SCANNING = 5;
96
97     /**
98     Maintains the current state of the sort as defined in
99     the preceding values. Sorts start off and end up closed.
100     **/

101     protected int state = STATE_CLOSED;
102
103     /**
104     The template as passed in on create. Valid when the state
105     is INITIALIZED through SCANNING, null otherwise.
106     **/

107     protected DataValueDescriptor[] template;
108
109     /**
110     The column ordering as passed in on create. Valid when
111     the state is INITIALIZED through SCANNING, null otherwise.
112     May be null if there is no column ordering - this means
113     that all rows are considered to be duplicates, and the
114     sort will only emit a single row.
115     **/

116     protected ColumnOrdering columnOrdering[];
117
118     /**
119     A lookup table to speed up lookup of a column associated with the i'th
120     column to compare. To find the column id to compare as the i'th column
121     look in columnOrderingMap[i].
122     **/

123     protected int columnOrderingMap[];
124
125     /**
126     A lookup table to speed up lookup of Ascending state of a column,
127     **/

128     protected boolean columnOrderingAscendingMap[];
129
130     /**
131     The sort observer. May be null. Used as a callback.
132     **/

133     protected SortObserver sortObserver;
134
135     /**
136     Whether the rows are expected to be in order on insert,
137     as passed in on create.
138     **/

139     protected boolean alreadyInOrder;
140
141     /**
142     The inserter that's being used to insert rows into the sort.
143     This field is only valid when the state is INSERTING.
144     **/

145     protected MergeInserter inserter = null;
146
147     /**
148     The scan that's being used to return rows from the sort.
149     This field is only valid when the state is SCANNING.
150     **/

151     protected Scan scan = null;
152
153     /**
154     A vector of merge runs, produced by the MergeInserter.
155     Might be null if no merge runs were produced.
156     It is a vector of container ids.
157     **/

158     protected Vector JavaDoc mergeRuns = null;
159
160     /**
161     An ordered set of the leftover rows that didn't go
162     in the last merge run (might be all the rows if there
163     are no merge runs).
164     **/

165     protected SortBuffer sortBuffer = null;
166
167     /**
168     The maximum number of entries a sort buffer can hold.
169     **/

170     protected int sortBufferMax;
171
172     /**
173     The minimum number of entries a sort buffer can hold.
174     **/

175     protected int sortBufferMin;
176
177     /**
178     Properties for mergeSort
179     **/

180     static Properties JavaDoc properties = null;
181
182     /**
183     Static initializer for MergeSort, to initialize once the properties
184     for the sortBuffer.
185     **/

186     static
187     {
188         properties = new Properties JavaDoc();
189         properties.put(RawStoreFactory.STREAM_FILE_BUFFER_SIZE_PARAMETER, "16384");
190     }
191
192     /*
193      * Methods of Sort
194      */

195
196     /**
197     Open a sort controller.
198     <p>
199     This implementation only supports a single sort controller
200     per sort.
201     @see Sort#open
202     **/

203     public SortController open(TransactionManager tran)
204         throws StandardException
205     {
206         if (SanityManager.DEBUG)
207             SanityManager.ASSERT(state == STATE_INITIALIZED);
208
209         // Ready to start inserting rows.
210
state = STATE_INSERTING;
211
212         // Create and initialize an inserter. When the caller
213
// closes it, it will call back to inserterIsClosed().
214
this.inserter = new MergeInserter();
215         if (this.inserter.initialize(this, tran) == false)
216         {
217             throw StandardException.newException(SQLState.SORT_COULD_NOT_INIT);
218         }
219
220         return this.inserter;
221     }
222
223     /**
224     Open a scan controller.
225     @see Sort#openSortScan
226     **/

227
228     public ScanController openSortScan(
229     TransactionManager tran,
230     boolean hold)
231             throws StandardException
232     {
233         if (SanityManager.DEBUG)
234             SanityManager.ASSERT(state == STATE_DONE_INSERTING);
235
236         if (mergeRuns == null || mergeRuns.size() == 0)
237         {
238             // There were no merge runs so we can just return
239
// the rows from the sort buffer.
240
scan = new SortBufferScan(this, tran, sortBuffer, hold);
241
242             // The scan now owns the sort buffer
243
sortBuffer = null;
244         }
245         else
246         {
247             // Dump the rows in the sort buffer to a merge run.
248
long containerId = createMergeRun(tran, sortBuffer);
249             mergeRuns.addElement(new Long JavaDoc(containerId));
250
251             // If there are more merge runs than we can sort
252
// at once with our sort buffer, we have to reduce
253
// the number of merge runs
254
if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN ||
255                 mergeRuns.size() > sortBuffer.capacity())
256                     multiStageMerge(tran);
257
258             // There are now few enough merge runs to sort
259
// at once, so create a scan for them.
260
MergeScan mscan =
261                 new MergeScan(
262                     this, tran, sortBuffer, mergeRuns, sortObserver, hold);
263
264             if (!mscan.init(tran))
265             {
266                 throw StandardException.newException(
267                         SQLState.SORT_COULD_NOT_INIT);
268             }
269             scan = mscan;
270
271             // The scan now owns the sort buffer and merge runs.
272
sortBuffer = null;
273             mergeRuns = null;
274         }
275
276         // Ready to start retrieving rows.
277
this.state = STATE_SCANNING;
278
279         return scan;
280     }
281
282     /**
283     Open a row source to get rows out of the sorter.
284     @see Sort#openSortRowSource
285     **/

286     public ScanControllerRowSource openSortRowSource(TransactionManager tran)
287             throws StandardException
288     {
289         if (SanityManager.DEBUG)
290             SanityManager.ASSERT(state == STATE_DONE_INSERTING);
291
292         ScanControllerRowSource rowSource = null;
293
294         if (mergeRuns == null || mergeRuns.size() == 0)
295         {
296             // There were no merge runs so we can just return
297
// the rows from the sort buffer.
298
scan = new SortBufferRowSource(sortBuffer, tran, sortObserver, false, sortBufferMax);
299             rowSource = (ScanControllerRowSource)scan;
300
301             // The scan now owns the sort buffer
302
sortBuffer = null;
303         }
304         else
305         {
306             // Dump the rows in the sort buffer to a merge run.
307
long containerId = createMergeRun(tran, sortBuffer);
308             mergeRuns.addElement(new Long JavaDoc(containerId));
309
310             // If there are more merge runs than we can sort
311
// at once with our sort buffer, we have to reduce
312
// the number of merge runs
313
if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN ||
314                 mergeRuns.size() > sortBuffer.capacity())
315                 multiStageMerge(tran);
316
317             // There are now few enough merge runs to sort
318
// at once, so create a rowSource for them.
319
MergeScanRowSource msRowSource =
320                 new MergeScanRowSource(this, tran, sortBuffer, mergeRuns, sortObserver, false);
321             if (!msRowSource.init(tran))
322             {
323                 throw StandardException.newException(
324                         SQLState.SORT_COULD_NOT_INIT);
325             }
326             scan = msRowSource;
327             rowSource = msRowSource;
328
329             // The scan now owns the sort buffer and merge runs.
330
sortBuffer = null;
331             mergeRuns = null;
332         }
333
334         // Ready to start retrieving rows.
335
this.state = STATE_SCANNING;
336
337         return rowSource;
338     }
339
340
341
342     /**
343     Drop the sort.
344     @see Sort#drop
345     **/

346     public void drop(TransactionController tran)
347         throws StandardException
348     {
349         // Make sure the inserter is closed. Note this
350
// will cause the callback to doneInserting()
351
// which will give us any in-progress merge
352
// runs, if there are any.
353
if (inserter != null)
354             inserter.close();
355         inserter = null;
356
357         // Make sure the scan is closed, if there is one.
358
// This will cause the callback to doneScanning().
359
if (scan != null)
360         {
361             scan.close();
362             scan = null;
363         }
364
365         // If we have a row set, get rid of it.
366
if (sortBuffer != null)
367         {
368             sortBuffer.close();
369             sortBuffer = null;
370         }
371
372         // Clean out the rest of the objects.
373
template = null;
374         columnOrdering = null;
375         sortObserver = null;
376
377         // If there are any merge runs, drop them.
378
dropMergeRuns((TransactionManager)tran);
379
380         // Whew!
381
state = STATE_CLOSED;
382     }
383
384
385     /*
386      * Methods of MergeSort. Arranged alphabetically.
387      */

388
389     /**
390     Check the column ordering against the template, making
391     sure that each column is present in the template,
392     implements Orderable, and is not mentioned more than
393     once. Intended to be called as part of a sanity check.
394     **/

395     protected boolean checkColumnOrdering(
396     DataValueDescriptor[] template,
397     ColumnOrdering columnOrdering[])
398     {
399         // Allocate an array to check that each column mentioned only once.
400
int templateNColumns = template.length;
401         boolean seen[] = new boolean[templateNColumns];
402
403         // Check each column ordering.
404
for (int i = 0; i < columnOrdering.length; i++)
405         {
406             int colid = columnOrdering[i].getColumnId();
407
408             // Check that the column id is valid.
409
if (colid < 0 || colid >= templateNColumns)
410                 return false;
411             
412             // Check that the column isn't mentioned more than once.
413
if (seen[colid])
414                 return false;
415             seen[colid] = true;
416
417             Object JavaDoc columnVal =
418                 RowUtil.getColumn(template, (FormatableBitSet) null, colid);
419
420             if (!(columnVal instanceof Orderable))
421                 return false;
422         }
423
424         return true;
425     }
426
427     /**
428     Check that the columns in the row agree with the columns
429     in the template, both in number and in type.
430     <p>
431     XXX (nat) Currently checks that the classes implementing
432     each column are the same -- is this right?
433     **/

434     void checkColumnTypes(DataValueDescriptor[] row)
435         throws StandardException
436     {
437         int nCols = row.length;
438         if (template.length != nCols)
439         {
440             if (SanityManager.DEBUG)
441             {
442                 SanityManager.THROWASSERT(
443                     "template.length (" + template.length +
444                     ") expected to be = to nCols (" +
445                     nCols + ")");
446             }
447             throw StandardException.newException(
448                     SQLState.SORT_TYPE_MISMATCH);
449         }
450
451         if (SanityManager.DEBUG)
452         {
453             for (int colid = 0; colid < nCols; colid++)
454             {
455                 Object JavaDoc col1 = row[colid];
456                 Object JavaDoc col2 = template[colid];
457                 if (col1 == null)
458                 {
459                     SanityManager.THROWASSERT(
460                         "col[" + colid + "] is null");
461                 }
462                         
463                 if (!(col1 instanceof CloneableObject))
464                 {
465                     SanityManager.THROWASSERT(
466                         "col[" + colid + "] (" +col1.getClass().getName()+
467                         ") is not a CloneableObject.");
468                 }
469
470                 if (col1.getClass() != col2.getClass())
471                 {
472                     SanityManager.THROWASSERT(
473                         "col1.getClass() (" + col1.getClass() +
474                         ") expected to be the same as col2.getClass() (" +
475                         col2.getClass() + ")");
476                 }
477             }
478         }
479     }
480
481     int compare(
482     DataValueDescriptor[] r1,
483     DataValueDescriptor[] r2)
484         throws StandardException
485     {
486         // Get the number of columns we have to compare.
487
int colsToCompare = this.columnOrdering.length;
488         int r;
489
490         // Compare the columns specified in the column
491
// ordering array.
492
for (int i = 0; i < colsToCompare; i++)
493         {
494             // Get columns to compare.
495
int colid = this.columnOrderingMap[i];
496
497             // If the columns don't compare equal, we're done.
498
// Return the sense of the comparison.
499
if ((r = r1[colid].compare(r2[colid]))
500                     != 0)
501             {
502                 if (this.columnOrderingAscendingMap[i])
503                     return r;
504                 else
505                     return -r;
506             }
507         }
508
509         // We made it through all the columns, and they must have
510
// all compared equal. So return that the rows compare equal.
511
return 0;
512     }
513
514     /**
515     Go from the CLOSED to the INITIALIZED state.
516     **/

517     public void initialize(
518     DataValueDescriptor[] template,
519     ColumnOrdering columnOrdering[],
520     SortObserver sortObserver,
521     boolean alreadyInOrder,
522     long estimatedRows,
523     int sortBufferMax)
524         throws StandardException
525     {
526         if (SanityManager.DEBUG)
527         {
528             SanityManager.ASSERT(state == STATE_CLOSED);
529         }
530
531         // Make sure the column ordering makes sense
532
if (SanityManager.DEBUG)
533         {
534             SanityManager.ASSERT(checkColumnOrdering(template, columnOrdering),
535                 "column ordering error");
536         }
537
538         // Set user-defined parameters.
539
this.template = template;
540         this.columnOrdering = columnOrdering;
541         this.sortObserver = sortObserver;
542         this.alreadyInOrder = alreadyInOrder;
543
544         // Cache results of columnOrdering calls, results are not allowed
545
// to change throughout a sort.
546
columnOrderingMap = new int[columnOrdering.length];
547         columnOrderingAscendingMap = new boolean[columnOrdering.length];
548         for (int i = 0; i < columnOrdering.length; i++)
549         {
550             columnOrderingMap[i] = columnOrdering[i].getColumnId();
551             columnOrderingAscendingMap[i] = columnOrdering[i].getIsAscending();
552         }
553
554         // No inserter or scan yet.
555
this.inserter = null;
556         this.scan = null;
557
558         // We don't have any merge runs.
559
this.mergeRuns = null;
560         this.sortBuffer = null;
561         this.sortBufferMax = sortBufferMax;
562
563         if (estimatedRows > sortBufferMax)
564             sortBufferMin = sortBufferMax;
565         else
566             sortBufferMin = (int)estimatedRows;
567         if (SanityManager.DEBUG)
568         {
569             if (SanityManager.DEBUG_ON("testSort"))
570                 sortBufferMin = sortBufferMax;
571         }
572
573         this.state = STATE_INITIALIZED;
574     }
575
576     /**
577     An inserter is closing.
578     **/

579     void doneInserting(MergeInserter inserter,
580         SortBuffer sortBuffer, Vector JavaDoc mergeRuns)
581     {
582         if (SanityManager.DEBUG)
583         {
584             SanityManager.ASSERT(state == STATE_INSERTING);
585         }
586
587         this.sortBuffer = sortBuffer;
588         this.mergeRuns = mergeRuns;
589         this.inserter = null;
590
591         this.state = STATE_DONE_INSERTING;
592     }
593
594     void doneScanning(Scan scan, SortBuffer sortBuffer)
595     {
596         if (SanityManager.DEBUG)
597         {
598             // Make sure the scan we're getting back is the one we gave out
599

600             if (this.scan != scan)
601                 SanityManager.THROWASSERT("this.scan = " + this.scan
602                                           + " scan = " + scan);
603         }
604
605         this.sortBuffer = sortBuffer;
606         this.scan = null;
607
608         this.state = STATE_DONE_SCANNING;
609     }
610
611     void doneScanning(Scan scan, SortBuffer sortBuffer,
612         Vector JavaDoc mergeRuns)
613     {
614         this.mergeRuns = mergeRuns;
615
616         doneScanning(scan, sortBuffer);
617     }
618
619
620     /**
621     Get rid of the merge runs, if there are any.
622     Must not cause any errors because it's called
623     during error processing.
624     **/

625     void dropMergeRuns(TransactionManager tran)
626     {
627         if (mergeRuns != null)
628         {
629             Enumeration JavaDoc e = mergeRuns.elements();
630
631             try
632             {
633                 Transaction rawTran = tran.getRawStoreXact();
634                 long segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
635
636                 while (e.hasMoreElements())
637                 {
638                     long containerId = ((Long JavaDoc) e.nextElement()).longValue();
639                     rawTran.dropStreamContainer(segmentId, containerId);
640                 }
641             }
642             catch (StandardException se)
643             {
644                 // Ignore problems with dropping, worst case
645
// the raw store will clean up at reboot.
646
}
647             mergeRuns = null;
648         }
649     }
650
651     /* DEBUG (nat)
652     void printRunInfo(TransactionController tran)
653         throws StandardException
654     {
655         java.util.Enumeration e = mergeRuns.elements();
656         while (e.hasMoreElements())
657         {
658             long conglomid = ((Long) e.nextElement()).longValue();
659             ScanController sc = tran.openScan(conglomid, false,
660                                     false, null, null, 0, null,
661                                     null, 0);
662             System.out.println("Merge run: conglomid=" + conglomid);
663             while (sc.next())
664             {
665                 sc.fetch(template);
666                 System.out.println(template);
667             }
668             sc.close();
669         }
670     }
671     */

672
673     private void multiStageMerge(TransactionManager tran)
674         throws StandardException
675     {
676         Enumeration JavaDoc e;
677         //int iterations = 0; // DEBUG (nat)
678
int maxMergeRuns = sortBuffer.capacity();
679
680         if (maxMergeRuns > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN)
681             maxMergeRuns = ExternalSortFactory.DEFAULT_MAX_MERGE_RUN;
682
683         Vector JavaDoc subset;
684         Vector JavaDoc leftovers;
685
686         while (mergeRuns.size() > maxMergeRuns)
687         {
688             // Move maxMergeRuns elements from the merge runs
689
// vector into a subset, leaving the rest.
690
subset = new Vector JavaDoc(maxMergeRuns);
691             leftovers = new Vector JavaDoc(mergeRuns.size() - maxMergeRuns);
692             e = mergeRuns.elements();
693             while (e.hasMoreElements())
694             {
695                 Long JavaDoc containerId = (Long JavaDoc) e.nextElement();
696                 if (subset.size() < maxMergeRuns)
697                     subset.addElement(containerId);
698                 else
699                     leftovers.addElement(containerId);
700             }
701
702             /* DEBUG (nat)
703             iterations++;
704                 System.out.println(subset.size() + " elements in subset");
705                 System.out.println(leftovers.size() + " elements in leftovers");
706                 System.out.println(mergeRuns.size() + " elements in mergeRuns");
707                 System.out.println("maxMergeRuns is " + maxMergeRuns);
708                 System.out.println("iterations = " + iterations);
709             if (subset.size() == 0)
710             {
711                 System.exit(1);
712             }
713             */

714
715             mergeRuns = leftovers;
716
717             // Open a merge scan on the subset.
718
MergeScanRowSource msRowSource =
719                 new MergeScanRowSource(this, tran, sortBuffer, subset, sortObserver, false);
720
721             if (!msRowSource.init(tran))
722             {
723                 throw StandardException.newException(
724                         SQLState.SORT_COULD_NOT_INIT);
725             }
726
727             // Create and open another temporary stream conglomerate
728
// which will become
729
// a merge run made up with the merged runs from the subset.
730
Transaction rawTran = tran.getRawStoreXact();
731             int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
732             long id = rawTran.addAndLoadStreamContainer(segmentId,
733                 properties, msRowSource);
734
735             mergeRuns.addElement(new Long JavaDoc(id));
736
737             // Drop the conglomerates in the merge subset
738
e = subset.elements();
739             while (e.hasMoreElements())
740             {
741                 Long JavaDoc containerId = (Long JavaDoc) e.nextElement();
742                 rawTran.dropStreamContainer(segmentId, containerId.longValue());
743             }
744         }
745     }
746
747     /**
748     Remove all the rows from the sort buffer and store them
749     in a temporary conglomerate. The temporary conglomerate
750     is a "merge run". Returns the container id of the
751     merge run.
752     **/

753     long createMergeRun(TransactionManager tran, SortBuffer sortBuffer)
754         throws StandardException
755     {
756         // this sort buffer is not a scan and is not tracked by any
757
// TransactionManager.
758
SortBufferRowSource rowSource =
759             new SortBufferRowSource(sortBuffer, (TransactionManager)null, sortObserver, true, sortBufferMax);
760
761         // Create a temporary stream conglomerate...
762
Transaction rawTran = tran.getRawStoreXact(); // get raw transaction
763
int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
764         long id = rawTran.addAndLoadStreamContainer(segmentId,
765             properties, rowSource);
766
767         // Don't close the sortBuffer, we just emptied it, the caller may reuse
768
// that sortBuffer for the next run.
769
rowSource = null;
770
771         return id;
772     }
773 }
774
Popular Tags