szehon-ho commented on code in PR #9251: URL: https://github.com/apache/iceberg/pull/9251#discussion_r1424766968
########## core/src/main/java/org/apache/iceberg/DeleteFileIndex.java: ########## @@ -582,93 +513,187 @@ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestRea } } - // a group of indexed delete files sorted by the sequence number they apply to - private static class DeleteFileGroup { - private final long[] seqs; - private final IndexedDeleteFile[] files; - - DeleteFileGroup(IndexedDeleteFile[] files) { - this.seqs = Arrays.stream(files).mapToLong(IndexedDeleteFile::applySequenceNumber).toArray(); - this.files = files; + private static int findStartIndex(long[] seqs, long seq) { + int pos = Arrays.binarySearch(seqs, seq); + int start; + if (pos < 0) { + // the sequence number was not found, where it would be inserted is -(pos + 1) + start = -(pos + 1); + } else { + // the sequence number was found, but may not be the first + // find the first delete file with the given sequence number by decrementing the position + start = pos; + while (start > 0 && seqs[start - 1] >= seq) { + start -= 1; + } } - DeleteFileGroup(long[] seqs, IndexedDeleteFile[] files) { - this.seqs = seqs; - this.files = files; + return start; + } + + private static DeleteFile[] concat(DeleteFile[]... deletes) { + return ArrayUtil.concat(DeleteFile.class, deletes); + } + + // a group of position delete files sorted by the sequence number they apply to + private static class PositionDeletes { + private static final Comparator<DeleteFile> SEQ_COMPARATOR = + Comparator.comparingLong(DeleteFile::dataSequenceNumber); + + private long[] seqs = null; + private DeleteFile[] files = null; + private volatile List<DeleteFile> buffer = Lists.newArrayList(); + + public void add(DeleteFile file) { + buffer.add(file); } public DeleteFile[] filter(long seq) { - int start = findStartIndex(seq); + indexIfNeeded(); + + int start = findStartIndex(seqs, seq); if (start >= files.length) { return NO_DELETES; } - DeleteFile[] matchingFiles = new DeleteFile[files.length - start]; - - for (int index = start; index < files.length; index++) { - matchingFiles[index - start] = files[index].wrapped(); + if (start == 0) { + return files; } + int matchingFilesCount = files.length - start; + DeleteFile[] matchingFiles = new DeleteFile[matchingFilesCount]; + System.arraycopy(files, start, matchingFiles, 0, matchingFilesCount); return matchingFiles; } - public Stream<IndexedDeleteFile> limit(long seq) { - int start = findStartIndex(seq); - return Arrays.stream(files, start, files.length); + public Iterable<DeleteFile> referencedDeleteFiles() { + indexIfNeeded(); + return Arrays.asList(files); } - private int findStartIndex(long seq) { - int pos = Arrays.binarySearch(seqs, seq); - int start; - if (pos < 0) { - // the sequence number was not found, where it would be inserted is -(pos + 1) - start = -(pos + 1); - } else { - // the sequence number was found, but may not be the first - // find the first delete file with the given sequence number by decrementing the position - start = pos; - while (start > 0 && seqs[start - 1] >= seq) { - start -= 1; + public boolean isEmpty() { + indexIfNeeded(); + return files.length == 0; + } + + private void indexIfNeeded() { + if (buffer != null) { + synchronized (this) { + if (buffer != null) { + this.files = indexFiles(buffer); + this.seqs = indexSeqs(files); + this.buffer = null; + } + } + } + } + + private static DeleteFile[] indexFiles(List<DeleteFile> list) { + DeleteFile[] array = list.toArray(NO_DELETES); Review Comment: I personally feel its clearer to use DeleteFile::new, as it looks strange to use NO_DELETES ########## core/src/main/java/org/apache/iceberg/util/ArrayUtil.java: ########## @@ -320,4 +320,29 @@ public static boolean isStrictlyAscending(long[] array) { return true; } + + @SuppressWarnings("unchecked") + public static <T> T[] concat(Class<T> type, T[]... arrays) { Review Comment: for reference, whats the reason for not using library like apache commons? ########## core/src/main/java/org/apache/iceberg/DeleteFileIndex.java: ########## @@ -582,93 +513,187 @@ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestRea } } - // a group of indexed delete files sorted by the sequence number they apply to - private static class DeleteFileGroup { - private final long[] seqs; - private final IndexedDeleteFile[] files; - - DeleteFileGroup(IndexedDeleteFile[] files) { - this.seqs = Arrays.stream(files).mapToLong(IndexedDeleteFile::applySequenceNumber).toArray(); - this.files = files; + private static int findStartIndex(long[] seqs, long seq) { + int pos = Arrays.binarySearch(seqs, seq); + int start; + if (pos < 0) { + // the sequence number was not found, where it would be inserted is -(pos + 1) + start = -(pos + 1); + } else { + // the sequence number was found, but may not be the first + // find the first delete file with the given sequence number by decrementing the position + start = pos; + while (start > 0 && seqs[start - 1] >= seq) { + start -= 1; + } } - DeleteFileGroup(long[] seqs, IndexedDeleteFile[] files) { - this.seqs = seqs; - this.files = files; + return start; + } + + private static DeleteFile[] concat(DeleteFile[]... deletes) { + return ArrayUtil.concat(DeleteFile.class, deletes); + } + + // a group of position delete files sorted by the sequence number they apply to + private static class PositionDeletes { + private static final Comparator<DeleteFile> SEQ_COMPARATOR = + Comparator.comparingLong(DeleteFile::dataSequenceNumber); + + private long[] seqs = null; + private DeleteFile[] files = null; + private volatile List<DeleteFile> buffer = Lists.newArrayList(); Review Comment: This should be init'ed to null, to allow double check locking to work, right? ########## core/src/main/java/org/apache/iceberg/DeleteFileIndex.java: ########## @@ -474,68 +421,52 @@ private Collection<DeleteFile> loadDeleteFiles() { DeleteFileIndex build() { Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles(); - boolean useColumnStatsFiltering = false; + EqualityDeletes globalDeletes = new EqualityDeletes(); + PartitionMap<EqualityDeletes> eqDeletesByPartition = PartitionMap.create(specsById); + PartitionMap<PositionDeletes> posDeletesByPartition = PartitionMap.create(specsById); + CharSequenceMap<PositionDeletes> posDeletesByPath = CharSequenceMap.create(); Review Comment: I guess its possible we'll hit memory issues with using path map, example if we have a million data files, can be ~100 mbs, if i calculate correctly?. (We use DeleteFileIndex centrally if i understand correctly). I wonder if we considered doing some common-prefix encoding, or other encoding, of paths anywhere. Not sure if its worth it , given the typical memory size of planner ########## core/src/main/java/org/apache/iceberg/DeleteFileIndex.java: ########## @@ -582,93 +513,187 @@ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestRea } } - // a group of indexed delete files sorted by the sequence number they apply to - private static class DeleteFileGroup { - private final long[] seqs; - private final IndexedDeleteFile[] files; - - DeleteFileGroup(IndexedDeleteFile[] files) { - this.seqs = Arrays.stream(files).mapToLong(IndexedDeleteFile::applySequenceNumber).toArray(); - this.files = files; + private static int findStartIndex(long[] seqs, long seq) { + int pos = Arrays.binarySearch(seqs, seq); + int start; + if (pos < 0) { + // the sequence number was not found, where it would be inserted is -(pos + 1) + start = -(pos + 1); + } else { + // the sequence number was found, but may not be the first + // find the first delete file with the given sequence number by decrementing the position + start = pos; + while (start > 0 && seqs[start - 1] >= seq) { + start -= 1; + } } - DeleteFileGroup(long[] seqs, IndexedDeleteFile[] files) { - this.seqs = seqs; - this.files = files; + return start; + } + + private static DeleteFile[] concat(DeleteFile[]... deletes) { + return ArrayUtil.concat(DeleteFile.class, deletes); + } + + // a group of position delete files sorted by the sequence number they apply to + private static class PositionDeletes { + private static final Comparator<DeleteFile> SEQ_COMPARATOR = + Comparator.comparingLong(DeleteFile::dataSequenceNumber); + + private long[] seqs = null; + private DeleteFile[] files = null; + private volatile List<DeleteFile> buffer = Lists.newArrayList(); + + public void add(DeleteFile file) { + buffer.add(file); } public DeleteFile[] filter(long seq) { - int start = findStartIndex(seq); + indexIfNeeded(); + + int start = findStartIndex(seqs, seq); Review Comment: Took a bit of time to figure it out, can we comment on return value of findStartIndex (either here, or that method's javadoc) to clarify this? I guess its from before, but now with new code we explicitly return nothing if start is 0, as well as beyond files.length, which seems counter-intuitive. ########## core/src/main/java/org/apache/iceberg/DeleteFileIndex.java: ########## @@ -474,68 +421,52 @@ private Collection<DeleteFile> loadDeleteFiles() { DeleteFileIndex build() { Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles(); - boolean useColumnStatsFiltering = false; + EqualityDeletes globalDeletes = new EqualityDeletes(); + PartitionMap<EqualityDeletes> eqDeletesByPartition = PartitionMap.create(specsById); + PartitionMap<PositionDeletes> posDeletesByPartition = PartitionMap.create(specsById); + CharSequenceMap<PositionDeletes> posDeletesByPath = CharSequenceMap.create(); - // build a map from (specId, partition) to delete file entries - Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap(); - ListMultimap<Pair<Integer, StructLikeWrapper>, IndexedDeleteFile> deleteFilesByPartition = - Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); for (DeleteFile file : files) { - int specId = file.specId(); - PartitionSpec spec = specsById.get(specId); - StructLikeWrapper wrapper = - wrappersBySpecId - .computeIfAbsent(specId, id -> StructLikeWrapper.forType(spec.partitionType())) - .copyFor(file.partition()); - IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file); - deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile); - - if (!useColumnStatsFiltering) { - useColumnStatsFiltering = indexedFile.hasLowerAndUpperBounds(); + CharSequence path = ContentFileUtil.referencedDataFile(file); + if (path != null) { + PositionDeletes deletes = posDeletesByPath.computeIfAbsent(path, PositionDeletes::new); + deletes.add(file); + } else { + int specId = file.specId(); + PartitionSpec spec = specsById.get(specId); + StructLike partition = file.partition(); + if (file.content() == FileContent.POSITION_DELETES) { + PositionDeletes deletes = fetchPosDeletes(posDeletesByPartition, specId, partition); Review Comment: fetch sounds like it will load from disk, I think removing the helper method in favor of the underlying code is clearer, or renaming the method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org