szehon-ho commented on code in PR #8755:
URL: https://github.com/apache/iceberg/pull/8755#discussion_r1433431468


##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
     }
   }
 
+  /**
+   * Estimates the number of bytes a value for a given field may occupy in 
memory.
+   *
+   * <p>This method approximates the memory size based on the internal Java 
representation defined
+   * by {@link Type.TypeID}. It is important to note that the actual size 
might differ from this
+   * estimation. The method is designed to handle a variety of data types, 
including primitive
+   * types, strings, and nested types such as structs, maps, and lists.
+   *
+   * @param field a field for which to estimate the size
+   * @return the estimated size in bytes of the field's value in memory
+   */
+  public static long defaultSize(Types.NestedField field) {
+    return defaultSize(field.type());
+  }
+
+  private static long defaultSize(Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        // the size of a boolean variable is virtual machine dependent
+        // it is common to believe booleans occupy 1 byte in most JVMs
+        return 1;
+      case INTEGER:
+      case FLOAT:
+      case DATE:
+        // ints and floats occupy 4 bytes
+        // dates are internally represented as ints
+        return 4;
+      case LONG:
+      case DOUBLE:
+      case TIME:
+      case TIMESTAMP:
+        // longs and doubles occupy 8 bytes
+        // times and timestamps are internally represented as longs
+        return 8;
+      case STRING:
+        // 12 (header) + 12 (fields) + 16 (array overhead) + 20 (10 chars, 2 
bytes each) = 60 bytes
+        return 60;

Review Comment:
   Are we just picking 10 chars as a rule of thumb?



