1 21 22 package org.apache.derby.impl.store.access.sort; 23 24 import org.apache.derby.iapi.reference.SQLState; 25 26 import org.apache.derby.iapi.services.io.FormatableBitSet; 27 28 import org.apache.derby.iapi.services.io.Storable; 29 import org.apache.derby.iapi.services.sanity.SanityManager; 30 import org.apache.derby.iapi.error.StandardException; 31 import org.apache.derby.iapi.store.access.conglomerate.ScanControllerRowSource; 32 import org.apache.derby.iapi.store.access.conglomerate.Sort; 33 import org.apache.derby.iapi.store.access.conglomerate.SortFactory; 34 import org.apache.derby.iapi.store.access.conglomerate.TransactionManager; 35 import org.apache.derby.iapi.types.CloneableObject; 36 import org.apache.derby.iapi.store.access.ColumnOrdering; 37 import org.apache.derby.iapi.store.access.ConglomerateController; 38 import org.apache.derby.iapi.store.access.Qualifier; 39 import org.apache.derby.iapi.store.access.RowUtil; 40 import org.apache.derby.iapi.store.access.ScanController; 41 import org.apache.derby.iapi.store.access.SortObserver; 42 import org.apache.derby.iapi.store.access.SortController; 43 import org.apache.derby.iapi.store.access.TransactionController; 44 45 import org.apache.derby.iapi.store.raw.StreamContainerHandle; 46 import org.apache.derby.iapi.store.raw.RawStoreFactory; 47 import org.apache.derby.iapi.store.raw.Transaction; 48 49 import org.apache.derby.iapi.types.DataValueDescriptor; 50 51 import org.apache.derby.iapi.types.Orderable; 52 import org.apache.derby.iapi.types.RowLocation; 53 54 import java.util.Enumeration ; 55 import java.util.Properties ; 56 import java.util.Vector ; 57 58 65 66 public final class MergeSort implements Sort 67 { 68 69 72 73 75 static final int STATE_CLOSED = 0; 76 77 79 static final int STATE_INITIALIZED = 1; 80 81 83 static final int STATE_INSERTING = 2; 84 85 87 static final int STATE_DONE_INSERTING = 3; 88 89 91 static final int STATE_SCANNING = 4; 92 93 95 static final int STATE_DONE_SCANNING = 5; 96 97 101 protected int state = STATE_CLOSED; 102 103 107 protected DataValueDescriptor[] template; 108 109 116 protected ColumnOrdering columnOrdering[]; 117 118 123 protected int columnOrderingMap[]; 124 125 128 protected boolean columnOrderingAscendingMap[]; 129 130 133 protected SortObserver sortObserver; 134 135 139 protected boolean alreadyInOrder; 140 141 145 protected MergeInserter inserter = null; 146 147 151 protected Scan scan = null; 152 153 158 protected Vector mergeRuns = null; 159 160 165 protected SortBuffer sortBuffer = null; 166 167 170 protected int sortBufferMax; 171 172 175 protected int sortBufferMin; 176 177 180 static Properties properties = null; 181 182 186 static 187 { 188 properties = new Properties (); 189 properties.put(RawStoreFactory.STREAM_FILE_BUFFER_SIZE_PARAMETER, "16384"); 190 } 191 192 195 196 203 public SortController open(TransactionManager tran) 204 throws StandardException 205 { 206 if (SanityManager.DEBUG) 207 SanityManager.ASSERT(state == STATE_INITIALIZED); 208 209 state = STATE_INSERTING; 211 212 this.inserter = new MergeInserter(); 215 if (this.inserter.initialize(this, tran) == false) 216 { 217 throw StandardException.newException(SQLState.SORT_COULD_NOT_INIT); 218 } 219 220 return this.inserter; 221 } 222 223 227 228 public ScanController openSortScan( 229 TransactionManager tran, 230 boolean hold) 231 throws StandardException 232 { 233 if (SanityManager.DEBUG) 234 SanityManager.ASSERT(state == STATE_DONE_INSERTING); 235 236 if (mergeRuns == null || mergeRuns.size() == 0) 237 { 238 scan = new SortBufferScan(this, tran, sortBuffer, hold); 241 242 sortBuffer = null; 244 } 245 else 246 { 247 long containerId = createMergeRun(tran, sortBuffer); 249 mergeRuns.addElement(new Long (containerId)); 250 251 if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN || 255 mergeRuns.size() > sortBuffer.capacity()) 256 multiStageMerge(tran); 257 258 MergeScan mscan = 261 new MergeScan( 262 this, tran, sortBuffer, mergeRuns, sortObserver, hold); 263 264 if (!mscan.init(tran)) 265 { 266 throw StandardException.newException( 267 SQLState.SORT_COULD_NOT_INIT); 268 } 269 scan = mscan; 270 271 sortBuffer = null; 273 mergeRuns = null; 274 } 275 276 this.state = STATE_SCANNING; 278 279 return scan; 280 } 281 282 286 public ScanControllerRowSource openSortRowSource(TransactionManager tran) 287 throws StandardException 288 { 289 if (SanityManager.DEBUG) 290 SanityManager.ASSERT(state == STATE_DONE_INSERTING); 291 292 ScanControllerRowSource rowSource = null; 293 294 if (mergeRuns == null || mergeRuns.size() == 0) 295 { 296 scan = new SortBufferRowSource(sortBuffer, tran, sortObserver, false, sortBufferMax); 299 rowSource = (ScanControllerRowSource)scan; 300 301 sortBuffer = null; 303 } 304 else 305 { 306 long containerId = createMergeRun(tran, sortBuffer); 308 mergeRuns.addElement(new Long (containerId)); 309 310 if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN || 314 mergeRuns.size() > sortBuffer.capacity()) 315 multiStageMerge(tran); 316 317 MergeScanRowSource msRowSource = 320 new MergeScanRowSource(this, tran, sortBuffer, mergeRuns, sortObserver, false); 321 if (!msRowSource.init(tran)) 322 { 323 throw StandardException.newException( 324 SQLState.SORT_COULD_NOT_INIT); 325 } 326 scan = msRowSource; 327 rowSource = msRowSource; 328 329 sortBuffer = null; 331 mergeRuns = null; 332 } 333 334 this.state = STATE_SCANNING; 336 337 return rowSource; 338 } 339 340 341 342 346 public void drop(TransactionController tran) 347 throws StandardException 348 { 349 if (inserter != null) 354 inserter.close(); 355 inserter = null; 356 357 if (scan != null) 360 { 361 scan.close(); 362 scan = null; 363 } 364 365 if (sortBuffer != null) 367 { 368 sortBuffer.close(); 369 sortBuffer = null; 370 } 371 372 template = null; 374 columnOrdering = null; 375 sortObserver = null; 376 377 dropMergeRuns((TransactionManager)tran); 379 380 state = STATE_CLOSED; 382 } 383 384 385 388 389 395 protected boolean checkColumnOrdering( 396 DataValueDescriptor[] template, 397 ColumnOrdering columnOrdering[]) 398 { 399 int templateNColumns = template.length; 401 boolean seen[] = new boolean[templateNColumns]; 402 403 for (int i = 0; i < columnOrdering.length; i++) 405 { 406 int colid = columnOrdering[i].getColumnId(); 407 408 if (colid < 0 || colid >= templateNColumns) 410 return false; 411 412 if (seen[colid]) 414 return false; 415 seen[colid] = true; 416 417 Object columnVal = 418 RowUtil.getColumn(template, (FormatableBitSet) null, colid); 419 420 if (!(columnVal instanceof Orderable)) 421 return false; 422 } 423 424 return true; 425 } 426 427 434 void checkColumnTypes(DataValueDescriptor[] row) 435 throws StandardException 436 { 437 int nCols = row.length; 438 if (template.length != nCols) 439 { 440 if (SanityManager.DEBUG) 441 { 442 SanityManager.THROWASSERT( 443 "template.length (" + template.length + 444 ") expected to be = to nCols (" + 445 nCols + ")"); 446 } 447 throw StandardException.newException( 448 SQLState.SORT_TYPE_MISMATCH); 449 } 450 451 if (SanityManager.DEBUG) 452 { 453 for (int colid = 0; colid < nCols; colid++) 454 { 455 Object col1 = row[colid]; 456 Object col2 = template[colid]; 457 if (col1 == null) 458 { 459 SanityManager.THROWASSERT( 460 "col[" + colid + "] is null"); 461 } 462 463 if (!(col1 instanceof CloneableObject)) 464 { 465 SanityManager.THROWASSERT( 466 "col[" + colid + "] (" +col1.getClass().getName()+ 467 ") is not a CloneableObject."); 468 } 469 470 if (col1.getClass() != col2.getClass()) 471 { 472 SanityManager.THROWASSERT( 473 "col1.getClass() (" + col1.getClass() + 474 ") expected to be the same as col2.getClass() (" + 475 col2.getClass() + ")"); 476 } 477 } 478 } 479 } 480 481 int compare( 482 DataValueDescriptor[] r1, 483 DataValueDescriptor[] r2) 484 throws StandardException 485 { 486 int colsToCompare = this.columnOrdering.length; 488 int r; 489 490 for (int i = 0; i < colsToCompare; i++) 493 { 494 int colid = this.columnOrderingMap[i]; 496 497 if ((r = r1[colid].compare(r2[colid])) 500 != 0) 501 { 502 if (this.columnOrderingAscendingMap[i]) 503 return r; 504 else 505 return -r; 506 } 507 } 508 509 return 0; 512 } 513 514 517 public void initialize( 518 DataValueDescriptor[] template, 519 ColumnOrdering columnOrdering[], 520 SortObserver sortObserver, 521 boolean alreadyInOrder, 522 long estimatedRows, 523 int sortBufferMax) 524 throws StandardException 525 { 526 if (SanityManager.DEBUG) 527 { 528 SanityManager.ASSERT(state == STATE_CLOSED); 529 } 530 531 if (SanityManager.DEBUG) 533 { 534 SanityManager.ASSERT(checkColumnOrdering(template, columnOrdering), 535 "column ordering error"); 536 } 537 538 this.template = template; 540 this.columnOrdering = columnOrdering; 541 this.sortObserver = sortObserver; 542 this.alreadyInOrder = alreadyInOrder; 543 544 columnOrderingMap = new int[columnOrdering.length]; 547 columnOrderingAscendingMap = new boolean[columnOrdering.length]; 548 for (int i = 0; i < columnOrdering.length; i++) 549 { 550 columnOrderingMap[i] = columnOrdering[i].getColumnId(); 551 columnOrderingAscendingMap[i] = columnOrdering[i].getIsAscending(); 552 } 553 554 this.inserter = null; 556 this.scan = null; 557 558 this.mergeRuns = null; 560 this.sortBuffer = null; 561 this.sortBufferMax = sortBufferMax; 562 563 if (estimatedRows > sortBufferMax) 564 sortBufferMin = sortBufferMax; 565 else 566 sortBufferMin = (int)estimatedRows; 567 if (SanityManager.DEBUG) 568 { 569 if (SanityManager.DEBUG_ON("testSort")) 570 sortBufferMin = sortBufferMax; 571 } 572 573 this.state = STATE_INITIALIZED; 574 } 575 576 579 void doneInserting(MergeInserter inserter, 580 SortBuffer sortBuffer, Vector mergeRuns) 581 { 582 if (SanityManager.DEBUG) 583 { 584 SanityManager.ASSERT(state == STATE_INSERTING); 585 } 586 587 this.sortBuffer = sortBuffer; 588 this.mergeRuns = mergeRuns; 589 this.inserter = null; 590 591 this.state = STATE_DONE_INSERTING; 592 } 593 594 void doneScanning(Scan scan, SortBuffer sortBuffer) 595 { 596 if (SanityManager.DEBUG) 597 { 598 600 if (this.scan != scan) 601 SanityManager.THROWASSERT("this.scan = " + this.scan 602 + " scan = " + scan); 603 } 604 605 this.sortBuffer = sortBuffer; 606 this.scan = null; 607 608 this.state = STATE_DONE_SCANNING; 609 } 610 611 void doneScanning(Scan scan, SortBuffer sortBuffer, 612 Vector mergeRuns) 613 { 614 this.mergeRuns = mergeRuns; 615 616 doneScanning(scan, sortBuffer); 617 } 618 619 620 625 void dropMergeRuns(TransactionManager tran) 626 { 627 if (mergeRuns != null) 628 { 629 Enumeration e = mergeRuns.elements(); 630 631 try 632 { 633 Transaction rawTran = tran.getRawStoreXact(); 634 long segmentId = StreamContainerHandle.TEMPORARY_SEGMENT; 635 636 while (e.hasMoreElements()) 637 { 638 long containerId = ((Long ) e.nextElement()).longValue(); 639 rawTran.dropStreamContainer(segmentId, containerId); 640 } 641 } 642 catch (StandardException se) 643 { 644 } 647 mergeRuns = null; 648 } 649 } 650 651 672 673 private void multiStageMerge(TransactionManager tran) 674 throws StandardException 675 { 676 Enumeration e; 677 int maxMergeRuns = sortBuffer.capacity(); 679 680 if (maxMergeRuns > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN) 681 maxMergeRuns = ExternalSortFactory.DEFAULT_MAX_MERGE_RUN; 682 683 Vector subset; 684 Vector leftovers; 685 686 while (mergeRuns.size() > maxMergeRuns) 687 { 688 subset = new Vector (maxMergeRuns); 691 leftovers = new Vector (mergeRuns.size() - maxMergeRuns); 692 e = mergeRuns.elements(); 693 while (e.hasMoreElements()) 694 { 695 Long containerId = (Long ) e.nextElement(); 696 if (subset.size() < maxMergeRuns) 697 subset.addElement(containerId); 698 else 699 leftovers.addElement(containerId); 700 } 701 702 714 715 mergeRuns = leftovers; 716 717 MergeScanRowSource msRowSource = 719 new MergeScanRowSource(this, tran, sortBuffer, subset, sortObserver, false); 720 721 if (!msRowSource.init(tran)) 722 { 723 throw StandardException.newException( 724 SQLState.SORT_COULD_NOT_INIT); 725 } 726 727 Transaction rawTran = tran.getRawStoreXact(); 731 int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT; 732 long id = rawTran.addAndLoadStreamContainer(segmentId, 733 properties, msRowSource); 734 735 mergeRuns.addElement(new Long (id)); 736 737 e = subset.elements(); 739 while (e.hasMoreElements()) 740 { 741 Long containerId = (Long ) e.nextElement(); 742 rawTran.dropStreamContainer(segmentId, containerId.longValue()); 743 } 744 } 745 } 746 747 753 long createMergeRun(TransactionManager tran, SortBuffer sortBuffer) 754 throws StandardException 755 { 756 SortBufferRowSource rowSource = 759 new SortBufferRowSource(sortBuffer, (TransactionManager)null, sortObserver, true, sortBufferMax); 760 761 Transaction rawTran = tran.getRawStoreXact(); int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT; 764 long id = rawTran.addAndLoadStreamContainer(segmentId, 765 properties, rowSource); 766 767 rowSource = null; 770 771 return id; 772 } 773 } 774 | Popular Tags |