This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 51239f39259 branch-4.0: [fix](fe) Clear warmup jobs on available 
backends (#62931) (#63131)
51239f39259 is described below

commit 51239f39259fad92af5dfad3ec095a166b587cc0
Author: bobhan1 <[email protected]>
AuthorDate: Thu May 28 10:19:46 2026 +0800

    branch-4.0: [fix](fe) Clear warmup jobs on available backends (#62931) 
(#63131)
    
    pick https://github.com/apache/doris/pull/62931
---
 .../org/apache/doris/cloud/CloudWarmUpJob.java     | 57 ++++++++++----
 .../org/apache/doris/cloud/CloudWarmUpJobTest.java | 90 ++++++++++++++++++++++
 2 files changed, 131 insertions(+), 16 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index 593e68ef8d8..ae12107c3dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -511,6 +511,20 @@ public class CloudWarmUpJob implements Writable {
     }
 
     public void initClients() throws Exception {
+        prepareClients();
+        if (beToClient.isEmpty()) {
+            try {
+                for (Map.Entry<Long, String> entry : 
beToThriftAddress.entrySet()) {
+                    initClient(entry.getKey(), entry.getValue());
+                }
+            } catch (Exception e) {
+                releaseClients();
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void prepareClients() {
         if (beToThriftAddress == null || beToThriftAddress.isEmpty()) {
             fetchBeToThriftAddress();
         }
@@ -518,25 +532,36 @@ public class CloudWarmUpJob implements Writable {
             beToClient = new HashMap<>();
             beToAddr = new HashMap<>();
         }
+    }
+
+    private void initClient(long beId, String thriftAddress) throws Exception {
+        boolean ok = false;
+        TNetworkAddress address = null;
+        Client client = null;
+        try {
+            String[] ipPort = thriftAddress.split(":");
+            address = new TNetworkAddress(ipPort[0], 
Integer.parseInt(ipPort[1]));
+            beToAddr.put(beId, address);
+            client = ClientPool.backendPool.borrowObject(address);
+            beToClient.put(beId, client);
+            ok = true;
+        } finally {
+            if (!ok) {
+                ClientPool.backendPool.invalidateObject(address, client);
+                beToAddr.remove(beId);
+            }
+        }
+    }
+
+    private void initClientsForClearJob() {
+        prepareClients();
         if (beToClient.isEmpty()) {
             for (Map.Entry<Long, String> entry : beToThriftAddress.entrySet()) 
{
-                boolean ok = false;
-                TNetworkAddress address = null;
-                Client client = null;
                 try {
-                    String[] ipPort = entry.getValue().split(":");
-                    address = new TNetworkAddress(ipPort[0], 
Integer.parseInt(ipPort[1]));
-                    beToAddr.put(entry.getKey(), address);
-                    client = ClientPool.backendPool.borrowObject(address);
-                    beToClient.put(entry.getKey(), client);
-                    ok = true;
+                    initClient(entry.getKey(), entry.getValue());
                 } catch (Exception e) {
-                    throw new RuntimeException(e);
-                } finally {
-                    if (!ok) {
-                        ClientPool.backendPool.invalidateObject(address, 
client);
-                        releaseClients();
-                    }
+                    LOG.warn("init client for BE {} ({}) failed when clearing 
warm up job {}: {}",
+                            entry.getKey(), entry.getValue(), jobId, 
e.getMessage());
                 }
             }
         }
@@ -575,7 +600,7 @@ public class CloudWarmUpJob implements Writable {
 
     private final void clearJobOnBEs() {
         try {
-            initClients();
+            initClientsForClearJob();
             // Iterate with explicit iterator so we can remove invalidated 
clients during iteration.
             Iterator<Map.Entry<Long, Client>> iter = 
beToClient.entrySet().iterator();
             while (iter.hasNext()) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
index add89a0c51c..3da09461138 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
@@ -18,23 +18,52 @@
 package org.apache.doris.cloud;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.CloudWarmUpJob.JobState;
 import org.apache.doris.cloud.CloudWarmUpJob.JobType;
 import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
 import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.GenericPool;
 import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TWarmUpTabletsRequest;
+import org.apache.doris.thrift.TWarmUpTabletsRequestType;
+import org.apache.doris.thrift.TWarmUpTabletsResponse;
 
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
+import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class CloudWarmUpJobTest {
+    private GenericPool<BackendService.Client> originalBackendPool;
+    private GenericPool<BackendService.Client> mockBackendPool;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() {
+        originalBackendPool = ClientPool.backendPool;
+        mockBackendPool = Mockito.mock(GenericPool.class);
+        ClientPool.backendPool = mockBackendPool;
+    }
+
+    @After
+    public void tearDown() {
+        ClientPool.backendPool = originalBackendPool;
+    }
+
     @Test
     public void testEventDrivenRefreshesSourceBackends() {
         CloudSystemInfoService cloudSystemInfoService = 
Mockito.mock(CloudSystemInfoService.class);
@@ -70,4 +99,65 @@ public class CloudWarmUpJobTest {
         Assert.assertEquals("host1:9060", 
warmUpJob.getBeToThriftAddress().get(1L));
         Assert.assertEquals("host2:9061", 
warmUpJob.getBeToThriftAddress().get(2L));
     }
+
+    @Test
+    public void testInitClientsKeepsFailFastForWarmUpRpc() throws Exception {
+        long jobId = 12345L;
+        TNetworkAddress firstAddress = new TNetworkAddress("127.0.0.1", 9050);
+        TNetworkAddress secondAddress = new TNetworkAddress("127.0.0.2", 9050);
+        BackendService.Client firstClient = 
Mockito.mock(BackendService.Client.class);
+        CloudWarmUpJob job = createRunningJob(jobId, firstAddress, 
secondAddress);
+
+        
Mockito.when(mockBackendPool.borrowObject(firstAddress)).thenReturn(firstClient);
+        
Mockito.when(mockBackendPool.borrowObject(secondAddress)).thenThrow(new 
RuntimeException("down"));
+
+        Assert.assertThrows(RuntimeException.class, job::initClients);
+        Mockito.verify(mockBackendPool).returnObject(firstAddress, 
firstClient);
+        Mockito.verify(mockBackendPool).invalidateObject(secondAddress, null);
+    }
+
+    @Test
+    public void testClearJobSkipsUnavailableBackendAndClearsAvailableBackend() 
throws Exception {
+        long jobId = 12346L;
+        TNetworkAddress unavailableAddress = new TNetworkAddress("127.0.0.1", 
9050);
+        TNetworkAddress availableAddress = new TNetworkAddress("127.0.0.2", 
9050);
+        BackendService.Client availableClient = 
Mockito.mock(BackendService.Client.class);
+        CloudWarmUpJob job = createRunningJob(jobId, unavailableAddress, 
availableAddress);
+
+        
Mockito.when(mockBackendPool.borrowObject(unavailableAddress)).thenThrow(new 
RuntimeException("down"));
+        
Mockito.when(mockBackendPool.borrowObject(availableAddress)).thenReturn(availableClient);
+        
Mockito.when(availableClient.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+                .thenReturn(new TWarmUpTabletsResponse());
+
+        invokeClearJobOnBEs(job);
+
+        ArgumentCaptor<TWarmUpTabletsRequest> captor = 
ArgumentCaptor.forClass(TWarmUpTabletsRequest.class);
+        Mockito.verify(availableClient).warmUpTablets(captor.capture());
+        TWarmUpTabletsRequest request = captor.getValue();
+        Assert.assertEquals(TWarmUpTabletsRequestType.CLEAR_JOB, 
request.getType());
+        Assert.assertEquals(jobId, request.getJobId());
+        Mockito.verify(mockBackendPool).returnObject(availableAddress, 
availableClient);
+        Mockito.verify(mockBackendPool).invalidateObject(unavailableAddress, 
null);
+    }
+
+    private CloudWarmUpJob createRunningJob(long jobId, TNetworkAddress 
firstAddress,
+            TNetworkAddress secondAddress) {
+        CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+                .setJobId(jobId)
+                .setSrcClusterName("source_cluster")
+                .setDstClusterName("target_cluster")
+                .build();
+        job.setJobState(JobState.RUNNING);
+        Map<Long, String> beToThriftAddress = new LinkedHashMap<>();
+        beToThriftAddress.put(1L, firstAddress.getHostname() + ":" + 
firstAddress.getPort());
+        beToThriftAddress.put(2L, secondAddress.getHostname() + ":" + 
secondAddress.getPort());
+        job.setBeToThriftAddress(beToThriftAddress);
+        return job;
+    }
+
+    private void invokeClearJobOnBEs(CloudWarmUpJob job) throws Exception {
+        Method method = 
CloudWarmUpJob.class.getDeclaredMethod("clearJobOnBEs");
+        method.setAccessible(true);
+        method.invoke(job);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to