szehon-ho commented on code in PR #8755:
URL: https://github.com/apache/iceberg/pull/8755#discussion_r1440382646


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -70,4 +72,18 @@ private SparkSQLProperties() {}
 
   // Controls whether to report locality information to Spark while allocating 
input partitions
   public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
+
+  public static final String EXECUTOR_CACHE_ENABLED = 
"spark.sql.iceberg.executor-cache.enabled";
+  public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true;
+
+  public static final String EXECUTOR_CACHE_TIMEOUT = 
"spark.sql.iceberg.executor-cache.timeout";
+  public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT = 
Duration.ofMinutes(10);
+
+  public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE =
+      "spark.sql.iceberg.executor-cache.max-entry-size";
+  public static final long EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64 * 1024 * 
1024; // 64 MB
+
+  public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE =

Review Comment:
   We should document these confs



##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
     }
   }
 
+  /**
+   * Estimates the number of bytes a value for a given field may occupy in 
memory.
+   *
+   * <p>This method approximates the memory size based on the internal Java 
representation defined
+   * by {@link Type.TypeID}. It is important to note that the actual size 
might differ from this
+   * estimation. The method is designed to handle a variety of data types, 
including primitive
+   * types, strings, and nested types such as structs, maps, and lists.
+   *
+   * @param field a field for which to estimate the size
+   * @return the estimated size in bytes of the field's value in memory
+   */
+  public static long defaultSize(Types.NestedField field) {
+    return defaultSize(field.type());
+  }
+
+  private static long defaultSize(Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        // the size of a boolean variable is virtual machine dependent
+        // it is common to believe booleans occupy 1 byte in most JVMs
+        return 1;
+      case INTEGER:
+      case FLOAT:
+      case DATE:
+        // ints and floats occupy 4 bytes
+        // dates are internally represented as ints
+        return 4;
+      case LONG:
+      case DOUBLE:
+      case TIME:
+      case TIMESTAMP:
+        // longs and doubles occupy 8 bytes
+        // times and timestamps are internally represented as longs
+        return 8;
+      case STRING:
+        // 12 (header) + 12 (fields) + 16 (array overhead) + 20 (10 chars, 2 
bytes each) = 60 bytes
+        return 60;
+      case UUID:
+        // 12 (header) + 16 (two long variables) = 28 bytes
+        return 28;

Review Comment:
   should we express these as OBJECT_HEADER



##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -125,6 +126,25 @@ public static StructLikeSet toEqualitySet(
     }
   }
 
+  public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> 
toPositionIndexes(

Review Comment:
   This seems useful as a javadoc



##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -452,6 +454,68 @@ private static void checkSchemaCompatibility(
     }
   }
 
+  /**
+   * Estimates the number of bytes a value for a given field may occupy in 
memory.
+   *
+   * <p>This method approximates the memory size based on the internal Java 
representation defined
+   * by {@link Type.TypeID}. It is important to note that the actual size 
might differ from this
+   * estimation. The method is designed to handle a variety of data types, 
including primitive
+   * types, strings, and nested types such as structs, maps, and lists.
+   *
+   * @param field a field for which to estimate the size
+   * @return the estimated size in bytes of the field's value in memory
+   */
+  public static long defaultSize(Types.NestedField field) {
+    return defaultSize(field.type());
+  }
+
+  private static long defaultSize(Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        // the size of a boolean variable is virtual machine dependent
+        // it is common to believe booleans occupy 1 byte in most JVMs
+        return 1;
+      case INTEGER:
+      case FLOAT:
+      case DATE:
+        // ints and floats occupy 4 bytes
+        // dates are internally represented as ints
+        return 4;
+      case LONG:
+      case DOUBLE:
+      case TIME:
+      case TIMESTAMP:
+        // longs and doubles occupy 8 bytes
+        // times and timestamps are internally represented as longs
+        return 8;
+      case STRING:
+        // 12 (header) + 12 (fields) + 16 (array overhead) + 20 (10 chars, 2 
bytes each) = 60 bytes
+        return 60;
+      case UUID:
+        // 12 (header) + 16 (two long variables) = 28 bytes
+        return 28;
+      case FIXED:
+        return ((Types.FixedType) type).length();
+      case BINARY:
+        return 100;
+      case DECIMAL:
+        // 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes
+        return 44;
+      case STRUCT:
+        Types.StructType struct = (Types.StructType) type;
+        return OBJECT_HEADER + 
struct.fields().stream().mapToLong(TypeUtil::defaultSize).sum();
+      case LIST:
+        Types.ListType list = (Types.ListType) type;
+        return OBJECT_HEADER + 5 * defaultSize(list.elementType());
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        long entrySize = OBJECT_HEADER + defaultSize(map.keyType()) + 
defaultSize(map.valueType());
+        return OBJECT_HEADER + 5 * entrySize;
+      default:
+        return 16;