##########
core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+class EmptyPositionDeleteIndex implements PositionDeleteIndex {
+
+  private static final EmptyPositionDeleteIndex INSTANCE = new 
EmptyPositionDeleteIndex();
+
+  private EmptyPositionDeleteIndex() {}
+
+  static EmptyPositionDeleteIndex get() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void delete(long position) {
+    throw new UnsupportedOperationException("Cannot modify " + 
getClass().getName());
+  }
+
+  @Override
+  public void delete(long posStart, long posEnd) {
+    throw new UnsupportedOperationException("Cannot modify " + 
getClass().getName());
+  }
+
+  @Override
+  public boolean isDeleted(long position) {
+    return false;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "PositionDeleteIndex{}";

Review Comment:
   Should it be EmptyPositionDeleteIndex?



##########
data/src/main/java/org/apache/iceberg/data/DeleteFilter.java:
##########
@@ -224,14 +223,10 @@ public Predicate<T> eqDeletedRowFilter() {
   }
 
   public PositionDeleteIndex deletedRowPositions() {
-    if (posDeletes.isEmpty()) {
-      return null;
+    if (deleteRowPositions == null && !posDeletes.isEmpty()) {

Review Comment:
   Do we need to make this deleteRowPositions use double check locking too, as 
we are doing it for posDeletes?



##########
data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeleteLoader implements DeleteLoader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseDeleteLoader.class);
+  private static final Schema POS_DELETE_SCHEMA = 
DeleteSchemaUtil.pathPosSchema();
+
+  private final Function<DeleteFile, InputFile> loadInputFile;
+  private final ExecutorService workerPool;
+
+  public BaseDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+    this(loadInputFile, ThreadPools.getDeleteWorkerPool());
+  }
+
+  public BaseDeleteLoader(
+      Function<DeleteFile, InputFile> loadInputFile, ExecutorService 
workerPool) {
+    this.loadInputFile = loadInputFile;
+    this.workerPool = workerPool;
+  }
+
+  /**
+   * Checks if the given number of bytes can be cached.
+   *
+   * <p>Implementations should override this method if they support caching. 
It is also recommended
+   * to use the provided size as a guideline to decide whether the value is 
eligible for caching.
+   * For instance, it may be beneficial to discard values that are too large 
to optimize the cache
+   * performance and utilization.
+   */
+  protected boolean canCache(long size) {
+    return false;
+  }
+
+  /**
+   * Gets the cached value for the key or populates the cache with a new 
mapping.
+   *
+   * <p>If the value for the specified key is in the cache, it should be 
returned. If the value is
+   * not in the cache, implementations should compute the value using the 
provided supplier, cache
+   * it, and then return it.
+   *
+   * <p>This method will be called only if {@link #canCache(long)} returned 
true.
+   */
+  protected <V> V get(String key, Supplier<V> valueSupplier, long valueSize) {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
support caching");
+  }
+
+  @Override
+  public StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, 
Schema projection) {
+    Iterable<Iterable<StructLike>> deletes =
+        execute(deleteFiles, deleteFile -> getOrLoadEqDeletes(deleteFile, 
projection));
+    StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct());
+    Iterables.addAll(deleteSet, Iterables.concat(deletes));
+    return deleteSet;
+  }
+
+  private Iterable<StructLike> getOrLoadEqDeletes(DeleteFile deleteFile, 
Schema projection) {
+    long estimatedSize = estimateEqDeletesSize(deleteFile, projection);
+    if (canCache(estimatedSize)) {
+      String cacheKey = deleteFile.path().toString();
+      return get(cacheKey, () -> loadEqDeletes(deleteFile, projection), 
estimatedSize);
+    } else {
+      return loadEqDeletes(deleteFile, projection);
+    }
+  }
+
+  private Iterable<StructLike> loadEqDeletes(DeleteFile deleteFile, Schema 
projection) {
+    CloseableIterable<Record> deletes = openDeletes(deleteFile, projection);
+    CloseableIterable<Record> copiedDeletes = 
CloseableIterable.transform(deletes, Record::copy);
+    CloseableIterable<StructLike> copiedDeletesAsStructs = 
toStructs(copiedDeletes, projection);
+    return materialize(copiedDeletesAsStructs);
+  }
+
+  private CloseableIterable<StructLike> toStructs(
+      CloseableIterable<Record> records, Schema schema) {
+    InternalRecordWrapper wrapper = new 
InternalRecordWrapper(schema.asStruct());
+    return CloseableIterable.transform(records, wrapper::copyFor);
+  }
+
+  private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {
+    try (CloseableIterable<T> closeableIterable = iterable) {
+      return ImmutableList.copyOf(closeableIterable);
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close iterable", e);
+    }
+  }
+
+  @Override
+  public PositionDeleteIndex loadPositionDeletes(
+      Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
+    Iterable<PositionDeleteIndex> deletes =
+        execute(deleteFiles, deleteFile -> getOrLoadPosDeletes(deleteFile, 
filePath));
+    return PositionDeleteIndexUtil.merge(deletes);
+  }
+
+  private PositionDeleteIndex getOrLoadPosDeletes(DeleteFile deleteFile, 
CharSequence filePath) {
+    long estimatedSize = estimatePosDeletesSize(deleteFile);
+    if (canCache(estimatedSize)) {
+      String cacheKey = deleteFile.path().toString();
+      CharSequenceMap<PositionDeleteIndex> indexes =
+          get(cacheKey, () -> loadPosDeletes(deleteFile), estimatedSize);
+      return indexes.getOrDefault(filePath, PositionDeleteIndex.empty());
+    } else {
+      return loadPosDeletes(deleteFile, filePath);
+    }
+  }
+
+  private CharSequenceMap<PositionDeleteIndex> loadPosDeletes(DeleteFile 
deleteFile) {

Review Comment:
   Same comment, loadPosDeletesInternal?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for optimizing tasks by reducing the computation and IO 
overhead.
+ *
+ * <p>The cache is configurable and enabled through Spark SQL properties. Its 
key features include
+ * setting limits on the total cache size and maximum size for individual 
entries. Additionally, it
+ * implements automatic eviction of entries after a specified duration of 
inactivity. The cache will
+ * respect the SQL configuration valid at the time of initialization. All 
subsequent changes will
+ * have no effect.
+ *
+ * <p>Usage pattern involves fetching data from the cache using a unique 
combination of execution ID
+ * and key. If the data is not present in the cache, it is computed using the 
provided supplier and
+ * stored in the cache, subject to the defined size constraints.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one 
cache exists per JVM.
+ *
+ * @see SparkUtil#executionId()
+ */
+public class SparkExecutorCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+  private static final SparkConfParser CONF_PARSER = new SparkConfParser();
+  private static final boolean CACHE_ENABLED = parseCacheEnabledConf();
+  private static final Duration TIMEOUT = parseTimeoutConf();
+  private static final long MAX_ENTRY_SIZE = parseMaxEntrySizeConf();
+  private static final long MAX_TOTAL_SIZE = parseMaxTotalSizeConf();
+  private static final String EXECUTOR_DESC = SparkUtil.executorDesc();
+
+  private static volatile SparkExecutorCache instance = null;
+
+  private final Map<String, Collection<String>> keysByExecutionId;
+  private volatile Cache<String, CacheValue> cache = null;
+
+  private SparkExecutorCache() {
+    this.keysByExecutionId = Collections.synchronizedMap(Maps.newHashMap());
+  }
+
+  public static SparkExecutorCache getOrCreate() {
+    if (instance == null && CACHE_ENABLED) {
+      synchronized (SparkExecutorCache.class) {
+        if (instance == null) {
+          SparkExecutorCache.instance = new SparkExecutorCache();
+        }
+      }
+    }
+
+    return instance;
+  }
+
+  public static void cleanUp(String executionId) {
+    if (instance != null) {
+      instance.invalidate(executionId);
+    }
+  }
+
+  public long maxEntrySize() {
+    return MAX_ENTRY_SIZE;
+  }
+
+  public <V> V get(String executionId, String key, Supplier<V> valueSupplier, 
long valueSize) {
+    if (valueSize > MAX_ENTRY_SIZE) {
+      return valueSupplier.get();
+    }
+
+    storeMapping(executionId, key);
+
+    CacheValue value = cache().get(key, ignored -> new 
CacheValue(valueSupplier, valueSize));
+    Preconditions.checkNotNull(value, "Loaded value must not be null");
+    return value.get();
+  }
+
+  private void storeMapping(String executionId, String key) {
+    Collection<String> keys =

Review Comment:
   optional:  is there any need to define variable keys, can we just inline the 
whole statement:
   
   keysByExecutionId
      .computeIfAbsent(executionId, id -> Queues.newConcurrentLinkedQueue())
      .add(key)



##########
data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeleteLoader implements DeleteLoader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseDeleteLoader.class);
+  private static final Schema POS_DELETE_SCHEMA = 
DeleteSchemaUtil.pathPosSchema();
+
+  private final Function<DeleteFile, InputFile> loadInputFile;
+  private final ExecutorService workerPool;
+
+  public BaseDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+    this(loadInputFile, ThreadPools.getDeleteWorkerPool());
+  }
+
+  public BaseDeleteLoader(
+      Function<DeleteFile, InputFile> loadInputFile, ExecutorService 
workerPool) {
+    this.loadInputFile = loadInputFile;
+    this.workerPool = workerPool;
+  }
+
+  /**
+   * Checks if the given number of bytes can be cached.
+   *
+   * <p>Implementations should override this method if they support caching. 
It is also recommended
+   * to use the provided size as a guideline to decide whether the value is 
eligible for caching.
+   * For instance, it may be beneficial to discard values that are too large 
to optimize the cache
+   * performance and utilization.
+   */
+  protected boolean canCache(long size) {
+    return false;
+  }
+
+  /**
+   * Gets the cached value for the key or populates the cache with a new 
mapping.
+   *
+   * <p>If the value for the specified key is in the cache, it should be 
returned. If the value is
+   * not in the cache, implementations should compute the value using the 
provided supplier, cache
+   * it, and then return it.
+   *
+   * <p>This method will be called only if {@link #canCache(long)} returned 
true.
+   */
+  protected <V> V get(String key, Supplier<V> valueSupplier, long valueSize) {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
support caching");
+  }
+
+  @Override
+  public StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, 
Schema projection) {
+    Iterable<Iterable<StructLike>> deletes =
+        execute(deleteFiles, deleteFile -> getOrLoadEqDeletes(deleteFile, 
projection));
+    StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct());
+    Iterables.addAll(deleteSet, Iterables.concat(deletes));
+    return deleteSet;
+  }
+
+  private Iterable<StructLike> getOrLoadEqDeletes(DeleteFile deleteFile, 
Schema projection) {
+    long estimatedSize = estimateEqDeletesSize(deleteFile, projection);
+    if (canCache(estimatedSize)) {
+      String cacheKey = deleteFile.path().toString();
+      return get(cacheKey, () -> loadEqDeletes(deleteFile, projection), 
estimatedSize);
+    } else {
+      return loadEqDeletes(deleteFile, projection);
+    }
+  }
+
+  private Iterable<StructLike> loadEqDeletes(DeleteFile deleteFile, Schema 
projection) {

Review Comment:
   Nit: i think the two methods (loadEqualityDeletes and loadEqDeletes) are too 
similar, can we differentiate somehow?  (loadEqDeleteInternal?)



##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
     }
   }
 
