This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7ae83b60fdfaa633bba6e75732e437075773462d
Author: xzj7019 <131111794+xzj7...@users.noreply.github.com>
AuthorDate: Tue May 21 14:20:48 2024 +0800

    [opt](Nereids) opt locality under multi-replica (#34927)
    
    Make tablet locality fixed under multi-replica cases.
    Session variable: set enable_ordered_scan_range_locations = true, default 
false;
    3 replica tpcds 100g: 7% improvement
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 72 +++++++++++++++++-----
 .../java/org/apache/doris/qe/SessionVariable.java  |  5 ++
 .../java/org/apache/doris/qe/CoordinatorTest.java  |  8 +--
 3 files changed, 65 insertions(+), 20 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 1c272b1dbb1..ba7ea54b098 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -144,6 +144,7 @@ import java.security.SecureRandom;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -2264,7 +2265,13 @@ public class Coordinator implements CoordInterface {
     private void computeScanRangeAssignment() throws Exception {
         Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
         Map<TNetworkAddress, Long> replicaNumPerHost = 
getReplicaNumPerHostForOlapTable();
-        Collections.shuffle(scanNodes);
+        boolean isAllOlapTables = scanNodes.stream().allMatch(e -> e 
instanceof OlapScanNode);
+        boolean isEnableOrderedLocations = ConnectContext.get() != null
+                && 
ConnectContext.get().getSessionVariable().enableOrderedScanRangeLocations
+                && isAllOlapTables;
+        if (isEnableOrderedLocations) {
+            sortScanNodes();
+        }
         // set scan ranges/locations for scan nodes
         for (ScanNode scanNode : scanNodes) {
             if (!(scanNode instanceof ExternalScanNode)) {
@@ -2277,6 +2284,9 @@ public class Coordinator implements CoordInterface {
                 // only analysis olap scan node
                 continue;
             }
+            if (isEnableOrderedLocations) {
+                sortScanRangeLocations(locations);
+            }
             Set<Integer> scanNodeIds = 
fragmentIdToScanNodeIds.computeIfAbsent(scanNode.getFragmentId(),
                     k -> Sets.newHashSet());
             scanNodeIds.add(scanNode.getId().asInt());
@@ -2296,7 +2306,8 @@ public class Coordinator implements CoordInterface {
             // A fragment may contain both colocate join and bucket shuffle 
join
             // on need both compute scanRange to init basic data for query 
coordinator
             if (fragmentContainsColocateJoin) {
-                computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, 
assignedBytesPerHost, replicaNumPerHost);
+                computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, 
assignedBytesPerHost,
+                        replicaNumPerHost, isEnableOrderedLocations);
             }
             if (fragmentContainsBucketShuffleJoin) {
                 
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) 
scanNode,
@@ -2304,7 +2315,7 @@ public class Coordinator implements CoordInterface {
             }
             if (!(fragmentContainsColocateJoin || 
fragmentContainsBucketShuffleJoin)) {
                 computeScanRangeAssignmentByScheduler(scanNode, locations, 
assignment, assignedBytesPerHost,
-                        replicaNumPerHost);
+                        replicaNumPerHost, isEnableOrderedLocations);
             }
         }
     }
@@ -2312,7 +2323,7 @@ public class Coordinator implements CoordInterface {
     // To ensure the same bucketSeq tablet to the same execHostPort
     private void computeScanRangeAssignmentByColocate(
             final OlapScanNode scanNode, Map<TNetworkAddress, Long> 
assignedBytesPerHost,
-            Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
+            Map<TNetworkAddress, Long> replicaNumPerHost, boolean 
isEnableOrderedLocations) throws Exception {
         if 
(!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
             fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new 
HashMap<>());
             fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), 
new BucketSeqToScanRange());
@@ -2333,7 +2344,8 @@ public class Coordinator implements CoordInterface {
             List<TScanRangeLocations> locations = 
scanNode.bucketSeq2locations.get(bucketSeq);
             if (!bucketSeqToAddress.containsKey(bucketSeq)) {
                 getExecHostPortForFragmentIDAndBucketSeq(locations.get(0),
-                        scanNode.getFragmentId(), bucketSeq, 
assignedBytesPerHost, replicaNumPerHost);
+                        scanNode.getFragmentId(), bucketSeq, 
assignedBytesPerHost,
+                        replicaNumPerHost, isEnableOrderedLocations);
             }
 
             for (TScanRangeLocations location : locations) {
@@ -2355,29 +2367,55 @@ public class Coordinator implements CoordInterface {
     //ensure bucket sequence distribued to every host evenly
     private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations 
seqLocation,
             PlanFragmentId fragmentId, Integer bucketSeq, Map<TNetworkAddress, 
Long> assignedBytesPerHost,
-            Map<TNetworkAddress, Long> replicaNumPerHost)
+            Map<TNetworkAddress, Long> replicaNumPerHost, boolean 
isEnableOrderedLocations)
             throws Exception {
         Reference<Long> backendIdRef = new Reference<Long>();
-        selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, 
replicaNumPerHost, backendIdRef);
+        selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, 
replicaNumPerHost,
+                backendIdRef, isEnableOrderedLocations);
         Backend backend = this.idToBackend.get(backendIdRef.getRef());
         TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
         this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
         this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, 
