This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 51239f39259 branch-4.0: [fix](fe) Clear warmup jobs on available
backends (#62931) (#63131)
51239f39259 is described below
commit 51239f39259fad92af5dfad3ec095a166b587cc0
Author: bobhan1 <[email protected]>
AuthorDate: Thu May 28 10:19:46 2026 +0800
branch-4.0: [fix](fe) Clear warmup jobs on available backends (#62931)
(#63131)
pick https://github.com/apache/doris/pull/62931
---
.../org/apache/doris/cloud/CloudWarmUpJob.java | 57 ++++++++++----
.../org/apache/doris/cloud/CloudWarmUpJobTest.java | 90 ++++++++++++++++++++++
2 files changed, 131 insertions(+), 16 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index 593e68ef8d8..ae12107c3dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -511,6 +511,20 @@ public class CloudWarmUpJob implements Writable {
}
public void initClients() throws Exception {
+ prepareClients();
+ if (beToClient.isEmpty()) {
+ try {
+ for (Map.Entry<Long, String> entry :
beToThriftAddress.entrySet()) {
+ initClient(entry.getKey(), entry.getValue());
+ }
+ } catch (Exception e) {
+ releaseClients();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void prepareClients() {
if (beToThriftAddress == null || beToThriftAddress.isEmpty()) {
fetchBeToThriftAddress();
}
@@ -518,25 +532,36 @@ public class CloudWarmUpJob implements Writable {
beToClient = new HashMap<>();
beToAddr = new HashMap<>();
}
+ }
+
+ private void initClient(long beId, String thriftAddress) throws Exception {
+ boolean ok = false;
+ TNetworkAddress address = null;
+ Client client = null;
+ try {
+ String[] ipPort = thriftAddress.split(":");
+ address = new TNetworkAddress(ipPort[0],
Integer.parseInt(ipPort[1]));
+ beToAddr.put(beId, address);
+ client = ClientPool.backendPool.borrowObject(address);
+ beToClient.put(beId, client);
+ ok = true;
+ } finally {
+ if (!ok) {
+ ClientPool.backendPool.invalidateObject(address, client);
+ beToAddr.remove(beId);
+ }
+ }
+ }
+
+ private void initClientsForClearJob() {
+ prepareClients();
if (beToClient.isEmpty()) {
for (Map.Entry<Long, String> entry : beToThriftAddress.entrySet())
{
- boolean ok = false;
- TNetworkAddress address = null;
- Client client = null;
try {
- String[] ipPort = entry.getValue().split(":");
- address = new TNetworkAddress(ipPort[0],
Integer.parseInt(ipPort[1]));
- beToAddr.put(entry.getKey(), address);
- client = ClientPool.backendPool.borrowObject(address);
- beToClient.put(entry.getKey(), client);
- ok = true;
+ initClient(entry.getKey(), entry.getValue());
} catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- if (!ok) {
- ClientPool.backendPool.invalidateObject(address,
client);
- releaseClients();
- }
+ LOG.warn("init client for BE {} ({}) failed when clearing
warm up job {}: {}",
+ entry.getKey(), entry.getValue(), jobId,
e.getMessage());
}
}
}
@@ -575,7 +600,7 @@ public class CloudWarmUpJob implements Writable {
private final void clearJobOnBEs() {
try {
- initClients();
+ initClientsForClearJob();
// Iterate with explicit iterator so we can remove invalidated
clients during iteration.
Iterator<Map.Entry<Long, Client>> iter =
beToClient.entrySet().iterator();
while (iter.hasNext()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
index add89a0c51c..3da09461138 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
@@ -18,23 +18,52 @@
package org.apache.doris.cloud;
import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.CloudWarmUpJob.JobState;
import org.apache.doris.cloud.CloudWarmUpJob.JobType;
import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.GenericPool;
import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TWarmUpTabletsRequest;
+import org.apache.doris.thrift.TWarmUpTabletsRequestType;
+import org.apache.doris.thrift.TWarmUpTabletsResponse;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
public class CloudWarmUpJobTest {
+ private GenericPool<BackendService.Client> originalBackendPool;
+ private GenericPool<BackendService.Client> mockBackendPool;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() {
+ originalBackendPool = ClientPool.backendPool;
+ mockBackendPool = Mockito.mock(GenericPool.class);
+ ClientPool.backendPool = mockBackendPool;
+ }
+
+ @After
+ public void tearDown() {
+ ClientPool.backendPool = originalBackendPool;
+ }
+
@Test
public void testEventDrivenRefreshesSourceBackends() {
CloudSystemInfoService cloudSystemInfoService =
Mockito.mock(CloudSystemInfoService.class);
@@ -70,4 +99,65 @@ public class CloudWarmUpJobTest {
Assert.assertEquals("host1:9060",
warmUpJob.getBeToThriftAddress().get(1L));
Assert.assertEquals("host2:9061",
warmUpJob.getBeToThriftAddress().get(2L));
}
+
+ @Test
+ public void testInitClientsKeepsFailFastForWarmUpRpc() throws Exception {
+ long jobId = 12345L;
+ TNetworkAddress firstAddress = new TNetworkAddress("127.0.0.1", 9050);
+ TNetworkAddress secondAddress = new TNetworkAddress("127.0.0.2", 9050);
+ BackendService.Client firstClient =
Mockito.mock(BackendService.Client.class);
+ CloudWarmUpJob job = createRunningJob(jobId, firstAddress,
secondAddress);
+
+
Mockito.when(mockBackendPool.borrowObject(firstAddress)).thenReturn(firstClient);
+
Mockito.when(mockBackendPool.borrowObject(secondAddress)).thenThrow(new
RuntimeException("down"));
+
+ Assert.assertThrows(RuntimeException.class, job::initClients);
+ Mockito.verify(mockBackendPool).returnObject(firstAddress,
firstClient);
+ Mockito.verify(mockBackendPool).invalidateObject(secondAddress, null);
+ }
+
+ @Test
+ public void testClearJobSkipsUnavailableBackendAndClearsAvailableBackend()
throws Exception {
+ long jobId = 12346L;
+ TNetworkAddress unavailableAddress = new TNetworkAddress("127.0.0.1",
9050);
+ TNetworkAddress availableAddress = new TNetworkAddress("127.0.0.2",
9050);
+ BackendService.Client availableClient =
Mockito.mock(BackendService.Client.class);
+ CloudWarmUpJob job = createRunningJob(jobId, unavailableAddress,
availableAddress);
+
+
Mockito.when(mockBackendPool.borrowObject(unavailableAddress)).thenThrow(new
RuntimeException("down"));
+
Mockito.when(mockBackendPool.borrowObject(availableAddress)).thenReturn(availableClient);
+
Mockito.when(availableClient.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+ .thenReturn(new TWarmUpTabletsResponse());
+
+ invokeClearJobOnBEs(job);
+
+ ArgumentCaptor<TWarmUpTabletsRequest> captor =
ArgumentCaptor.forClass(TWarmUpTabletsRequest.class);
+ Mockito.verify(availableClient).warmUpTablets(captor.capture());
+ TWarmUpTabletsRequest request = captor.getValue();
+ Assert.assertEquals(TWarmUpTabletsRequestType.CLEAR_JOB,
request.getType());
+ Assert.assertEquals(jobId, request.getJobId());
+ Mockito.verify(mockBackendPool).returnObject(availableAddress,
availableClient);
+ Mockito.verify(mockBackendPool).invalidateObject(unavailableAddress,
null);
+ }
+
+ private CloudWarmUpJob createRunningJob(long jobId, TNetworkAddress
firstAddress,
+ TNetworkAddress secondAddress) {
+ CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+ .setJobId(jobId)
+ .setSrcClusterName("source_cluster")
+ .setDstClusterName("target_cluster")
+ .build();
+ job.setJobState(JobState.RUNNING);
+ Map<Long, String> beToThriftAddress = new LinkedHashMap<>();
+ beToThriftAddress.put(1L, firstAddress.getHostname() + ":" +
firstAddress.getPort());
+ beToThriftAddress.put(2L, secondAddress.getHostname() + ":" +
secondAddress.getPort());
+ job.setBeToThriftAddress(beToThriftAddress);
+ return job;
+ }
+
+ private void invokeClearJobOnBEs(CloudWarmUpJob job) throws Exception {
+ Method method =
CloudWarmUpJob.class.getDeclaredMethod("clearJobOnBEs");
+ method.setAccessible(true);
+ method.invoke(job);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]