This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 140ab60a74 [Enhancement](multi-catalog) add a BE selection strategy 
for hdfs short-circuit-read. (#22697)
140ab60a74 is described below

commit 140ab60a749614c739851ad23391c25d3c277378
Author: Xiangyu Wang <dut.xian...@gmail.com>
AuthorDate: Tue Aug 15 15:34:39 2023 +0800

    [Enhancement](multi-catalog) add a BE selection strategy for hdfs 
short-circuit-read. (#22697)
    
    Sometimes the BEs will be deployed on the same node with DataNode, so we 
can use a more reasonable BE selection policy to use the hdfs 
short-circuit-read as much as possible.
---
 .../org/apache/doris/catalog/HdfsResource.java     |   7 +-
 .../planner/external/FederationBackendPolicy.java  |  28 +++++-
 .../doris/planner/external/FileQueryScanNode.java  |  18 +++-
 .../doris/planner/FederationBackendPolicyTest.java | 104 +++++++++++++++++++++
 4 files changed, 151 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index 9735f2f059..1f87f93e9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -75,7 +75,7 @@ public class HdfsResource extends Resource {
     protected void setProperties(Map<String, String> properties) throws 
DdlException {
         // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should 
be both set to enable short circuit read.
         // We should disable short circuit read if they are not both set 
because it will cause performance down.
-        if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || 
!properties.containsKey(HADOOP_SOCKET_PATH)) {
+        if (!(enableShortCircuitRead(properties))) {
             properties.put(HADOOP_SHORT_CIRCUIT, "false");
         }
         this.properties = properties;
@@ -94,6 +94,11 @@ public class HdfsResource extends Resource {
         }
     }
 
+    public static boolean enableShortCircuitRead(Map<String, String> 
properties) {
+        return 
"true".equalsIgnoreCase(properties.getOrDefault(HADOOP_SHORT_CIRCUIT, "false"))
+                    && properties.containsKey(HADOOP_SOCKET_PATH);
+    }
+
     // Will be removed after BE unified storage params
     public static THdfsParams generateHdfsParam(Map<String, String> 
properties) {
         THdfsParams tHdfsParams = new THdfsParams();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index daa1b151ea..eb2545affd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -31,21 +31,29 @@ import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class FederationBackendPolicy {
     private static final Logger LOG = 
LogManager.getLogger(FederationBackendPolicy.class);
     private final List<Backend> backends = Lists.newArrayList();
+    private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
+    private final Random random = new Random(System.currentTimeMillis());
     private ConsistentHash<TScanRangeLocations, Backend> consistentHash;
 
     private int nextBe = 0;
@@ -87,6 +95,7 @@ public class FederationBackendPolicy {
         if (backends.isEmpty()) {
             throw new UserException("No available backends");
         }
+        
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
         int virtualNumber = Math.max(Math.min(512 / backends.size(), 32), 2);
         consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new 
ScanRangeHash(),
                 new BackendHash(), backends, virtualNumber);
@@ -102,12 +111,27 @@ public class FederationBackendPolicy {
         return consistentHash.getNode(scanRangeLocations);
     }
 
+    // Try to find a local BE, if not exists, use `getNextBe` instead
+    public Backend getNextLocalBe(List<String> hosts) {
+        List<Backend> candidateBackends = 
Lists.newArrayListWithCapacity(hosts.size());
+        for (String host : hosts) {
+            List<Backend> backends = backendMap.get(host);
+            if (CollectionUtils.isNotEmpty(backends)) {
+                
candidateBackends.add(backends.get(random.nextInt(backends.size())));
+            }
+        }
+
+        return CollectionUtils.isEmpty(candidateBackends)
+                    ? getNextBe()
+                    : 
candidateBackends.get(random.nextInt(candidateBackends.size()));
+    }
+
     public int numBackends() {
         return backends.size();
     }
 
-    public List<Backend> getBackends() {
-        return backends;
+    public Collection<Backend> getBackends() {
+        return CollectionUtils.unmodifiableCollection(backends);
     }
 
     private static class BackendHash implements Funnel<Backend> {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index db8f835abb..adbbbfaeaf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -74,6 +74,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -271,6 +272,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             params.setProperties(locationProperties);
         }
 
+        boolean enableSqlCache = 
ConnectContext.get().getSessionVariable().enableFileCache;
+        boolean enableShortCircuitRead = 
HdfsResource.enableShortCircuitRead(locationProperties);
         List<String> pathPartitionKeys = getPathPartitionKeys();
         for (Split split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
@@ -312,6 +315,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
                 rangeDesc.setTableFormatParams(tableFormatFileDesc);
             }
+
             // external data lake table
             if (fileSplit instanceof IcebergSplit) {
                 // TODO: extract all data lake split to factory
@@ -322,11 +326,19 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
             }
 
+            Backend selectedBackend;
+            if (enableSqlCache) {
+                // Use consistent hash to assign the same scan range into the 
same backend among different queries
+                selectedBackend = 
backendPolicy.getNextConsistentBe(curLocations);
+            } else if (enableShortCircuitRead) {
+                // Try to find a local BE if enable hdfs short circuit read
+                selectedBackend = 
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
+            } else {
+                selectedBackend = backendPolicy.getNextBe();
+            }
+
             
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
             TScanRangeLocation location = new TScanRangeLocation();
-            // Use consistent hash to assign the same scan range into the same 
backend among different queries
-            Backend selectedBackend = 
ConnectContext.get().getSessionVariable().enableFileCache
-                    ? backendPolicy.getNextConsistentBe(curLocations) : 
backendPolicy.getNextBe();
             location.setBackendId(selectedBackend.getId());
             location.setServer(new TNetworkAddress(selectedBackend.getHost(), 
selectedBackend.getBePort()));
             curLocations.addToLocations(location);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
new file mode 100644
index 0000000000..05a7f6985d
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.external.FederationBackendPolicy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Stopwatch;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class FederationBackendPolicyTest {
+    @Mocked
+    private Env env;
+
+    @Before
+    public void setUp() {
+
+        SystemInfoService service = new SystemInfoService();
+
+        for (int i = 0; i < 190; i++) {
+            Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 
9050);
+            backend.setAlive(true);
+            service.addBackend(backend);
+        }
+        for (int i = 0; i < 10; i++) {
+            Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." 
+ i, 9051);
+            backend.setAlive(true);
+            service.addBackend(backend);
+        }
+        for (int i = 0; i < 10; i++) {
+            Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." 
+ i, 9050);
+            backend.setAlive(false);
+            service.addBackend(backend);
+        }
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+    }
+
+    @Test
+    public void testGetNextBe() throws UserException {
+        FederationBackendPolicy policy = new FederationBackendPolicy();
+        policy.init();
+        int backendNum = 200;
+        int invokeTimes = 1000000;
+        Assertions.assertEquals(policy.numBackends(), backendNum);
+        Stopwatch sw = Stopwatch.createStarted();
+        for (int i = 0; i < invokeTimes; i++) {
+            
Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2."));
+        }
+        sw.stop();
+        System.out.println("Invoke getNextBe() " + invokeTimes
+                    + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] 
ms");
+    }
+
+    @Test
+    public void testGetNextLocalBe() throws UserException {
+        FederationBackendPolicy policy = new FederationBackendPolicy();
+        policy.init();
+        int backendNum = 200;
+        int invokeTimes = 1000000;
+        Assertions.assertEquals(policy.numBackends(), backendNum);
+        List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", 
"192.168.1.2");
+        Stopwatch sw = Stopwatch.createStarted();
+        for (int i = 0; i < invokeTimes; i++) {
+            
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
+        }
+        sw.stop();
+        System.out.println("Invoke getNextLocalBe() " + invokeTimes
+                    + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] 
ms");
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to