Review Comment:
   why 16?



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for optimizing tasks by reducing the computation and IO 
overhead.
+ *
+ * <p>The cache is configurable and enabled through Spark SQL properties. Its 
key features include
+ * setting limits on the total cache size and maximum size for individual 
entries. Additionally, it
+ * implements automatic eviction of entries after a specified duration of 
inactivity. The cache will
+ * respect the SQL configuration valid at the time of initialization. All 
subsequent changes will
+ * have no effect.
+ *
+ * <p>Usage pattern involves fetching data from the cache using a unique 
combination of execution ID
+ * and key. If the data is not present in the cache, it is computed using the 
provided supplier and
+ * stored in the cache, subject to the defined size constraints.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one 
cache exists per JVM.
+ *
+ * @see SparkUtil#executionId()
+ */
+public class SparkExecutorCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+  private static final SparkConfParser CONF_PARSER = new SparkConfParser();
+  private static final boolean CACHE_ENABLED = parseCacheEnabledConf();
+  private static final Duration TIMEOUT = parseTimeoutConf();
+  private static final long MAX_ENTRY_SIZE = parseMaxEntrySizeConf();
+  private static final long MAX_TOTAL_SIZE = parseMaxTotalSizeConf();
+  private static final String EXECUTOR_DESC = SparkUtil.executorDesc();
+
+  private static volatile SparkExecutorCache instance = null;
+
+  private final Map<String, Collection<String>> keysByExecutionId;
+  private volatile Cache<String, CacheValue> cache = null;
+
+  private SparkExecutorCache() {
+    this.keysByExecutionId = Collections.synchronizedMap(Maps.newHashMap());
+  }
+
+  public static SparkExecutorCache getOrCreate() {
+    if (instance == null && CACHE_ENABLED) {
+      synchronized (SparkExecutorCache.class) {
+        if (instance == null) {
+          SparkExecutorCache.instance = new SparkExecutorCache();
+        }
+      }
+    }
+
+    return instance;
+  }
+
+  public static void cleanUp(String executionId) {

Review Comment:
   It seems "instance" itself will not be cleaned up (nullified), is it an 
issue?



##########
api/src/main/java/org/apache/iceberg/types/TypeUtil.java:
##########
@@ -452,6 +454,59 @@ private static void checkSchemaCompatibility(
     }
   }
 
+  public static long defaultSize(Types.NestedField field) {

Review Comment:
   So to understand, user configures size of equality deletes (but based on 
whole row sizes), but position deletes based on 2* record count?  Would it be 
easier to we consider having two configured cache sizes (position delete , eq 
delete cache sizes)



##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java:
##########
@@ -70,4 +72,18 @@ private SparkSQLProperties() {}
 
   // Controls whether to report locality information to Spark while allocating 
input partitions
   public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
+
+  public static final String EXECUTOR_CACHE_ENABLED = 
"spark.sql.iceberg.executor-cache.enabled";
+  public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true;
+
+  public static final String EXECUTOR_CACHE_TIMEOUT = 
"spark.sql.iceberg.executor-cache.timeout";
+  public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT = 
Duration.ofMinutes(10);
+
+  public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE =

Review Comment:
   what is the guidance for max entry size?  Are we trying to prevent one huge 
entry preventing smaller entries from getting cached, assuming that we may 
cache fewer total deletes that way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to