This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 9d7eaddfc36 branch-4.0: [fix](filecache) fix warm up cancel failure 
when BE is down #58035 (#59059)
9d7eaddfc36 is described below

commit 9d7eaddfc36307849cb7044340f5597c2acfe8d3
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 17 18:05:15 2025 +0800

    branch-4.0: [fix](filecache) fix warm up cancel failure when BE is down 
#58035 (#59059)
    
    Cherry-picked from #58035
    
    Signed-off-by: zhengyu <[email protected]>
    Co-authored-by: zhengyu <[email protected]>
---
 .../org/apache/doris/cloud/CloudWarmUpJob.java     | 75 ++++++++++++++++++----
 1 file changed, 62 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index d8d7c95085d..90f61fbfb75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -56,6 +56,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -536,23 +537,68 @@ public class CloudWarmUpJob implements Writable {
         beToAddr = null;
     }
 
+    private String getBackendEndpoint(long beId) {
+        if (beToAddr != null) {
+            TNetworkAddress addr = beToAddr.get(beId);
+            if (addr != null) {
+                String host = addr.getHostname();
+                if (host == null) {
+                    host = "unknown";
+                }
+                return host + ":" + addr.getPort();
+            }
+        }
+        if (beToThriftAddress != null) {
+            String addr = beToThriftAddress.get(beId);
+            if (addr != null) {
+                return addr;
+            }
+        }
+        return "unknown";
+    }
+
     private final void clearJobOnBEs() {
         try {
             initClients();
-            for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
+            // Iterate with explicit iterator so we can remove invalidated 
clients during iteration.
+            Iterator<Map.Entry<Long, Client>> iter = 
beToClient.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<Long, Client> entry = iter.next();
+                long beId = entry.getKey();
+                Client client = entry.getValue();
                 TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
                 request.setType(TWarmUpTabletsRequestType.CLEAR_JOB);
                 request.setJobId(jobId);
                 if (this.isEventDriven()) {
                     TWarmUpEventType event = getTWarmUpEventType();
                     if (event == null) {
-                        throw new IllegalArgumentException("Unknown SyncEvent 
" + syncEvent);
+                        // If event type is unknown, skip this BE but continue 
others.
+                        LOG.warn("Unknown SyncEvent {}, skip CLEAR_JOB for BE 
{} ({})",
+                                syncEvent, beId, getBackendEndpoint(beId));
+                        continue;
                     }
                     request.setEvent(event);
                 }
-                LOG.info("send warm up request to BE {}. job_id={}, 
request_type=CLEAR_JOB",
-                        entry.getKey(), jobId);
-                entry.getValue().warmUpTablets(request);
+                LOG.info("send warm up request to BE {} ({}). job_id={}, 
request_type=CLEAR_JOB",
+                        beId, getBackendEndpoint(beId), jobId);
+                try {
+                    client.warmUpTablets(request);
+                } catch (Exception e) {
+                    // If RPC to this BE fails, invalidate this client and 
remove it from map,
+                    // then continue to next BE so that one bad BE won't block 
others.
+                    LOG.warn("send warm up request to BE {} ({}) failed: {}",
+                            beId, getBackendEndpoint(beId), e.getMessage());
+                    try {
+                        TNetworkAddress addr = beToAddr == null ? null : 
beToAddr.get(beId);
+                        if (addr != null) {
+                            ClientPool.backendPool.invalidateObject(addr, 
client);
+                        }
+                    } catch (Exception ie) {
+                        LOG.warn("invalidate client for BE {} failed: {}", 
beId, ie.getMessage());
+                    }
+                    // remove from local map so releaseClients won't try to 
return an invalidated client
+                    iter.remove();
+                }
             }
         } catch (Exception e) {
             LOG.warn("send warm up request failed. job_id={}, 
request_type=CLEAR_JOB, exception={}",
@@ -653,8 +699,8 @@ public class CloudWarmUpJob implements Writable {
                     throw new IllegalArgumentException("Unknown SyncEvent " + 
syncEvent);
                 }
                 request.setEvent(event);
-                LOG.debug("send warm up request to BE {}. job_id={}, event={}, 
request_type=SET_JOB(EVENT)",
-                        entry.getKey(), jobId, syncEvent);
+                LOG.debug("send warm up request to BE {} ({}). job_id={}, 
event={}, request_type=SET_JOB(EVENT)",
+                        entry.getKey(), getBackendEndpoint(entry.getKey()), 
jobId, syncEvent);
                 TWarmUpTabletsResponse response = 
entry.getValue().warmUpTablets(request);
                 if (response.getStatus().getStatusCode() != TStatusCode.OK) {
                     if (!response.getStatus().getErrorMsgs().isEmpty()) {
@@ -698,9 +744,10 @@ public class CloudWarmUpJob implements Writable {
                     request.setJobId(jobId);
                     request.setBatchId(lastBatchId + 1);
                     request.setJobMetas(buildJobMetas(entry.getKey(), 
request.batch_id));
-                    LOG.info("send warm up request to BE {}. job_id={}, 
batch_id={}"
+                    LOG.info("send warm up request to BE {} ({}). job_id={}, 
batch_id={}"
                             + ", job_size={}, request_type=SET_JOB",
-                            entry.getKey(), jobId, request.batch_id, 
request.job_metas.size());
+                            entry.getKey(), getBackendEndpoint(entry.getKey()),
+                            jobId, request.batch_id, request.job_metas.size());
                     TWarmUpTabletsResponse response = 
entry.getValue().warmUpTablets(request);
                     if (response.getStatus().getStatusCode() != 
TStatusCode.OK) {
                         if (!response.getStatus().getErrorMsgs().isEmpty()) {
@@ -715,8 +762,9 @@ public class CloudWarmUpJob implements Writable {
                 for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
                     TWarmUpTabletsRequest request = new 
TWarmUpTabletsRequest();
                     
request.setType(TWarmUpTabletsRequestType.GET_CURRENT_JOB_STATE_AND_LEASE);
-                    LOG.info("send warm up request to BE {}. job_id={}, 
request_type=GET_CURRENT_JOB_STATE_AND_LEASE",
-                            entry.getKey(), jobId);
+                    LOG.info("send warm up request to BE {} ({}). job_id={}"
+                            + ", request_type=GET_CURRENT_JOB_STATE_AND_LEASE",
+                            entry.getKey(), 
getBackendEndpoint(entry.getKey()), jobId);
                     TWarmUpTabletsResponse response = 
entry.getValue().warmUpTablets(request);
                     if (response.getStatus().getStatusCode() != 
TStatusCode.OK) {
                         if (!response.getStatus().getErrorMsgs().isEmpty()) {
@@ -754,9 +802,10 @@ public class CloudWarmUpJob implements Writable {
                         if (!request.job_metas.isEmpty()) {
                             // check all batches is done or not
                             allBatchesDone = false;
-                            LOG.info("send warm up request to BE {}. 
job_id={}, batch_id={}"
+                            LOG.info("send warm up request to BE {} ({}). 
job_id={}, batch_id={}"
                                     + ", job_size={}, request_type=SET_BATCH",
-                                    entry.getKey(), jobId, request.batch_id, 
request.job_metas.size());
+                                    entry.getKey(), 
getBackendEndpoint(entry.getKey()),
+                                    jobId, request.batch_id, 
request.job_metas.size());
                             TWarmUpTabletsResponse response = 
entry.getValue().warmUpTablets(request);
                             if (response.getStatus().getStatusCode() != 
TStatusCode.OK) {
                                 if 
(!response.getStatus().getErrorMsgs().isEmpty()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to