This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 140ab60a74 [Enhancement](multi-catalog) add a BE selection strategy for hdfs short-circuit-read. (#22697) 140ab60a74 is described below commit 140ab60a749614c739851ad23391c25d3c277378 Author: Xiangyu Wang <dut.xian...@gmail.com> AuthorDate: Tue Aug 15 15:34:39 2023 +0800 [Enhancement](multi-catalog) add a BE selection strategy for hdfs short-circuit-read. (#22697) Sometimes the BEs will be deployed on the same node with DataNode, so we can use a more reasonable BE selection policy to use the hdfs short-circuit-read as much as possible. --- .../org/apache/doris/catalog/HdfsResource.java | 7 +- .../planner/external/FederationBackendPolicy.java | 28 +++++- .../doris/planner/external/FileQueryScanNode.java | 18 +++- .../doris/planner/FederationBackendPolicyTest.java | 104 +++++++++++++++++++++ 4 files changed, 151 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java index 9735f2f059..1f87f93e9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java @@ -75,7 +75,7 @@ public class HdfsResource extends Resource { protected void setProperties(Map<String, String> properties) throws DdlException { // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read. // We should disable short circuit read if they are not both set because it will cause performance down. - if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) { + if (!(enableShortCircuitRead(properties))) { properties.put(HADOOP_SHORT_CIRCUIT, "false"); } this.properties = properties; @@ -94,6 +94,11 @@ public class HdfsResource extends Resource { } } + public static boolean enableShortCircuitRead(Map<String, String> properties) { + return "true".equalsIgnoreCase(properties.getOrDefault(HADOOP_SHORT_CIRCUIT, "false")) + && properties.containsKey(HADOOP_SOCKET_PATH); + } + // Will be removed after BE unified storage params public static THdfsParams generateHdfsParam(Map<String, String> properties) { THdfsParams tHdfsParams = new THdfsParams(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index daa1b151ea..eb2545affd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -31,21 +31,29 @@ import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.hash.Funnel; import com.google.common.hash.Hashing; import com.google.common.hash.PrimitiveSink; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; public class FederationBackendPolicy { private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class); private final List<Backend> backends = Lists.newArrayList(); + private final Map<String, List<Backend>> backendMap = Maps.newHashMap(); + private final Random random = new Random(System.currentTimeMillis()); private ConsistentHash<TScanRangeLocations, Backend> consistentHash; private int nextBe = 0; @@ -87,6 +95,7 @@ public class FederationBackendPolicy { if (backends.isEmpty()) { throw new UserException("No available backends"); } + backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); int virtualNumber = Math.max(Math.min(512 / backends.size(), 32), 2); consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), new BackendHash(), backends, virtualNumber); @@ -102,12 +111,27 @@ public class FederationBackendPolicy { return consistentHash.getNode(scanRangeLocations); } + // Try to find a local BE, if not exists, use `getNextBe` instead + public Backend getNextLocalBe(List<String> hosts) { + List<Backend> candidateBackends = Lists.newArrayListWithCapacity(hosts.size()); + for (String host : hosts) { + List<Backend> backends = backendMap.get(host); + if (CollectionUtils.isNotEmpty(backends)) { + candidateBackends.add(backends.get(random.nextInt(backends.size()))); + } + } + + return CollectionUtils.isEmpty(candidateBackends) + ? getNextBe() + : candidateBackends.get(random.nextInt(candidateBackends.size())); + } + public int numBackends() { return backends.size(); } - public List<Backend> getBackends() { - return backends; + public Collection<Backend> getBackends() { + return CollectionUtils.unmodifiableCollection(backends); } private static class BackendHash implements Funnel<Backend> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index db8f835abb..adbbbfaeaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -74,6 +74,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -271,6 +272,8 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setProperties(locationProperties); } + boolean enableSqlCache = ConnectContext.get().getSessionVariable().enableFileCache; + boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties); List<String> pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { FileSplit fileSplit = (FileSplit) split; @@ -312,6 +315,7 @@ public abstract class FileQueryScanNode extends FileScanNode { tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } + // external data lake table if (fileSplit instanceof IcebergSplit) { // TODO: extract all data lake split to factory @@ -322,11 +326,19 @@ public abstract class FileQueryScanNode extends FileScanNode { HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit); } + Backend selectedBackend; + if (enableSqlCache) { + // Use consistent hash to assign the same scan range into the same backend among different queries + selectedBackend = backendPolicy.getNextConsistentBe(curLocations); + } else if (enableShortCircuitRead) { + // Try to find a local BE if enable hdfs short circuit read + selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts())); + } else { + selectedBackend = backendPolicy.getNextBe(); + } + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); - // Use consistent hash to assign the same scan range into the same backend among different queries - Backend selectedBackend = ConnectContext.get().getSessionVariable().enableFileCache - ? backendPolicy.getNextConsistentBe(curLocations) : backendPolicy.getNextBe(); location.setBackendId(selectedBackend.getId()); location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); curLocations.addToLocations(location); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java new file mode 100644 index 0000000000..05a7f6985d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.FederationBackendPolicy; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.base.Stopwatch; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class FederationBackendPolicyTest { + @Mocked + private Env env; + + @Before + public void setUp() { + + SystemInfoService service = new SystemInfoService(); + + for (int i = 0; i < 190; i++) { + Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 9050); + backend.setAlive(true); + service.addBackend(backend); + } + for (int i = 0; i < 10; i++) { + Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." + i, 9051); + backend.setAlive(true); + service.addBackend(backend); + } + for (int i = 0; i < 10; i++) { + Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." + i, 9050); + backend.setAlive(false); + service.addBackend(backend); + } + + new MockUp<Env>() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + } + + @Test + public void testGetNextBe() throws UserException { + FederationBackendPolicy policy = new FederationBackendPolicy(); + policy.init(); + int backendNum = 200; + int invokeTimes = 1000000; + Assertions.assertEquals(policy.numBackends(), backendNum); + Stopwatch sw = Stopwatch.createStarted(); + for (int i = 0; i < invokeTimes; i++) { + Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2.")); + } + sw.stop(); + System.out.println("Invoke getNextBe() " + invokeTimes + + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); + } + + @Test + public void testGetNextLocalBe() throws UserException { + FederationBackendPolicy policy = new FederationBackendPolicy(); + policy.init(); + int backendNum = 200; + int invokeTimes = 1000000; + Assertions.assertEquals(policy.numBackends(), backendNum); + List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2"); + Stopwatch sw = Stopwatch.createStarted(); + for (int i = 0; i < invokeTimes; i++) { + Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost())); + } + sw.stop(); + System.out.println("Invoke getNextLocalBe() " + invokeTimes + + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org