aokolnychyi commented on code in PR #8755: URL: https://github.com/apache/iceberg/pull/8755#discussion_r1440372097
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An executor cache for optimizing tasks by reducing the computation and IO overhead. + * + * <p>The cache is configurable and enabled through Spark SQL properties. Its key features include + * setting limits on the total cache size and maximum size for individual entries. Additionally, it + * implements automatic eviction of entries after a specified duration of inactivity. The cache will + * respect the SQL configuration valid at the time of initialization. All subsequent changes will + * have no effect. + * + * <p>Usage pattern involves fetching data from the cache using a unique combination of execution ID + * and key. If the data is not present in the cache, it is computed using the provided supplier and + * stored in the cache, subject to the defined size constraints. + * + * <p>Note that this class employs the singleton pattern to ensure only one cache exists per JVM. + * + * @see SparkUtil#executionId() + */ +public class SparkExecutorCache { + + private static final Logger LOG = LoggerFactory.getLogger(SparkExecutorCache.class); + + private static final SparkConfParser CONF_PARSER = new SparkConfParser(); + private static final boolean CACHE_ENABLED = parseCacheEnabledConf(); + private static final Duration TIMEOUT = parseTimeoutConf(); + private static final long MAX_ENTRY_SIZE = parseMaxEntrySizeConf(); + private static final long MAX_TOTAL_SIZE = parseMaxTotalSizeConf(); + private static final String EXECUTOR_DESC = SparkUtil.executorDesc(); + + private static volatile SparkExecutorCache instance = null; + + private final Map<String, Collection<String>> keysByExecutionId; + private volatile Cache<String, CacheValue> cache = null; + + private SparkExecutorCache() { + this.keysByExecutionId = Collections.synchronizedMap(Maps.newHashMap()); + } + + public static SparkExecutorCache getOrCreate() { + if (instance == null && CACHE_ENABLED) { + synchronized (SparkExecutorCache.class) { + if (instance == null) { + SparkExecutorCache.instance = new SparkExecutorCache(); + } + } + } + + return instance; + } + + public static void cleanUp(String executionId) { + if (instance != null) { + instance.invalidate(executionId); + } + } + + public long maxEntrySize() { + return MAX_ENTRY_SIZE; + } + + public <V> V get(String executionId, String key, Supplier<V> valueSupplier, long valueSize) { + if (valueSize > MAX_ENTRY_SIZE) { + return valueSupplier.get(); + } + + storeMapping(executionId, key); Review Comment: I cannot use a map of maps because we won't be able to enforce the total size of the cache across all executions. The goal of this mapping is to be able to remove all cached entries by ID. I will explore the idea of concatenating the execution ID and cache key and then iterating over all keys to find what belongs to a given execution ID instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org