1 8 9 package com.sleepycat.je; 10 11 import java.util.Arrays ; 12 import java.util.Comparator ; 13 import java.util.logging.Level ; 14 15 import com.sleepycat.je.dbi.GetMode; 16 import com.sleepycat.je.dbi.CursorImpl.SearchMode; 17 import com.sleepycat.je.txn.Locker; 18 import com.sleepycat.je.utilint.DatabaseUtil; 19 20 24 public class JoinCursor { 25 26 private JoinConfig config; 27 private Database priDb; 28 private Cursor priCursor; 29 private Cursor[] secCursors; 30 private DatabaseEntry[] cursorScratchEntries; 31 private DatabaseEntry scratchEntry; 32 33 36 JoinCursor(Locker locker, 37 Database primaryDb, 38 final Cursor[] cursors, 39 JoinConfig configParam) 40 throws DatabaseException { 41 42 priDb = primaryDb; 43 config = (configParam != null) ? configParam.cloneConfig() 44 : JoinConfig.DEFAULT; 45 scratchEntry = new DatabaseEntry(); 46 cursorScratchEntries = new DatabaseEntry[cursors.length]; 47 Cursor[] sortedCursors = new Cursor[cursors.length]; 48 System.arraycopy(cursors, 0, sortedCursors, 0, cursors.length); 49 50 if (!config.getNoSort()) { 51 52 57 final int[] counts = new int[cursors.length]; 58 for (int i = 0; i < cursors.length; i += 1) { 59 counts[i] = cursors[i].countInternal 60 (LockMode.READ_UNCOMMITTED); 61 assert counts[i] >= 0; 62 } 63 Arrays.sort(sortedCursors, new Comparator () { 64 public int compare(Object o1, Object o2) { 65 int count1 = -1; 66 int count2 = -1; 67 68 72 for (int i = 0; i < cursors.length && 73 (count1 < 0 || count2 < 0); i += 1) { 74 if (cursors[i] == o1) { 75 count1 = counts[i]; 76 } else if (cursors[i] == o2) { 77 count2 = counts[i]; 78 } 79 } 80 assert count1 >= 0 && count2 >= 0; 81 return (count1 - count2); 82 } 83 }); 84 } 85 86 91 try { 92 priCursor = new Cursor(priDb, locker, null); 93 secCursors = new Cursor[cursors.length]; 94 for (int i = 0; i < cursors.length; i += 1) { 95 secCursors[i] = new Cursor(sortedCursors[i], true); 96 } 97 } catch (DatabaseException e) { 98 close(e); 99 } 100 } 101 102 106 public void close() 107 throws DatabaseException { 108 109 if (priCursor == null) { 110 throw new DatabaseException("Already closed"); 111 } 112 close(null); 113 } 114 115 120 private void close(DatabaseException firstException) 121 throws DatabaseException { 122 123 if (priCursor != null) { 124 try { 125 priCursor.close(); 126 } catch (DatabaseException e) { 127 if (firstException == null) { 128 firstException = e; 129 } 130 } 131 priCursor = null; 132 } 133 for (int i = 0; i < secCursors.length; i += 1) { 134 if (secCursors[i] != null) { 135 try { 136 secCursors[i].close(); 137 } catch (DatabaseException e) { 138 if (firstException == null) { 139 firstException = e; 140 } 141 } 142 secCursors[i] = null; 143 } 144 } 145 if (firstException != null) { 146 throw firstException; 147 } 148 } 149 150 153 Cursor[] getSortedCursors() { 154 return secCursors; 155 } 156 157 161 public Database getDatabase() { 162 163 return priDb; 164 } 165 166 170 public JoinConfig getConfig() { 171 172 return config.cloneConfig(); 173 } 174 175 179 public OperationStatus getNext(DatabaseEntry key, 180 LockMode lockMode) 181 throws DatabaseException { 182 183 priCursor.checkEnv(); 184 DatabaseUtil.checkForNullDbt(key, "key", false); 185 priCursor.trace(Level.FINEST, "JoinCursor.getNext(key): ", lockMode); 186 187 return retrieveNext(key, null, lockMode); 188 } 189 190 194 public OperationStatus getNext(DatabaseEntry key, 195 DatabaseEntry data, 196 LockMode lockMode) 197 throws DatabaseException { 198 199 priCursor.checkEnv(); 200 DatabaseUtil.checkForNullDbt(key, "key", false); 201 DatabaseUtil.checkForNullDbt(data, "data", false); 202 priCursor.trace(Level.FINEST, "JoinCursor.getNext(key,data): ", 203 lockMode); 204 205 return retrieveNext(key, data, lockMode); 206 } 207 208 227 private OperationStatus retrieveNext(DatabaseEntry keyParam, 228 DatabaseEntry dataParam, 229 LockMode lockMode) 230 throws DatabaseException { 231 232 outerLoop: while (true) { 233 234 235 Cursor secCursor = secCursors[0]; 236 DatabaseEntry candidateKey = cursorScratchEntries[0]; 237 OperationStatus status; 238 if (candidateKey == null) { 239 240 candidateKey = new DatabaseEntry(); 241 cursorScratchEntries[0] = candidateKey; 242 status = secCursor.getCurrentInternal(scratchEntry, 243 candidateKey, 244 lockMode); 245 } else { 246 247 status = secCursor.retrieveNext(scratchEntry, candidateKey, 248 lockMode, 249 GetMode.NEXT_DUP); 250 } 251 if (status != OperationStatus.SUCCESS) { 252 253 return status; 254 } 255 256 257 for (int i = 1; i < secCursors.length; i += 1) { 258 secCursor = secCursors[i]; 259 DatabaseEntry secKey = cursorScratchEntries[i]; 260 if (secKey == null) { 261 secKey = new DatabaseEntry(); 262 cursorScratchEntries[i] = secKey; 263 status = secCursor.getCurrentInternal(secKey, scratchEntry, 264 lockMode); 265 assert status == OperationStatus.SUCCESS; 266 } 267 scratchEntry.setData(secKey.getData(), secKey.getOffset(), 268 secKey.getSize()); 269 status = secCursor.search(scratchEntry, candidateKey, lockMode, 270 SearchMode.BOTH); 271 if (status != OperationStatus.SUCCESS) { 272 273 continue outerLoop; 274 } 275 } 276 277 278 if (dataParam != null) { 279 status = priCursor.search(candidateKey, dataParam, 280 lockMode, SearchMode.SET); 281 if (status != OperationStatus.SUCCESS) { 282 throw new DatabaseException("Secondary corrupt"); 283 } 284 } 285 keyParam.setData(candidateKey.getData(), candidateKey.getOffset(), 286 candidateKey.getSize()); 287 return OperationStatus.SUCCESS; 288 } 289 } 290 } 291 | Popular Tags |