This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1-lakehouse in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4a721e445b97347fd3c0d934434dc8b6d6a0c84c Author: Qi Chen <che...@selectdb.com> AuthorDate: Wed Feb 12 22:31:45 2025 +0800 [Opt](multi-catalog)Improve performance by introducing cache of list directory files when getting split for each query. (#43913) --- .../main/java/org/apache/doris/common/Config.java | 4 + fe/fe-core/pom.xml | 5 + .../java/org/apache/doris/common/EmptyCache.java | 247 +++++++ .../org/apache/doris/common/EvictableCache.java | 466 ++++++++++++++ .../apache/doris/common/EvictableCacheBuilder.java | 286 +++++++++ .../doris/datasource/hive/HMSExternalTable.java | 4 +- .../doris/datasource/hive/HiveMetaStoreCache.java | 68 +- .../doris/datasource/hive/source/HiveScanNode.java | 15 +- .../doris/datasource/hudi/source/HudiScanNode.java | 5 +- .../java/org/apache/doris/fs/DirectoryLister.java | 29 + .../apache/doris/fs/FileSystemDirectoryLister.java | 37 ++ .../org/apache/doris/fs/FileSystemIOException.java | 65 ++ .../apache/doris/fs/RemoteFileRemoteIterator.java | 47 ++ .../java/org/apache/doris/fs/RemoteIterator.java | 27 + .../org/apache/doris/fs/SimpleRemoteIterator.java | 45 ++ .../fs/TransactionDirectoryListingCacheKey.java | 64 ++ .../fs/TransactionScopeCachingDirectoryLister.java | 216 +++++++ ...nsactionScopeCachingDirectoryListerFactory.java | 59 ++ .../glue/translator/PhysicalPlanTranslator.java | 19 +- .../apache/doris/planner/SingleNodePlanner.java | 14 +- .../apache/doris/common/TestEvictableCache.java | 708 +++++++++++++++++++++ ...TransactionScopeCachingDirectoryListerTest.java | 174 +++++ 22 files changed, 2568 insertions(+), 36 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a367c13c75b..d4a636a7a98 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2098,6 +2098,10 @@ public class Config extends ConfigBase { "Max cache number of external table row count"}) public static long max_external_table_row_count_cache_num = 100000; + @ConfField(description = {"每个查询的外表文件元数据缓存的最大文件数量。", + "Max cache file number of external table split file meta cache at query level."}) + public static long max_external_table_split_file_meta_cache_num = 100000; + /** * Max cache loader thread-pool size. * Max thread pool size for loading external meta cache diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 049044f62d7..6195a1e32cd 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -783,6 +783,11 @@ under the License. <artifactId>ap-loader-all</artifactId> <version>3.0-8</version> </dependency> + <dependency> + <groupId>org.gaul</groupId> + <artifactId>modernizer-maven-annotations</artifactId> + <version>2.4.0</version> + </dependency> </dependencies> <repositories> <!-- for huawei obs sdk --> diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java new file mode 100644 index 00000000000..5942eb2b1f3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EmptyCache.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheLoader.InvalidCacheLoadException; +import com.google.common.cache.CacheStats; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; + +class EmptyCache<K, V> + extends AbstractLoadingCache<K, V> { + private final CacheLoader<? super K, V> loader; + private final StatsCounter statsCounter; + + EmptyCache(CacheLoader<? super K, V> loader, boolean recordStats) { + this.loader = Objects.requireNonNull(loader, "loader is null"); + this.statsCounter = recordStats ? new SimpleStatsCounter() : new NoopStatsCounter(); + } + + @Override + public V getIfPresent(Object key) { + statsCounter.recordMisses(1); + return null; + } + + @Override + public V get(K key) + throws ExecutionException { + return get(key, () -> loader.load(key)); + } + + @Override + public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) + throws ExecutionException { + try { + Set<K> keySet = ImmutableSet.copyOf(keys); + statsCounter.recordMisses(keySet.size()); + @SuppressWarnings("unchecked") // safe since all keys extend K + ImmutableMap<K, V> result = (ImmutableMap<K, V>) loader.loadAll(keySet); + for (K key : keySet) { + if (!result.containsKey(key)) { + throw new InvalidCacheLoadException("loadAll failed to return a value for " + key); + } + } + statsCounter.recordLoadSuccess(1); + return result; + } catch (RuntimeException e) { + statsCounter.recordLoadException(1); + throw new UncheckedExecutionException(e); + } catch (Exception e) { + statsCounter.recordLoadException(1); + throw new ExecutionException(e); + } + } + + @Override + public V get(K key, Callable<? extends V> valueLoader) + throws ExecutionException { + statsCounter.recordMisses(1); + try { + V value = valueLoader.call(); + statsCounter.recordLoadSuccess(1); + return value; + } catch (RuntimeException e) { + statsCounter.recordLoadException(1); + throw new UncheckedExecutionException(e); + } catch (Exception e) { + statsCounter.recordLoadException(1); + throw new ExecutionException(e); + } + } + + @Override + public void put(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void refresh(K key) {} + + @Override + public void invalidate(Object key) {} + + @Override + public void invalidateAll(Iterable<?> keys) {} + + @Override + public void invalidateAll() { + + } + + @Override + public long size() { + return 0; + } + + @Override + public CacheStats stats() { + return statsCounter.snapshot(); + } + + @Override + public ConcurrentMap<K, V> asMap() { + return new ConcurrentMap<K, V>() { + @Override + public V putIfAbsent(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + // putIfAbsent returns the previous value + return null; + } + + @Override + public boolean remove(Object key, Object value) { + return false; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + return false; + } + + @Override + public V replace(K key, V value) { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public boolean containsKey(Object key) { + return false; + } + + @Override + public boolean containsValue(Object value) { + return false; + } + + @Override + @Nullable + public V get(Object key) { + return null; + } + + @Override + @Nullable + public V put(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + return null; + } + + @Override + @Nullable + public V remove(Object key) { + return null; + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void clear() { + + } + + @Override + public Set<K> keySet() { + return ImmutableSet.of(); + } + + @Override + public Collection<V> values() { + return ImmutableSet.of(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return ImmutableSet.of(); + } + }; + } + + private static class NoopStatsCounter + implements StatsCounter { + private static final CacheStats EMPTY_STATS = new SimpleStatsCounter().snapshot(); + + @Override + public void recordHits(int count) {} + + @Override + public void recordMisses(int count) {} + + @Override + public void recordLoadSuccess(long loadTime) {} + + @Override + public void recordLoadException(long loadTime) {} + + @Override + public void recordEviction() {} + + @Override + public CacheStats snapshot() { + return EMPTY_STATS; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java new file mode 100644 index 00000000000..a2cb05d82c5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCache.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Verify; +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalCause; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import javax.annotation.Nullable; + +/** + * A {@link Cache} and {@link LoadingCache} implementation similar to ones + * produced by {@link CacheBuilder#build()}, but one that does not + * exhibit <a href="https://github.com/google/guava/issues/1881">Guava issue #1881</a>: + * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} + * is guaranteed to return fresh state after {@link #invalidate(Object)}, + * {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. + * + * @see EvictableCacheBuilder + */ +// @ElementTypesAreNonnullByDefault +class EvictableCache<K, V> + extends AbstractLoadingCache<K, V> + implements LoadingCache<K, V> { + // Invariant: for every (K, token) entry in the tokens map, there is a live + // cache entry (token, ?) in dataCache, that, upon eviction, will cause the tokens' + // entry to be removed. + private final ConcurrentHashMap<K, Token<K>> tokens = new ConcurrentHashMap<>(); + // The dataCache can have entries with no corresponding tokens in the tokens map. + // For example, this can happen when invalidation concurs with load. + // The dataCache must be bounded. + private final LoadingCache<Token<K>, V> dataCache; + + private final AtomicInteger invalidations = new AtomicInteger(); + + EvictableCache(CacheBuilder<? super Token<K>, ? super V> cacheBuilder, CacheLoader<? super K, V> cacheLoader) { + dataCache = buildUnsafeCache( + cacheBuilder + .<Token<K>, V>removalListener(removal -> { + Token<K> token = removal.getKey(); + Verify.verify(token != null, "token is null"); + if (removal.getCause() != RemovalCause.REPLACED) { + tokens.remove(token.getKey(), token); + } + }), + new TokenCacheLoader<>(cacheLoader)); + } + + // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, + // advising to use this class as a safety-adding wrapper. + private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? super K, ? super V> cacheBuilder, + CacheLoader<? super K, V> cacheLoader) { + return cacheBuilder.build(cacheLoader); + } + + @Override + public V getIfPresent(Object key) { + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token<K> token = tokens.get(key); + if (token == null) { + return null; + } + return dataCache.getIfPresent(token); + } + + @Override + public V get(K key, Callable<? extends V> valueLoader) + throws ExecutionException { + Token<K> newToken = new Token<>(key); + int invalidations = this.invalidations.get(); + Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + V value = dataCache.get(token, valueLoader); + if (invalidations == this.invalidations.get()) { + // Revive token if it got expired before reloading + if (tokens.putIfAbsent(key, token) == null) { + // Revived + if (!dataCache.asMap().containsKey(token)) { + // We revived, but the token does not correspond to a live entry anymore. + // It would stay in tokens forever, so let's remove it. + tokens.remove(key, token); + } + } + } + return value; + } catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); + } + throw e; + } + } + + @Override + public V get(K key) + throws ExecutionException { + Token<K> newToken = new Token<>(key); + int invalidations = this.invalidations.get(); + Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + V value = dataCache.get(token); + if (invalidations == this.invalidations.get()) { + // Revive token if it got expired before reloading + if (tokens.putIfAbsent(key, token) == null) { + // Revived + if (!dataCache.asMap().containsKey(token)) { + // We revived, but the token does not correspond to a live entry anymore. + // It would stay in tokens forever, so let's remove it. + tokens.remove(key, token); + } + } + } + return value; + } catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); + } + throw e; + } + } + + @Override + public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) + throws ExecutionException { + List<Token<K>> newTokens = new ArrayList<>(); + List<Token<K>> temporaryTokens = new ArrayList<>(); + try { + Map<K, V> result = new LinkedHashMap<>(); + for (K key : keys) { + if (result.containsKey(key)) { + continue; + } + // This is not bulk, but is fast local operation + Token<K> newToken = new Token<>(key); + Token<K> oldToken = tokens.putIfAbsent(key, newToken); + if (oldToken != null) { + // Token exists but a data may not exist (e.g. due to concurrent eviction) + V value = dataCache.getIfPresent(oldToken); + if (value != null) { + result.put(key, value); + continue; + } + // Old token exists but value wasn't found. This can happen when there is concurrent + // eviction/invalidation or when the value is still being loaded. + // The new token is not registered in tokens, so won't be used by subsequent invocations. + temporaryTokens.add(newToken); + } + newTokens.add(newToken); + } + + Map<Token<K>, V> values = dataCache.getAll(newTokens); + for (Map.Entry<Token<K>, V> entry : values.entrySet()) { + Token<K> newToken = entry.getKey(); + result.put(newToken.getKey(), entry.getValue()); + } + return ImmutableMap.copyOf(result); + } catch (Throwable e) { + for (Token<K> token : newTokens) { + // Failed to load and it was our new token (potentially) persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(token.getKey(), token); + } + throw e; + } finally { + dataCache.invalidateAll(temporaryTokens); + } + } + + @Override + public void refresh(K key) { + // The refresh loads a new entry, if it wasn't in the cache yet. Thus, we would create a new Token. + // However, dataCache.refresh is asynchronous and may fail, so no cache entry may be created. + // In such case we would leak the newly created token. + throw new UnsupportedOperationException(); + } + + @Override + public long size() { + return dataCache.size(); + } + + @Override + public void cleanUp() { + dataCache.cleanUp(); + } + + @VisibleForTesting + int tokensCount() { + return tokens.size(); + } + + @Override + public void invalidate(Object key) { + invalidations.incrementAndGet(); + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token<K> token = tokens.remove(key); + if (token != null) { + dataCache.invalidate(token); + } + } + + @Override + public void invalidateAll() { + invalidations.incrementAndGet(); + dataCache.invalidateAll(); + tokens.clear(); + } + + // Not thread safe, test only. + @VisibleForTesting + void clearDataCacheOnly() { + Map<K, Token<K>> tokensCopy = new HashMap<>(tokens); + dataCache.asMap().clear(); + Verify.verify(tokens.isEmpty(), "Clearing dataCache should trigger tokens eviction"); + tokens.putAll(tokensCopy); + } + + @Override + public CacheStats stats() { + return dataCache.stats(); + } + + @Override + public ConcurrentMap<K, V> asMap() { + return new ConcurrentMap<K, V>() { + private final ConcurrentMap<Token<K>, V> dataCacheMap = dataCache.asMap(); + + @Override + public V putIfAbsent(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported," + + " as in inherently races with cache invalidation"); + } + + @Override + public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + // default implementation of ConcurrentMap#compute uses not supported putIfAbsent in some cases + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation"); + } + + @Override + public boolean remove(Object key, Object value) { + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token<K> token = tokens.get(key); + if (token != null) { + return dataCacheMap.remove(token, value); + } + return false; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + Token<K> token = tokens.get(key); + if (token != null) { + return dataCacheMap.replace(token, oldValue, newValue); + } + return false; + } + + @Override + public V replace(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races" + + " with cache invalidation"); + } + + @Override + public int size() { + return dataCache.asMap().size(); + } + + @Override + public boolean isEmpty() { + return dataCache.asMap().isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return tokens.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return values().contains(value); + } + + @Override + @Nullable + public V get(Object key) { + return getIfPresent(key); + } + + @Override + public V put(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + @Nullable + public V remove(Object key) { + Token<K> token = tokens.remove(key); + if (token != null) { + return dataCacheMap.remove(token); + } + return null; + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + public void clear() { + dataCacheMap.clear(); + tokens.clear(); + } + + @Override + public Set<K> keySet() { + return tokens.keySet(); + } + + @Override + public Collection<V> values() { + return dataCacheMap.values(); + } + + @Override + public Set<Map.Entry<K, V>> entrySet() { + throw new UnsupportedOperationException(); + } + }; + } + + // instance-based equality + static final class Token<K> { + private final K key; + + Token(K key) { + this.key = Objects.requireNonNull(key, "key is null"); + } + + K getKey() { + return key; + } + + @Override + public String toString() { + return String.format("CacheToken(%s; %s)", Integer.toHexString(hashCode()), key); + } + } + + private static class TokenCacheLoader<K, V> + extends CacheLoader<Token<K>, V> { + private final CacheLoader<? super K, V> delegate; + + public TokenCacheLoader(CacheLoader<? super K, V> delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + } + + @Override + public V load(Token<K> token) + throws Exception { + return delegate.load(token.getKey()); + } + + @Override + public ListenableFuture<V> reload(Token<K> token, V oldValue) + throws Exception { + return delegate.reload(token.getKey(), oldValue); + } + + @Override + public Map<Token<K>, V> loadAll(Iterable<? extends Token<K>> tokens) + throws Exception { + List<Token<K>> tokenList = ImmutableList.copyOf(tokens); + List<K> keys = new ArrayList<>(); + for (Token<K> token : tokenList) { + keys.add(token.getKey()); + } + Map<? super K, V> values; + try { + values = delegate.loadAll(keys); + } catch (UnsupportedLoadingOperationException e) { + // Guava uses UnsupportedLoadingOperationException in LoadingCache.loadAll + // to fall back from bulk loading (without load sharing) to loading individual + // values (with load sharing). EvictableCache implementation does not currently + // support the fallback mechanism, so the individual values would be loaded + // without load sharing. This would be an unintentional and non-obvious behavioral + // discrepancy between EvictableCache and Guava Caches, so the mechanism is disabled. + throw new UnsupportedOperationException("LoadingCache.getAll() is not supported by EvictableCache" + + " when CacheLoader.loadAll is not implemented", e); + } + + ImmutableMap.Builder<Token<K>, V> result = ImmutableMap.builder(); + for (int i = 0; i < tokenList.size(); i++) { + Token<K> token = tokenList.get(i); + K key = keys.get(i); + V value = values.get(key); + // CacheLoader.loadAll is not guaranteed to return values for all the keys + if (value != null) { + result.put(token, value); + } + } + return result.buildOrThrow(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .addValue(delegate) + .toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java new file mode 100644 index 00000000000..8da3f8c8d56 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCacheBuilder.java +// and modified by Doris + +package org.apache.doris.common; + +import org.apache.doris.common.EvictableCache.Token; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.Weigher; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import com.google.errorprone.annotations.CheckReturnValue; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Builder for {@link Cache} and {@link LoadingCache} instances, similar to {@link CacheBuilder}, + * but creating cache implementations that do not exhibit + * <a href="https://github.com/google/guava/issues/1881">Guava issue #1881</a>: + * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link Cache#get(Object, Callable)} + * is guaranteed to return fresh state after {@link Cache#invalidate(Object)}, + * {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were called. + */ +public final class EvictableCacheBuilder<K, V> { + @CheckReturnValue + public static EvictableCacheBuilder<Object, Object> newBuilder() { + return new EvictableCacheBuilder<>(); + } + + private Optional<Ticker> ticker = Optional.empty(); + private Optional<Duration> expireAfterWrite = Optional.empty(); + private Optional<Duration> refreshAfterWrite = Optional.empty(); + private Optional<Long> maximumSize = Optional.empty(); + private Optional<Long> maximumWeight = Optional.empty(); + private Optional<Integer> concurrencyLevel = Optional.empty(); + private Optional<Weigher<? super Token<K>, ? super V>> weigher = Optional.empty(); + private boolean recordStats; + private Optional<DisabledCacheImplementation> disabledCacheImplementation = Optional.empty(); + + private EvictableCacheBuilder() {} + + /** + * Pass-through for {@link CacheBuilder#ticker(Ticker)}. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> ticker(Ticker ticker) { + this.ticker = Optional.of(ticker); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) { + return expireAfterWrite(toDuration(duration, unit)); + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> expireAfterWrite(Duration duration) { + Preconditions.checkState(!this.expireAfterWrite.isPresent(), "expireAfterWrite already set"); + this.expireAfterWrite = Optional.of(duration); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) { + return refreshAfterWrite(toDuration(duration, unit)); + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> refreshAfterWrite(Duration duration) { + Preconditions.checkState(!this.refreshAfterWrite.isPresent(), "refreshAfterWrite already set"); + this.refreshAfterWrite = Optional.of(duration); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> maximumSize(long maximumSize) { + Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + this.maximumSize = Optional.of(maximumSize); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> maximumWeight(long maximumWeight) { + Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + this.maximumWeight = Optional.of(maximumWeight); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) { + Preconditions.checkState(!this.concurrencyLevel.isPresent(), "concurrencyLevel already set"); + this.concurrencyLevel = Optional.of(concurrencyLevel); + return this; + } + + public <K1 extends K, V1 extends V> EvictableCacheBuilder<K1, V1> weigher(Weigher<? super K1, ? super V1> weigher) { + Preconditions.checkState(!this.weigher.isPresent(), "weigher already set"); + @SuppressWarnings("unchecked") // see com.google.common.cache.CacheBuilder.weigher + EvictableCacheBuilder<K1, V1> cast = (EvictableCacheBuilder<K1, V1>) this; + cast.weigher = Optional.of(new TokenWeigher<>(weigher)); + return cast; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> recordStats() { + recordStats = true; + return this; + } + + /** + * Choose a behavior for case when caching is disabled that may allow data and failure + * sharing between concurrent callers. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> shareResultsAndFailuresEvenIfDisabled() { + return disabledCacheImplementation(DisabledCacheImplementation.GUAVA); + } + + /** + * Choose a behavior for case when caching is disabled that prevents data and + * failure sharing between concurrent callers. + * Note: disabled cache won't report any statistics. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> shareNothingWhenDisabled() { + return disabledCacheImplementation(DisabledCacheImplementation.NOOP); + } + + @VisibleForTesting + EvictableCacheBuilder<K, V> disabledCacheImplementation(DisabledCacheImplementation cacheImplementation) { + Preconditions.checkState(!disabledCacheImplementation.isPresent(), "disabledCacheImplementation already set"); + disabledCacheImplementation = Optional.of(cacheImplementation); + return this; + } + + @CheckReturnValue + public <K1 extends K, V1 extends V> Cache<K1, V1> build() { + return build(unimplementedCacheLoader()); + } + + @CheckReturnValue + public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(CacheLoader<? super K1, V1> loader) { + if (cacheDisabled()) { + // Silently providing a behavior different from Guava's could be surprising, so require explicit choice. + DisabledCacheImplementation disabledCacheImplementation = this.disabledCacheImplementation.orElseThrow( + () -> new IllegalStateException( + "Even when cache is disabled, the loads are synchronized and both load results and failures" + + " are shared between threads. " + "This is rarely desired, thus builder caller is" + + " expected to either opt-in into this behavior with" + + " shareResultsAndFailuresEvenIfDisabled(), or choose not to share results (and failures)" + + " between concurrent invocations with shareNothingWhenDisabled().")); + + switch (disabledCacheImplementation) { + case NOOP: + return new EmptyCache<>(loader, recordStats); + case GUAVA: { + // Disabled cache is always empty, so doesn't exhibit invalidation problems. + // Avoid overhead of EvictableCache wrapper. + CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, TimeUnit.SECONDS); + if (recordStats) { + cacheBuilder.recordStats(); + } + return buildUnsafeCache(cacheBuilder, loader); + } + default: + throw new IllegalStateException("Unexpected value: " + disabledCacheImplementation); + } + } + + if (!(maximumSize.isPresent() || maximumWeight.isPresent() || expireAfterWrite.isPresent())) { + // EvictableCache invalidation (e.g. invalidateAll) happening concurrently with a load may + // lead to an entry remaining in the cache, without associated token. This would lead to + // a memory leak in an unbounded cache. + throw new IllegalStateException("Unbounded cache is not supported"); + } + + // CacheBuilder is further modified in EvictableCache::new, so cannot be shared between build() calls. + CacheBuilder<Object, ? super V> cacheBuilder = CacheBuilder.newBuilder(); + ticker.ifPresent(cacheBuilder::ticker); + expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite); + refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite); + maximumSize.ifPresent(cacheBuilder::maximumSize); + maximumWeight.ifPresent(cacheBuilder::maximumWeight); + weigher.ifPresent(cacheBuilder::weigher); + concurrencyLevel.ifPresent(cacheBuilder::concurrencyLevel); + if (recordStats) { + cacheBuilder.recordStats(); + } + return new EvictableCache<>(cacheBuilder, loader); + } + + private boolean cacheDisabled() { + return (maximumSize.isPresent() && maximumSize.get() == 0) + || (expireAfterWrite.isPresent() && expireAfterWrite.get().isZero()); + } + + // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, + // advising to use this class as a safety-adding wrapper. + private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? super K, ? super V> cacheBuilder, + CacheLoader<? super K, V> cacheLoader) { + return cacheBuilder.build(cacheLoader); + } + + private static <K, V> CacheLoader<K, V> unimplementedCacheLoader() { + return CacheLoader.from(ignored -> { + throw new UnsupportedOperationException(); + }); + } + + private static final class TokenWeigher<K, V> + implements Weigher<Token<K>, V> { + private final Weigher<? super K, ? super V> delegate; + + private TokenWeigher(Weigher<? super K, ? super V> delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + } + + @Override + public int weigh(Token<K> key, V value) { + return delegate.weigh(key.getKey(), value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TokenWeigher<?, ?> that = (TokenWeigher<?, ?>) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(delegate); + } + + @Override + public String toString() { + return "TokenWeigher{" + "delegate=" + delegate + '}'; + } + } + + private static Duration toDuration(long duration, TimeUnit unit) { + // Saturated conversion, as in com.google.common.cache.CacheBuilder.toNanosSaturated + return Duration.ofNanos(unit.toNanos(duration)); + } + + @VisibleForTesting + enum DisabledCacheImplementation { + NOOP, + GUAVA, + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index d07f0502d10..b056af3ebd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -39,6 +39,7 @@ import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.fs.FileSystemDirectoryLister; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -1016,7 +1017,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI LOG.debug("Chosen partition for table {}. [{}]", name, partition.toString()); } } - return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName, + new FileSystemDirectoryLister(), null); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 838005b47a9..3815a356585 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; @@ -43,7 +44,11 @@ import org.apache.doris.datasource.hive.HiveUtil.ACIDFileFilter; import org.apache.doris.datasource.hive.HiveUtil.FullAcidFileFilter; import org.apache.doris.datasource.hive.HiveUtil.InsertOnlyACIDFileFilter; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.fs.FileSystemCache; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.FileSystemIOException; +import org.apache.doris.fs.RemoteIterator; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; @@ -194,7 +199,7 @@ public class HiveMetaStoreCache { @Override public FileCacheValue load(FileCacheKey key) { - return loadFiles(key); + return loadFiles(key, new FileSystemDirectoryLister(), null); } }; @@ -347,7 +352,9 @@ public class HiveMetaStoreCache { private FileCacheValue getFileCache(String location, String inputFormat, JobConf jobConf, List<String> partitionValues, - String bindBrokerName) throws UserException { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) throws UserException { FileCacheValue result = new FileCacheValue(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( @@ -362,34 +369,37 @@ public class HiveMetaStoreCache { // /user/hive/warehouse/region_tmp_union_all2/2 // So we need to recursively list data location. // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - List<RemoteFile> remoteFiles = new ArrayList<>(); boolean isRecursiveDirectories = Boolean.valueOf( catalog.getProperties().getOrDefault("hive.recursive_directories", "false")); - Status status = fs.listFiles(location, isRecursiveDirectories, remoteFiles); - if (status.ok()) { - for (RemoteFile remoteFile : remoteFiles) { + try { + RemoteIterator<RemoteFile> iterator = directoryLister.listFiles(fs, isRecursiveDirectories, + table, location); + while (iterator.hasNext()) { + RemoteFile remoteFile = iterator.next(); String srcPath = remoteFile.getPath().toString(); LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); result.addFile(remoteFile, locationPath); } - } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) { - // User may manually remove partition under HDFS, in this case, - // Hive doesn't aware that the removed partition is missing. - // Here is to support this case without throw an exception. - LOG.warn(String.format("File %s not exist.", location)); - if (!Boolean.valueOf(catalog.getProperties() - .getOrDefault("hive.ignore_absent_partitions", "true"))) { - throw new UserException("Partition location does not exist: " + location); + } catch (FileSystemIOException e) { + if (e.getErrorCode().isPresent() && e.getErrorCode().get().equals(ErrCode.NOT_FOUND)) { + // User may manually remove partition under HDFS, in this case, + // Hive doesn't aware that the removed partition is missing. + // Here is to support this case without throw an exception. + LOG.warn(String.format("File %s not exist.", location)); + if (!Boolean.valueOf(catalog.getProperties() + .getOrDefault("hive.ignore_absent_partitions", "true"))) { + throw new UserException("Partition location does not exist: " + location); + } + } else { + throw new RuntimeException(e); } - } else { - throw new RuntimeException(status.getErrMsg()); } // Must copy the partitionValues to avoid concurrent modification of key and value result.setPartitionValues(Lists.newArrayList(partitionValues)); return result; } - private FileCacheValue loadFiles(FileCacheKey key) { + private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister directoryLister, TableIf table) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); @@ -413,7 +423,7 @@ public class HiveMetaStoreCache { FileInputFormat.setInputPaths(jobConf, finalLocation.get()); try { FileCacheValue result = getFileCache(finalLocation.get(), key.inputFormat, jobConf, - key.getPartitionValues(), key.bindBrokerName); + key.getPartitionValues(), key.bindBrokerName, directoryLister, table); // Replace default hive partition with a null_string. for (int i = 0; i < result.getValuesSize(); i++) { if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) { @@ -465,19 +475,25 @@ public class HiveMetaStoreCache { } public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, true, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, true, true, bindBrokerName, directoryLister, table); } public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, false, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, false, true, bindBrokerName, directoryLister, table); } public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, boolean withCache, boolean concurrent, - String bindBrokerName) { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { long start = System.currentTimeMillis(); List<FileCacheKey> keys = partitions.stream().map(p -> p.isDummyPartition() ? FileCacheKey.createDummyCacheKey( @@ -493,13 +509,15 @@ public class HiveMetaStoreCache { } else { if (concurrent) { List<Future<FileCacheValue>> pList = keys.stream().map( - key -> fileListingExecutor.submit(() -> loadFiles(key))).collect(Collectors.toList()); + key -> fileListingExecutor.submit(() -> loadFiles(key, directoryLister, table))) + .collect(Collectors.toList()); fileLists = Lists.newArrayListWithExpectedSize(keys.size()); for (Future<FileCacheValue> p : pList) { fileLists.add(p.get()); } } else { - fileLists = keys.stream().map(this::loadFiles).collect(Collectors.toList()); + fileLists = keys.stream().map((key) -> loadFiles(key, directoryLister, table)) + .collect(Collectors.toList()); } } } catch (ExecutionException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index b14dfbf02f4..66d5382c151 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -42,6 +42,7 @@ import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.HiveProperties; import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -87,6 +88,8 @@ public class HiveScanNode extends FileQueryScanNode { @Setter protected SelectedPartitions selectedPartitions = null; + private DirectoryLister directoryLister; + private boolean partitionInit = false; private final AtomicReference<UserException> batchException = new AtomicReference<>(null); private List<HivePartition> prunedPartitions; @@ -101,15 +104,18 @@ public class HiveScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { - this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv); + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv, + DirectoryLister directoryLister) { + this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv, directoryLister); } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv) { + StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv, + DirectoryLister directoryLister) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); + this.directoryLister = directoryLister; } @Override @@ -279,7 +285,8 @@ public class HiveScanNode extends FileQueryScanNode { } } else { boolean withCache = Config.max_external_file_cache_num > 0; - fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); + fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName, + directoryLister, hmsTable); } if (tableSample != null) { List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a6156924e27..b8f3d7bd198 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -37,6 +37,7 @@ import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; @@ -119,8 +120,8 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation, - SessionVariable sv) { - super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, sv); + SessionVariable sv, DirectoryLister directoryLister) { + super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, sv, directoryLister); isCowTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { if (isCowTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java new file mode 100644 index 00000000000..e97bc2c684f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +public interface DirectoryLister { + RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java new file mode 100644 index 00000000000..dcd7eeb16b3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.ArrayList; +import java.util.List; + +public class FileSystemDirectoryLister implements DirectoryLister { + public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + List<RemoteFile> result = new ArrayList<>(); + Status status = fs.listFiles(location, recursive, result); + if (!status.ok()) { + throw new FileSystemIOException(status.getErrCode(), status.getErrMsg()); + } + return new RemoteFileRemoteIterator(result); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java new file mode 100644 index 00000000000..c9e45d0352b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status.ErrCode; + +import java.io.IOException; +import java.util.Optional; +import javax.annotation.Nullable; + +public class FileSystemIOException extends IOException { + + @Nullable + private ErrCode errCode; + + public FileSystemIOException(ErrCode errCode, String message) { + super(message); + this.errCode = errCode; + } + + public FileSystemIOException(ErrCode errCode, String message, Throwable cause) { + super(message, cause); + this.errCode = errCode; + } + + public FileSystemIOException(String message) { + super(message); + this.errCode = null; + } + + public FileSystemIOException(String message, Throwable cause) { + super(message, cause); + this.errCode = null; + } + + public Optional<ErrCode> getErrorCode() { + return Optional.ofNullable(errCode); + } + + @Override + public String getMessage() { + if (errCode != null) { + return String.format("[%s]: %s", + errCode, + super.getMessage()); + } else { + return super.getMessage(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java new file mode 100644 index 00000000000..6ac8eb3b0c6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; + +public class RemoteFileRemoteIterator + implements RemoteIterator<RemoteFile> { + private final List<RemoteFile> remoteFileList; + private int currentIndex = 0; + + public RemoteFileRemoteIterator(List<RemoteFile> remoteFileList) { + this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator is null"); + } + + @Override + public boolean hasNext() throws FileSystemIOException { + return currentIndex < remoteFileList.size(); + } + + @Override + public RemoteFile next() throws FileSystemIOException { + if (!hasNext()) { + throw new NoSuchElementException("No more elements in RemoteFileRemoteIterator"); + } + return remoteFileList.get(currentIndex++); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java new file mode 100644 index 00000000000..b9d212a15a5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java +// and modified by Doris + +package org.apache.doris.fs; + +public interface RemoteIterator<T> { + boolean hasNext() throws FileSystemIOException; + + T next() throws FileSystemIOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java new file mode 100644 index 00000000000..4332a5fed35 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.Iterator; +import java.util.Objects; +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java +// and modified by Doris + +class SimpleRemoteIterator implements RemoteIterator<RemoteFile> { + private final Iterator<RemoteFile> iterator; + + public SimpleRemoteIterator(Iterator<RemoteFile> iterator) { + this.iterator = Objects.requireNonNull(iterator, "iterator is null"); + } + + @Override + public boolean hasNext() throws FileSystemIOException { + return iterator.hasNext(); + } + + @Override + public RemoteFile next() throws FileSystemIOException { + return iterator.next(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java new file mode 100644 index 00000000000..6be3c03f824 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java +// and modified by Doris + +package org.apache.doris.fs; + +import java.util.Objects; + +public class TransactionDirectoryListingCacheKey { + + private final long transactionId; + private final String path; + + public TransactionDirectoryListingCacheKey(long transactionId, String path) { + this.transactionId = transactionId; + this.path = Objects.requireNonNull(path, "path is null"); + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o; + return transactionId == that.transactionId && path.equals(that.path); + } + + @Override + public int hashCode() { + return Objects.hash(transactionId, path); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("TransactionDirectoryListingCacheKey{"); + sb.append("transactionId=").append(transactionId); + sb.append(", path='").append(path).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java new file mode 100644 index 00000000000..37acec6864f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.commons.collections.ListUtils; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; + +/** + * Caches directory content (including listings that were started concurrently). + * {@link TransactionScopeCachingDirectoryLister} assumes that all listings + * are performed by same user within single transaction, therefore any failure can + * be shared between concurrent listings. + */ +public class TransactionScopeCachingDirectoryLister implements DirectoryLister { + private final long transactionId; + + @VisibleForTesting + public Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> getCache() { + return cache; + } + + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + private final Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> cache; + private final DirectoryLister delegate; + + public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long transactionId, + Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> cache) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + this.transactionId = transactionId; + this.cache = Objects.requireNonNull(cache, "cache is null"); + } + + @Override + public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + return listInternal(fs, recursive, table, new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + private RemoteIterator<RemoteFile> listInternal(FileSystem fs, boolean recursive, TableIf table, + TransactionDirectoryListingCacheKey cacheKey) throws FileSystemIOException { + FetchingValueHolder cachedValueHolder; + try { + cachedValueHolder = cache.get(cacheKey, + () -> new FetchingValueHolder(createListingRemoteIterator(fs, recursive, table, cacheKey))); + } catch (ExecutionException | UncheckedExecutionException e) { + Throwable throwable = e.getCause(); + Throwables.throwIfInstanceOf(throwable, FileSystemIOException.class); + Throwables.throwIfUnchecked(throwable); + throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), throwable); + } + + if (cachedValueHolder.isFullyCached()) { + return new SimpleRemoteIterator(cachedValueHolder.getCachedFiles()); + } + + return cachingRemoteIterator(cachedValueHolder, cacheKey); + } + + private RemoteIterator<RemoteFile> createListingRemoteIterator(FileSystem fs, boolean recursive, + TableIf table, TransactionDirectoryListingCacheKey cacheKey) + throws FileSystemIOException { + return delegate.listFiles(fs, recursive, table, cacheKey.getPath()); + } + + + private RemoteIterator<RemoteFile> cachingRemoteIterator(FetchingValueHolder cachedValueHolder, + TransactionDirectoryListingCacheKey cacheKey) { + return new RemoteIterator<RemoteFile>() { + private int fileIndex; + + @Override + public boolean hasNext() + throws FileSystemIOException { + try { + boolean hasNext = cachedValueHolder.getCachedFile(fileIndex).isPresent(); + // Update cache weight of cachedValueHolder for a given path. + // The cachedValueHolder acts as an invalidation guard. + // If a cache invalidation happens while this iterator goes over the files from the specified path, + // the eventually outdated file listing will not be added anymore to the cache. + cache.asMap().replace(cacheKey, cachedValueHolder, cachedValueHolder); + return hasNext; + } catch (Exception exception) { + // invalidate cached value to force retry of directory listing + cache.invalidate(cacheKey); + throw exception; + } + } + + @Override + public RemoteFile next() + throws FileSystemIOException { + // force cache entry weight update in case next file is cached + Preconditions.checkState(hasNext()); + return cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new); + } + }; + } + + @VisibleForTesting + boolean isCached(String location) { + return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + @VisibleForTesting + boolean isCached(TransactionDirectoryListingCacheKey cacheKey) { + FetchingValueHolder cached = cache.getIfPresent(cacheKey); + return cached != null && cached.isFullyCached(); + } + + static class FetchingValueHolder { + + private final List<RemoteFile> cachedFiles = ListUtils.synchronizedList(new ArrayList<RemoteFile>()); + + @GuardedBy("this") + @Nullable + private RemoteIterator<RemoteFile> fileIterator; + @GuardedBy("this") + @Nullable + private Exception exception; + + public FetchingValueHolder(RemoteIterator<RemoteFile> fileIterator) { + this.fileIterator = Objects.requireNonNull(fileIterator, "fileIterator is null"); + } + + public synchronized boolean isFullyCached() { + return fileIterator == null && exception == null; + } + + public long getCacheFileCount() { + return cachedFiles.size(); + } + + public Iterator<RemoteFile> getCachedFiles() { + Preconditions.checkState(isFullyCached()); + return cachedFiles.iterator(); + } + + public Optional<RemoteFile> getCachedFile(int index) + throws FileSystemIOException { + int filesSize = cachedFiles.size(); + Preconditions.checkArgument(index >= 0 && index <= filesSize, + "File index (%s) out of bounds [0, %s]", index, filesSize); + + // avoid fileIterator synchronization (and thus blocking) for already cached files + if (index < filesSize) { + return Optional.of(cachedFiles.get(index)); + } + + return fetchNextCachedFile(index); + } + + private synchronized Optional<RemoteFile> fetchNextCachedFile(int index) + throws FileSystemIOException { + if (exception != null) { + throw new FileSystemIOException("Exception while listing directory", exception); + } + + if (index < cachedFiles.size()) { + // file was fetched concurrently + return Optional.of(cachedFiles.get(index)); + } + + try { + if (fileIterator == null || !fileIterator.hasNext()) { + // no more files + fileIterator = null; + return Optional.empty(); + } + + RemoteFile fileStatus = fileIterator.next(); + cachedFiles.add(fileStatus); + return Optional.of(fileStatus); + } catch (Exception exception) { + fileIterator = null; + this.exception = exception; + throw exception; + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java new file mode 100644 index 00000000000..c3c9c347c3d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.common.EvictableCacheBuilder; +import org.apache.doris.fs.TransactionScopeCachingDirectoryLister.FetchingValueHolder; + +import com.google.common.cache.Cache; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +public class TransactionScopeCachingDirectoryListerFactory { + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + // private final Optional<Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder>> cache; + + private final Optional<Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder>> cache; + + private final AtomicLong nextTransactionId = new AtomicLong(); + + public TransactionScopeCachingDirectoryListerFactory(long maxSize) { + if (maxSize > 0) { + EvictableCacheBuilder<TransactionDirectoryListingCacheKey, FetchingValueHolder> cacheBuilder = + EvictableCacheBuilder.newBuilder() + .maximumWeight(maxSize) + .weigher((key, value) -> + Math.toIntExact(value.getCacheFileCount())); + this.cache = Optional.of(cacheBuilder.build()); + } else { + cache = Optional.empty(); + } + } + + public DirectoryLister get(DirectoryLister delegate) { + return cache + .map(cache -> (DirectoryLister) new TransactionScopeCachingDirectoryLister(delegate, + nextTransactionId.getAndIncrement(), cache)) + .orElse(delegate); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 92bc509955d..a942c4e75b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -46,6 +46,7 @@ import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.es.EsExternalTable; @@ -64,6 +65,9 @@ import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode; import org.apache.doris.datasource.odbc.source.OdbcScanNode; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.paimon.source.PaimonScanNode; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.DistributionSpec; import org.apache.doris.nereids.properties.DistributionSpecAny; @@ -238,6 +242,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla private final StatsErrorEstimator statsErrorEstimator; private final PlanTranslatorContext context; + private DirectoryLister directoryLister; + public PhysicalPlanTranslator() { this(null, null); } @@ -589,12 +595,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // TODO(cmy): determine the needCheckColumnPriv param ScanNode scanNode; if (table instanceof HMSExternalTable) { + if (directoryLister == null) { + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory( + Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister()); + } switch (((HMSExternalTable) table).getDlaType()) { case ICEBERG: scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); break; case HIVE: - scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); + scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv, directoryLister); HiveScanNode hiveScanNode = (HiveScanNode) scanNode; hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions()); if (fileScan.getTableSample().isPresent()) { @@ -665,6 +675,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla @Override public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTranslatorContext context) { + if (directoryLister == null) { + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory( + Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister()); + } List<Slot> slots = fileScan.getOutput(); ExternalTable table = fileScan.getTable(); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); @@ -677,7 +691,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla + " for Hudi table"); PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, - hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), ConnectContext.get().getSessionVariable()); + hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), ConnectContext.get().getSessionVariable(), + directoryLister); if (fileScan.getTableSnapshot().isPresent()) { ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index d37f6c729f7..b4558347d30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -58,6 +58,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; @@ -72,6 +73,9 @@ import org.apache.doris.datasource.jdbc.source.JdbcScanNode; import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode; import org.apache.doris.datasource.odbc.source.OdbcScanNode; import org.apache.doris.datasource.paimon.source.PaimonScanNode; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; @@ -113,6 +117,8 @@ public class SingleNodePlanner { private final ArrayList<ScanNode> scanNodes = Lists.newArrayList(); private Map<Analyzer, List<ScanNode>> selectStmtToScanNodes = Maps.newHashMap(); + private DirectoryLister directoryLister; + public SingleNodePlanner(PlannerContext ctx) { this.ctx = ctx; } @@ -1960,6 +1966,10 @@ public class SingleNodePlanner { scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId(), sv); break; case HMS_EXTERNAL_TABLE: + if (directoryLister == null) { + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory( + Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister()); + } TableIf table = tblRef.getDesc().getTable(); switch (((HMSExternalTable) table).getDlaType()) { case HUDI: @@ -1970,13 +1980,13 @@ public class SingleNodePlanner { + "please set enable_nereids_planner = true to enable new optimizer"); } scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty(), sv); + Optional.empty(), Optional.empty(), sv, directoryLister); break; case ICEBERG: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case HIVE: - scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); + scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv, directoryLister); ((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample()); break; default: diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java new file mode 100644 index 00000000000..3bfdc73b78e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java @@ -0,0 +1,708 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java +// and modified by Doris + +package org.apache.doris.common; + +import org.apache.doris.common.EvictableCacheBuilder.DisabledCacheImplementation; + +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.gaul.modernizer_maven_annotations.SuppressModernizer; +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +public class TestEvictableCache { + private static class TestingTicker extends Ticker { + private volatile long time; + + public TestingTicker() { + } + + public long read() { + return this.time; + } + + public synchronized void increment(long delta, TimeUnit unit) { + Preconditions.checkArgument(delta >= 0L, "delta is negative"); + this.time += unit.toNanos(delta); + } + } + + private static final int TEST_TIMEOUT_SECONDS = 10; + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoad() + throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Assert.assertEquals("abc", cache.get(42, () -> "abc")); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictBySize() + throws Exception { + int maximumSize = 10; + Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(maximumSize) + .build(); + + for (int i = 0; i < 10_000; i++) { + int value = i * 10; + Assert.assertEquals(value, (Object) cache.get(i, () -> value)); + } + cache.cleanUp(); + Assert.assertEquals(maximumSize, cache.size()); + Assert.assertEquals(maximumSize, ((EvictableCache<?, ?>) cache).tokensCount()); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 10_000 - 1; + Assert.assertEquals(lastKey * 10, (Object) cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + })); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictByWeight() throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumWeight(20) + .weigher((Integer key, String value) -> value.length()) + .build(); + + for (int i = 0; i < 10; i++) { + String value = String.join("", Collections.nCopies(i, "a")); + Assert.assertEquals(value, cache.get(i, () -> value)); + } + + cache.cleanUp(); + // It's not deterministic which entries get evicted + int cacheSize = Math.toIntExact(cache.size()); + + Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) cache).tokensCount()); + Assert.assertEquals(cacheSize, cache.asMap().keySet().size()); + + int keySum = cache.asMap().keySet().stream() + .mapToInt(i -> i) + .sum(); + Assert.assertTrue("key sum should be <= 20", keySum <= 20); + + Assert.assertEquals(cacheSize, cache.asMap().values().size()); + + int valuesLengthSum = cache.asMap().values().stream() + .mapToInt(String::length) + .sum(); + Assert.assertTrue("values length sum should be <= 20", valuesLengthSum <= 20); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 9; // 10 - 1 + String expected = String.join("", Collections.nCopies(lastKey, "a")); // java8 替代 repeat + Assert.assertEquals(expected, cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + })); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictByTime() throws Exception { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + + Assert.assertEquals("1 ala ma kota", cache.get(1, () -> "1 ala ma kota")); + ticker.increment(ttl, TimeUnit.MILLISECONDS); + Assert.assertEquals("2 ala ma kota", cache.get(2, () -> "2 ala ma kota")); + cache.cleanUp(); + + // First entry should be expired and its token removed + int cacheSize = Math.toIntExact(cache.size()); + Assert.assertEquals(1, cacheSize); + Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) cache).tokensCount()); + Assert.assertEquals(cacheSize, cache.asMap().keySet().size()); + Assert.assertEquals(cacheSize, cache.asMap().values().size()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testPreserveValueLoadedAfterTimeExpiration() throws Exception { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + int key = 11; + + Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "11 ala ma kota")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "something else")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + ticker.increment(ttl, TimeUnit.MILLISECONDS); + Assert.assertEquals("new value", cache.get(key, () -> "new value")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + Assert.assertEquals("new value", cache.get(key, () -> "something yet different")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + Assert.assertEquals(1, cache.size()); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + Assert.assertEquals(1, cache.asMap().keySet().size()); + Assert.assertEquals(1, cache.asMap().values().size()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testReplace() throws Exception { + Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(); + + int key = 10; + int initialValue = 20; + int replacedValue = 21; + + cache.get(key, () -> initialValue); + + Assert.assertTrue("Should successfully replace value", cache.asMap().replace(key, initialValue, replacedValue)); + Assert.assertEquals("Cache should contain replaced value", replacedValue, cache.getIfPresent(key).intValue()); + + Assert.assertFalse("Should not replace when current value is different", cache.asMap().replace(key, initialValue, replacedValue)); + Assert.assertEquals("Cache should maintain replaced value", replacedValue, cache.getIfPresent(key).intValue()); + + Assert.assertFalse("Should not replace non-existent key", cache.asMap().replace(100000, replacedValue, 22)); + Assert.assertEquals("Cache should only contain original key", ImmutableSet.of(key), cache.asMap().keySet()); + Assert.assertEquals("Original key should maintain its value", replacedValue, cache.getIfPresent(key).intValue()); + + int anotherKey = 13; + int anotherInitialValue = 14; + cache.get(anotherKey, () -> anotherInitialValue); + cache.invalidate(anotherKey); + + Assert.assertFalse("Should not replace after invalidation", cache.asMap().replace(anotherKey, anotherInitialValue, 15)); + Assert.assertEquals("Cache should only contain original key after invalidation", ImmutableSet.of(key), cache.asMap().keySet()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testDisabledCache() throws Exception { + Exception exception = Assert.assertThrows(IllegalStateException.class, () -> + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .build()); + + Assert.assertEquals("Even when cache is disabled, the loads are synchronized and both load results and failures are shared between threads. " + + "This is rarely desired, thus builder caller is expected to either opt-in into this behavior with shareResultsAndFailuresEvenIfDisabled(), " + + "or choose not to share results (and failures) between concurrent invocations with shareNothingWhenDisabled().", + exception.getMessage()); + + testDisabledCache( + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .shareNothingWhenDisabled() + .build()); + + testDisabledCache( + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .shareResultsAndFailuresEvenIfDisabled() + .build()); + } + + private void testDisabledCache(Cache<Integer, Integer> cache) throws Exception { + for (int i = 0; i < 10; i++) { + int value = i * 10; + Assert.assertEquals(value, cache.get(i, () -> value).intValue()); + } + + cache.cleanUp(); + Assert.assertEquals(0, cache.size()); + Assert.assertTrue(cache.asMap().keySet().isEmpty()); + Assert.assertTrue(cache.asMap().values().isEmpty()); + } + + private static class CacheStatsAssertions { + public static CacheStatsAssertions assertCacheStats(Cache<?, ?> cache) { + Objects.requireNonNull(cache, "cache is null"); + return assertCacheStats(cache::stats); + } + + public static CacheStatsAssertions assertCacheStats(Supplier<CacheStats> statsSupplier) { + return new CacheStatsAssertions(statsSupplier); + } + + private final Supplier<CacheStats> stats; + + private long loads; + private long hits; + private long misses; + + private CacheStatsAssertions(Supplier<CacheStats> stats) { + this.stats = Objects.requireNonNull(stats, "stats is null"); + } + + public CacheStatsAssertions loads(long value) { + this.loads = value; + return this; + } + + public CacheStatsAssertions hits(long value) { + this.hits = value; + return this; + } + + public CacheStatsAssertions misses(long value) { + this.misses = value; + return this; + } + + public void afterRunning(Runnable runnable) { + try { + calling(() -> { + runnable.run(); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public <T> T calling(Callable<T> callable) + throws Exception { + CacheStats beforeStats = stats.get(); + T value = callable.call(); + CacheStats afterStats = stats.get(); + + long loadDelta = afterStats.loadCount() - beforeStats.loadCount(); + long missesDelta = afterStats.missCount() - beforeStats.missCount(); + long hitsDelta = afterStats.hitCount() - beforeStats.hitCount(); + + Assert.assertEquals(loads, loadDelta); + Assert.assertEquals(hits, hitsDelta); + Assert.assertEquals(misses, missesDelta); + + return value; + } + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoadStats() + throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .recordStats() + .build(); + + Assert.assertEquals(new CacheStats(0, 0, 0, 0, 0, 0), cache.stats()); + + String value = CacheStatsAssertions.assertCacheStats(cache) + .misses(1) + .loads(1) + .calling(() -> cache.get(42, () -> "abc")); + Assert.assertEquals("abc", value); + + value = CacheStatsAssertions.assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(42, () -> "xyz")); + Assert.assertEquals("abc", value); + + // with equal, but not the same key + value = CacheStatsAssertions.assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(newInteger(42), () -> "xyz")); + Assert.assertEquals("abc", value); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoadFailure() + throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, TimeUnit.DAYS) + .shareResultsAndFailuresEvenIfDisabled() + .build(); + int key = 10; + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + Exchanger<Thread> exchanger = new Exchanger<>(); + CountDownLatch secondUnblocked = new CountDownLatch(1); + + List<Future<String>> futures = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + boolean first = i == 0; + futures.add(executor.submit(() -> { + if (!first) { + // Wait for the first one to start the call + exchanger.exchange(Thread.currentThread(), 10, TimeUnit.SECONDS); + // Prove that we are back in RUNNABLE state. + secondUnblocked.countDown(); + } + return cache.get(key, () -> { + if (first) { + Thread secondThread = exchanger.exchange(null, 10, TimeUnit.SECONDS); + Assert.assertTrue(secondUnblocked.await(10, TimeUnit.SECONDS)); + // Wait for the second one to hang inside the cache.get call. + long start = System.nanoTime(); + while (!Thread.currentThread().isInterrupted()) { + try { + Assert.assertNotEquals(Thread.State.RUNNABLE, secondThread.getState()); + break; + } catch (Exception | AssertionError e) { + if (System.nanoTime() - start > TimeUnit.SECONDS.toNanos(30)) { + throw e; + } + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + throw new RuntimeException("first attempt is poised to fail"); + } + return "success"; + }); + })); + } + + List<String> results = new ArrayList<>(); + for (Future<String> future : futures) { + try { + results.add(future.get()); + } catch (ExecutionException e) { + results.add(e.getCause().toString()); + } + } + + // Note: if this starts to fail, that suggests that Guava implementation changed and NoopCache may be redundant now. + String expectedError = "com.google.common.util.concurrent.UncheckedExecutionException: " + + "java.lang.RuntimeException: first attempt is poised to fail"; + Assert.assertEquals(2, results.size()); + Assert.assertEquals(expectedError, results.get(0)); + Assert.assertEquals(expectedError, results.get(1)); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + @SuppressModernizer + private static Integer newInteger(int value) { + Integer integer = value; + @SuppressWarnings({"UnnecessaryBoxing", "BoxedPrimitiveConstructor", "CachedNumberConstructorCall", "removal"}) + Integer newInteger = new Integer(value); + Assert.assertNotSame(integer, newInteger); + return newInteger; + } + + /** + * Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object, Callable)} with equal keys. + * This is a behavior of Guava Cache as well. While this is necessarily desirable behavior (see + * <a href="https://github.com/trinodb/trino/issues/11067">https://github.com/trinodb/trino/issues/11067</a>), + * the test exists primarily to document current state and support discussion, should the current state change. + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testConcurrentGetWithCallableShareLoad() + throws Exception { + AtomicInteger loads = new AtomicInteger(); + AtomicInteger concurrentInvocations = new AtomicInteger(); + + Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + + int threads = 2; + int invocationsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + CyclicBarrier barrier = new CyclicBarrier(threads); + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + for (int invocation = 0; invocation < invocationsPerThread; invocation++) { + int key = invocation; + barrier.await(10, TimeUnit.SECONDS); + int value = cache.get(key, () -> { + loads.incrementAndGet(); + int invocations = concurrentInvocations.incrementAndGet(); + Preconditions.checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key"); + Thread.sleep(1); + concurrentInvocations.decrementAndGet(); + return -key; + }); + Assert.assertEquals(-invocation, value); + } + return null; + })); + } + + for (Future<?> future : futures) { + future.get(10, TimeUnit.SECONDS); + } + Assert.assertTrue( + String.format( + "loads (%d) should be between %d and %d", + loads.intValue(), + invocationsPerThread, + threads * invocationsPerThread - 1), + loads.intValue() >= invocationsPerThread && loads.intValue() <= threads * invocationsPerThread - 1); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + enum Invalidation { + INVALIDATE_KEY, + INVALIDATE_PREDEFINED_KEYS, + INVALIDATE_SELECTED_KEYS, + INVALIDATE_ALL, + /**/; + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testInvalidateOngoingLoad() + throws Exception { + for (Invalidation invalidation : Invalidation.values()) { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Integer key = 42; + + CountDownLatch loadOngoing = new CountDownLatch(1); + CountDownLatch invalidated = new CountDownLatch(1); + CountDownLatch getReturned = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + // thread A + Future<String> threadA = executor.submit(() -> { + String value = cache.get(key, () -> { + loadOngoing.countDown(); // 1 + Assert.assertTrue(invalidated.await(10, TimeUnit.SECONDS)); // 2 + return "stale value"; + }); + getReturned.countDown(); // 3 + return value; + }); + + // thread B + Future<String> threadB = executor.submit(() -> { + Assert.assertTrue(loadOngoing.await(10, TimeUnit.SECONDS)); // 1 + + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(ImmutableList.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set<Integer> keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(ImmutableSet.toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + default: + throw new IllegalArgumentException(); + } + + invalidated.countDown(); // 2 + // Cache may persist value after loader returned, but before `cache.get(...)` returned. Ensure the latter completed. + Assert.assertTrue(getReturned.await(10, TimeUnit.SECONDS)); // 3 + + return cache.get(key, () -> "fresh value"); + }); + + Assert.assertEquals("stale value", threadA.get()); + Assert.assertEquals("fresh value", threadB.get()); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testInvalidateAndLoadConcurrently() + throws Exception { + for (Invalidation invalidation : Invalidation.values()) { + int[] primes = {2, 3, 5, 7}; + AtomicLong remoteState = new AtomicLong(1); + + Cache<Integer, Long> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Integer key = 42; + int threads = 4; + + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + List<Future<Void>> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + // prime the cache + Assert.assertEquals(1L, (long) cache.get(key, remoteState::get)); + int prime = primes[threadNumber]; + + barrier.await(10, TimeUnit.SECONDS); + + // modify underlying state + remoteState.updateAndGet(current -> current * prime); + + // invalidate + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(ImmutableList.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set<Integer> keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(ImmutableSet.toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + default: + throw new IllegalArgumentException(); + } + + // read through cache + long current = cache.get(key, remoteState::get); + if (current % prime != 0) { + throw new AssertionError(String.format("The value read through cache (%s) in thread (%s) is not divisible by (%s)", current, threadNumber, prime)); + } + + return (Void) null; + })) + .collect(ImmutableList.toImmutableList()); + + for (Future<?> future : futures) { + try { + future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Failed to get future value", e); + } + } + + Assert.assertEquals(2 * 3 * 5 * 7, remoteState.get()); + Assert.assertEquals(remoteState.get(), (long) cache.get(key, remoteState::get)); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + } + + @Test + public void testPutOnEmptyCacheImplementation() { + for (DisabledCacheImplementation disabledCacheImplementation : DisabledCacheImplementation.values()) { + Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .disabledCacheImplementation(disabledCacheImplementation) + .build(); + Map<Object, Object> cacheMap = cache.asMap(); + + int key = 0; + int value = 1; + Assert.assertNull(cacheMap.put(key, value)); + Assert.assertNull(cacheMap.put(key, value)); + Assert.assertNull(cacheMap.putIfAbsent(key, value)); + Assert.assertNull(cacheMap.putIfAbsent(key, value)); + } + } + + @Test + public void testPutOnNonEmptyCacheImplementation() { + Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(); + Map<Object, Object> cacheMap = cache.asMap(); + + int key = 0; + int value = 1; + + Exception putException = Assert.assertThrows("put operation should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> cacheMap.put(key, value)); + Assert.assertEquals( + "The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead.", + putException.getMessage()); + + Exception putIfAbsentException = Assert.assertThrows("putIfAbsent operation should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> cacheMap.putIfAbsent(key, value)); + Assert.assertEquals( + "The operation is not supported, as in inherently races with cache invalidation", + putIfAbsentException.getMessage()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java new file mode 100644 index 00000000000..d6c6ed4b93b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +// some tests may invalidate the whole cache affecting therefore other concurrent tests +@Execution(ExecutionMode.SAME_THREAD) +public class TransactionScopeCachingDirectoryListerTest { + @Test + public void testConcurrentDirectoryListing(@Mocked TableIf table) + throws FileSystemIOException { + RemoteFile firstFile = new RemoteFile("file:/x/x", true, 1, 1); + RemoteFile secondFile = new RemoteFile("file:/x/y", true, 1, 1); + RemoteFile thirdFile = new RemoteFile("file:/y/z", true, 1, 1); + + String path1 = "file:/x"; + String path2 = "file:/y"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister( + ImmutableMap.of( + path1, ImmutableList.of(firstFile, secondFile), + path2, ImmutableList.of(thirdFile))); + + TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister) + new TransactionScopeCachingDirectoryListerFactory(2).get(countingLister); + + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + + Assert.assertEquals(1, countingLister.getListCount()); + + // listing path2 again shouldn't increase listing count + Assert.assertTrue(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + Assert.assertEquals(1, countingLister.getListCount()); + + + // start listing path1 concurrently + RemoteIterator<RemoteFile> path1FilesA = cachingLister.listFiles(null, true, table, path1); + RemoteIterator<RemoteFile> path1FilesB = cachingLister.listFiles(null, true, table, path1); + Assert.assertEquals(2, countingLister.getListCount()); + + // list path1 files using both iterators concurrently + Assert.assertEquals(firstFile, path1FilesA.next()); + Assert.assertEquals(firstFile, path1FilesB.next()); + Assert.assertEquals(secondFile, path1FilesB.next()); + Assert.assertEquals(secondFile, path1FilesA.next()); + Assert.assertFalse(path1FilesA.hasNext()); + Assert.assertFalse(path1FilesB.hasNext()); + Assert.assertEquals(2, countingLister.getListCount()); + + Assert.assertFalse(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + Assert.assertEquals(3, countingLister.getListCount()); + } + + @Test + public void testConcurrentDirectoryListingException(@Mocked TableIf table) + throws FileSystemIOException { + RemoteFile file = new RemoteFile("file:/x/x", true, 1, 1); + + String path = "file:/x"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file))); + DirectoryLister cachingLister = new TransactionScopeCachingDirectoryListerFactory(1).get(countingLister); + + // start listing path concurrently + countingLister.setThrowException(true); + RemoteIterator<RemoteFile> filesA = cachingLister.listFiles(null, true, table, path); + RemoteIterator<RemoteFile> filesB = cachingLister.listFiles(null, true, table, path); + Assert.assertEquals(1, countingLister.getListCount()); + + // listing should throw an exception + Assert.assertThrows(FileSystemIOException.class, () -> filesA.hasNext()); + + + // listing again should succeed + countingLister.setThrowException(false); + assertFiles(cachingLister.listFiles(null, true, table, path), ImmutableList.of(file)); + Assert.assertEquals(2, countingLister.getListCount()); + + // listing using second concurrently initialized DirectoryLister should fail + Assert.assertThrows(FileSystemIOException.class, () -> filesB.hasNext()); + + } + + private void assertFiles(RemoteIterator<RemoteFile> iterator, List<RemoteFile> expectedFiles) + throws FileSystemIOException { + ImmutableList.Builder<RemoteFile> actualFiles = ImmutableList.builder(); + while (iterator.hasNext()) { + actualFiles.add(iterator.next()); + } + Assert.assertEquals(expectedFiles, actualFiles.build()); + } + + private static class CountingDirectoryLister + implements DirectoryLister { + private final Map<String, List<RemoteFile>> fileStatuses; + private int listCount; + private boolean throwException; + + public CountingDirectoryLister(Map<String, List<RemoteFile>> fileStatuses) { + this.fileStatuses = Objects.requireNonNull(fileStatuses, "fileStatuses is null"); + } + + @Override + public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + // No specific recursive files-only listing implementation + listCount++; + return throwingRemoteIterator(Objects.requireNonNull(fileStatuses.get(location)), throwException); + } + + public void setThrowException(boolean throwException) { + this.throwException = throwException; + } + + public int getListCount() { + return listCount; + } + } + + static RemoteIterator<RemoteFile> throwingRemoteIterator(List<RemoteFile> files, boolean throwException) { + return new RemoteIterator<RemoteFile>() { + private final Iterator<RemoteFile> iterator = ImmutableList.copyOf(files).iterator(); + + @Override + public boolean hasNext() + throws FileSystemIOException { + if (throwException) { + throw new FileSystemIOException("File system io exception."); + } + return iterator.hasNext(); + } + + @Override + public RemoteFile next() { + return iterator.next(); + } + }; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org