This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1ff07a391a5 [Opt](multi-catalog)Improve performance by introducing cache of list directory files when getting split for each query. (#43913) 1ff07a391a5 is described below commit 1ff07a391a540882b40f7a3f5ab693dbee69ffbf Author: Qi Chen <che...@selectdb.com> AuthorDate: Wed Feb 12 22:31:45 2025 +0800 [Opt](multi-catalog)Improve performance by introducing cache of list directory files when getting split for each query. (#43913) ### What problem does this PR solve? Refer to trino to implement the cache mechanism of multiple hive tables at the query level to obtain the file split list of each partition. Because files within a query should have the same visibility, the split list of partitions that see the same table should be consistent across the query scope. So this cache is reasonable and should be enabled by default. The mechanism in Trino is transactional level. A transaction can see the same table, so the command is `TransactionScopeCachingDirectoryLister`. This name is retained for Doris to expand to the transaction concept in the future. In addition, for this scenario, because the caffeine cache currently used by doris has an elimination phase strategy, the existing cache items in the window area may be eliminated immediately after the weight is updated. Therefore, `EvictableCache` which based on guava was introduced and eliminated based on segment LRU. --- .../main/java/org/apache/doris/common/Config.java | 4 + .../java/org/apache/doris/common/EmptyCache.java | 247 +++++++ .../org/apache/doris/common/EvictableCache.java | 466 ++++++++++++++ .../apache/doris/common/EvictableCacheBuilder.java | 286 +++++++++ .../doris/datasource/hive/HMSExternalTable.java | 4 +- .../doris/datasource/hive/HiveMetaStoreCache.java | 69 +- .../doris/datasource/hive/source/HiveScanNode.java | 15 +- .../doris/datasource/hudi/source/HudiScanNode.java | 5 +- .../java/org/apache/doris/fs/DirectoryLister.java | 29 + .../apache/doris/fs/FileSystemDirectoryLister.java | 37 ++ .../org/apache/doris/fs/FileSystemIOException.java | 65 ++ .../apache/doris/fs/RemoteFileRemoteIterator.java | 47 ++ .../java/org/apache/doris/fs/RemoteIterator.java | 27 + .../org/apache/doris/fs/SimpleRemoteIterator.java | 45 ++ .../fs/TransactionDirectoryListingCacheKey.java | 64 ++ .../fs/TransactionScopeCachingDirectoryLister.java | 216 +++++++ ...nsactionScopeCachingDirectoryListerFactory.java | 59 ++ .../glue/translator/PhysicalPlanTranslator.java | 19 +- .../apache/doris/planner/SingleNodePlanner.java | 14 +- .../apache/doris/common/TestEvictableCache.java | 708 +++++++++++++++++++++ ...TransactionScopeCachingDirectoryListerTest.java | 174 +++++ 21 files changed, 2563 insertions(+), 37 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 030833ef4df..60f59460a0a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2214,6 +2214,10 @@ public class Config extends ConfigBase { "Max cache number of external table row count"}) public static long max_external_table_row_count_cache_num = 100000; + @ConfField(description = {"每个查询的外表文件元数据缓存的最大文件数量。", + "Max cache file number of external table split file meta cache at query level."}) + public static long max_external_table_split_file_meta_cache_num = 100000; + /** * Max cache loader thread-pool size. * Max thread pool size for loading external meta cache diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java new file mode 100644 index 00000000000..5942eb2b1f3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EmptyCache.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheLoader.InvalidCacheLoadException; +import com.google.common.cache.CacheStats; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; + +class EmptyCache<K, V> + extends AbstractLoadingCache<K, V> { + private final CacheLoader<? super K, V> loader; + private final StatsCounter statsCounter; + + EmptyCache(CacheLoader<? super K, V> loader, boolean recordStats) { + this.loader = Objects.requireNonNull(loader, "loader is null"); + this.statsCounter = recordStats ? new SimpleStatsCounter() : new NoopStatsCounter(); + } + + @Override + public V getIfPresent(Object key) { + statsCounter.recordMisses(1); + return null; + } + + @Override + public V get(K key) + throws ExecutionException { + return get(key, () -> loader.load(key)); + } + + @Override + public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) + throws ExecutionException { + try { + Set<K> keySet = ImmutableSet.copyOf(keys); + statsCounter.recordMisses(keySet.size()); + @SuppressWarnings("unchecked") // safe since all keys extend K + ImmutableMap<K, V> result = (ImmutableMap<K, V>) loader.loadAll(keySet); + for (K key : keySet) { + if (!result.containsKey(key)) { + throw new InvalidCacheLoadException("loadAll failed to return a value for " + key); + } + } + statsCounter.recordLoadSuccess(1); + return result; + } catch (RuntimeException e) { + statsCounter.recordLoadException(1); + throw new UncheckedExecutionException(e); + } catch (Exception e) { + statsCounter.recordLoadException(1); + throw new ExecutionException(e); + } + } + + @Override + public V get(K key, Callable<? extends V> valueLoader) + throws ExecutionException { + statsCounter.recordMisses(1); + try { + V value = valueLoader.call(); + statsCounter.recordLoadSuccess(1); + return value; + } catch (RuntimeException e) { + statsCounter.recordLoadException(1); + throw new UncheckedExecutionException(e); + } catch (Exception e) { + statsCounter.recordLoadException(1); + throw new ExecutionException(e); + } + } + + @Override + public void put(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void refresh(K key) {} + + @Override + public void invalidate(Object key) {} + + @Override + public void invalidateAll(Iterable<?> keys) {} + + @Override + public void invalidateAll() { + + } + + @Override + public long size() { + return 0; + } + + @Override + public CacheStats stats() { + return statsCounter.snapshot(); + } + + @Override + public ConcurrentMap<K, V> asMap() { + return new ConcurrentMap<K, V>() { + @Override + public V putIfAbsent(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + // putIfAbsent returns the previous value + return null; + } + + @Override + public boolean remove(Object key, Object value) { + return false; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + return false; + } + + @Override + public V replace(K key, V value) { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public boolean containsKey(Object key) { + return false; + } + + @Override + public boolean containsValue(Object value) { + return false; + } + + @Override + @Nullable + public V get(Object key) { + return null; + } + + @Override + @Nullable + public V put(K key, V value) { + // Cache, even if configured to evict everything immediately, should allow writes. + return null; + } + + @Override + @Nullable + public V remove(Object key) { + return null; + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + // Cache, even if configured to evict everything immediately, should allow writes. + } + + @Override + public void clear() { + + } + + @Override + public Set<K> keySet() { + return ImmutableSet.of(); + } + + @Override + public Collection<V> values() { + return ImmutableSet.of(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return ImmutableSet.of(); + } + }; + } + + private static class NoopStatsCounter + implements StatsCounter { + private static final CacheStats EMPTY_STATS = new SimpleStatsCounter().snapshot(); + + @Override + public void recordHits(int count) {} + + @Override + public void recordMisses(int count) {} + + @Override + public void recordLoadSuccess(long loadTime) {} + + @Override + public void recordLoadException(long loadTime) {} + + @Override + public void recordEviction() {} + + @Override + public CacheStats snapshot() { + return EMPTY_STATS; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java new file mode 100644 index 00000000000..a2cb05d82c5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCache.java @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCache.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.base.Verify; +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalCause; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import javax.annotation.Nullable; + +/** + * A {@link Cache} and {@link LoadingCache} implementation similar to ones + * produced by {@link CacheBuilder#build()}, but one that does not + * exhibit <a href="https://github.com/google/guava/issues/1881">Guava issue #1881</a>: + * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} + * is guaranteed to return fresh state after {@link #invalidate(Object)}, + * {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. + * + * @see EvictableCacheBuilder + */ +// @ElementTypesAreNonnullByDefault +class EvictableCache<K, V> + extends AbstractLoadingCache<K, V> + implements LoadingCache<K, V> { + // Invariant: for every (K, token) entry in the tokens map, there is a live + // cache entry (token, ?) in dataCache, that, upon eviction, will cause the tokens' + // entry to be removed. + private final ConcurrentHashMap<K, Token<K>> tokens = new ConcurrentHashMap<>(); + // The dataCache can have entries with no corresponding tokens in the tokens map. + // For example, this can happen when invalidation concurs with load. + // The dataCache must be bounded. + private final LoadingCache<Token<K>, V> dataCache; + + private final AtomicInteger invalidations = new AtomicInteger(); + + EvictableCache(CacheBuilder<? super Token<K>, ? super V> cacheBuilder, CacheLoader<? super K, V> cacheLoader) { + dataCache = buildUnsafeCache( + cacheBuilder + .<Token<K>, V>removalListener(removal -> { + Token<K> token = removal.getKey(); + Verify.verify(token != null, "token is null"); + if (removal.getCause() != RemovalCause.REPLACED) { + tokens.remove(token.getKey(), token); + } + }), + new TokenCacheLoader<>(cacheLoader)); + } + + // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, + // advising to use this class as a safety-adding wrapper. + private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? super K, ? super V> cacheBuilder, + CacheLoader<? super K, V> cacheLoader) { + return cacheBuilder.build(cacheLoader); + } + + @Override + public V getIfPresent(Object key) { + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token<K> token = tokens.get(key); + if (token == null) { + return null; + } + return dataCache.getIfPresent(token); + } + + @Override + public V get(K key, Callable<? extends V> valueLoader) + throws ExecutionException { + Token<K> newToken = new Token<>(key); + int invalidations = this.invalidations.get(); + Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + V value = dataCache.get(token, valueLoader); + if (invalidations == this.invalidations.get()) { + // Revive token if it got expired before reloading + if (tokens.putIfAbsent(key, token) == null) { + // Revived + if (!dataCache.asMap().containsKey(token)) { + // We revived, but the token does not correspond to a live entry anymore. + // It would stay in tokens forever, so let's remove it. + tokens.remove(key, token); + } + } + } + return value; + } catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); + } + throw e; + } + } + + @Override + public V get(K key) + throws ExecutionException { + Token<K> newToken = new Token<>(key); + int invalidations = this.invalidations.get(); + Token<K> token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + V value = dataCache.get(token); + if (invalidations == this.invalidations.get()) { + // Revive token if it got expired before reloading + if (tokens.putIfAbsent(key, token) == null) { + // Revived + if (!dataCache.asMap().containsKey(token)) { + // We revived, but the token does not correspond to a live entry anymore. + // It would stay in tokens forever, so let's remove it. + tokens.remove(key, token); + } + } + } + return value; + } catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); + } + throw e; + } + } + + @Override + public ImmutableMap<K, V> getAll(Iterable<? extends K> keys) + throws ExecutionException { + List<Token<K>> newTokens = new ArrayList<>(); + List<Token<K>> temporaryTokens = new ArrayList<>(); + try { + Map<K, V> result = new LinkedHashMap<>(); + for (K key : keys) { + if (result.containsKey(key)) { + continue; + } + // This is not bulk, but is fast local operation + Token<K> newToken = new Token<>(key); + Token<K> oldToken = tokens.putIfAbsent(key, newToken); + if (oldToken != null) { + // Token exists but a data may not exist (e.g. due to concurrent eviction) + V value = dataCache.getIfPresent(oldToken); + if (value != null) { + result.put(key, value); + continue; + } + // Old token exists but value wasn't found. This can happen when there is concurrent + // eviction/invalidation or when the value is still being loaded. + // The new token is not registered in tokens, so won't be used by subsequent invocations. + temporaryTokens.add(newToken); + } + newTokens.add(newToken); + } + + Map<Token<K>, V> values = dataCache.getAll(newTokens); + for (Map.Entry<Token<K>, V> entry : values.entrySet()) { + Token<K> newToken = entry.getKey(); + result.put(newToken.getKey(), entry.getValue()); + } + return ImmutableMap.copyOf(result); + } catch (Throwable e) { + for (Token<K> token : newTokens) { + // Failed to load and it was our new token (potentially) persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(token.getKey(), token); + } + throw e; + } finally { + dataCache.invalidateAll(temporaryTokens); + } + } + + @Override + public void refresh(K key) { + // The refresh loads a new entry, if it wasn't in the cache yet. Thus, we would create a new Token. + // However, dataCache.refresh is asynchronous and may fail, so no cache entry may be created. + // In such case we would leak the newly created token. + throw new UnsupportedOperationException(); + } + + @Override + public long size() { + return dataCache.size(); + } + + @Override + public void cleanUp() { + dataCache.cleanUp(); + } + + @VisibleForTesting + int tokensCount() { + return tokens.size(); + } + + @Override + public void invalidate(Object key) { + invalidations.incrementAndGet(); + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token<K> token = tokens.remove(key); + if (token != null) { + dataCache.invalidate(token); + } + } + + @Override + public void invalidateAll() { + invalidations.incrementAndGet(); + dataCache.invalidateAll(); + tokens.clear(); + } + + // Not thread safe, test only. + @VisibleForTesting + void clearDataCacheOnly() { + Map<K, Token<K>> tokensCopy = new HashMap<>(tokens); + dataCache.asMap().clear(); + Verify.verify(tokens.isEmpty(), "Clearing dataCache should trigger tokens eviction"); + tokens.putAll(tokensCopy); + } + + @Override + public CacheStats stats() { + return dataCache.stats(); + } + + @Override + public ConcurrentMap<K, V> asMap() { + return new ConcurrentMap<K, V>() { + private final ConcurrentMap<Token<K>, V> dataCacheMap = dataCache.asMap(); + + @Override + public V putIfAbsent(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported," + + " as in inherently races with cache invalidation"); + } + + @Override + public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) { + // default implementation of ConcurrentMap#compute uses not supported putIfAbsent in some cases + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation"); + } + + @Override + public boolean remove(Object key, Object value) { + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token<K> token = tokens.get(key); + if (token != null) { + return dataCacheMap.remove(token, value); + } + return false; + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + Token<K> token = tokens.get(key); + if (token != null) { + return dataCacheMap.replace(token, oldValue, newValue); + } + return false; + } + + @Override + public V replace(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races" + + " with cache invalidation"); + } + + @Override + public int size() { + return dataCache.asMap().size(); + } + + @Override + public boolean isEmpty() { + return dataCache.asMap().isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return tokens.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return values().contains(value); + } + + @Override + @Nullable + public V get(Object key) { + return getIfPresent(key); + } + + @Override + public V put(K key, V value) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + @Nullable + public V remove(Object key) { + Token<K> token = tokens.remove(key); + if (token != null) { + return dataCacheMap.remove(token); + } + return null; + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + throw new UnsupportedOperationException("The operation is not supported, as in inherently" + + " races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + public void clear() { + dataCacheMap.clear(); + tokens.clear(); + } + + @Override + public Set<K> keySet() { + return tokens.keySet(); + } + + @Override + public Collection<V> values() { + return dataCacheMap.values(); + } + + @Override + public Set<Map.Entry<K, V>> entrySet() { + throw new UnsupportedOperationException(); + } + }; + } + + // instance-based equality + static final class Token<K> { + private final K key; + + Token(K key) { + this.key = Objects.requireNonNull(key, "key is null"); + } + + K getKey() { + return key; + } + + @Override + public String toString() { + return String.format("CacheToken(%s; %s)", Integer.toHexString(hashCode()), key); + } + } + + private static class TokenCacheLoader<K, V> + extends CacheLoader<Token<K>, V> { + private final CacheLoader<? super K, V> delegate; + + public TokenCacheLoader(CacheLoader<? super K, V> delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + } + + @Override + public V load(Token<K> token) + throws Exception { + return delegate.load(token.getKey()); + } + + @Override + public ListenableFuture<V> reload(Token<K> token, V oldValue) + throws Exception { + return delegate.reload(token.getKey(), oldValue); + } + + @Override + public Map<Token<K>, V> loadAll(Iterable<? extends Token<K>> tokens) + throws Exception { + List<Token<K>> tokenList = ImmutableList.copyOf(tokens); + List<K> keys = new ArrayList<>(); + for (Token<K> token : tokenList) { + keys.add(token.getKey()); + } + Map<? super K, V> values; + try { + values = delegate.loadAll(keys); + } catch (UnsupportedLoadingOperationException e) { + // Guava uses UnsupportedLoadingOperationException in LoadingCache.loadAll + // to fall back from bulk loading (without load sharing) to loading individual + // values (with load sharing). EvictableCache implementation does not currently + // support the fallback mechanism, so the individual values would be loaded + // without load sharing. This would be an unintentional and non-obvious behavioral + // discrepancy between EvictableCache and Guava Caches, so the mechanism is disabled. + throw new UnsupportedOperationException("LoadingCache.getAll() is not supported by EvictableCache" + + " when CacheLoader.loadAll is not implemented", e); + } + + ImmutableMap.Builder<Token<K>, V> result = ImmutableMap.builder(); + for (int i = 0; i < tokenList.size(); i++) { + Token<K> token = tokenList.get(i); + K key = keys.get(i); + V value = values.get(key); + // CacheLoader.loadAll is not guaranteed to return values for all the keys + if (value != null) { + result.put(token, value); + } + } + return result.buildOrThrow(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .addValue(delegate) + .toString(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java new file mode 100644 index 00000000000..8da3f8c8d56 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/EvictableCacheBuilder.java @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EvictableCacheBuilder.java +// and modified by Doris + +package org.apache.doris.common; + +import org.apache.doris.common.EvictableCache.Token; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.Weigher; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import com.google.errorprone.annotations.CheckReturnValue; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Builder for {@link Cache} and {@link LoadingCache} instances, similar to {@link CacheBuilder}, + * but creating cache implementations that do not exhibit + * <a href="https://github.com/google/guava/issues/1881">Guava issue #1881</a>: + * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link Cache#get(Object, Callable)} + * is guaranteed to return fresh state after {@link Cache#invalidate(Object)}, + * {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were called. + */ +public final class EvictableCacheBuilder<K, V> { + @CheckReturnValue + public static EvictableCacheBuilder<Object, Object> newBuilder() { + return new EvictableCacheBuilder<>(); + } + + private Optional<Ticker> ticker = Optional.empty(); + private Optional<Duration> expireAfterWrite = Optional.empty(); + private Optional<Duration> refreshAfterWrite = Optional.empty(); + private Optional<Long> maximumSize = Optional.empty(); + private Optional<Long> maximumWeight = Optional.empty(); + private Optional<Integer> concurrencyLevel = Optional.empty(); + private Optional<Weigher<? super Token<K>, ? super V>> weigher = Optional.empty(); + private boolean recordStats; + private Optional<DisabledCacheImplementation> disabledCacheImplementation = Optional.empty(); + + private EvictableCacheBuilder() {} + + /** + * Pass-through for {@link CacheBuilder#ticker(Ticker)}. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> ticker(Ticker ticker) { + this.ticker = Optional.of(ticker); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) { + return expireAfterWrite(toDuration(duration, unit)); + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> expireAfterWrite(Duration duration) { + Preconditions.checkState(!this.expireAfterWrite.isPresent(), "expireAfterWrite already set"); + this.expireAfterWrite = Optional.of(duration); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) { + return refreshAfterWrite(toDuration(duration, unit)); + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> refreshAfterWrite(Duration duration) { + Preconditions.checkState(!this.refreshAfterWrite.isPresent(), "refreshAfterWrite already set"); + this.refreshAfterWrite = Optional.of(duration); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> maximumSize(long maximumSize) { + Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + this.maximumSize = Optional.of(maximumSize); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> maximumWeight(long maximumWeight) { + Preconditions.checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + Preconditions.checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + this.maximumWeight = Optional.of(maximumWeight); + return this; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) { + Preconditions.checkState(!this.concurrencyLevel.isPresent(), "concurrencyLevel already set"); + this.concurrencyLevel = Optional.of(concurrencyLevel); + return this; + } + + public <K1 extends K, V1 extends V> EvictableCacheBuilder<K1, V1> weigher(Weigher<? super K1, ? super V1> weigher) { + Preconditions.checkState(!this.weigher.isPresent(), "weigher already set"); + @SuppressWarnings("unchecked") // see com.google.common.cache.CacheBuilder.weigher + EvictableCacheBuilder<K1, V1> cast = (EvictableCacheBuilder<K1, V1>) this; + cast.weigher = Optional.of(new TokenWeigher<>(weigher)); + return cast; + } + + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> recordStats() { + recordStats = true; + return this; + } + + /** + * Choose a behavior for case when caching is disabled that may allow data and failure + * sharing between concurrent callers. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> shareResultsAndFailuresEvenIfDisabled() { + return disabledCacheImplementation(DisabledCacheImplementation.GUAVA); + } + + /** + * Choose a behavior for case when caching is disabled that prevents data and + * failure sharing between concurrent callers. + * Note: disabled cache won't report any statistics. + */ + @CanIgnoreReturnValue + public EvictableCacheBuilder<K, V> shareNothingWhenDisabled() { + return disabledCacheImplementation(DisabledCacheImplementation.NOOP); + } + + @VisibleForTesting + EvictableCacheBuilder<K, V> disabledCacheImplementation(DisabledCacheImplementation cacheImplementation) { + Preconditions.checkState(!disabledCacheImplementation.isPresent(), "disabledCacheImplementation already set"); + disabledCacheImplementation = Optional.of(cacheImplementation); + return this; + } + + @CheckReturnValue + public <K1 extends K, V1 extends V> Cache<K1, V1> build() { + return build(unimplementedCacheLoader()); + } + + @CheckReturnValue + public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(CacheLoader<? super K1, V1> loader) { + if (cacheDisabled()) { + // Silently providing a behavior different from Guava's could be surprising, so require explicit choice. + DisabledCacheImplementation disabledCacheImplementation = this.disabledCacheImplementation.orElseThrow( + () -> new IllegalStateException( + "Even when cache is disabled, the loads are synchronized and both load results and failures" + + " are shared between threads. " + "This is rarely desired, thus builder caller is" + + " expected to either opt-in into this behavior with" + + " shareResultsAndFailuresEvenIfDisabled(), or choose not to share results (and failures)" + + " between concurrent invocations with shareNothingWhenDisabled().")); + + switch (disabledCacheImplementation) { + case NOOP: + return new EmptyCache<>(loader, recordStats); + case GUAVA: { + // Disabled cache is always empty, so doesn't exhibit invalidation problems. + // Avoid overhead of EvictableCache wrapper. + CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, TimeUnit.SECONDS); + if (recordStats) { + cacheBuilder.recordStats(); + } + return buildUnsafeCache(cacheBuilder, loader); + } + default: + throw new IllegalStateException("Unexpected value: " + disabledCacheImplementation); + } + } + + if (!(maximumSize.isPresent() || maximumWeight.isPresent() || expireAfterWrite.isPresent())) { + // EvictableCache invalidation (e.g. invalidateAll) happening concurrently with a load may + // lead to an entry remaining in the cache, without associated token. This would lead to + // a memory leak in an unbounded cache. + throw new IllegalStateException("Unbounded cache is not supported"); + } + + // CacheBuilder is further modified in EvictableCache::new, so cannot be shared between build() calls. + CacheBuilder<Object, ? super V> cacheBuilder = CacheBuilder.newBuilder(); + ticker.ifPresent(cacheBuilder::ticker); + expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite); + refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite); + maximumSize.ifPresent(cacheBuilder::maximumSize); + maximumWeight.ifPresent(cacheBuilder::maximumWeight); + weigher.ifPresent(cacheBuilder::weigher); + concurrencyLevel.ifPresent(cacheBuilder::concurrencyLevel); + if (recordStats) { + cacheBuilder.recordStats(); + } + return new EvictableCache<>(cacheBuilder, loader); + } + + private boolean cacheDisabled() { + return (maximumSize.isPresent() && maximumSize.get() == 0) + || (expireAfterWrite.isPresent() && expireAfterWrite.get().isZero()); + } + + // @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, + // advising to use this class as a safety-adding wrapper. + private static <K, V> LoadingCache<K, V> buildUnsafeCache(CacheBuilder<? super K, ? super V> cacheBuilder, + CacheLoader<? super K, V> cacheLoader) { + return cacheBuilder.build(cacheLoader); + } + + private static <K, V> CacheLoader<K, V> unimplementedCacheLoader() { + return CacheLoader.from(ignored -> { + throw new UnsupportedOperationException(); + }); + } + + private static final class TokenWeigher<K, V> + implements Weigher<Token<K>, V> { + private final Weigher<? super K, ? super V> delegate; + + private TokenWeigher(Weigher<? super K, ? super V> delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + } + + @Override + public int weigh(Token<K> key, V value) { + return delegate.weigh(key.getKey(), value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TokenWeigher<?, ?> that = (TokenWeigher<?, ?>) o; + return Objects.equals(delegate, that.delegate); + } + + @Override + public int hashCode() { + return Objects.hash(delegate); + } + + @Override + public String toString() { + return "TokenWeigher{" + "delegate=" + delegate + '}'; + } + } + + private static Duration toDuration(long duration, TimeUnit unit) { + // Saturated conversion, as in com.google.common.cache.CacheBuilder.toNanosSaturated + return Duration.ofNanos(unit.toNanos(duration)); + } + + @VisibleForTesting + enum DisabledCacheImplementation { + NOOP, + GUAVA, + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index dccefc8b743..5321039bedc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -39,6 +39,7 @@ import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.fs.FileSystemDirectoryLister; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -1059,7 +1060,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI LOG.debug("Chosen partition for table {}. [{}]", name, partition.toString()); } } - return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, bindBrokerName, + new FileSystemDirectoryLister(), null); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index e3bbd681b9e..9bf63df6970 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -18,13 +18,13 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.PartitionValue; -import org.apache.doris.backup.Status; import org.apache.doris.backup.Status.ErrCode; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; @@ -39,7 +39,11 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.fs.FileSystemCache; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.FileSystemIOException; +import org.apache.doris.fs.RemoteIterator; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; @@ -185,7 +189,7 @@ public class HiveMetaStoreCache { @Override public FileCacheValue load(FileCacheKey key) { - return loadFiles(key); + return loadFiles(key, new FileSystemDirectoryLister(), null); } }; @@ -338,7 +342,9 @@ public class HiveMetaStoreCache { private FileCacheValue getFileCache(String location, String inputFormat, JobConf jobConf, List<String> partitionValues, - String bindBrokerName) throws UserException { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) throws UserException { FileCacheValue result = new FileCacheValue(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( @@ -353,34 +359,37 @@ public class HiveMetaStoreCache { // /user/hive/warehouse/region_tmp_union_all2/2 // So we need to recursively list data location. // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - List<RemoteFile> remoteFiles = new ArrayList<>(); boolean isRecursiveDirectories = Boolean.valueOf( catalog.getProperties().getOrDefault("hive.recursive_directories", "false")); - Status status = fs.listFiles(location, isRecursiveDirectories, remoteFiles); - if (status.ok()) { - for (RemoteFile remoteFile : remoteFiles) { + try { + RemoteIterator<RemoteFile> iterator = directoryLister.listFiles(fs, isRecursiveDirectories, + table, location); + while (iterator.hasNext()) { + RemoteFile remoteFile = iterator.next(); String srcPath = remoteFile.getPath().toString(); LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); result.addFile(remoteFile, locationPath); } - } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) { - // User may manually remove partition under HDFS, in this case, - // Hive doesn't aware that the removed partition is missing. - // Here is to support this case without throw an exception. - LOG.warn(String.format("File %s not exist.", location)); - if (!Boolean.valueOf(catalog.getProperties() - .getOrDefault("hive.ignore_absent_partitions", "true"))) { - throw new UserException("Partition location does not exist: " + location); + } catch (FileSystemIOException e) { + if (e.getErrorCode().isPresent() && e.getErrorCode().get().equals(ErrCode.NOT_FOUND)) { + // User may manually remove partition under HDFS, in this case, + // Hive doesn't aware that the removed partition is missing. + // Here is to support this case without throw an exception. + LOG.warn(String.format("File %s not exist.", location)); + if (!Boolean.valueOf(catalog.getProperties() + .getOrDefault("hive.ignore_absent_partitions", "true"))) { + throw new UserException("Partition location does not exist: " + location); + } + } else { + throw new RuntimeException(e); } - } else { - throw new RuntimeException(status.getErrMsg()); } // Must copy the partitionValues to avoid concurrent modification of key and value result.setPartitionValues(Lists.newArrayList(partitionValues)); return result; } - private FileCacheValue loadFiles(FileCacheKey key) { + private FileCacheValue loadFiles(FileCacheKey key, DirectoryLister directoryLister, TableIf table) { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); @@ -404,7 +413,7 @@ public class HiveMetaStoreCache { FileInputFormat.setInputPaths(jobConf, finalLocation.get()); try { FileCacheValue result = getFileCache(finalLocation.get(), key.inputFormat, jobConf, - key.getPartitionValues(), key.bindBrokerName); + key.getPartitionValues(), key.bindBrokerName, directoryLister, table); // Replace default hive partition with a null_string. for (int i = 0; i < result.getValuesSize(); i++) { if (HIVE_DEFAULT_PARTITION.equals(result.getPartitionValues().get(i))) { @@ -456,19 +465,25 @@ public class HiveMetaStoreCache { } public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, true, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, true, true, bindBrokerName, directoryLister, table); } public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions, - String bindBrokerName) { - return getFilesByPartitions(partitions, false, true, bindBrokerName); + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { + return getFilesByPartitions(partitions, false, true, bindBrokerName, directoryLister, table); } public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions, boolean withCache, boolean concurrent, - String bindBrokerName) { + String bindBrokerName, + DirectoryLister directoryLister, + TableIf table) { long start = System.currentTimeMillis(); List<FileCacheKey> keys = partitions.stream().map(p -> p.isDummyPartition() ? FileCacheKey.createDummyCacheKey( @@ -484,13 +499,15 @@ public class HiveMetaStoreCache { } else { if (concurrent) { List<Future<FileCacheValue>> pList = keys.stream().map( - key -> fileListingExecutor.submit(() -> loadFiles(key))).collect(Collectors.toList()); + key -> fileListingExecutor.submit(() -> loadFiles(key, directoryLister, table))) + .collect(Collectors.toList()); fileLists = Lists.newArrayListWithExpectedSize(keys.size()); for (Future<FileCacheValue> p : pList) { fileLists.add(p.get()); } } else { - fileLists = keys.stream().map(this::loadFiles).collect(Collectors.toList()); + fileLists = keys.stream().map((key) -> loadFiles(key, directoryLister, table)) + .collect(Collectors.toList()); } } } catch (ExecutionException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 08cf6582447..890f6147f33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -42,6 +42,7 @@ import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.HiveProperties; import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -86,6 +87,8 @@ public class HiveScanNode extends FileQueryScanNode { @Setter protected SelectedPartitions selectedPartitions = null; + private DirectoryLister directoryLister; + private boolean partitionInit = false; private final AtomicReference<UserException> batchException = new AtomicReference<>(null); private List<HivePartition> prunedPartitions; @@ -100,15 +103,18 @@ public class HiveScanNode extends FileQueryScanNode { * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) { - this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv); + public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv, + DirectoryLister directoryLister) { + this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv, directoryLister); } public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv) { + StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv, + DirectoryLister directoryLister) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv); hmsTable = (HMSExternalTable) desc.getTable(); brokerName = hmsTable.getCatalog().bindBrokerName(); + this.directoryLister = directoryLister; } @Override @@ -276,7 +282,8 @@ public class HiveScanNode extends FileQueryScanNode { } } else { boolean withCache = Config.max_external_file_cache_num > 0; - fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName); + fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName, + directoryLister, hmsTable); } if (tableSample != null) { List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a6156924e27..b8f3d7bd198 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -37,6 +37,7 @@ import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; +import org.apache.doris.fs.DirectoryLister; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; @@ -119,8 +120,8 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation, - SessionVariable sv) { - super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, sv); + SessionVariable sv, DirectoryLister directoryLister) { + super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, sv, directoryLister); isCowTable = hmsTable.isHoodieCowTable(); if (LOG.isDebugEnabled()) { if (isCowTable) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java new file mode 100644 index 00000000000..e97bc2c684f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/DirectoryLister.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/DirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +public interface DirectoryLister { + RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java new file mode 100644 index 00000000000..dcd7eeb16b3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemDirectoryLister.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.ArrayList; +import java.util.List; + +public class FileSystemDirectoryLister implements DirectoryLister { + public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + List<RemoteFile> result = new ArrayList<>(); + Status status = fs.listFiles(location, recursive, result); + if (!status.ok()) { + throw new FileSystemIOException(status.getErrCode(), status.getErrMsg()); + } + return new RemoteFileRemoteIterator(result); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java new file mode 100644 index 00000000000..c9e45d0352b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemIOException.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status.ErrCode; + +import java.io.IOException; +import java.util.Optional; +import javax.annotation.Nullable; + +public class FileSystemIOException extends IOException { + + @Nullable + private ErrCode errCode; + + public FileSystemIOException(ErrCode errCode, String message) { + super(message); + this.errCode = errCode; + } + + public FileSystemIOException(ErrCode errCode, String message, Throwable cause) { + super(message, cause); + this.errCode = errCode; + } + + public FileSystemIOException(String message) { + super(message); + this.errCode = null; + } + + public FileSystemIOException(String message, Throwable cause) { + super(message, cause); + this.errCode = null; + } + + public Optional<ErrCode> getErrorCode() { + return Optional.ofNullable(errCode); + } + + @Override + public String getMessage() { + if (errCode != null) { + return String.format("[%s]: %s", + errCode, + super.getMessage()); + } else { + return super.getMessage(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java new file mode 100644 index 00000000000..6ac8eb3b0c6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteFileRemoteIterator.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; + +public class RemoteFileRemoteIterator + implements RemoteIterator<RemoteFile> { + private final List<RemoteFile> remoteFileList; + private int currentIndex = 0; + + public RemoteFileRemoteIterator(List<RemoteFile> remoteFileList) { + this.remoteFileList = Objects.requireNonNull(remoteFileList, "iterator is null"); + } + + @Override + public boolean hasNext() throws FileSystemIOException { + return currentIndex < remoteFileList.size(); + } + + @Override + public RemoteFile next() throws FileSystemIOException { + if (!hasNext()) { + throw new NoSuchElementException("No more elements in RemoteFileRemoteIterator"); + } + return remoteFileList.get(currentIndex++); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java new file mode 100644 index 00000000000..b9d212a15a5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/RemoteIterator.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/RemoteIterator.java +// and modified by Doris + +package org.apache.doris.fs; + +public interface RemoteIterator<T> { + boolean hasNext() throws FileSystemIOException; + + T next() throws FileSystemIOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java new file mode 100644 index 00000000000..4332a5fed35 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/SimpleRemoteIterator.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +package org.apache.doris.fs; + +import org.apache.doris.fs.remote.RemoteFile; + +import java.util.Iterator; +import java.util.Objects; +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/SimpleRemoteIterator.java +// and modified by Doris + +class SimpleRemoteIterator implements RemoteIterator<RemoteFile> { + private final Iterator<RemoteFile> iterator; + + public SimpleRemoteIterator(Iterator<RemoteFile> iterator) { + this.iterator = Objects.requireNonNull(iterator, "iterator is null"); + } + + @Override + public boolean hasNext() throws FileSystemIOException { + return iterator.hasNext(); + } + + @Override + public RemoteFile next() throws FileSystemIOException { + return iterator.next(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java new file mode 100644 index 00000000000..6be3c03f824 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionDirectoryListingCacheKey.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionDirectoryListingCacheKey.java +// and modified by Doris + +package org.apache.doris.fs; + +import java.util.Objects; + +public class TransactionDirectoryListingCacheKey { + + private final long transactionId; + private final String path; + + public TransactionDirectoryListingCacheKey(long transactionId, String path) { + this.transactionId = transactionId; + this.path = Objects.requireNonNull(path, "path is null"); + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TransactionDirectoryListingCacheKey that = (TransactionDirectoryListingCacheKey) o; + return transactionId == that.transactionId && path.equals(that.path); + } + + @Override + public int hashCode() { + return Objects.hash(transactionId, path); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("TransactionDirectoryListingCacheKey{"); + sb.append("transactionId=").append(transactionId); + sb.append(", path='").append(path).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java new file mode 100644 index 00000000000..37acec6864f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryLister.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.cache.Cache; +import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import org.apache.commons.collections.ListUtils; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; + +/** + * Caches directory content (including listings that were started concurrently). + * {@link TransactionScopeCachingDirectoryLister} assumes that all listings + * are performed by same user within single transaction, therefore any failure can + * be shared between concurrent listings. + */ +public class TransactionScopeCachingDirectoryLister implements DirectoryLister { + private final long transactionId; + + @VisibleForTesting + public Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> getCache() { + return cache; + } + + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + private final Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> cache; + private final DirectoryLister delegate; + + public TransactionScopeCachingDirectoryLister(DirectoryLister delegate, long transactionId, + Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder> cache) { + this.delegate = Objects.requireNonNull(delegate, "delegate is null"); + this.transactionId = transactionId; + this.cache = Objects.requireNonNull(cache, "cache is null"); + } + + @Override + public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + return listInternal(fs, recursive, table, new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + private RemoteIterator<RemoteFile> listInternal(FileSystem fs, boolean recursive, TableIf table, + TransactionDirectoryListingCacheKey cacheKey) throws FileSystemIOException { + FetchingValueHolder cachedValueHolder; + try { + cachedValueHolder = cache.get(cacheKey, + () -> new FetchingValueHolder(createListingRemoteIterator(fs, recursive, table, cacheKey))); + } catch (ExecutionException | UncheckedExecutionException e) { + Throwable throwable = e.getCause(); + Throwables.throwIfInstanceOf(throwable, FileSystemIOException.class); + Throwables.throwIfUnchecked(throwable); + throw new RuntimeException("Failed to list directory: " + cacheKey.getPath(), throwable); + } + + if (cachedValueHolder.isFullyCached()) { + return new SimpleRemoteIterator(cachedValueHolder.getCachedFiles()); + } + + return cachingRemoteIterator(cachedValueHolder, cacheKey); + } + + private RemoteIterator<RemoteFile> createListingRemoteIterator(FileSystem fs, boolean recursive, + TableIf table, TransactionDirectoryListingCacheKey cacheKey) + throws FileSystemIOException { + return delegate.listFiles(fs, recursive, table, cacheKey.getPath()); + } + + + private RemoteIterator<RemoteFile> cachingRemoteIterator(FetchingValueHolder cachedValueHolder, + TransactionDirectoryListingCacheKey cacheKey) { + return new RemoteIterator<RemoteFile>() { + private int fileIndex; + + @Override + public boolean hasNext() + throws FileSystemIOException { + try { + boolean hasNext = cachedValueHolder.getCachedFile(fileIndex).isPresent(); + // Update cache weight of cachedValueHolder for a given path. + // The cachedValueHolder acts as an invalidation guard. + // If a cache invalidation happens while this iterator goes over the files from the specified path, + // the eventually outdated file listing will not be added anymore to the cache. + cache.asMap().replace(cacheKey, cachedValueHolder, cachedValueHolder); + return hasNext; + } catch (Exception exception) { + // invalidate cached value to force retry of directory listing + cache.invalidate(cacheKey); + throw exception; + } + } + + @Override + public RemoteFile next() + throws FileSystemIOException { + // force cache entry weight update in case next file is cached + Preconditions.checkState(hasNext()); + return cachedValueHolder.getCachedFile(fileIndex++).orElseThrow(NoSuchElementException::new); + } + }; + } + + @VisibleForTesting + boolean isCached(String location) { + return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)); + } + + @VisibleForTesting + boolean isCached(TransactionDirectoryListingCacheKey cacheKey) { + FetchingValueHolder cached = cache.getIfPresent(cacheKey); + return cached != null && cached.isFullyCached(); + } + + static class FetchingValueHolder { + + private final List<RemoteFile> cachedFiles = ListUtils.synchronizedList(new ArrayList<RemoteFile>()); + + @GuardedBy("this") + @Nullable + private RemoteIterator<RemoteFile> fileIterator; + @GuardedBy("this") + @Nullable + private Exception exception; + + public FetchingValueHolder(RemoteIterator<RemoteFile> fileIterator) { + this.fileIterator = Objects.requireNonNull(fileIterator, "fileIterator is null"); + } + + public synchronized boolean isFullyCached() { + return fileIterator == null && exception == null; + } + + public long getCacheFileCount() { + return cachedFiles.size(); + } + + public Iterator<RemoteFile> getCachedFiles() { + Preconditions.checkState(isFullyCached()); + return cachedFiles.iterator(); + } + + public Optional<RemoteFile> getCachedFile(int index) + throws FileSystemIOException { + int filesSize = cachedFiles.size(); + Preconditions.checkArgument(index >= 0 && index <= filesSize, + "File index (%s) out of bounds [0, %s]", index, filesSize); + + // avoid fileIterator synchronization (and thus blocking) for already cached files + if (index < filesSize) { + return Optional.of(cachedFiles.get(index)); + } + + return fetchNextCachedFile(index); + } + + private synchronized Optional<RemoteFile> fetchNextCachedFile(int index) + throws FileSystemIOException { + if (exception != null) { + throw new FileSystemIOException("Exception while listing directory", exception); + } + + if (index < cachedFiles.size()) { + // file was fetched concurrently + return Optional.of(cachedFiles.get(index)); + } + + try { + if (fileIterator == null || !fileIterator.hasNext()) { + // no more files + fileIterator = null; + return Optional.empty(); + } + + RemoteFile fileStatus = fileIterator.next(); + cachedFiles.add(fileStatus); + return Optional.of(fileStatus); + } catch (Exception exception) { + fileIterator = null; + this.exception = exception; + throw exception; + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java new file mode 100644 index 00000000000..c3c9c347c3d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerFactory.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TransactionScopeCachingDirectoryListerFactory.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.common.EvictableCacheBuilder; +import org.apache.doris.fs.TransactionScopeCachingDirectoryLister.FetchingValueHolder; + +import com.google.common.cache.Cache; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +public class TransactionScopeCachingDirectoryListerFactory { + //TODO use a cache key based on Path & SchemaTableName and iterate over the cache keys + // to deal more efficiently with cache invalidation scenarios for partitioned tables. + // private final Optional<Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder>> cache; + + private final Optional<Cache<TransactionDirectoryListingCacheKey, FetchingValueHolder>> cache; + + private final AtomicLong nextTransactionId = new AtomicLong(); + + public TransactionScopeCachingDirectoryListerFactory(long maxSize) { + if (maxSize > 0) { + EvictableCacheBuilder<TransactionDirectoryListingCacheKey, FetchingValueHolder> cacheBuilder = + EvictableCacheBuilder.newBuilder() + .maximumWeight(maxSize) + .weigher((key, value) -> + Math.toIntExact(value.getCacheFileCount())); + this.cache = Optional.of(cacheBuilder.build()); + } else { + cache = Optional.empty(); + } + } + + public DirectoryLister get(DirectoryLister delegate) { + return cache + .map(cache -> (DirectoryLister) new TransactionScopeCachingDirectoryLister(delegate, + nextTransactionId.getAndIncrement(), cache)) + .orElse(delegate); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index b439dbdc12d..5b19387289c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -46,6 +46,7 @@ import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.es.EsExternalTable; @@ -68,6 +69,9 @@ import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.DistributionSpec; import org.apache.doris.nereids.properties.DistributionSpecAny; @@ -244,6 +248,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla private final StatsErrorEstimator statsErrorEstimator; private final PlanTranslatorContext context; + private DirectoryLister directoryLister; + public PhysicalPlanTranslator() { this(null, null); } @@ -559,12 +565,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // TODO(cmy): determine the needCheckColumnPriv param ScanNode scanNode; if (table instanceof HMSExternalTable) { + if (directoryLister == null) { + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory( + Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister()); + } switch (((HMSExternalTable) table).getDlaType()) { case ICEBERG: scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); break; case HIVE: - scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv); + scanNode = new HiveScanNode(context.nextPlanNodeId(), tupleDescriptor, false, sv, directoryLister); HiveScanNode hiveScanNode = (HiveScanNode) scanNode; hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions()); if (fileScan.getTableSample().isPresent()) { @@ -640,6 +650,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla @Override public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTranslatorContext context) { + if (directoryLister == null) { + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory( + Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister()); + } List<Slot> slots = fileScan.getOutput(); ExternalTable table = fileScan.getTable(); TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); @@ -652,7 +666,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla + " for Hudi table"); PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, - hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), ConnectContext.get().getSessionVariable()); + hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), ConnectContext.get().getSessionVariable(), + directoryLister); if (fileScan.getTableSnapshot().isPresent()) { ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 0a5de932243..552d6c4c45d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -58,6 +58,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; @@ -74,6 +75,9 @@ import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode; import org.apache.doris.datasource.odbc.source.OdbcScanNode; import org.apache.doris.datasource.paimon.source.PaimonScanNode; import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.fs.FileSystemDirectoryLister; +import org.apache.doris.fs.TransactionScopeCachingDirectoryListerFactory; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; @@ -115,6 +119,8 @@ public class SingleNodePlanner { private final ArrayList<ScanNode> scanNodes = Lists.newArrayList(); private Map<Analyzer, List<ScanNode>> selectStmtToScanNodes = Maps.newHashMap(); + private DirectoryLister directoryLister; + public SingleNodePlanner(PlannerContext ctx) { this.ctx = ctx; } @@ -1959,6 +1965,10 @@ public class SingleNodePlanner { scanNode = ((TableValuedFunctionRef) tblRef).getScanNode(ctx.getNextNodeId(), sv); break; case HMS_EXTERNAL_TABLE: + if (directoryLister == null) { + this.directoryLister = new TransactionScopeCachingDirectoryListerFactory( + Config.max_external_table_split_file_meta_cache_num).get(new FileSystemDirectoryLister()); + } TableIf table = tblRef.getDesc().getTable(); switch (((HMSExternalTable) table).getDlaType()) { case HUDI: @@ -1969,13 +1979,13 @@ public class SingleNodePlanner { + "please set enable_nereids_planner = true to enable new optimizer"); } scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, - Optional.empty(), Optional.empty(), sv); + Optional.empty(), Optional.empty(), sv, directoryLister); break; case ICEBERG: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); break; case HIVE: - scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv); + scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, sv, directoryLister); ((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample()); break; default: diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java new file mode 100644 index 00000000000..3bfdc73b78e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/TestEvictableCache.java @@ -0,0 +1,708 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java +// and modified by Doris + +package org.apache.doris.common; + +import org.apache.doris.common.EvictableCacheBuilder.DisabledCacheImplementation; + +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.gaul.modernizer_maven_annotations.SuppressModernizer; +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.IntStream; + +public class TestEvictableCache { + private static class TestingTicker extends Ticker { + private volatile long time; + + public TestingTicker() { + } + + public long read() { + return this.time; + } + + public synchronized void increment(long delta, TimeUnit unit) { + Preconditions.checkArgument(delta >= 0L, "delta is negative"); + this.time += unit.toNanos(delta); + } + } + + private static final int TEST_TIMEOUT_SECONDS = 10; + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoad() + throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Assert.assertEquals("abc", cache.get(42, () -> "abc")); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictBySize() + throws Exception { + int maximumSize = 10; + Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(maximumSize) + .build(); + + for (int i = 0; i < 10_000; i++) { + int value = i * 10; + Assert.assertEquals(value, (Object) cache.get(i, () -> value)); + } + cache.cleanUp(); + Assert.assertEquals(maximumSize, cache.size()); + Assert.assertEquals(maximumSize, ((EvictableCache<?, ?>) cache).tokensCount()); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 10_000 - 1; + Assert.assertEquals(lastKey * 10, (Object) cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + })); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictByWeight() throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumWeight(20) + .weigher((Integer key, String value) -> value.length()) + .build(); + + for (int i = 0; i < 10; i++) { + String value = String.join("", Collections.nCopies(i, "a")); + Assert.assertEquals(value, cache.get(i, () -> value)); + } + + cache.cleanUp(); + // It's not deterministic which entries get evicted + int cacheSize = Math.toIntExact(cache.size()); + + Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) cache).tokensCount()); + Assert.assertEquals(cacheSize, cache.asMap().keySet().size()); + + int keySum = cache.asMap().keySet().stream() + .mapToInt(i -> i) + .sum(); + Assert.assertTrue("key sum should be <= 20", keySum <= 20); + + Assert.assertEquals(cacheSize, cache.asMap().values().size()); + + int valuesLengthSum = cache.asMap().values().stream() + .mapToInt(String::length) + .sum(); + Assert.assertTrue("values length sum should be <= 20", valuesLengthSum <= 20); + + // Ensure cache is effective, i.e. some entries preserved + int lastKey = 9; // 10 - 1 + String expected = String.join("", Collections.nCopies(lastKey, "a")); // java8 替代 repeat + Assert.assertEquals(expected, cache.get(lastKey, () -> { + throw new UnsupportedOperationException(); + })); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testEvictByTime() throws Exception { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + + Assert.assertEquals("1 ala ma kota", cache.get(1, () -> "1 ala ma kota")); + ticker.increment(ttl, TimeUnit.MILLISECONDS); + Assert.assertEquals("2 ala ma kota", cache.get(2, () -> "2 ala ma kota")); + cache.cleanUp(); + + // First entry should be expired and its token removed + int cacheSize = Math.toIntExact(cache.size()); + Assert.assertEquals(1, cacheSize); + Assert.assertEquals(cacheSize, ((EvictableCache<?, ?>) cache).tokensCount()); + Assert.assertEquals(cacheSize, cache.asMap().keySet().size()); + Assert.assertEquals(cacheSize, cache.asMap().values().size()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testPreserveValueLoadedAfterTimeExpiration() throws Exception { + TestingTicker ticker = new TestingTicker(); + int ttl = 100; + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .ticker(ticker) + .expireAfterWrite(ttl, TimeUnit.MILLISECONDS) + .build(); + int key = 11; + + Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "11 ala ma kota")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + Assert.assertEquals("11 ala ma kota", cache.get(key, () -> "something else")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + ticker.increment(ttl, TimeUnit.MILLISECONDS); + Assert.assertEquals("new value", cache.get(key, () -> "new value")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + Assert.assertEquals("new value", cache.get(key, () -> "something yet different")); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + + Assert.assertEquals(1, cache.size()); + Assert.assertEquals(1, ((EvictableCache<?, ?>) cache).tokensCount()); + Assert.assertEquals(1, cache.asMap().keySet().size()); + Assert.assertEquals(1, cache.asMap().values().size()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testReplace() throws Exception { + Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(); + + int key = 10; + int initialValue = 20; + int replacedValue = 21; + + cache.get(key, () -> initialValue); + + Assert.assertTrue("Should successfully replace value", cache.asMap().replace(key, initialValue, replacedValue)); + Assert.assertEquals("Cache should contain replaced value", replacedValue, cache.getIfPresent(key).intValue()); + + Assert.assertFalse("Should not replace when current value is different", cache.asMap().replace(key, initialValue, replacedValue)); + Assert.assertEquals("Cache should maintain replaced value", replacedValue, cache.getIfPresent(key).intValue()); + + Assert.assertFalse("Should not replace non-existent key", cache.asMap().replace(100000, replacedValue, 22)); + Assert.assertEquals("Cache should only contain original key", ImmutableSet.of(key), cache.asMap().keySet()); + Assert.assertEquals("Original key should maintain its value", replacedValue, cache.getIfPresent(key).intValue()); + + int anotherKey = 13; + int anotherInitialValue = 14; + cache.get(anotherKey, () -> anotherInitialValue); + cache.invalidate(anotherKey); + + Assert.assertFalse("Should not replace after invalidation", cache.asMap().replace(anotherKey, anotherInitialValue, 15)); + Assert.assertEquals("Cache should only contain original key after invalidation", ImmutableSet.of(key), cache.asMap().keySet()); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testDisabledCache() throws Exception { + Exception exception = Assert.assertThrows(IllegalStateException.class, () -> + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .build()); + + Assert.assertEquals("Even when cache is disabled, the loads are synchronized and both load results and failures are shared between threads. " + + "This is rarely desired, thus builder caller is expected to either opt-in into this behavior with shareResultsAndFailuresEvenIfDisabled(), " + + "or choose not to share results (and failures) between concurrent invocations with shareNothingWhenDisabled().", + exception.getMessage()); + + testDisabledCache( + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .shareNothingWhenDisabled() + .build()); + + testDisabledCache( + EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .shareResultsAndFailuresEvenIfDisabled() + .build()); + } + + private void testDisabledCache(Cache<Integer, Integer> cache) throws Exception { + for (int i = 0; i < 10; i++) { + int value = i * 10; + Assert.assertEquals(value, cache.get(i, () -> value).intValue()); + } + + cache.cleanUp(); + Assert.assertEquals(0, cache.size()); + Assert.assertTrue(cache.asMap().keySet().isEmpty()); + Assert.assertTrue(cache.asMap().values().isEmpty()); + } + + private static class CacheStatsAssertions { + public static CacheStatsAssertions assertCacheStats(Cache<?, ?> cache) { + Objects.requireNonNull(cache, "cache is null"); + return assertCacheStats(cache::stats); + } + + public static CacheStatsAssertions assertCacheStats(Supplier<CacheStats> statsSupplier) { + return new CacheStatsAssertions(statsSupplier); + } + + private final Supplier<CacheStats> stats; + + private long loads; + private long hits; + private long misses; + + private CacheStatsAssertions(Supplier<CacheStats> stats) { + this.stats = Objects.requireNonNull(stats, "stats is null"); + } + + public CacheStatsAssertions loads(long value) { + this.loads = value; + return this; + } + + public CacheStatsAssertions hits(long value) { + this.hits = value; + return this; + } + + public CacheStatsAssertions misses(long value) { + this.misses = value; + return this; + } + + public void afterRunning(Runnable runnable) { + try { + calling(() -> { + runnable.run(); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public <T> T calling(Callable<T> callable) + throws Exception { + CacheStats beforeStats = stats.get(); + T value = callable.call(); + CacheStats afterStats = stats.get(); + + long loadDelta = afterStats.loadCount() - beforeStats.loadCount(); + long missesDelta = afterStats.missCount() - beforeStats.missCount(); + long hitsDelta = afterStats.hitCount() - beforeStats.hitCount(); + + Assert.assertEquals(loads, loadDelta); + Assert.assertEquals(hits, hitsDelta); + Assert.assertEquals(misses, missesDelta); + + return value; + } + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoadStats() + throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .recordStats() + .build(); + + Assert.assertEquals(new CacheStats(0, 0, 0, 0, 0, 0), cache.stats()); + + String value = CacheStatsAssertions.assertCacheStats(cache) + .misses(1) + .loads(1) + .calling(() -> cache.get(42, () -> "abc")); + Assert.assertEquals("abc", value); + + value = CacheStatsAssertions.assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(42, () -> "xyz")); + Assert.assertEquals("abc", value); + + // with equal, but not the same key + value = CacheStatsAssertions.assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(newInteger(42), () -> "xyz")); + Assert.assertEquals("abc", value); + } + + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testLoadFailure() + throws Exception { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, TimeUnit.DAYS) + .shareResultsAndFailuresEvenIfDisabled() + .build(); + int key = 10; + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + Exchanger<Thread> exchanger = new Exchanger<>(); + CountDownLatch secondUnblocked = new CountDownLatch(1); + + List<Future<String>> futures = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + boolean first = i == 0; + futures.add(executor.submit(() -> { + if (!first) { + // Wait for the first one to start the call + exchanger.exchange(Thread.currentThread(), 10, TimeUnit.SECONDS); + // Prove that we are back in RUNNABLE state. + secondUnblocked.countDown(); + } + return cache.get(key, () -> { + if (first) { + Thread secondThread = exchanger.exchange(null, 10, TimeUnit.SECONDS); + Assert.assertTrue(secondUnblocked.await(10, TimeUnit.SECONDS)); + // Wait for the second one to hang inside the cache.get call. + long start = System.nanoTime(); + while (!Thread.currentThread().isInterrupted()) { + try { + Assert.assertNotEquals(Thread.State.RUNNABLE, secondThread.getState()); + break; + } catch (Exception | AssertionError e) { + if (System.nanoTime() - start > TimeUnit.SECONDS.toNanos(30)) { + throw e; + } + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + throw new RuntimeException("first attempt is poised to fail"); + } + return "success"; + }); + })); + } + + List<String> results = new ArrayList<>(); + for (Future<String> future : futures) { + try { + results.add(future.get()); + } catch (ExecutionException e) { + results.add(e.getCause().toString()); + } + } + + // Note: if this starts to fail, that suggests that Guava implementation changed and NoopCache may be redundant now. + String expectedError = "com.google.common.util.concurrent.UncheckedExecutionException: " + + "java.lang.RuntimeException: first attempt is poised to fail"; + Assert.assertEquals(2, results.size()); + Assert.assertEquals(expectedError, results.get(0)); + Assert.assertEquals(expectedError, results.get(1)); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + @SuppressModernizer + private static Integer newInteger(int value) { + Integer integer = value; + @SuppressWarnings({"UnnecessaryBoxing", "BoxedPrimitiveConstructor", "CachedNumberConstructorCall", "removal"}) + Integer newInteger = new Integer(value); + Assert.assertNotSame(integer, newInteger); + return newInteger; + } + + /** + * Test that the loader is invoked only once for concurrent invocations of {{@link LoadingCache#get(Object, Callable)} with equal keys. + * This is a behavior of Guava Cache as well. While this is necessarily desirable behavior (see + * <a href="https://github.com/trinodb/trino/issues/11067">https://github.com/trinodb/trino/issues/11067</a>), + * the test exists primarily to document current state and support discussion, should the current state change. + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testConcurrentGetWithCallableShareLoad() + throws Exception { + AtomicInteger loads = new AtomicInteger(); + AtomicInteger concurrentInvocations = new AtomicInteger(); + + Cache<Integer, Integer> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + + int threads = 2; + int invocationsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + CyclicBarrier barrier = new CyclicBarrier(threads); + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(executor.submit(() -> { + for (int invocation = 0; invocation < invocationsPerThread; invocation++) { + int key = invocation; + barrier.await(10, TimeUnit.SECONDS); + int value = cache.get(key, () -> { + loads.incrementAndGet(); + int invocations = concurrentInvocations.incrementAndGet(); + Preconditions.checkState(invocations == 1, "There should be no concurrent invocations, cache should do load sharing when get() invoked for same key"); + Thread.sleep(1); + concurrentInvocations.decrementAndGet(); + return -key; + }); + Assert.assertEquals(-invocation, value); + } + return null; + })); + } + + for (Future<?> future : futures) { + future.get(10, TimeUnit.SECONDS); + } + Assert.assertTrue( + String.format( + "loads (%d) should be between %d and %d", + loads.intValue(), + invocationsPerThread, + threads * invocationsPerThread - 1), + loads.intValue() >= invocationsPerThread && loads.intValue() <= threads * invocationsPerThread - 1); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + + enum Invalidation { + INVALIDATE_KEY, + INVALIDATE_PREDEFINED_KEYS, + INVALIDATE_SELECTED_KEYS, + INVALIDATE_ALL, + /**/; + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testInvalidateOngoingLoad() + throws Exception { + for (Invalidation invalidation : Invalidation.values()) { + Cache<Integer, String> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Integer key = 42; + + CountDownLatch loadOngoing = new CountDownLatch(1); + CountDownLatch invalidated = new CountDownLatch(1); + CountDownLatch getReturned = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + // thread A + Future<String> threadA = executor.submit(() -> { + String value = cache.get(key, () -> { + loadOngoing.countDown(); // 1 + Assert.assertTrue(invalidated.await(10, TimeUnit.SECONDS)); // 2 + return "stale value"; + }); + getReturned.countDown(); // 3 + return value; + }); + + // thread B + Future<String> threadB = executor.submit(() -> { + Assert.assertTrue(loadOngoing.await(10, TimeUnit.SECONDS)); // 1 + + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(ImmutableList.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set<Integer> keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(ImmutableSet.toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + default: + throw new IllegalArgumentException(); + } + + invalidated.countDown(); // 2 + // Cache may persist value after loader returned, but before `cache.get(...)` returned. Ensure the latter completed. + Assert.assertTrue(getReturned.await(10, TimeUnit.SECONDS)); // 3 + + return cache.get(key, () -> "fresh value"); + }); + + Assert.assertEquals("stale value", threadA.get()); + Assert.assertEquals("fresh value", threadB.get()); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test + @Timeout(TEST_TIMEOUT_SECONDS) + public void testInvalidateAndLoadConcurrently() + throws Exception { + for (Invalidation invalidation : Invalidation.values()) { + int[] primes = {2, 3, 5, 7}; + AtomicLong remoteState = new AtomicLong(1); + + Cache<Integer, Long> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); + Integer key = 42; + int threads = 4; + + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + List<Future<Void>> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + // prime the cache + Assert.assertEquals(1L, (long) cache.get(key, remoteState::get)); + int prime = primes[threadNumber]; + + barrier.await(10, TimeUnit.SECONDS); + + // modify underlying state + remoteState.updateAndGet(current -> current * prime); + + // invalidate + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(ImmutableList.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set<Integer> keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(ImmutableSet.toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + default: + throw new IllegalArgumentException(); + } + + // read through cache + long current = cache.get(key, remoteState::get); + if (current % prime != 0) { + throw new AssertionError(String.format("The value read through cache (%s) in thread (%s) is not divisible by (%s)", current, threadNumber, prime)); + } + + return (Void) null; + })) + .collect(ImmutableList.toImmutableList()); + + for (Future<?> future : futures) { + try { + future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Failed to get future value", e); + } + } + + Assert.assertEquals(2 * 3 * 5 * 7, remoteState.get()); + Assert.assertEquals(remoteState.get(), (long) cache.get(key, remoteState::get)); + } finally { + executor.shutdownNow(); + Assert.assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + } + } + + @Test + public void testPutOnEmptyCacheImplementation() { + for (DisabledCacheImplementation disabledCacheImplementation : DisabledCacheImplementation.values()) { + Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .disabledCacheImplementation(disabledCacheImplementation) + .build(); + Map<Object, Object> cacheMap = cache.asMap(); + + int key = 0; + int value = 1; + Assert.assertNull(cacheMap.put(key, value)); + Assert.assertNull(cacheMap.put(key, value)); + Assert.assertNull(cacheMap.putIfAbsent(key, value)); + Assert.assertNull(cacheMap.putIfAbsent(key, value)); + } + } + + @Test + public void testPutOnNonEmptyCacheImplementation() { + Cache<Object, Object> cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(); + Map<Object, Object> cacheMap = cache.asMap(); + + int key = 0; + int value = 1; + + Exception putException = Assert.assertThrows("put operation should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> cacheMap.put(key, value)); + Assert.assertEquals( + "The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead.", + putException.getMessage()); + + Exception putIfAbsentException = Assert.assertThrows("putIfAbsent operation should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> cacheMap.putIfAbsent(key, value)); + Assert.assertEquals( + "The operation is not supported, as in inherently races with cache invalidation", + putIfAbsentException.getMessage()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java new file mode 100644 index 00000000000..d6c6ed4b93b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/TransactionScopeCachingDirectoryListerTest.java @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java +// and modified by Doris + +package org.apache.doris.fs; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.fs.remote.RemoteFile; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +// some tests may invalidate the whole cache affecting therefore other concurrent tests +@Execution(ExecutionMode.SAME_THREAD) +public class TransactionScopeCachingDirectoryListerTest { + @Test + public void testConcurrentDirectoryListing(@Mocked TableIf table) + throws FileSystemIOException { + RemoteFile firstFile = new RemoteFile("file:/x/x", true, 1, 1); + RemoteFile secondFile = new RemoteFile("file:/x/y", true, 1, 1); + RemoteFile thirdFile = new RemoteFile("file:/y/z", true, 1, 1); + + String path1 = "file:/x"; + String path2 = "file:/y"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister( + ImmutableMap.of( + path1, ImmutableList.of(firstFile, secondFile), + path2, ImmutableList.of(thirdFile))); + + TransactionScopeCachingDirectoryLister cachingLister = (TransactionScopeCachingDirectoryLister) + new TransactionScopeCachingDirectoryListerFactory(2).get(countingLister); + + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + + Assert.assertEquals(1, countingLister.getListCount()); + + // listing path2 again shouldn't increase listing count + Assert.assertTrue(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + Assert.assertEquals(1, countingLister.getListCount()); + + + // start listing path1 concurrently + RemoteIterator<RemoteFile> path1FilesA = cachingLister.listFiles(null, true, table, path1); + RemoteIterator<RemoteFile> path1FilesB = cachingLister.listFiles(null, true, table, path1); + Assert.assertEquals(2, countingLister.getListCount()); + + // list path1 files using both iterators concurrently + Assert.assertEquals(firstFile, path1FilesA.next()); + Assert.assertEquals(firstFile, path1FilesB.next()); + Assert.assertEquals(secondFile, path1FilesB.next()); + Assert.assertEquals(secondFile, path1FilesA.next()); + Assert.assertFalse(path1FilesA.hasNext()); + Assert.assertFalse(path1FilesB.hasNext()); + Assert.assertEquals(2, countingLister.getListCount()); + + Assert.assertFalse(cachingLister.isCached(path2)); + assertFiles(cachingLister.listFiles(null, true, table, path2), ImmutableList.of(thirdFile)); + Assert.assertEquals(3, countingLister.getListCount()); + } + + @Test + public void testConcurrentDirectoryListingException(@Mocked TableIf table) + throws FileSystemIOException { + RemoteFile file = new RemoteFile("file:/x/x", true, 1, 1); + + String path = "file:/x"; + + CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file))); + DirectoryLister cachingLister = new TransactionScopeCachingDirectoryListerFactory(1).get(countingLister); + + // start listing path concurrently + countingLister.setThrowException(true); + RemoteIterator<RemoteFile> filesA = cachingLister.listFiles(null, true, table, path); + RemoteIterator<RemoteFile> filesB = cachingLister.listFiles(null, true, table, path); + Assert.assertEquals(1, countingLister.getListCount()); + + // listing should throw an exception + Assert.assertThrows(FileSystemIOException.class, () -> filesA.hasNext()); + + + // listing again should succeed + countingLister.setThrowException(false); + assertFiles(cachingLister.listFiles(null, true, table, path), ImmutableList.of(file)); + Assert.assertEquals(2, countingLister.getListCount()); + + // listing using second concurrently initialized DirectoryLister should fail + Assert.assertThrows(FileSystemIOException.class, () -> filesB.hasNext()); + + } + + private void assertFiles(RemoteIterator<RemoteFile> iterator, List<RemoteFile> expectedFiles) + throws FileSystemIOException { + ImmutableList.Builder<RemoteFile> actualFiles = ImmutableList.builder(); + while (iterator.hasNext()) { + actualFiles.add(iterator.next()); + } + Assert.assertEquals(expectedFiles, actualFiles.build()); + } + + private static class CountingDirectoryLister + implements DirectoryLister { + private final Map<String, List<RemoteFile>> fileStatuses; + private int listCount; + private boolean throwException; + + public CountingDirectoryLister(Map<String, List<RemoteFile>> fileStatuses) { + this.fileStatuses = Objects.requireNonNull(fileStatuses, "fileStatuses is null"); + } + + @Override + public RemoteIterator<RemoteFile> listFiles(FileSystem fs, boolean recursive, TableIf table, String location) + throws FileSystemIOException { + // No specific recursive files-only listing implementation + listCount++; + return throwingRemoteIterator(Objects.requireNonNull(fileStatuses.get(location)), throwException); + } + + public void setThrowException(boolean throwException) { + this.throwException = throwException; + } + + public int getListCount() { + return listCount; + } + } + + static RemoteIterator<RemoteFile> throwingRemoteIterator(List<RemoteFile> files, boolean throwException) { + return new RemoteIterator<RemoteFile>() { + private final Iterator<RemoteFile> iterator = ImmutableList.copyOf(files).iterator(); + + @Override + public boolean hasNext() + throws FileSystemIOException { + if (throwException) { + throw new FileSystemIOException("File system io exception."); + } + return iterator.hasNext(); + } + + @Override + public RemoteFile next() { + return iterator.next(); + } + }; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org