+  /**
+   * Estimates the number of bytes a value for a given field may occupy in 
memory.
+   *
+   * <p>This method approximates the memory size based on the internal Java 
representation defined
+   * by {@link Type.TypeID}. It is important to note that the actual size 
might differ from this
+   * estimation. The method is designed to handle a variety of data types, 
including primitive
+   * types, strings, and nested types such as structs, maps, and lists.
+   *
+   * @param field a field for which to estimate the size
+   * @return the estimated size in bytes of the field's value in memory
+   */
+  public static long defaultSize(Types.NestedField field) {

Review Comment:
   Why defaultSize (what's the signficance of default).  Not something like 
estimateSize?



##########
data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeleteLoader implements DeleteLoader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseDeleteLoader.class);
+  private static final Schema POS_DELETE_SCHEMA = 
DeleteSchemaUtil.pathPosSchema();
+
+  private final Function<DeleteFile, InputFile> loadInputFile;
+  private final ExecutorService workerPool;
+
+  public BaseDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+    this(loadInputFile, ThreadPools.getDeleteWorkerPool());
+  }
+
+  public BaseDeleteLoader(
+      Function<DeleteFile, InputFile> loadInputFile, ExecutorService 
workerPool) {
+    this.loadInputFile = loadInputFile;
+    this.workerPool = workerPool;
+  }
+
+  /**
+   * Checks if the given number of bytes can be cached.
+   *
+   * <p>Implementations should override this method if they support caching. 
It is also recommended
+   * to use the provided size as a guideline to decide whether the value is 
eligible for caching.
+   * For instance, it may be beneficial to discard values that are too large 
to optimize the cache
+   * performance and utilization.
+   */
+  protected boolean canCache(long size) {
+    return false;
+  }
+
+  /**
+   * Gets the cached value for the key or populates the cache with a new 
mapping.
+   *
+   * <p>If the value for the specified key is in the cache, it should be 
returned. If the value is
+   * not in the cache, implementations should compute the value using the 
provided supplier, cache
+   * it, and then return it.
+   *
+   * <p>This method will be called only if {@link #canCache(long)} returned 
true.
+   */
+  protected <V> V get(String key, Supplier<V> valueSupplier, long valueSize) {

Review Comment:
   get() is a bit confusing, can we call it getOrLoad (like on the calling 
method)?



##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
     }
   }
 
