Copilot commented on code in PR #61111: URL: https://github.com/apache/doris/pull/61111#discussion_r2894644963
########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.catalog.Partition; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + + // partitionId -> inflight RPC future (for request coalescing) + private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); + + @VisibleForTesting + public PointQueryVersionCache() { + } + + public static PointQueryVersionCache getInstance() { + if (instance == null) { + synchronized (PointQueryVersionCache.class) { + if (instance == null) { + instance = new PointQueryVersionCache(); + } + } + } + return instance; + } + + @VisibleForTesting + public static void setInstance(PointQueryVersionCache cache) { + instance = cache; + } + + /** + * Get the visible version for a partition, using TTL-based caching and request coalescing. + * + * @param partition the cloud partition to get version for + * @param ttlMs TTL in milliseconds; 0 or negative disables caching + * @return the visible version + * @throws RpcException if the MetaService RPC fails + */ + public long getVersion(CloudPartition partition, long ttlMs) throws RpcException { + long partitionId = partition.getId(); + + // If cache is disabled, fetch directly + if (ttlMs <= 0) { + return fetchVersionFromMs(partition); + } + + // Check cache first + VersionEntry entry = cache.get(partitionId); + if (entry != null && !entry.isExpired(ttlMs)) { + if (LOG.isDebugEnabled()) { + LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version); + } + return entry.version; + } + + // Cache miss or expired: use request coalescing + return getVersionWithCoalescing(partition, partitionId, ttlMs); + } + + private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs) + throws RpcException { + // Try to become the leader request for this partition + CompletableFuture<Long> myFuture = new CompletableFuture<>(); + CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture); + + if (existingFuture != null) { + // Another request is already in flight — wait for its result + if (LOG.isDebugEnabled()) { + LOG.debug("point query version coalescing, waiting for inflight request, partition={}", + partitionId); + } + try { + return existingFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RpcException("get version", "interrupted while waiting for coalesced request"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RpcException) { + throw (RpcException) cause; + } + throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage()); Review Comment: When converting ExecutionException into RpcException, the original cause is not preserved, which makes debugging harder. Prefer using RpcException(host, message, Exception) (or otherwise attaching the cause) so the stack trace isn’t lost. ########## fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/PointQueryExecutorPerfTest.java: ########## @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.PointQueryExecutor; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.ShortCircuitQueryContext; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import mockit.Mock; +import mockit.MockUp; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * End-to-end performance test for PointQueryExecutor.updateCloudPartitionVersions(). + * + * Mocks the MetaService RPC layer (VersionHelper.getVersionFromMeta) with configurable + * latency, then exercises the actual updateCloudPartitionVersions() code path under + * high concurrency. Compares RPC counts with cache disabled vs enabled. + */ +public class PointQueryExecutorPerfTest { + private static final long PARTITION_ID = 10001; + private static final long TABLE_ID = 1001; + private static final long DB_ID = 100; + + // Counter for MetaService RPC calls + private static final AtomicInteger msRpcCount = new AtomicInteger(0); + private static volatile long msRpcLatencyMs = 5; + private static volatile long msVersionToReturn = 42L; + + private CloudPartition partition; + + @Before + public void setUp() throws Exception { + msRpcCount.set(0); + partition = CloudPartitionTest.createPartition(PARTITION_ID, DB_ID, TABLE_ID); + + // -- Mock VersionHelper.getVersionFromMeta (the MetaService RPC) -- + new MockUp<VersionHelper>(VersionHelper.class) { + @Mock + public Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req) { + msRpcCount.incrementAndGet(); + if (msRpcLatencyMs > 0) { + try { + Thread.sleep(msRpcLatencyMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return Cloud.GetVersionResponse.newBuilder() + .setStatus(Cloud.MetaServiceResponseStatus.newBuilder() + .setCode(Cloud.MetaServiceCode.OK).build()) + .setVersion(msVersionToReturn) + .build(); + } + }; + + // -- Create Objenesis OlapTable and mock its getPartition -- + final OlapTable mockTable = new org.objenesis.ObjenesisStd().newInstance(OlapTable.class); + new MockUp<OlapTable>(OlapTable.class) { + @Mock + public Partition getPartition(long partitionId) { + return partition; + } + }; + + // -- Mock OlapScanNode -- + new MockUp<OlapScanNode>(OlapScanNode.class) { + @Mock + public Collection<Long> getSelectedPartitionIds() { + return ImmutableList.of(PARTITION_ID); + } + @Mock + public OlapTable getOlapTable() { + return mockTable; + } + }; + + // -- Mock ShortCircuitQueryContext.sanitize -- + new MockUp<ShortCircuitQueryContext>(ShortCircuitQueryContext.class) { + @Mock + public void sanitize() { + // no-op + } + }; + + PointQueryVersionCache.getInstance().clear(); + } + + @After + public void tearDown() { + PointQueryVersionCache.getInstance().clear(); + } + + /** + * Create a PointQueryExecutor and directly invoke updateCloudPartitionVersions(). + * This is the exact code path that was optimized. + */ + private void invokeUpdateCloudPartitionVersions(long cacheTtlMs) throws Exception { + // Set up ConnectContext for this thread + SessionVariable sv = new SessionVariable(); + sv.enableSnapshotPointQuery = true; + sv.pointQueryVersionCacheTtlMs = cacheTtlMs; + ConnectContext ctx = new ConnectContext(); + ctx.setSessionVariable(sv); + ctx.setThreadLocalInfo(); + + // Build minimal PointQueryExecutor + org.objenesis.ObjenesisStd objenesis = new org.objenesis.ObjenesisStd(); + ShortCircuitQueryContext queryCtx = objenesis.newInstance(ShortCircuitQueryContext.class); + OlapScanNode scanNode = objenesis.newInstance(OlapScanNode.class); + OlapTable table = objenesis.newInstance(OlapTable.class); + + setField(queryCtx, "scanNode", scanNode); + setField(queryCtx, "tbl", table); + setField(queryCtx, "serializedDescTable", ByteString.copyFrom(new byte[]{0})); + setField(queryCtx, "serializedOutputExpr", ByteString.copyFrom(new byte[]{0})); + setField(queryCtx, "serializedQueryOptions", ByteString.copyFrom(new byte[]{0})); + setField(queryCtx, "cacheID", java.util.UUID.randomUUID()); + setField(queryCtx, "schemaVersion", 0); + + PointQueryExecutor executor = new PointQueryExecutor(queryCtx, 1024 * 1024); + + // Directly invoke updateCloudPartitionVersions() via reflection + Method method = PointQueryExecutor.class.getDeclaredMethod("updateCloudPartitionVersions"); + method.setAccessible(true); + method.invoke(executor); + } + + private static void setField(Object obj, String fieldName, Object value) throws Exception { + Class<?> clz = obj.getClass(); + while (clz != null) { + try { + Field f = clz.getDeclaredField(fieldName); + f.setAccessible(true); + f.set(obj, value); + return; + } catch (NoSuchFieldException e) { + clz = clz.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } + + // ======================== Main Test ======================== + + @Test + public void testHighConcurrencyCacheEffectiveness() throws Exception { + int numThreads = 100; + msRpcLatencyMs = 5; // 5ms per MetaService RPC + + // ========== WITHOUT CACHE (ttl=0) ========== + msRpcCount.set(0); + PointQueryVersionCache.getInstance().clear(); + long noCacheWallMs = runConcurrent(numThreads, 0); + int noCacheRpcs = msRpcCount.get(); + + // ========== WITH CACHE (ttl=500ms) ========== + msRpcCount.set(0); + PointQueryVersionCache.getInstance().clear(); + long cachedWallMs = runConcurrent(numThreads, 500); + int cachedRpcs = msRpcCount.get(); + + // ========== Print Results ========== + double reductionPct = (1.0 - (double) cachedRpcs / Math.max(1, noCacheRpcs)) * 100; + double reductionFactor = (double) noCacheRpcs / Math.max(1, cachedRpcs); + + System.out.println(); + System.out.println("╔══════════════════════════════════════════════════════════════╗"); + System.out.println("║ PointQueryExecutor.updateCloudPartitionVersions() Perf Test ║"); + System.out.println("╠══════════════════════════════════════════════════════════════╣"); + System.out.printf("║ Concurrent threads: %-34d║%n", numThreads); + System.out.printf("║ MetaService RPC latency: %-34s║%n", msRpcLatencyMs + "ms"); + System.out.println("╠══════════════════════════════════════════════════════════════╣"); + System.out.printf("║ %-28s %-14s %-14s ║%n", "Metric", "No Cache", "With Cache"); + System.out.println("║ ──────────────────────── ────────────── ────────────── ║"); + System.out.printf("║ %-28s %-14d %-14d ║%n", "MetaService RPCs", noCacheRpcs, cachedRpcs); + System.out.printf("║ %-28s %-14s %-14s ║%n", "Wall time", + noCacheWallMs + "ms", cachedWallMs + "ms"); + System.out.printf("║ %-28s %-14s %-14s ║%n", "RPC reduction", "-", + String.format("%.1f%% (%.0fx)", reductionPct, reductionFactor)); + System.out.println("╚══════════════════════════════════════════════════════════════╝"); + System.out.println(); + + // ========== Assertions ========== + Assertions.assertTrue(noCacheRpcs >= numThreads, + "Without cache, expected >= " + numThreads + " MS RPCs, got " + noCacheRpcs); + Assertions.assertTrue(cachedRpcs <= 5, + "With cache, expected <= 5 MS RPCs, got " + cachedRpcs); + Assertions.assertTrue(reductionFactor >= 20, Review Comment: The assertions here use strict absolute thresholds (e.g., cachedRpcs <= 5 and >= 20x reduction) under heavy concurrency, which can be flaky on contended/slow CI hosts. Consider asserting a looser relative improvement (e.g., cachedRpcs is significantly less than noCacheRpcs) and/or adding timeouts to avoid hangs. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.catalog.Partition; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + + // partitionId -> inflight RPC future (for request coalescing) + private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); + Review Comment: Expired entries are never evicted from this cache: VersionEntry expiration is checked on read, but stale entries remain in the map indefinitely, so memory usage can grow with the number of partitions ever point-queried. Consider a bounded/expiring cache (e.g., Caffeine with maximumSize + expireAfterWrite) or explicit cleanup/removal. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.catalog.Partition; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + + // partitionId -> inflight RPC future (for request coalescing) + private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); + + @VisibleForTesting + public PointQueryVersionCache() { + } + + public static PointQueryVersionCache getInstance() { + if (instance == null) { + synchronized (PointQueryVersionCache.class) { + if (instance == null) { + instance = new PointQueryVersionCache(); + } + } + } + return instance; + } + + @VisibleForTesting + public static void setInstance(PointQueryVersionCache cache) { + instance = cache; + } + + /** + * Get the visible version for a partition, using TTL-based caching and request coalescing. + * + * @param partition the cloud partition to get version for + * @param ttlMs TTL in milliseconds; 0 or negative disables caching + * @return the visible version + * @throws RpcException if the MetaService RPC fails + */ + public long getVersion(CloudPartition partition, long ttlMs) throws RpcException { + long partitionId = partition.getId(); + + // If cache is disabled, fetch directly + if (ttlMs <= 0) { + return fetchVersionFromMs(partition); + } + + // Check cache first + VersionEntry entry = cache.get(partitionId); + if (entry != null && !entry.isExpired(ttlMs)) { + if (LOG.isDebugEnabled()) { + LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version); + } + return entry.version; + } + + // Cache miss or expired: use request coalescing + return getVersionWithCoalescing(partition, partitionId, ttlMs); + } + + private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs) + throws RpcException { + // Try to become the leader request for this partition + CompletableFuture<Long> myFuture = new CompletableFuture<>(); + CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture); + + if (existingFuture != null) { + // Another request is already in flight — wait for its result + if (LOG.isDebugEnabled()) { + LOG.debug("point query version coalescing, waiting for inflight request, partition={}", + partitionId); + } + try { + return existingFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RpcException("get version", "interrupted while waiting for coalesced request"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RpcException) { + throw (RpcException) cause; + } + throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage()); + } + } + + // We are the leader — fetch version from MetaService + try { + long version = fetchVersionFromMs(partition); + // Update cache + cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis())); + // Also update the partition's cached version + partition.setCachedVisibleVersion(version, System.currentTimeMillis()); + // Complete the future so waiting requests get the result Review Comment: Passing System.currentTimeMillis() as the partition visibleVersionTime changes the meaning of Partition.visibleVersionTime (CloudPartition normally uses MetaService’s versionUpdateTimeMs). This can make visibleVersionTime inaccurate. Either extract and propagate versionUpdateTimeMs from the MetaService response, or avoid calling setCachedVisibleVersion here. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.catalog.Partition; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> Review Comment: The class Javadoc claims the default TTL is 500ms, but the new session variable defaults to 0 (disabled). Please update the Javadoc to reflect the actual default/behavior to avoid misleading operators. ```suggestion * <li><b>Short TTL caching</b>: When enabled, partition versions are cached for a * configurable duration ({@code point_query_version_cache_ttl_ms}). The default * value is {@code 0}, which disables the cache; setting a positive value (for * example, {@code 500} milliseconds) allows concurrent queries within the TTL * window to reuse the cached version.</li> ``` ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.catalog.Partition; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + + // partitionId -> inflight RPC future (for request coalescing) + private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); + + @VisibleForTesting + public PointQueryVersionCache() { + } + + public static PointQueryVersionCache getInstance() { + if (instance == null) { + synchronized (PointQueryVersionCache.class) { + if (instance == null) { + instance = new PointQueryVersionCache(); + } + } + } + return instance; + } + + @VisibleForTesting + public static void setInstance(PointQueryVersionCache cache) { + instance = cache; + } + + /** + * Get the visible version for a partition, using TTL-based caching and request coalescing. + * + * @param partition the cloud partition to get version for + * @param ttlMs TTL in milliseconds; 0 or negative disables caching + * @return the visible version + * @throws RpcException if the MetaService RPC fails + */ + public long getVersion(CloudPartition partition, long ttlMs) throws RpcException { + long partitionId = partition.getId(); + + // If cache is disabled, fetch directly + if (ttlMs <= 0) { + return fetchVersionFromMs(partition); + } + + // Check cache first + VersionEntry entry = cache.get(partitionId); + if (entry != null && !entry.isExpired(ttlMs)) { + if (LOG.isDebugEnabled()) { + LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version); + } + return entry.version; + } + + // Cache miss or expired: use request coalescing + return getVersionWithCoalescing(partition, partitionId, ttlMs); + } + + private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs) + throws RpcException { + // Try to become the leader request for this partition + CompletableFuture<Long> myFuture = new CompletableFuture<>(); + CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture); + + if (existingFuture != null) { + // Another request is already in flight — wait for its result + if (LOG.isDebugEnabled()) { + LOG.debug("point query version coalescing, waiting for inflight request, partition={}", + partitionId); + } + try { + return existingFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RpcException("get version", "interrupted while waiting for coalesced request"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RpcException) { + throw (RpcException) cause; + } + throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage()); + } + } + + // We are the leader — fetch version from MetaService + try { + long version = fetchVersionFromMs(partition); + // Update cache + cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis())); + // Also update the partition's cached version + partition.setCachedVisibleVersion(version, System.currentTimeMillis()); + // Complete the future so waiting requests get the result + myFuture.complete(version); + if (LOG.isDebugEnabled()) { + LOG.debug("point query version fetched from MS, partition={}, version={}", + partitionId, version); + } + return version; + } catch (Exception e) { + // Complete exceptionally so waiting requests also get the error + myFuture.completeExceptionally(e); + if (e instanceof RpcException) { + throw (RpcException) e; + } + throw new RpcException("get version", e.getMessage()); Review Comment: Wrapping non-RpcException failures into a new RpcException drops the original exception as the cause, which makes debugging harder. Consider using the RpcException(host, message, Exception) constructor (or otherwise attaching the cause) so stack traces aren’t lost. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.catalog.Partition; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + + // partitionId -> inflight RPC future (for request coalescing) + private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); + + @VisibleForTesting + public PointQueryVersionCache() { + } + + public static PointQueryVersionCache getInstance() { + if (instance == null) { + synchronized (PointQueryVersionCache.class) { + if (instance == null) { + instance = new PointQueryVersionCache(); + } + } + } + return instance; + } + + @VisibleForTesting + public static void setInstance(PointQueryVersionCache cache) { + instance = cache; + } + + /** + * Get the visible version for a partition, using TTL-based caching and request coalescing. + * + * @param partition the cloud partition to get version for + * @param ttlMs TTL in milliseconds; 0 or negative disables caching + * @return the visible version + * @throws RpcException if the MetaService RPC fails + */ + public long getVersion(CloudPartition partition, long ttlMs) throws RpcException { + long partitionId = partition.getId(); + + // If cache is disabled, fetch directly + if (ttlMs <= 0) { + return fetchVersionFromMs(partition); + } + + // Check cache first + VersionEntry entry = cache.get(partitionId); + if (entry != null && !entry.isExpired(ttlMs)) { + if (LOG.isDebugEnabled()) { + LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version); + } + return entry.version; + } + + // Cache miss or expired: use request coalescing + return getVersionWithCoalescing(partition, partitionId, ttlMs); + } + + private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs) + throws RpcException { + // Try to become the leader request for this partition + CompletableFuture<Long> myFuture = new CompletableFuture<>(); + CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture); + + if (existingFuture != null) { Review Comment: A late-arriving thread can miss the cache, then find no inflight future (after the previous leader removed it) and become the new leader without re-checking the now-populated cache, causing an unnecessary extra MetaService RPC. Consider re-checking the cache after winning leadership (before calling fetchVersionFromMs) or using an atomic computeIfAbsent-based future to avoid this race. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
