KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > derby > impl > store > access > sort > MergeScan


1 /*
2
3    Derby - Class org.apache.derby.impl.store.access.sort.MergeScan
4
5    Licensed to the Apache Software Foundation (ASF) under one or more
6    contributor license agreements. See the NOTICE file distributed with
7    this work for additional information regarding copyright ownership.
8    The ASF licenses this file to you under the Apache License, Version 2.0
9    (the "License"); you may not use this file except in compliance with
10    the License. You may obtain a copy of the License at
11
12       http://www.apache.org/licenses/LICENSE-2.0
13
14    Unless required by applicable law or agreed to in writing, software
15    distributed under the License is distributed on an "AS IS" BASIS,
16    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17    See the License for the specific language governing permissions and
18    limitations under the License.
19
20  */

21
22 package org.apache.derby.impl.store.access.sort;
23
24 import java.util.Enumeration JavaDoc;
25 import java.util.Vector JavaDoc;
26
27 import org.apache.derby.iapi.services.sanity.SanityManager;
28 import org.apache.derby.iapi.services.io.Storable;
29 import org.apache.derby.iapi.error.StandardException;
30 import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;
31 import org.apache.derby.iapi.store.access.conglomerate.ScanManager;
32 import org.apache.derby.iapi.store.access.ScanController;
33 import org.apache.derby.iapi.store.access.SortObserver;
34 import org.apache.derby.iapi.store.access.TransactionController;
35 import org.apache.derby.iapi.store.raw.StreamContainerHandle;
36 import org.apache.derby.iapi.store.raw.Transaction;
37
38 import org.apache.derby.iapi.types.DataValueDescriptor;
39
40 /**
41     A sort scan that is capable of merging as many merge runs
42     as will fit in the passed-in sort buffer.
43 **/

44
45 public class MergeScan extends SortScan
46 {
47     /**
48     The sort buffer we will use.
49     **/

50     protected SortBuffer sortBuffer;
51
52     /**
53     The merge runs.
54     **/

55     protected Vector JavaDoc mergeRuns;
56
57     /**
58     Array of scan controllers for the merge runs.
59     Entries in the array become null as the last
60     row is pulled out and the scan is closed.
61     **/

62     protected StreamContainerHandle openScans[];
63
64     private SortObserver sortObserver;
65
66     /*
67      * Constructors.
68      */

69
70     MergeScan(
71     MergeSort sort,
72     TransactionManager tran,
73     SortBuffer sortBuffer,
74     Vector JavaDoc mergeRuns,
75     SortObserver sortObserver,
76     boolean hold)
77     {
78         super(sort, tran, hold);
79         this.sortBuffer = sortBuffer;
80         this.mergeRuns = mergeRuns;
81         this.tran = tran;
82         this.sortObserver = sortObserver;
83     }
84
85     /*
86      * Methods of MergeSortScan
87      */

88
89     /**
90     Move to the next position in the scan.
91     @see ScanController#next
92     **/

93     public boolean next()
94         throws StandardException
95     {
96         current = sortBuffer.removeFirst();
97         if (current != null)
98             mergeARow(sortBuffer.getLastAux());
99         return (current != null);
100     }
101
102     /**
103     Close the scan.
104     @see ScanController#close
105     **/

106     public void close()
107     {
108         if (openScans != null)
109         {
110             for (int i = 0; i < openScans.length; i++)
111             {
112                 if (openScans[i] != null)
113                 {
114                     openScans[i].close();
115                 }
116                 openScans[i] = null;
117             }
118             openScans = null;
119         }
120
121         // Hand sort buffer and remaining merge runs to sort.
122
if (super.sort != null)
123         {
124             sort.doneScanning(this, sortBuffer, mergeRuns);
125             sortBuffer = null;
126             mergeRuns = null;
127         }
128
129         // Sets sort to null
130
super.close();
131     }
132
133     /**
134     Close the scan.
135     @see ScanManager#closeForEndTransaction
136     **/

137     public boolean closeForEndTransaction(boolean closeHeldScan)
138     {
139         if (!hold || closeHeldScan)
140         {
141             close();
142             return(true);
143         }
144         else
145         {
146             return(false);
147         }
148     }
149
150     /*
151      * Methods of MergeScan
152      */

153
154     /**
155     Initialize the scan, returning false if there
156     was some error.
157     **/

158     public boolean init(TransactionManager tran)
159         throws StandardException
160     {
161         if (SanityManager.DEBUG)
162         {
163             // We really expect to have at least one
164
// merge run.
165
SanityManager.ASSERT(mergeRuns != null);
166             SanityManager.ASSERT(mergeRuns.size() > 0);
167
168             // This sort scan also expects that the
169
// caller has ensured that the sort buffer
170
// capacity will hold a row from all the
171
// merge runs.
172
SanityManager.ASSERT(sortBuffer.capacity() >= mergeRuns.size());
173         }
174
175         // Clear the sort buffer.
176
sortBuffer.reset();
177
178         // Create an array to hold a scan controller
179
// for each merge run.
180
openScans = new StreamContainerHandle[mergeRuns.size()];
181         if (openScans == null)
182             return false;
183
184         // Open a scan on each merge run.
185
int scanindex = 0;
186         Enumeration JavaDoc e = mergeRuns.elements();
187         while (e.hasMoreElements())
188         {
189             // get the container id
190
long id = ((Long JavaDoc) e.nextElement()).longValue();
191
192             Transaction rawTran = tran.getRawStoreXact(); // get raw transaction
193
int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;
194             openScans[scanindex++] =
195                 rawTran.openStreamContainer(segmentId, id, hold);
196         }
197
198         // Load the initial rows.
199
for (scanindex = 0; scanindex < openScans.length; scanindex++)
200             mergeARow(scanindex);
201
202         // Success!
203
return true;
204     }
205
206     /**
207     Insert rows while we keep getting duplicates
208     from the merge run whose scan is in the
209     open scan array entry indexed by scanindex.
210     **/

211     void mergeARow(int scanindex)
212         throws StandardException
213     {
214         if (SanityManager.DEBUG)
215         {
216             // Unless there's a bug, the scan index will refer
217
// to an open scan. That's because we never put
218
// a scan index for a closed scan into the sort
219
// buffer (via setNextAux).
220
SanityManager.ASSERT(openScans[scanindex] != null);
221         }
222
223         DataValueDescriptor[] row;
224
225         // Read rows from the merge run and stuff them into the
226
// sort buffer for as long as we encounter duplicates.
227
do
228         {
229             row = sortObserver.getArrayClone();
230
231             // Fetch the row from the merge run.
232
if (!openScans[scanindex].fetchNext(row))
233             {
234                 // If we're out of rows in the merge run, close the scan.
235

236                 openScans[scanindex].close();
237                 openScans[scanindex] = null;
238                 return;
239             }
240
241             // Save the index of this merge run with
242
// the row we're putting in the sort buffer.
243
sortBuffer.setNextAux(scanindex);
244         }
245         while (sortBuffer.insert(row) == SortBuffer.INSERT_DUPLICATE);
246     }
247 }
248
Popular Tags