+  /**
+   * Estimates the number of bytes a value for a given field may occupy in 
memory.
+   *
+   * <p>This method approximates the memory size based on the internal Java 
representation defined
+   * by {@link Type.TypeID}. It is important to note that the actual size 
might differ from this
+   * estimation. The method is designed to handle a variety of data types, 
including primitive
+   * types, strings, and nested types such as structs, maps, and lists.
+   *
+   * @param field a field for which to estimate the size
+   * @return the estimated size in bytes of the field's value in memory
+   */
+  public static long defaultSize(Types.NestedField field) {
+    return defaultSize(field.type());
+  }
+
+  private static long defaultSize(Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        // the size of a boolean variable is virtual machine dependent
+        // it is common to believe booleans occupy 1 byte in most JVMs
+        return 1;
+      case INTEGER:
+      case FLOAT:
+      case DATE:
+        // ints and floats occupy 4 bytes
+        // dates are internally represented as ints
+        return 4;
+      case LONG:
+      case DOUBLE:
+      case TIME:
+      case TIMESTAMP:
+        // longs and doubles occupy 8 bytes
+        // times and timestamps are internally represented as longs
+        return 8;
+      case STRING:
+        // 12 (header) + 12 (fields) + 16 (array overhead) + 20 (10 chars, 2 
bytes each) = 60 bytes
+        return 60;

Review Comment:
   Was also reading on internet,  in Java8 it is less overhead?



