This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 9d7eaddfc36 branch-4.0: [fix](filecache) fix warm up cancel failure
when BE is down #58035 (#59059)
9d7eaddfc36 is described below
commit 9d7eaddfc36307849cb7044340f5597c2acfe8d3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 17 18:05:15 2025 +0800
branch-4.0: [fix](filecache) fix warm up cancel failure when BE is down
#58035 (#59059)
Cherry-picked from #58035
Signed-off-by: zhengyu <[email protected]>
Co-authored-by: zhengyu <[email protected]>
---
.../org/apache/doris/cloud/CloudWarmUpJob.java | 75 ++++++++++++++++++----
1 file changed, 62 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index d8d7c95085d..90f61fbfb75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -56,6 +56,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -536,23 +537,68 @@ public class CloudWarmUpJob implements Writable {
beToAddr = null;
}
+ private String getBackendEndpoint(long beId) {
+ if (beToAddr != null) {
+ TNetworkAddress addr = beToAddr.get(beId);
+ if (addr != null) {
+ String host = addr.getHostname();
+ if (host == null) {
+ host = "unknown";
+ }
+ return host + ":" + addr.getPort();
+ }
+ }
+ if (beToThriftAddress != null) {
+ String addr = beToThriftAddress.get(beId);
+ if (addr != null) {
+ return addr;
+ }
+ }
+ return "unknown";
+ }
+
private final void clearJobOnBEs() {
try {
initClients();
- for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
+ // Iterate with explicit iterator so we can remove invalidated
clients during iteration.
+ Iterator<Map.Entry<Long, Client>> iter =
beToClient.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Long, Client> entry = iter.next();
+ long beId = entry.getKey();
+ Client client = entry.getValue();
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.CLEAR_JOB);
request.setJobId(jobId);
if (this.isEventDriven()) {
TWarmUpEventType event = getTWarmUpEventType();
if (event == null) {
- throw new IllegalArgumentException("Unknown SyncEvent
" + syncEvent);
+ // If event type is unknown, skip this BE but continue
others.
+ LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE
{} ({})",
+ syncEvent, beId, getBackendEndpoint(beId));
+ continue;
}
request.setEvent(event);
}
- LOG.info("send warm up request to BE {}. job_id={},
request_type=CLEAR_JOB",
- entry.getKey(), jobId);
- entry.getValue().warmUpTablets(request);
+ LOG.info("send warm up request to BE {} ({}). job_id={},
request_type=CLEAR_JOB",
+ beId, getBackendEndpoint(beId), jobId);
+ try {
+ client.warmUpTablets(request);
+ } catch (Exception e) {
+ // If RPC to this BE fails, invalidate this client and
remove it from map,
+ // then continue to next BE so that one bad BE won't block
others.
+ LOG.warn("send warm up request to BE {} ({}) failed: {}",
+ beId, getBackendEndpoint(beId), e.getMessage());
+ try {
+ TNetworkAddress addr = beToAddr == null ? null :
beToAddr.get(beId);
+ if (addr != null) {
+ ClientPool.backendPool.invalidateObject(addr,
client);
+ }
+ } catch (Exception ie) {
+ LOG.warn("invalidate client for BE {} failed: {}",
beId, ie.getMessage());
+ }
+ // remove from local map so releaseClients won't try to
return an invalidated client
+ iter.remove();
+ }
}
} catch (Exception e) {
LOG.warn("send warm up request failed. job_id={},
request_type=CLEAR_JOB, exception={}",
@@ -653,8 +699,8 @@ public class CloudWarmUpJob implements Writable {
throw new IllegalArgumentException("Unknown SyncEvent " +
syncEvent);
}
request.setEvent(event);
- LOG.debug("send warm up request to BE {}. job_id={}, event={},
request_type=SET_JOB(EVENT)",
- entry.getKey(), jobId, syncEvent);
+ LOG.debug("send warm up request to BE {} ({}). job_id={},
event={}, request_type=SET_JOB(EVENT)",
+ entry.getKey(), getBackendEndpoint(entry.getKey()),
jobId, syncEvent);
TWarmUpTabletsResponse response =
entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
@@ -698,9 +744,10 @@ public class CloudWarmUpJob implements Writable {
request.setJobId(jobId);
request.setBatchId(lastBatchId + 1);
request.setJobMetas(buildJobMetas(entry.getKey(),
request.batch_id));
- LOG.info("send warm up request to BE {}. job_id={},
batch_id={}"
+ LOG.info("send warm up request to BE {} ({}). job_id={},
batch_id={}"
+ ", job_size={}, request_type=SET_JOB",
- entry.getKey(), jobId, request.batch_id,
request.job_metas.size());
+ entry.getKey(), getBackendEndpoint(entry.getKey()),
+ jobId, request.batch_id, request.job_metas.size());
TWarmUpTabletsResponse response =
entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() !=
TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
@@ -715,8 +762,9 @@ public class CloudWarmUpJob implements Writable {
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new
TWarmUpTabletsRequest();
request.setType(TWarmUpTabletsRequestType.GET_CURRENT_JOB_STATE_AND_LEASE);
- LOG.info("send warm up request to BE {}. job_id={},
request_type=GET_CURRENT_JOB_STATE_AND_LEASE",
- entry.getKey(), jobId);
+ LOG.info("send warm up request to BE {} ({}). job_id={}"
+ + ", request_type=GET_CURRENT_JOB_STATE_AND_LEASE",
+ entry.getKey(),
getBackendEndpoint(entry.getKey()), jobId);
TWarmUpTabletsResponse response =
entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() !=
TStatusCode.OK) {
if (!response.getStatus().getErrorMsgs().isEmpty()) {
@@ -754,9 +802,10 @@ public class CloudWarmUpJob implements Writable {
if (!request.job_metas.isEmpty()) {
// check all batches is done or not
allBatchesDone = false;
- LOG.info("send warm up request to BE {}.
job_id={}, batch_id={}"
+ LOG.info("send warm up request to BE {} ({}).
job_id={}, batch_id={}"
+ ", job_size={}, request_type=SET_BATCH",
- entry.getKey(), jobId, request.batch_id,
request.job_metas.size());
+ entry.getKey(),
getBackendEndpoint(entry.getKey()),
+ jobId, request.batch_id,
request.job_metas.size());
TWarmUpTabletsResponse response =
entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() !=
TStatusCode.OK) {
if
(!response.getStatus().getErrorMsgs().isEmpty()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]