aokolnychyi commented on code in PR #8755: URL: https://github.com/apache/iceberg/pull/8755#discussion_r1440336894
########## data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.deletes.Deletes; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.deletes.PositionDeleteIndexUtil; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.orc.OrcRowReader; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.CharSequenceMap; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.Tasks; +import org.apache.iceberg.util.ThreadPools; +import org.apache.orc.TypeDescription; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseDeleteLoader implements DeleteLoader { + + private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteLoader.class); + private static final Schema POS_DELETE_SCHEMA = DeleteSchemaUtil.pathPosSchema(); + + private final Function<DeleteFile, InputFile> loadInputFile; + private final ExecutorService workerPool; + + public BaseDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) { + this(loadInputFile, ThreadPools.getDeleteWorkerPool()); + } + + public BaseDeleteLoader( + Function<DeleteFile, InputFile> loadInputFile, ExecutorService workerPool) { + this.loadInputFile = loadInputFile; + this.workerPool = workerPool; + } + + /** + * Checks if the given number of bytes can be cached. + * + * <p>Implementations should override this method if they support caching. It is also recommended + * to use the provided size as a guideline to decide whether the value is eligible for caching. + * For instance, it may be beneficial to discard values that are too large to optimize the cache + * performance and utilization. + */ + protected boolean canCache(long size) { + return false; + } + + /** + * Gets the cached value for the key or populates the cache with a new mapping. + * + * <p>If the value for the specified key is in the cache, it should be returned. If the value is + * not in the cache, implementations should compute the value using the provided supplier, cache + * it, and then return it. + * + * <p>This method will be called only if {@link #canCache(long)} returned true. + */ + protected <V> V get(String key, Supplier<V> valueSupplier, long valueSize) { + throw new UnsupportedOperationException(getClass().getName() + " does not support caching"); + } + + @Override + public StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, Schema projection) { + Iterable<Iterable<StructLike>> deletes = + execute(deleteFiles, deleteFile -> getOrLoadEqDeletes(deleteFile, projection)); + StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct()); + Iterables.addAll(deleteSet, Iterables.concat(deletes)); + return deleteSet; + } + + private Iterable<StructLike> getOrLoadEqDeletes(DeleteFile deleteFile, Schema projection) { + long estimatedSize = estimateEqDeletesSize(deleteFile, projection); + if (canCache(estimatedSize)) { + String cacheKey = deleteFile.path().toString(); + return get(cacheKey, () -> loadEqDeletes(deleteFile, projection), estimatedSize); + } else { + return loadEqDeletes(deleteFile, projection); + } + } + + private Iterable<StructLike> loadEqDeletes(DeleteFile deleteFile, Schema projection) { + CloseableIterable<Record> deletes = openDeletes(deleteFile, projection); + CloseableIterable<Record> copiedDeletes = CloseableIterable.transform(deletes, Record::copy); + CloseableIterable<StructLike> copiedDeletesAsStructs = toStructs(copiedDeletes, projection); + return materialize(copiedDeletesAsStructs); + } + + private CloseableIterable<StructLike> toStructs( + CloseableIterable<Record> records, Schema schema) { + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + return CloseableIterable.transform(records, wrapper::copyFor); + } + + private <T> Iterable<T> materialize(CloseableIterable<T> iterable) { Review Comment: This essentially materializes the iterable and closes the resources so that the actual value can be cached. We cannot cache the iterable directly cause it is lazy. Let me think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org