execHostPort);
     }
 
+    private void sortScanNodes() {
+        Collections.sort(scanNodes, new Comparator<ScanNode>() {
+            @Override
+            public int compare(ScanNode s1, ScanNode s2) {
+                return java.lang.Integer.compare(s1.getId().asInt(), 
s2.getId().asInt());
+            }
+        });
+    }
+
+    private void sortScanRangeLocations(List<TScanRangeLocations> locations) {
+        Collections.sort(locations, new Comparator<TScanRangeLocations>() {
+            @Override
+            public int compare(TScanRangeLocations o1, TScanRangeLocations o2) 
{
+                return org.apache.thrift.TBaseHelper.compareTo(
+                        o1.getScanRange().getPaloScanRange().tablet_id,
+                        o2.getScanRange().getPaloScanRange().tablet_id);
+            }
+        });
+    }
+
     public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations 
seqLocation,
                                                          Map<TNetworkAddress, 
Long> assignedBytesPerHost,
                                                          Map<TNetworkAddress, 
Long> replicaNumPerHost,
-                                                         Reference<Long> 
backendIdRef) throws UserException {
+                                                         Reference<Long> 
backendIdRef,
+                                                         boolean 
isEnableOrderedLocations) throws UserException {
+        List<TScanRangeLocation> locations = seqLocation.getLocations();
+        if (isEnableOrderedLocations) {
+            Collections.sort(locations);
+        }
         if (!Config.enable_local_replica_selection) {
-            return selectBackendsByRoundRobin(seqLocation.getLocations(), 
assignedBytesPerHost, replicaNumPerHost,
+            return selectBackendsByRoundRobin(locations, assignedBytesPerHost, 
replicaNumPerHost,
                     backendIdRef);
         }
 
         List<TScanRangeLocation> localLocations = new ArrayList<>();
         List<TScanRangeLocation> nonlocalLocations = new ArrayList<>();
         long localBeId = 
Env.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
-        for (final TScanRangeLocation location : seqLocation.getLocations()) {
+        for (final TScanRangeLocation location : locations) {
             if (location.backend_id == localBeId) {
                 localLocations.add(location);
             } else {
@@ -2395,14 +2433,15 @@ public class Coordinator implements CoordInterface {
         }
     }
 
-    public TScanRangeLocation 
selectBackendsByRoundRobin(List<TScanRangeLocation> locations,
+    public TScanRangeLocation 
selectBackendsByRoundRobin(List<TScanRangeLocation> sortedLocations,
             Map<TNetworkAddress, Long> assignedBytesPerHost, 
Map<TNetworkAddress, Long> replicaNumPerHost,
             Reference<Long> backendIdRef) throws UserException {
         Long minAssignedBytes = Long.MAX_VALUE;
         Long minReplicaNum = Long.MAX_VALUE;
         TScanRangeLocation minLocation = null;
         Long step = 1L;
-        for (final TScanRangeLocation location : locations) {
+
+        for (final TScanRangeLocation location : sortedLocations) {
             Long assignedBytes = findOrInsert(assignedBytesPerHost, 
location.server, 0L);
             if (assignedBytes < minAssignedBytes || 
(assignedBytes.equals(minAssignedBytes)
                     && replicaNumPerHost.get(location.server) < 
minReplicaNum)) {
@@ -2411,10 +2450,10 @@ public class Coordinator implements CoordInterface {
                 minLocation = location;
             }
         }
-        for (TScanRangeLocation location : locations) {
+        for (TScanRangeLocation location : sortedLocations) {
             replicaNumPerHost.put(location.server, 
replicaNumPerHost.get(location.server) - 1);
         }
-        TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, 
locations,
+        TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, 
sortedLocations,
                 this.idToBackend, backendIdRef);
         assignedBytesPerHost.put(location.server, 
assignedBytesPerHost.get(location.server) + step);
 
@@ -2426,7 +2465,8 @@ public class Coordinator implements CoordInterface {
             final List<TScanRangeLocations> locations,
             FragmentScanRangeAssignment assignment,
             Map<TNetworkAddress, Long> assignedBytesPerHost,
-            Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
+            Map<TNetworkAddress, Long> replicaNumPerHost,
+            boolean isEnableOrderedLocations) throws Exception {
         // Type of locations is List, it could have elements that have same 
"location"
         // and we do have this situation for some scan node.
         // The duplicate "location" will NOT be filtered by 
FragmentScanRangeAssignment,
@@ -2435,7 +2475,7 @@ public class Coordinator implements CoordInterface {
         for (TScanRangeLocations scanRangeLocations : locations) {
             Reference<Long> backendIdRef = new Reference<Long>();
             TScanRangeLocation minLocation = 
selectBackendsByRoundRobin(scanRangeLocations,
-                    assignedBytesPerHost, replicaNumPerHost, backendIdRef);
+                    assignedBytesPerHost, replicaNumPerHost, backendIdRef, 
isEnableOrderedLocations);
             Backend backend = this.idToBackend.get(backendIdRef.getRef());
             TNetworkAddress execHostPort = new 
TNetworkAddress(backend.getHost(), backend.getBePort());
             this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2af7725e029..ac653b82ec9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -420,6 +420,8 @@ public class SessionVariable implements Serializable, 
Writable {
      */
     public static final String ENABLE_UNIFIED_LOAD = "enable_unified_load";
 
+    public static final String ENABLE_ORDERED_SCAN_RANGE_LOCATIONS = 
"enable_ordered_scan_range_locations";
+
     public static final String ENABLE_PARQUET_LAZY_MAT = 
"enable_parquet_lazy_materialization";
 
     public static final String ENABLE_ORC_LAZY_MAT = 
"enable_orc_lazy_materialization";
@@ -1468,6 +1470,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_CTE_MATERIALIZE)
     public boolean enableCTEMaterialize = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_ORDERED_SCAN_RANGE_LOCATIONS)
+    public boolean enableOrderedScanRangeLocations = false;
+
     @VariableMgr.VarAttr(name = ENABLE_ANALYZE_COMPLEX_TYPE_COLUMN)
     public boolean enableAnalyzeComplexTypeColumn = false;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index ca8109e40e3..4c38ddd2749 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -593,7 +593,7 @@ public class CoordinatorTest extends Coordinator {
         locations.add(tScanRangeLocations);
         locations.add(tScanRangeLocations1);
         Deencapsulation.invoke(coordinator, 
"computeScanRangeAssignmentByScheduler",
-                olapScanNode, locations, assignment, assignedBytesPerHost, 
replicaNumPerHost);
+                olapScanNode, locations, assignment, assignedBytesPerHost, 
replicaNumPerHost, false);
         for (Map.Entry entry : assignment.entrySet()) {
             Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, 
List<TScanRangeParams>>) entry.getValue();
             for (Map.Entry item : addr.entrySet()) {
@@ -653,11 +653,11 @@ public class CoordinatorTest extends Coordinator {
         replicaNumPerHost.put(tScanRangeLocation2.server, 1L);
 
         Deencapsulation.invoke(coordinator, 
"getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
-                planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost);
+                planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost, 
false);
         Deencapsulation.invoke(coordinator, 
"getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
-                planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost);
+                planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost, 
false);
         Deencapsulation.invoke(coordinator, 
"getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
-                planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost);
+                planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost, 
false);
         List<String> hosts = new ArrayList<>();
         for (Map.Entry item : assignedBytesPerHost.entrySet()) {
             Assert.assertTrue((Long) item.getValue() == 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to