github-actions[bot] commented on code in PR #60855:
URL: https://github.com/apache/doris/pull/60855#discussion_r2883213442


##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -283,18 +302,58 @@ private <Response> Response executeWithMetrics(String 
methodName, Function<MetaS
         }
     }
 
-    public Future<Cloud.GetVersionResponse> 
getVisibleVersionAsync(Cloud.GetVersionRequest request)
-            throws RpcException {
+    public Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest 
request, int timeoutMs) {
         long startTime = System.currentTimeMillis();
-        String methodName = request.hasIsTableVersion() && 
request.getIsTableVersion() ? "getTableVersion"
-                : "getPartitionVersion";
-        MetaServiceClient client = null;
-
+        String methodName = request.hasIsTableVersion() && 
request.getIsTableVersion()
+                ? MetaServiceRpcLimiterManager.GET_TABLE_VERSION_METHOD :
+                MetaServiceRpcLimiterManager.GET_PARTITION_VERSION_METHOD;
+        int cost = 1;
+        if (request.hasBatchMode() && request.getBatchMode()) {
+            cost = 
MetaServiceRpcLimiterManager.getInstance().getClampedCost(methodName, 
request.getDbIdsCount());
+        }
+        boolean acquired = false;
         if (MetricRepo.isInit && Config.isCloudMode()) {
             CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
             
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
         }
 
+        long deadline = System.currentTimeMillis() + timeoutMs;
+        Cloud.GetVersionResponse resp = null;
+        try {
+            acquired = 
MetaServiceRpcLimiterManager.getInstance().acquire(methodName, cost);
+            Future<Cloud.GetVersionResponse> future = 
getVisibleVersionAsync(request);
+            while (resp == null) {
+                try {

Review Comment:
   **Medium**: This `while (resp == null)` loop retries `future.get()` whenever 
`InterruptedException` is caught, but only exits when `resp` is set or a 
different exception type is thrown. If `InterruptedException` keeps firing 
before the `TimeoutException` triggers (e.g., the thread is repeatedly 
interrupted by another mechanism), this loop could spin indefinitely.
   
   Suggestions:
   1. Add a retry/iteration limit or check `deadline` within the `while` 
condition: `while (resp == null && System.currentTimeMillis() < deadline)`
   2. Preserve the interrupt flag with `Thread.currentThread().interrupt()` 
after logging, so the caller knows the thread was interrupted
   3. Consider breaking out of the loop on `InterruptedException` and letting 
`resp` remain null (the caller already handles null responses)



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceOverloadThrottle.java:
##########
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.rpc;
+
+import org.apache.doris.common.Config;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+
+/**
+ * Overload throttle controller for meta-service RPCs.
+ *
+ * <p>Monitors RPC outcomes (success, timeout, backpressure) within a rolling 
time window
+ * and adjusts a global factor (0.1–1.0) that scales the configured QPS limits 
in
+ * {@link MetaServiceRpcLimiterManager}. Uses a state machine:
+ * NORMAL → FAST_DECREASE → COOLDOWN → SLOW_RECOVERY → NORMAL.
+ */
+public class MetaServiceOverloadThrottle {
+    private static final Logger LOG = 
LogManager.getLogger(MetaServiceOverloadThrottle.class);
+
+    public enum State {
+        NORMAL,
+        FAST_DECREASE,
+        COOLDOWN,
+        SLOW_RECOVERY
+    }
+
+    public enum Signal {
+        SUCCESS,
+        TIMEOUT,
+        BACKPRESSURE
+    }
+
+    private volatile State state = State.NORMAL;
+    private volatile double factor = 1.0;
+
+    private final LongAdder windowTotal = new LongAdder();
+    private final LongAdder windowBad = new LongAdder();
+    private final AtomicLong windowStartMs = new 
AtomicLong(System.currentTimeMillis());
+
+    private volatile long cooldownStartMs = 0;
+    private volatile long lastRecoveryMs = 0;
+
+    private static volatile MetaServiceOverloadThrottle instance;
+    private volatile Consumer<Double> factorChangeListener = null;
+
+    private MetaServiceOverloadThrottle() {
+    }
+
+    public static MetaServiceOverloadThrottle getInstance() {
+        if (instance == null) {
+            synchronized (MetaServiceOverloadThrottle.class) {
+                if (instance == null) {
+                    instance = new MetaServiceOverloadThrottle();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public void recordSignal(Signal signal) {
+        if (!Config.meta_service_rpc_overload_throttle_enabled) {
+            return;
+        }
+
+        long now = System.currentTimeMillis();
+        maybeResetWindow(now);
+
+        windowTotal.increment();
+        if (signal == Signal.TIMEOUT || signal == Signal.BACKPRESSURE) {
+            windowBad.increment();
+        }
+
+        switch (state) {
+            case NORMAL:
+                handleNormal(now);
+                break;
+            case FAST_DECREASE:
+                handleFastDecrease(now);
+                break;
+            case COOLDOWN:
+                handleCooldown(now, signal);
+                break;
+            case SLOW_RECOVERY:
+                handleSlowRecovery(now, signal);
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void handleNormal(long now) {
+        if (isOverloaded()) {
+            transitionTo(State.FAST_DECREASE, now);
+            decreaseFactor();
+        }
+    }
+
+    private void handleFastDecrease(long now) {
+        if (isOverloaded()) {
+            decreaseFactor();
+            // If factor has hit the floor, stop hammering FAST_DECREASE and 
enter COOLDOWN
+            /*double minFactor = 
Config.meta_service_rpc_overload_throttle_min_factor;
+            if (Math.abs(factor - minFactor) < 1e-9) {

Review Comment:
   **Low**: This commented-out code block appears to be leftover from 
development. If this is an alternative strategy for handling the floor-factor 
case, it should be tracked in a follow-up issue or removed to keep the code 
clean.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -194,19 +206,23 @@ public MetaServiceClientWrapper(MetaServiceProxy proxy) {
             this.proxy = proxy;
         }
 
-        public <Response> Response executeRequest(String methodName, 
Function<MetaServiceClient, Response> function)
-                throws RpcException {
+        public <Response> Response executeRequest(String methodName, 
Function<MetaServiceClient, Response> function,
+                int cost, Function<Response, Cloud.MetaServiceResponseStatus> 
statusExtractor) throws RpcException {
             long maxRetries = Config.meta_service_rpc_retry_cnt;
             for (long tried = 1; tried <= maxRetries; tried++) {
                 MetaServiceClient client = null;
                 boolean requestFailed = false;
+                boolean acquired = false;
                 try {
+                    acquired = 
MetaServiceRpcLimiterManager.getInstance().acquire(methodName, cost);
                     client = proxy.getProxy();

Review Comment:
   **Observation**: The rate limiter `acquire`/`release` is called per retry 
iteration. This means a single logical RPC that is retried N times (up to 
`meta_service_rpc_retry_cnt`) will consume N permits from the rate limiter. 
This may be the intended behavior to accurately account for load on the 
meta-service, but it could also cause cascading failures where retries are 
themselves rate-limited, reducing the effective retry budget.
   
   Worth a comment clarifying whether this is intentional, or consider 
acquiring once outside the retry loop with the release in a try-finally 
wrapping the loop.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -515,98 +614,144 @@ public Cloud.RemoveDeleteBitmapUpdateLockResponse 
removeDeleteBitmapUpdateLock(
     @Deprecated
     public Cloud.AlterObjStoreInfoResponse 
alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request)
             throws RpcException {
-        return executeWithMetrics("alterObjStoreInfo", (client) -> 
client.alterObjStoreInfo(request));
+        String methodName = "alterObjStoreInfo";
+        return executeWithMetrics(methodName, (client) -> 
client.alterObjStoreInfo(request), 1,
+                Cloud.AlterObjStoreInfoResponse::getStatus);
     }
 
     public Cloud.AlterObjStoreInfoResponse 
alterStorageVault(Cloud.AlterObjStoreInfoRequest request)
             throws RpcException {
-        return executeWithMetrics("alterStorageVault", (client) -> 
client.alterStorageVault(request));
+        String methodName = "alterStorageVault";
+        return executeWithMetrics(methodName, (client) -> 
client.alterStorageVault(request), 1,
+                Cloud.AlterObjStoreInfoResponse::getStatus);
     }
 
     public Cloud.FinishTabletJobResponse 
finishTabletJob(Cloud.FinishTabletJobRequest request)
             throws RpcException {
-        return executeWithMetrics("finishTabletJob", (client) -> 
client.finishTabletJob(request));
+        String methodName = "finishTabletJob";
+        return executeWithMetrics(methodName, (client) -> 
client.finishTabletJob(request), 1,
+                Cloud.FinishTabletJobResponse::getStatus);
     }
 
-    public Cloud.GetRLTaskCommitAttachResponse
-            getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
+    public Cloud.GetRLTaskCommitAttachResponse 
getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
             throws RpcException {
-        return executeWithMetrics("getRLTaskCommitAttach",
-                (client) -> client.getRLTaskCommitAttach(request));
+        String methodName = "getRLTaskCommitAttach";
+        return executeWithMetrics(methodName, (client) -> 
client.getRLTaskCommitAttach(request), 1,
+                Cloud.GetRLTaskCommitAttachResponse::getStatus);
     }
 
     public Cloud.ResetRLProgressResponse 
resetRLProgress(Cloud.ResetRLProgressRequest request)
             throws RpcException {
-        return executeWithMetrics("resetRLProgress", (client) -> 
client.resetRLProgress(request));
+        String methodName = "resetRLProgress";
+        return executeWithMetrics(methodName, (client) -> 
client.resetRLProgress(request), 1,
+                Cloud.ResetRLProgressResponse::getStatus);
     }
 
-    public Cloud.ResetStreamingJobOffsetResponse 
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request)
-            throws RpcException {
-        return executeWithMetrics("resetStreamingJobOffset",
-                (client) -> client.resetStreamingJobOffset(request));
+    public Cloud.ResetStreamingJobOffsetResponse resetStreamingJobOffset(
+            Cloud.ResetStreamingJobOffsetRequest request) throws RpcException {
+        String methodName = "resetStreamingJobOffset";
+        return executeWithMetrics(methodName, (client) -> 
client.resetStreamingJobOffset(request), 1,
+                Cloud.ResetStreamingJobOffsetResponse::getStatus);
     }
 
-    public Cloud.GetObjStoreInfoResponse
-            getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws 
RpcException {
-        return executeWithMetrics("getObjStoreInfo", (client) -> 
client.getObjStoreInfo(request));
+    public Cloud.GetObjStoreInfoResponse 
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws RpcException {
+        String methodName = "getObjStoreInfo";
+        return executeWithMetrics(methodName, (client) -> 
client.getObjStoreInfo(request), 1,
+                Cloud.GetObjStoreInfoResponse::getStatus);
     }
 
-    public Cloud.AbortTxnWithCoordinatorResponse
-            abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest 
request) throws RpcException {
-        return executeWithMetrics("abortTxnWithCoordinator",
-                (client) -> client.abortTxnWithCoordinator(request));
+    public Cloud.AbortTxnWithCoordinatorResponse abortTxnWithCoordinator(
+            Cloud.AbortTxnWithCoordinatorRequest request) throws RpcException {
+        String methodName = "abortTxnWithCoordinator";
+        return executeWithMetrics(methodName, (client) -> 
client.abortTxnWithCoordinator(request), 1,
+                Cloud.AbortTxnWithCoordinatorResponse::getStatus);
     }
 
-    public Cloud.GetPrepareTxnByCoordinatorResponse
-            getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest 
request) throws RpcException {
-        return executeWithMetrics("getPrepareTxnByCoordinator",
-                (client) -> client.getPrepareTxnByCoordinator(request));
+    public Cloud.GetPrepareTxnByCoordinatorResponse getPrepareTxnByCoordinator(
+            Cloud.GetPrepareTxnByCoordinatorRequest request) throws 
RpcException {
+        String methodName = "getPrepareTxnByCoordinator";
+        return executeWithMetrics(methodName, (client) -> 
client.getPrepareTxnByCoordinator(request), 1,
+                Cloud.GetPrepareTxnByCoordinatorResponse::getStatus);
     }
 
     public Cloud.CreateInstanceResponse 
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
-        return executeWithMetrics("createInstance", (client) -> 
client.createInstance(request));
+        String methodName = "createInstance";
+        return executeWithMetrics(methodName, (client) -> 
client.createInstance(request), 1,
+                Cloud.CreateInstanceResponse::getStatus);
     }
 
     public Cloud.GetStreamingTaskCommitAttachResponse 
getStreamingTaskCommitAttach(
             Cloud.GetStreamingTaskCommitAttachRequest request) throws 
RpcException {
-        return executeWithMetrics("getStreamingTaskCommitAttach",
-                (client) -> client.getStreamingTaskCommitAttach(request));
+        String methodName = "getStreamingTaskCommitAttach";
+        return executeWithMetrics(methodName, (client) -> 
client.getStreamingTaskCommitAttach(request), 1,
+                Cloud.GetStreamingTaskCommitAttachResponse::getStatus);
     }
 
     public Cloud.DeleteStreamingJobResponse 
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request)
             throws RpcException {
-        return executeWithMetrics("deleteStreamingJob", (client) -> 
client.deleteStreamingJob(request));
+        String methodName = "deleteStreamingJob";
+        return executeWithMetrics(methodName, (client) -> 
client.deleteStreamingJob(request), 1,
+                Cloud.DeleteStreamingJobResponse::getStatus);
     }
 
     public Cloud.AlterInstanceResponse 
alterInstance(Cloud.AlterInstanceRequest request) throws RpcException {
-        return executeWithMetrics("alterInstance", (client) -> 
client.alterInstance(request));
+        String methodName = "alterInstance";
+        return executeWithMetrics(methodName, (client) -> 
client.alterInstance(request), 1,
+                Cloud.AlterInstanceResponse::getStatus);
     }
 
     public Cloud.BeginSnapshotResponse 
beginSnapshot(Cloud.BeginSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("beginSnapshot", (client) -> 
client.beginSnapshot(request));
+        String methodName = "beginSnapshot";
+        return executeWithMetrics(methodName, (client) -> 
client.beginSnapshot(request), 1,
+                Cloud.BeginSnapshotResponse::getStatus);
     }
 
     public Cloud.UpdateSnapshotResponse 
updateSnapshot(Cloud.UpdateSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("updateSnapshot", (client) -> 
client.updateSnapshot(request));
+        String methodName = "updateSnapshot";
+        return executeWithMetrics(methodName, (client) -> 
client.updateSnapshot(request), 1,
+                Cloud.UpdateSnapshotResponse::getStatus);
     }
 
     public Cloud.CommitSnapshotResponse 
commitSnapshot(Cloud.CommitSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("commitSnapshot", (client) -> 
client.commitSnapshot(request));
+        String methodName = "commitSnapshot";
+        return executeWithMetrics(methodName, (client) -> 
client.commitSnapshot(request), 1,
+                Cloud.CommitSnapshotResponse::getStatus);
     }
 
     public Cloud.AbortSnapshotResponse 
abortSnapshot(Cloud.AbortSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("abortSnapshot", (client) -> 
client.abortSnapshot(request));
+        String methodName = "abortSnapshot";
+        return executeWithMetrics(methodName, (client) -> 
client.abortSnapshot(request), 1,
+                Cloud.AbortSnapshotResponse::getStatus);
     }
 
     public Cloud.ListSnapshotResponse listSnapshot(Cloud.ListSnapshotRequest 
request) throws RpcException {
-        return executeWithMetrics("listSnapshot", (client) -> 
client.listSnapshot(request));
+        String methodName = "listSnapshot";
+        return executeWithMetrics(methodName, (client) -> 
client.listSnapshot(request), 1,
+                Cloud.ListSnapshotResponse::getStatus);
     }
 
     public Cloud.DropSnapshotResponse dropSnapshot(Cloud.DropSnapshotRequest 
request) throws RpcException {
-        return executeWithMetrics("dropSnapshot", (client) -> 
client.dropSnapshot(request));
+        String methodName = "dropSnapshot";
+        return executeWithMetrics(methodName, (client) -> 
client.dropSnapshot(request), 1,
+                Cloud.DropSnapshotResponse::getStatus);
     }
 
     public Cloud.CloneInstanceResponse 
cloneInstance(Cloud.CloneInstanceRequest request) throws RpcException {
-        return executeWithMetrics("cloneInstance", (client) -> 
client.cloneInstance(request));
+        String methodName = "cloneInstance";
+        return executeWithMetrics(methodName, (client) -> 
client.cloneInstance(request), 1,
+                Cloud.CloneInstanceResponse::getStatus);
+    }
+
+    private static void handleResponseStatus(Cloud.MetaServiceResponseStatus 
status) {
+        if (Config.meta_service_rpc_overload_throttle_enabled) {

Review Comment:
   **Low-Medium**: `handleResponseStatus` only ever emits `Signal.SUCCESS` or 
`Signal.BACKPRESSURE`. The `Signal.TIMEOUT` enum variant is defined in 
`MetaServiceOverloadThrottle` but is never recorded in production code — when 
RPCs timeout (throwing exceptions), execution jumps to catch blocks that don't 
call `handleResponseStatus`.
   
   This means the overload throttle's timeout detection path is unreachable. If 
timeouts should contribute to overload detection (which the state machine logic 
clearly supports), consider recording `Signal.TIMEOUT` in the catch blocks for 
`TimeoutException`/`RpcException` in `getVisibleVersion()` and 
`executeWithMetrics()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to