github-actions[bot] commented on code in PR #60855:
URL: https://github.com/apache/doris/pull/60855#discussion_r2883213442
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -283,18 +302,58 @@ private <Response> Response executeWithMetrics(String
methodName, Function<MetaS
}
}
- public Future<Cloud.GetVersionResponse>
getVisibleVersionAsync(Cloud.GetVersionRequest request)
- throws RpcException {
+ public Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest
request, int timeoutMs) {
long startTime = System.currentTimeMillis();
- String methodName = request.hasIsTableVersion() &&
request.getIsTableVersion() ? "getTableVersion"
- : "getPartitionVersion";
- MetaServiceClient client = null;
-
+ String methodName = request.hasIsTableVersion() &&
request.getIsTableVersion()
+ ? MetaServiceRpcLimiterManager.GET_TABLE_VERSION_METHOD :
+ MetaServiceRpcLimiterManager.GET_PARTITION_VERSION_METHOD;
+ int cost = 1;
+ if (request.hasBatchMode() && request.getBatchMode()) {
+ cost =
MetaServiceRpcLimiterManager.getInstance().getClampedCost(methodName,
request.getDbIdsCount());
+ }
+ boolean acquired = false;
if (MetricRepo.isInit && Config.isCloudMode()) {
CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
}
+ long deadline = System.currentTimeMillis() + timeoutMs;
+ Cloud.GetVersionResponse resp = null;
+ try {
+ acquired =
MetaServiceRpcLimiterManager.getInstance().acquire(methodName, cost);
+ Future<Cloud.GetVersionResponse> future =
getVisibleVersionAsync(request);
+ while (resp == null) {
+ try {
Review Comment:
**Medium**: This `while (resp == null)` loop retries `future.get()` whenever
`InterruptedException` is caught, but only exits when `resp` is set or a
different exception type is thrown. If `InterruptedException` keeps firing
before the `TimeoutException` triggers (e.g., the thread is repeatedly
interrupted by another mechanism), this loop could spin indefinitely.
Suggestions:
1. Add a retry/iteration limit or check `deadline` within the `while`
condition: `while (resp == null && System.currentTimeMillis() < deadline)`
2. Preserve the interrupt flag with `Thread.currentThread().interrupt()`
after logging, so the caller knows the thread was interrupted
3. Consider breaking out of the loop on `InterruptedException` and letting
`resp` remain null (the caller already handles null responses)
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceOverloadThrottle.java:
##########
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.rpc;
+
+import org.apache.doris.common.Config;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+
+/**
+ * Overload throttle controller for meta-service RPCs.
+ *
+ * <p>Monitors RPC outcomes (success, timeout, backpressure) within a rolling
time window
+ * and adjusts a global factor (0.1–1.0) that scales the configured QPS limits
in
+ * {@link MetaServiceRpcLimiterManager}. Uses a state machine:
+ * NORMAL → FAST_DECREASE → COOLDOWN → SLOW_RECOVERY → NORMAL.
+ */
+public class MetaServiceOverloadThrottle {
+ private static final Logger LOG =
LogManager.getLogger(MetaServiceOverloadThrottle.class);
+
+ public enum State {
+ NORMAL,
+ FAST_DECREASE,
+ COOLDOWN,
+ SLOW_RECOVERY
+ }
+
+ public enum Signal {
+ SUCCESS,
+ TIMEOUT,
+ BACKPRESSURE
+ }
+
+ private volatile State state = State.NORMAL;
+ private volatile double factor = 1.0;
+
+ private final LongAdder windowTotal = new LongAdder();
+ private final LongAdder windowBad = new LongAdder();
+ private final AtomicLong windowStartMs = new
AtomicLong(System.currentTimeMillis());
+
+ private volatile long cooldownStartMs = 0;
+ private volatile long lastRecoveryMs = 0;
+
+ private static volatile MetaServiceOverloadThrottle instance;
+ private volatile Consumer<Double> factorChangeListener = null;
+
+ private MetaServiceOverloadThrottle() {
+ }
+
+ public static MetaServiceOverloadThrottle getInstance() {
+ if (instance == null) {
+ synchronized (MetaServiceOverloadThrottle.class) {
+ if (instance == null) {
+ instance = new MetaServiceOverloadThrottle();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public void recordSignal(Signal signal) {
+ if (!Config.meta_service_rpc_overload_throttle_enabled) {
+ return;
+ }
+
+ long now = System.currentTimeMillis();
+ maybeResetWindow(now);
+
+ windowTotal.increment();
+ if (signal == Signal.TIMEOUT || signal == Signal.BACKPRESSURE) {
+ windowBad.increment();
+ }
+
+ switch (state) {
+ case NORMAL:
+ handleNormal(now);
+ break;
+ case FAST_DECREASE:
+ handleFastDecrease(now);
+ break;
+ case COOLDOWN:
+ handleCooldown(now, signal);
+ break;
+ case SLOW_RECOVERY:
+ handleSlowRecovery(now, signal);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void handleNormal(long now) {
+ if (isOverloaded()) {
+ transitionTo(State.FAST_DECREASE, now);
+ decreaseFactor();
+ }
+ }
+
+ private void handleFastDecrease(long now) {
+ if (isOverloaded()) {
+ decreaseFactor();
+ // If factor has hit the floor, stop hammering FAST_DECREASE and
enter COOLDOWN
+ /*double minFactor =
Config.meta_service_rpc_overload_throttle_min_factor;
+ if (Math.abs(factor - minFactor) < 1e-9) {
Review Comment:
**Low**: This commented-out code block appears to be leftover from
development. If this is an alternative strategy for handling the floor-factor
case, it should be tracked in a follow-up issue or removed to keep the code
clean.
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -194,19 +206,23 @@ public MetaServiceClientWrapper(MetaServiceProxy proxy) {
this.proxy = proxy;
}
- public <Response> Response executeRequest(String methodName,
Function<MetaServiceClient, Response> function)
- throws RpcException {
+ public <Response> Response executeRequest(String methodName,
Function<MetaServiceClient, Response> function,
+ int cost, Function<Response, Cloud.MetaServiceResponseStatus>
statusExtractor) throws RpcException {
long maxRetries = Config.meta_service_rpc_retry_cnt;
for (long tried = 1; tried <= maxRetries; tried++) {
MetaServiceClient client = null;
boolean requestFailed = false;
+ boolean acquired = false;
try {
+ acquired =
MetaServiceRpcLimiterManager.getInstance().acquire(methodName, cost);
client = proxy.getProxy();
Review Comment:
**Observation**: The rate limiter `acquire`/`release` is called per retry
iteration. This means a single logical RPC that is retried N times (up to
`meta_service_rpc_retry_cnt`) will consume N permits from the rate limiter.
This may be the intended behavior to accurately account for load on the
meta-service, but it could also cause cascading failures where retries are
themselves rate-limited, reducing the effective retry budget.
Worth a comment clarifying whether this is intentional, or consider
acquiring once outside the retry loop with the release in a try-finally
wrapping the loop.
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -515,98 +614,144 @@ public Cloud.RemoveDeleteBitmapUpdateLockResponse
removeDeleteBitmapUpdateLock(
@Deprecated
public Cloud.AlterObjStoreInfoResponse
alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request)
throws RpcException {
- return executeWithMetrics("alterObjStoreInfo", (client) ->
client.alterObjStoreInfo(request));
+ String methodName = "alterObjStoreInfo";
+ return executeWithMetrics(methodName, (client) ->
client.alterObjStoreInfo(request), 1,
+ Cloud.AlterObjStoreInfoResponse::getStatus);
}
public Cloud.AlterObjStoreInfoResponse
alterStorageVault(Cloud.AlterObjStoreInfoRequest request)
throws RpcException {
- return executeWithMetrics("alterStorageVault", (client) ->
client.alterStorageVault(request));
+ String methodName = "alterStorageVault";
+ return executeWithMetrics(methodName, (client) ->
client.alterStorageVault(request), 1,
+ Cloud.AlterObjStoreInfoResponse::getStatus);
}
public Cloud.FinishTabletJobResponse
finishTabletJob(Cloud.FinishTabletJobRequest request)
throws RpcException {
- return executeWithMetrics("finishTabletJob", (client) ->
client.finishTabletJob(request));
+ String methodName = "finishTabletJob";
+ return executeWithMetrics(methodName, (client) ->
client.finishTabletJob(request), 1,
+ Cloud.FinishTabletJobResponse::getStatus);
}
- public Cloud.GetRLTaskCommitAttachResponse
- getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
+ public Cloud.GetRLTaskCommitAttachResponse
getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
throws RpcException {
- return executeWithMetrics("getRLTaskCommitAttach",
- (client) -> client.getRLTaskCommitAttach(request));
+ String methodName = "getRLTaskCommitAttach";
+ return executeWithMetrics(methodName, (client) ->
client.getRLTaskCommitAttach(request), 1,
+ Cloud.GetRLTaskCommitAttachResponse::getStatus);
}
public Cloud.ResetRLProgressResponse
resetRLProgress(Cloud.ResetRLProgressRequest request)
throws RpcException {
- return executeWithMetrics("resetRLProgress", (client) ->
client.resetRLProgress(request));
+ String methodName = "resetRLProgress";
+ return executeWithMetrics(methodName, (client) ->
client.resetRLProgress(request), 1,
+ Cloud.ResetRLProgressResponse::getStatus);
}
- public Cloud.ResetStreamingJobOffsetResponse
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request)
- throws RpcException {
- return executeWithMetrics("resetStreamingJobOffset",
- (client) -> client.resetStreamingJobOffset(request));
+ public Cloud.ResetStreamingJobOffsetResponse resetStreamingJobOffset(
+ Cloud.ResetStreamingJobOffsetRequest request) throws RpcException {
+ String methodName = "resetStreamingJobOffset";
+ return executeWithMetrics(methodName, (client) ->
client.resetStreamingJobOffset(request), 1,
+ Cloud.ResetStreamingJobOffsetResponse::getStatus);
}
- public Cloud.GetObjStoreInfoResponse
- getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws
RpcException {
- return executeWithMetrics("getObjStoreInfo", (client) ->
client.getObjStoreInfo(request));
+ public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws RpcException {
+ String methodName = "getObjStoreInfo";
+ return executeWithMetrics(methodName, (client) ->
client.getObjStoreInfo(request), 1,
+ Cloud.GetObjStoreInfoResponse::getStatus);
}
- public Cloud.AbortTxnWithCoordinatorResponse
- abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest
request) throws RpcException {
- return executeWithMetrics("abortTxnWithCoordinator",
- (client) -> client.abortTxnWithCoordinator(request));
+ public Cloud.AbortTxnWithCoordinatorResponse abortTxnWithCoordinator(
+ Cloud.AbortTxnWithCoordinatorRequest request) throws RpcException {
+ String methodName = "abortTxnWithCoordinator";
+ return executeWithMetrics(methodName, (client) ->
client.abortTxnWithCoordinator(request), 1,
+ Cloud.AbortTxnWithCoordinatorResponse::getStatus);
}
- public Cloud.GetPrepareTxnByCoordinatorResponse
- getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest
request) throws RpcException {
- return executeWithMetrics("getPrepareTxnByCoordinator",
- (client) -> client.getPrepareTxnByCoordinator(request));
+ public Cloud.GetPrepareTxnByCoordinatorResponse getPrepareTxnByCoordinator(
+ Cloud.GetPrepareTxnByCoordinatorRequest request) throws
RpcException {
+ String methodName = "getPrepareTxnByCoordinator";
+ return executeWithMetrics(methodName, (client) ->
client.getPrepareTxnByCoordinator(request), 1,
+ Cloud.GetPrepareTxnByCoordinatorResponse::getStatus);
}
public Cloud.CreateInstanceResponse
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
- return executeWithMetrics("createInstance", (client) ->
client.createInstance(request));
+ String methodName = "createInstance";
+ return executeWithMetrics(methodName, (client) ->
client.createInstance(request), 1,
+ Cloud.CreateInstanceResponse::getStatus);
}
public Cloud.GetStreamingTaskCommitAttachResponse
getStreamingTaskCommitAttach(
Cloud.GetStreamingTaskCommitAttachRequest request) throws
RpcException {
- return executeWithMetrics("getStreamingTaskCommitAttach",
- (client) -> client.getStreamingTaskCommitAttach(request));
+ String methodName = "getStreamingTaskCommitAttach";
+ return executeWithMetrics(methodName, (client) ->
client.getStreamingTaskCommitAttach(request), 1,
+ Cloud.GetStreamingTaskCommitAttachResponse::getStatus);
}
public Cloud.DeleteStreamingJobResponse
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request)
throws RpcException {
- return executeWithMetrics("deleteStreamingJob", (client) ->
client.deleteStreamingJob(request));
+ String methodName = "deleteStreamingJob";
+ return executeWithMetrics(methodName, (client) ->
client.deleteStreamingJob(request), 1,
+ Cloud.DeleteStreamingJobResponse::getStatus);
}
public Cloud.AlterInstanceResponse
alterInstance(Cloud.AlterInstanceRequest request) throws RpcException {
- return executeWithMetrics("alterInstance", (client) ->
client.alterInstance(request));
+ String methodName = "alterInstance";
+ return executeWithMetrics(methodName, (client) ->
client.alterInstance(request), 1,
+ Cloud.AlterInstanceResponse::getStatus);
}
public Cloud.BeginSnapshotResponse
beginSnapshot(Cloud.BeginSnapshotRequest request) throws RpcException {
- return executeWithMetrics("beginSnapshot", (client) ->
client.beginSnapshot(request));
+ String methodName = "beginSnapshot";
+ return executeWithMetrics(methodName, (client) ->
client.beginSnapshot(request), 1,
+ Cloud.BeginSnapshotResponse::getStatus);
}
public Cloud.UpdateSnapshotResponse
updateSnapshot(Cloud.UpdateSnapshotRequest request) throws RpcException {
- return executeWithMetrics("updateSnapshot", (client) ->
client.updateSnapshot(request));
+ String methodName = "updateSnapshot";
+ return executeWithMetrics(methodName, (client) ->
client.updateSnapshot(request), 1,
+ Cloud.UpdateSnapshotResponse::getStatus);
}
public Cloud.CommitSnapshotResponse
commitSnapshot(Cloud.CommitSnapshotRequest request) throws RpcException {
- return executeWithMetrics("commitSnapshot", (client) ->
client.commitSnapshot(request));
+ String methodName = "commitSnapshot";
+ return executeWithMetrics(methodName, (client) ->
client.commitSnapshot(request), 1,
+ Cloud.CommitSnapshotResponse::getStatus);
}
public Cloud.AbortSnapshotResponse
abortSnapshot(Cloud.AbortSnapshotRequest request) throws RpcException {
- return executeWithMetrics("abortSnapshot", (client) ->
client.abortSnapshot(request));
+ String methodName = "abortSnapshot";
+ return executeWithMetrics(methodName, (client) ->
client.abortSnapshot(request), 1,
+ Cloud.AbortSnapshotResponse::getStatus);
}
public Cloud.ListSnapshotResponse listSnapshot(Cloud.ListSnapshotRequest
request) throws RpcException {
- return executeWithMetrics("listSnapshot", (client) ->
client.listSnapshot(request));
+ String methodName = "listSnapshot";
+ return executeWithMetrics(methodName, (client) ->
client.listSnapshot(request), 1,
+ Cloud.ListSnapshotResponse::getStatus);
}
public Cloud.DropSnapshotResponse dropSnapshot(Cloud.DropSnapshotRequest
request) throws RpcException {
- return executeWithMetrics("dropSnapshot", (client) ->
client.dropSnapshot(request));
+ String methodName = "dropSnapshot";
+ return executeWithMetrics(methodName, (client) ->
client.dropSnapshot(request), 1,
+ Cloud.DropSnapshotResponse::getStatus);
}
public Cloud.CloneInstanceResponse
cloneInstance(Cloud.CloneInstanceRequest request) throws RpcException {
- return executeWithMetrics("cloneInstance", (client) ->
client.cloneInstance(request));
+ String methodName = "cloneInstance";
+ return executeWithMetrics(methodName, (client) ->
client.cloneInstance(request), 1,
+ Cloud.CloneInstanceResponse::getStatus);
+ }
+
+ private static void handleResponseStatus(Cloud.MetaServiceResponseStatus
status) {
+ if (Config.meta_service_rpc_overload_throttle_enabled) {
Review Comment:
**Low-Medium**: `handleResponseStatus` only ever emits `Signal.SUCCESS` or
`Signal.BACKPRESSURE`. The `Signal.TIMEOUT` enum variant is defined in
`MetaServiceOverloadThrottle` but is never recorded in production code — when
RPCs timeout (throwing exceptions), execution jumps to catch blocks that don't
call `handleResponseStatus`.
This means the overload throttle's timeout detection path is unreachable. If
timeouts should contribute to overload detection (which the state machine logic
clearly supports), consider recording `Signal.TIMEOUT` in the catch blocks for
`TimeoutException`/`RpcException` in `getVisibleVersion()` and
`executeWithMetrics()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]