##########
data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeleteLoader implements DeleteLoader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseDeleteLoader.class);
+  private static final Schema POS_DELETE_SCHEMA = 
DeleteSchemaUtil.pathPosSchema();
+
+  private final Function<DeleteFile, InputFile> loadInputFile;
+  private final ExecutorService workerPool;
+
+  public BaseDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+    this(loadInputFile, ThreadPools.getDeleteWorkerPool());
+  }
+
+  public BaseDeleteLoader(
+      Function<DeleteFile, InputFile> loadInputFile, ExecutorService 
workerPool) {
+    this.loadInputFile = loadInputFile;
+    this.workerPool = workerPool;
+  }
+
+  /**
+   * Checks if the given number of bytes can be cached.
+   *
+   * <p>Implementations should override this method if they support caching. 
It is also recommended
+   * to use the provided size as a guideline to decide whether the value is 
eligible for caching.
+   * For instance, it may be beneficial to discard values that are too large 
to optimize the cache
+   * performance and utilization.
+   */
+  protected boolean canCache(long size) {
+    return false;
+  }
+
+  /**
+   * Gets the cached value for the key or populates the cache with a new 
mapping.
+   *
+   * <p>If the value for the specified key is in the cache, it should be 
returned. If the value is
+   * not in the cache, implementations should compute the value using the 
provided supplier, cache
+   * it, and then return it.
+   *
+   * <p>This method will be called only if {@link #canCache(long)} returned 
true.
+   */
+  protected <V> V get(String key, Supplier<V> valueSupplier, long valueSize) {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
support caching");
+  }
+
+  @Override
+  public StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, 
Schema projection) {
+    Iterable<Iterable<StructLike>> deletes =
+        execute(deleteFiles, deleteFile -> getOrLoadEqDeletes(deleteFile, 
projection));
+    StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct());
+    Iterables.addAll(deleteSet, Iterables.concat(deletes));
+    return deleteSet;
+  }
+
+  private Iterable<StructLike> getOrLoadEqDeletes(DeleteFile deleteFile, 
Schema projection) {
+    long estimatedSize = estimateEqDeletesSize(deleteFile, projection);
+    if (canCache(estimatedSize)) {
+      String cacheKey = deleteFile.path().toString();
+      return get(cacheKey, () -> loadEqDeletes(deleteFile, projection), 
estimatedSize);
+    } else {
+      return loadEqDeletes(deleteFile, projection);
+    }
+  }
+
+  private Iterable<StructLike> loadEqDeletes(DeleteFile deleteFile, Schema 
projection) {
+    CloseableIterable<Record> deletes = openDeletes(deleteFile, projection);
+    CloseableIterable<Record> copiedDeletes = 
CloseableIterable.transform(deletes, Record::copy);
+    CloseableIterable<StructLike> copiedDeletesAsStructs = 
toStructs(copiedDeletes, projection);
+    return materialize(copiedDeletesAsStructs);
+  }
+
+  private CloseableIterable<StructLike> toStructs(
+      CloseableIterable<Record> records, Schema schema) {
+    InternalRecordWrapper wrapper = new 
InternalRecordWrapper(schema.asStruct());
+    return CloseableIterable.transform(records, wrapper::copyFor);
+  }
+
+  private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {

Review Comment:
   Can we just call the method copy().  The name is a bit obscure now, so I had 
to read it to see what it does.
   
   Also for my knowledge, what is the need for this? 



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for optimizing tasks by reducing the computation and IO 
overhead.
+ *
+ * <p>The cache is configurable and enabled through Spark SQL properties. Its 
key features include
+ * setting limits on the total cache size and maximum size for individual 
entries. Additionally, it
+ * implements automatic eviction of entries after a specified duration of 
inactivity. The cache will
+ * respect the SQL configuration valid at the time of initialization. All 
subsequent changes will
+ * have no effect.
+ *
+ * <p>Usage pattern involves fetching data from the cache using a unique 
combination of execution ID
+ * and key. If the data is not present in the cache, it is computed using the 
provided supplier and
+ * stored in the cache, subject to the defined size constraints.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one 
cache exists per JVM.
+ *
+ * @see SparkUtil#executionId()
+ */
+public class SparkExecutorCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+  private static final SparkConfParser CONF_PARSER = new SparkConfParser();
+  private static final boolean CACHE_ENABLED = parseCacheEnabledConf();
+  private static final Duration TIMEOUT = parseTimeoutConf();
+  private static final long MAX_ENTRY_SIZE = parseMaxEntrySizeConf();
+  private static final long MAX_TOTAL_SIZE = parseMaxTotalSizeConf();
+  private static final String EXECUTOR_DESC = SparkUtil.executorDesc();
+
+  private static volatile SparkExecutorCache instance = null;
+
+  private final Map<String, Collection<String>> keysByExecutionId;
+  private volatile Cache<String, CacheValue> cache = null;
+
+  private SparkExecutorCache() {
+    this.keysByExecutionId = Collections.synchronizedMap(Maps.newHashMap());
+  }
+
+  public static SparkExecutorCache getOrCreate() {
+    if (instance == null && CACHE_ENABLED) {
+      synchronized (SparkExecutorCache.class) {
+        if (instance == null) {
+          SparkExecutorCache.instance = new SparkExecutorCache();
+        }
+      }
+    }
+
+    return instance;
+  }
+
+  public static void cleanUp(String executionId) {
+    if (instance != null) {
+      instance.invalidate(executionId);
+    }
+  }
+
+  public long maxEntrySize() {
+    return MAX_ENTRY_SIZE;
+  }
+
+  public <V> V get(String executionId, String key, Supplier<V> valueSupplier, 
long valueSize) {
+    if (valueSize > MAX_ENTRY_SIZE) {
+      return valueSupplier.get();
+    }
+
+    storeMapping(executionId, key);

Review Comment:
   Sorry im confused here (will need to take another look with fresh eyes).  
How does this guarantee uniqueness of value for given executionId and key?  (we 
store key with executionId, but then store the key in cache() anyway, 
regardless of whether key was used in another executionId?)
   
   Why not just use executionId and key as cache key, or have a map of maps?



##########
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java:
##########
@@ -44,4 +44,14 @@ public interface PositionDeleteIndex {
 
   /** Returns true if this collection contains no element. */
   boolean isEmpty();
+
+  /** Returns true if this collection contains elements. */
+  default boolean isNotEmpty() {

Review Comment:
   is this really needed?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for optimizing tasks by reducing the computation and IO 
overhead.
+ *
+ * <p>The cache is configurable and enabled through Spark SQL properties. Its 
key features include
+ * setting limits on the total cache size and maximum size for individual 
entries. Additionally, it
+ * implements automatic eviction of entries after a specified duration of 
inactivity. The cache will
+ * respect the SQL configuration valid at the time of initialization. All 
subsequent changes will
+ * have no effect.
+ *
+ * <p>Usage pattern involves fetching data from the cache using a unique 
combination of execution ID
+ * and key. If the data is not present in the cache, it is computed using the 
provided supplier and
+ * stored in the cache, subject to the defined size constraints.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one 
cache exists per JVM.
+ *
+ * @see SparkUtil#executionId()
+ */
+public class SparkExecutorCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+  private static final SparkConfParser CONF_PARSER = new SparkConfParser();
+  private static final boolean CACHE_ENABLED = parseCacheEnabledConf();
+  private static final Duration TIMEOUT = parseTimeoutConf();
+  private static final long MAX_ENTRY_SIZE = parseMaxEntrySizeConf();
+  private static final long MAX_TOTAL_SIZE = parseMaxTotalSizeConf();
+  private static final String EXECUTOR_DESC = SparkUtil.executorDesc();
+
+  private static volatile SparkExecutorCache instance = null;
+
+  private final Map<String, Collection<String>> keysByExecutionId;
+  private volatile Cache<String, CacheValue> cache = null;
+
+  private SparkExecutorCache() {
+    this.keysByExecutionId = Collections.synchronizedMap(Maps.newHashMap());
+  }
+
+  public static SparkExecutorCache getOrCreate() {
+    if (instance == null && CACHE_ENABLED) {
+      synchronized (SparkExecutorCache.class) {
+        if (instance == null) {
+          SparkExecutorCache.instance = new SparkExecutorCache();
+        }
+      }
+    }
+
+    return instance;
+  }
+
+  public static void cleanUp(String executionId) {
+    if (instance != null) {
+      instance.invalidate(executionId);
+    }
+  }
+
+  public long maxEntrySize() {
+    return MAX_ENTRY_SIZE;
+  }
+
+  public <V> V get(String executionId, String key, Supplier<V> valueSupplier, 
long valueSize) {

Review Comment:
   can we add javadocs to this class methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to