This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fb3b0afb85d [opt](task-assignment) use consistent hash as default task 
assigner and cache the consistent hash ring (#28522)
fb3b0afb85d is described below

commit fb3b0afb85dccc9ee7964d16ecba9b01d09586b8
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Dec 19 22:29:35 2023 +0800

    [opt](task-assignment) use consistent hash as default task assigner and 
cache the consistent hash ring (#28522)
    
    1. Use consistent hash algo as the default assigner for file query scan node
        A consistent assignment can better utilize the page cache of BE node.
    
    2. Cache the consistent hash ring
        Init a consistent hash ring is time-consuming because there a thousands 
of virtual node need to be added.
        So cache it for better performance
---
 .../planner/external/FederationBackendPolicy.java  | 63 ++++++++++++++++++++--
 .../doris/planner/external/FileQueryScanNode.java  | 11 ++--
 .../doris/planner/FederationBackendPolicyTest.java | 45 +++++++++++++++-
 3 files changed, 107 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index 9e23463235f..d1d7e90d35a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -30,6 +30,9 @@ import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -46,7 +49,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 public class FederationBackendPolicy {
@@ -59,6 +64,53 @@ public class FederationBackendPolicy {
     private int nextBe = 0;
     private boolean initialized = false;
 
+    // Create a ConsistentHash ring may be a time-consuming operation, so we 
cache it.
+    private static LoadingCache<HashCacheKey, 
ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;
+
+    static {
+        consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
+                .build(new CacheLoader<HashCacheKey, 
ConsistentHash<TScanRangeLocations, Backend>>() {
+                    @Override
+                    public ConsistentHash<TScanRangeLocations, Backend> 
load(HashCacheKey key) {
+                        return new ConsistentHash<>(Hashing.murmur3_128(), new 
ScanRangeHash(),
+                                new BackendHash(), key.bes, 
Config.virtual_node_number);
+                    }
+                });
+    }
+
+    private static class HashCacheKey {
+        // sorted backend ids as key
+        private List<Long> beIds;
+        // backends is not part of key, just an attachment
+        private List<Backend> bes;
+
+        HashCacheKey(List<Backend> backends) {
+            this.bes = backends;
+            this.beIds = backends.stream().map(b -> 
b.getId()).sorted().collect(Collectors.toList());
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof HashCacheKey)) {
+                return false;
+            }
+            return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(beIds);
+        }
+
+        @Override
+        public String toString() {
+            return "HashCache{" + "beIds=" + beIds + '}';
+        }
+    }
+
     public void init() throws UserException {
         if (!initialized) {
             init(Collections.emptyList());
@@ -96,8 +148,11 @@ public class FederationBackendPolicy {
             throw new UserException("No available backends");
         }
         
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
-        consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new 
ScanRangeHash(),
-                new BackendHash(), backends, Config.virtual_node_number);
+        try {
+            consistentHash = consistentHashCache.get(new 
HashCacheKey(backends));
+        } catch (ExecutionException e) {
+            throw new UserException("failed to get consistent hash", e);
+        }
     }
 
     public Backend getNextBe() {
@@ -111,7 +166,7 @@ public class FederationBackendPolicy {
     }
 
     // Try to find a local BE, if not exists, use `getNextBe` instead
-    public Backend getNextLocalBe(List<String> hosts) {
+    public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations 
scanRangeLocations) {
         List<Backend> candidateBackends = 
Lists.newArrayListWithCapacity(hosts.size());
         for (String host : hosts) {
             List<Backend> backends = backendMap.get(host);
@@ -121,7 +176,7 @@ public class FederationBackendPolicy {
         }
 
         return CollectionUtils.isEmpty(candidateBackends)
-                    ? getNextBe()
+                    ? getNextConsistentBe(scanRangeLocations)
                     : 
candidateBackends.get(random.nextInt(candidateBackends.size()));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index ada9f1fda61..658ac49659b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -311,7 +311,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             params.setProperties(locationProperties);
         }
 
-        boolean enableSqlCache = 
ConnectContext.get().getSessionVariable().enableFileCache;
         boolean enableShortCircuitRead = 
HdfsResource.enableShortCircuitRead(locationProperties);
         List<String> pathPartitionKeys = getPathPartitionKeys();
         for (Split split : inputSplits) {
@@ -369,14 +368,12 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
             TScanRangeLocation location = new TScanRangeLocation();
             Backend selectedBackend;
-            if (enableSqlCache) {
-                // Use consistent hash to assign the same scan range into the 
same backend among different queries
-                selectedBackend = 
backendPolicy.getNextConsistentBe(curLocations);
-            } else if (enableShortCircuitRead) {
+            if (enableShortCircuitRead) {
                 // Try to find a local BE if enable hdfs short circuit read
-                selectedBackend = 
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
+                selectedBackend = 
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
             } else {
-                selectedBackend = backendPolicy.getNextBe();
+                // Use consistent hash to assign the same scan range into the 
same backend among different queries
+                selectedBackend = 
backendPolicy.getNextConsistentBe(curLocations);
             }
             setLocationPropertiesIfNecessary(selectedBackend, locationType, 
locationProperties);
             location.setBackendId(selectedBackend.getId());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index 05a7f6985df..ef65d1b6165 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -22,6 +22,11 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.planner.external.FederationBackendPolicy;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TExternalScanRange;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileScanRange;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Stopwatch;
 import mockit.Mock;
@@ -93,12 +98,50 @@ public class FederationBackendPolicyTest {
         int invokeTimes = 1000000;
         Assertions.assertEquals(policy.numBackends(), backendNum);
         List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", 
"192.168.1.2");
+        TScanRangeLocations scanRangeLocations = 
getScanRangeLocations("path1", 0, 100);
         Stopwatch sw = Stopwatch.createStarted();
         for (int i = 0; i < invokeTimes; i++) {
-            
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
+            
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, 
scanRangeLocations).getHost()));
         }
         sw.stop();
         System.out.println("Invoke getNextLocalBe() " + invokeTimes
                     + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] 
ms");
     }
+
+    @Test
+    public void testConsistentHash() throws UserException {
+        FederationBackendPolicy policy = new FederationBackendPolicy();
+        policy.init();
+        int backendNum = 200;
+        Assertions.assertEquals(policy.numBackends(), backendNum);
+
+        TScanRangeLocations scanRangeLocations = 
getScanRangeLocations("path1", 0, 100);
+        Assertions.assertEquals(39, 
policy.getNextConsistentBe(scanRangeLocations).getId());
+
+        scanRangeLocations = getScanRangeLocations("path2", 0, 100);
+        Assertions.assertEquals(78, 
policy.getNextConsistentBe(scanRangeLocations).getId());
+    }
+
+    private TScanRangeLocations getScanRangeLocations(String path, long 
startOffset, long size) {
+        // Generate on file scan range
+        TFileScanRange fileScanRange = new TFileScanRange();
+        // Scan range
+        TExternalScanRange externalScanRange = new TExternalScanRange();
+        externalScanRange.setFileScanRange(fileScanRange);
+        TScanRange scanRange = new TScanRange();
+        scanRange.setExtScanRange(externalScanRange);
+        
scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path,
 startOffset, size));
+        // Locations
+        TScanRangeLocations locations = new TScanRangeLocations();
+        locations.setScanRange(scanRange);
+        return locations;
+    }
+
+    private TFileRangeDesc createRangeDesc(String path, long startOffset, long 
size) {
+        TFileRangeDesc rangeDesc = new TFileRangeDesc();
+        rangeDesc.setPath(path);
+        rangeDesc.setStartOffset(startOffset);
+        rangeDesc.setSize(size);
+        return rangeDesc;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to