github-actions[bot] commented on code in PR #61111:
URL: https://github.com/apache/doris/pull/61111#discussion_r2896202477


##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.VersionHelper;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A request-coalescing version cache for point queries in cloud mode.
+ *
+ * <p>When {@code enable_snapshot_point_query=true}, every point query needs 
to fetch
+ * the partition's visible version from MetaService. Under high concurrency, 
this causes
+ * N RPCs for N concurrent point queries on the same partition.</p>
+ *
+ * <p>This cache optimizes the version fetching by:
+ * <ul>
+ *   <li><b>Short TTL caching</b>: Partition versions are cached for a 
configurable duration
+ *       ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the 
TTL window,
+ *       concurrent queries reuse the cached version.</li>
+ *   <li><b>Request coalescing</b>: When the cache expires, only the first 
request issues
+ *       the MetaService RPC. Concurrent requests for the same partition wait 
on the inflight
+ *       result via a {@link CompletableFuture}.</li>
+ * </ul>
+ * </p>
+ */
+public class PointQueryVersionCache {
+    private static final Logger LOG = 
LogManager.getLogger(PointQueryVersionCache.class);
+
+    private static volatile PointQueryVersionCache instance;
+
+    /**
+     * Cache entry holding the version and the timestamp when it was cached.
+     */
+    static class VersionEntry {
+        final long version;
+        final long cachedTimeMs;
+
+        VersionEntry(long version, long cachedTimeMs) {
+            this.version = version;
+            this.cachedTimeMs = cachedTimeMs;
+        }
+
+        boolean isExpired(long ttlMs) {
+            if (ttlMs <= 0) {
+                return true;
+            }
+            return System.currentTimeMillis() - cachedTimeMs > ttlMs;
+        }
+    }
+
+    // partitionId -> cached VersionEntry
+    private final ConcurrentHashMap<Long, VersionEntry> cache = new 
ConcurrentHashMap<>();
+

Review Comment:
   **[High] Unbounded cache — potential memory leak**
   
   This `ConcurrentHashMap` has no size limit and no eviction mechanism. 
Expired entries are never removed — they stay in the map forever (only 
`isExpired()` prevents serving them). Every partition ever queried adds a 
permanent entry, and dropped partitions are never cleaned up.
   
   Every other FE cache in the codebase uses Caffeine or Guava `CacheBuilder` 
with `maximumSize` and/or `expireAfterWrite`. The `EvictableCacheBuilder` even 
throws an exception for unbounded caches (`"Unbounded cache is not supported"`).
   
   In a cluster with thousands of tables and many partitions, this will grow 
monotonically without bound.
   
   **Suggestion:** Replace with 
`Caffeine.newBuilder().maximumSize(N).expireAfterWrite(ttl, 
TimeUnit.MILLISECONDS).build()`. This also eliminates the need for the manual 
`VersionEntry.isExpired()` check. The request coalescing via `inflightRequests` 
ConcurrentHashMap is fine since entries are self-cleaning (removed in `finally` 
block).



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.VersionHelper;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A request-coalescing version cache for point queries in cloud mode.
+ *
+ * <p>When {@code enable_snapshot_point_query=true}, every point query needs 
to fetch
+ * the partition's visible version from MetaService. Under high concurrency, 
this causes
+ * N RPCs for N concurrent point queries on the same partition.</p>
+ *
+ * <p>This cache optimizes the version fetching by:
+ * <ul>
+ *   <li><b>Short TTL caching</b>: Partition versions are cached for a 
configurable duration
+ *       ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the 
TTL window,
+ *       concurrent queries reuse the cached version.</li>

Review Comment:
   **[Low] Javadoc inconsistency:** The comment says `default 500ms` but the 
actual default value of `pointQueryVersionCacheTtlMs` is `0` (disabled). Please 
update the Javadoc to match.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.VersionHelper;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A request-coalescing version cache for point queries in cloud mode.
+ *
+ * <p>When {@code enable_snapshot_point_query=true}, every point query needs 
to fetch
+ * the partition's visible version from MetaService. Under high concurrency, 
this causes
+ * N RPCs for N concurrent point queries on the same partition.</p>
+ *
+ * <p>This cache optimizes the version fetching by:
+ * <ul>
+ *   <li><b>Short TTL caching</b>: Partition versions are cached for a 
configurable duration
+ *       ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the 
TTL window,
+ *       concurrent queries reuse the cached version.</li>
+ *   <li><b>Request coalescing</b>: When the cache expires, only the first 
request issues
+ *       the MetaService RPC. Concurrent requests for the same partition wait 
on the inflight
+ *       result via a {@link CompletableFuture}.</li>
+ * </ul>
+ * </p>
+ */
+public class PointQueryVersionCache {
+    private static final Logger LOG = 
LogManager.getLogger(PointQueryVersionCache.class);
+
+    private static volatile PointQueryVersionCache instance;
+
+    /**
+     * Cache entry holding the version and the timestamp when it was cached.
+     */
+    static class VersionEntry {
+        final long version;
+        final long cachedTimeMs;
+
+        VersionEntry(long version, long cachedTimeMs) {
+            this.version = version;
+            this.cachedTimeMs = cachedTimeMs;
+        }
+
+        boolean isExpired(long ttlMs) {
+            if (ttlMs <= 0) {
+                return true;
+            }
+            return System.currentTimeMillis() - cachedTimeMs > ttlMs;
+        }
+    }
+
+    // partitionId -> cached VersionEntry
+    private final ConcurrentHashMap<Long, VersionEntry> cache = new 
ConcurrentHashMap<>();
+
+    // partitionId -> inflight RPC future (for request coalescing)
+    private final ConcurrentHashMap<Long, CompletableFuture<Long>> 
inflightRequests = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    public PointQueryVersionCache() {
+    }
+
+    public static PointQueryVersionCache getInstance() {
+        if (instance == null) {
+            synchronized (PointQueryVersionCache.class) {
+                if (instance == null) {
+                    instance = new PointQueryVersionCache();
+                }
+            }
+        }
+        return instance;
+    }
+
+    @VisibleForTesting
+    public static void setInstance(PointQueryVersionCache cache) {
+        instance = cache;
+    }
+
+    /**
+     * Get the visible version for a partition, using TTL-based caching and 
request coalescing.
+     *
+     * @param partition  the cloud partition to get version for
+     * @param ttlMs      TTL in milliseconds; 0 or negative disables caching
+     * @return the visible version
+     * @throws RpcException if the MetaService RPC fails
+     */
+    public long getVersion(CloudPartition partition, long ttlMs) throws 
RpcException {
+        long partitionId = partition.getId();
+
+        // If cache is disabled, fetch directly
+        if (ttlMs <= 0) {
+            return fetchVersionFromMs(partition);
+        }
+
+        // Check cache first
+        VersionEntry entry = cache.get(partitionId);
+        if (entry != null && !entry.isExpired(ttlMs)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("point query version cache hit, partition={}, 
version={}", partitionId, entry.version);
+            }
+            return entry.version;
+        }
+
+        // Cache miss or expired: use request coalescing
+        return getVersionWithCoalescing(partition, partitionId, ttlMs);
+    }
+
+    private long getVersionWithCoalescing(CloudPartition partition, long 
partitionId, long ttlMs)
+            throws RpcException {
+        // Try to become the leader request for this partition
+        CompletableFuture<Long> myFuture = new CompletableFuture<>();
+        CompletableFuture<Long> existingFuture = 
inflightRequests.putIfAbsent(partitionId, myFuture);
+
+        if (existingFuture != null) {
+            // Another request is already in flight — wait for its result
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("point query version coalescing, waiting for 
inflight request, partition={}",
+                        partitionId);
+            }
+            try {
+                return existingFuture.get();
+            } catch (InterruptedException e) {

Review Comment:
   **[Medium] No timeout on `CompletableFuture.get()` — risk of indefinite 
blocking**
   
   `existingFuture.get()` blocks without timeout. If the leader thread is 
unexpectedly killed (e.g., OOM, `Thread.stop()`, or other abnormal termination 
before completing the future), all follower threads will block forever.
   
   While `fetchVersionFromMs()` has its own retry/timeout logic inside 
`VersionHelper`, the overall wait here could be very long (retry_times × 
timeout_per_attempt), and in pathological cases, infinite.
   
   **Suggestion:** Use `existingFuture.get(timeout, TimeUnit.MILLISECONDS)` 
with a reasonable timeout (e.g., 30s or configurable), and throw `RpcException` 
on `TimeoutException`.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java:
##########
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.VersionHelper;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.FrontendOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A request-coalescing version cache for point queries in cloud mode.
+ *
+ * <p>When {@code enable_snapshot_point_query=true}, every point query needs 
to fetch
+ * the partition's visible version from MetaService. Under high concurrency, 
this causes
+ * N RPCs for N concurrent point queries on the same partition.</p>
+ *
+ * <p>This cache optimizes the version fetching by:
+ * <ul>
+ *   <li><b>Short TTL caching</b>: Partition versions are cached for a 
configurable duration
+ *       ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the 
TTL window,
+ *       concurrent queries reuse the cached version.</li>
+ *   <li><b>Request coalescing</b>: When the cache expires, only the first 
request issues
+ *       the MetaService RPC. Concurrent requests for the same partition wait 
on the inflight
+ *       result via a {@link CompletableFuture}.</li>
+ * </ul>
+ * </p>
+ */
+public class PointQueryVersionCache {
+    private static final Logger LOG = 
LogManager.getLogger(PointQueryVersionCache.class);
+
+    private static volatile PointQueryVersionCache instance;
+
+    /**
+     * Cache entry holding the version and the timestamp when it was cached.
+     */
+    static class VersionEntry {
+        final long version;
+        final long cachedTimeMs;
+
+        VersionEntry(long version, long cachedTimeMs) {
+            this.version = version;
+            this.cachedTimeMs = cachedTimeMs;
+        }
+
+        boolean isExpired(long ttlMs) {
+            if (ttlMs <= 0) {
+                return true;
+            }
+            return System.currentTimeMillis() - cachedTimeMs > ttlMs;
+        }
+    }
+
+    // partitionId -> cached VersionEntry
+    private final ConcurrentHashMap<Long, VersionEntry> cache = new 
ConcurrentHashMap<>();
+
+    // partitionId -> inflight RPC future (for request coalescing)
+    private final ConcurrentHashMap<Long, CompletableFuture<Long>> 
inflightRequests = new ConcurrentHashMap<>();
+
+    @VisibleForTesting
+    public PointQueryVersionCache() {
+    }
+
+    public static PointQueryVersionCache getInstance() {
+        if (instance == null) {
+            synchronized (PointQueryVersionCache.class) {
+                if (instance == null) {
+                    instance = new PointQueryVersionCache();
+                }
+            }
+        }
+        return instance;
+    }
+
+    @VisibleForTesting
+    public static void setInstance(PointQueryVersionCache cache) {
+        instance = cache;
+    }
+
+    /**
+     * Get the visible version for a partition, using TTL-based caching and 
request coalescing.
+     *
+     * @param partition  the cloud partition to get version for
+     * @param ttlMs      TTL in milliseconds; 0 or negative disables caching
+     * @return the visible version
+     * @throws RpcException if the MetaService RPC fails
+     */
+    public long getVersion(CloudPartition partition, long ttlMs) throws 
RpcException {
+        long partitionId = partition.getId();
+
+        // If cache is disabled, fetch directly
+        if (ttlMs <= 0) {
+            return fetchVersionFromMs(partition);
+        }
+
+        // Check cache first
+        VersionEntry entry = cache.get(partitionId);
+        if (entry != null && !entry.isExpired(ttlMs)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("point query version cache hit, partition={}, 
version={}", partitionId, entry.version);
+            }
+            return entry.version;
+        }
+
+        // Cache miss or expired: use request coalescing
+        return getVersionWithCoalescing(partition, partitionId, ttlMs);
+    }
+
+    private long getVersionWithCoalescing(CloudPartition partition, long 
partitionId, long ttlMs)
+            throws RpcException {
+        // Try to become the leader request for this partition
+        CompletableFuture<Long> myFuture = new CompletableFuture<>();
+        CompletableFuture<Long> existingFuture = 
inflightRequests.putIfAbsent(partitionId, myFuture);
+
+        if (existingFuture != null) {
+            // Another request is already in flight — wait for its result
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("point query version coalescing, waiting for 
inflight request, partition={}",
+                        partitionId);
+            }
+            try {
+                return existingFuture.get();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RpcException("get version", "interrupted while 
waiting for coalesced request");
+            } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
+                if (cause instanceof RpcException) {
+                    throw (RpcException) cause;
+                }
+                throw new RpcException("get version", cause != null ? 
cause.getMessage() : e.getMessage());
+            }
+        }
+
+        // We are the leader — fetch version from MetaService
+        try {
+            long version = fetchVersionFromMs(partition);
+            // Update cache
+            cache.put(partitionId, new VersionEntry(version, 
System.currentTimeMillis()));
+            // Also update the partition's cached version

Review Comment:
   **[Low] No version monotonicity enforcement in cache**
   
   The cache blindly stores whatever version is returned by MetaService, even 
if it's lower than a previously cached version. While 
`CloudPartition.setCachedVisibleVersion()` (line 164) enforces monotonicity for 
the partition object itself (via lock + version comparison), the 
`PointQueryVersionCache` can serve a stale/lower version if MetaService 
transiently returns a lower value.
   
   This is unlikely in practice but could happen during MetaService failover or 
network partitions.
   
   **Suggestion:** Before `cache.put()`, check `if (existingEntry == null || 
version >= existingEntry.version)` to ensure the cache never serves a version 
regression. Alternatively, use `cache.compute()` with a version comparison.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to