This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7ae83b60fdfaa633bba6e75732e437075773462d Author: xzj7019 <131111794+xzj7...@users.noreply.github.com> AuthorDate: Tue May 21 14:20:48 2024 +0800 [opt](Nereids) opt locality under multi-replica (#34927) Make tablet locality fixed under multi-replica cases. Session variable: set enable_ordered_scan_range_locations = true, default false; 3 replica tpcds 100g: 7% improvement --- .../main/java/org/apache/doris/qe/Coordinator.java | 72 +++++++++++++++++----- .../java/org/apache/doris/qe/SessionVariable.java | 5 ++ .../java/org/apache/doris/qe/CoordinatorTest.java | 8 +-- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1c272b1dbb1..ba7ea54b098 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -144,6 +144,7 @@ import java.security.SecureRandom; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -2264,7 +2265,13 @@ public class Coordinator implements CoordInterface { private void computeScanRangeAssignment() throws Exception { Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap(); Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHostForOlapTable(); - Collections.shuffle(scanNodes); + boolean isAllOlapTables = scanNodes.stream().allMatch(e -> e instanceof OlapScanNode); + boolean isEnableOrderedLocations = ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enableOrderedScanRangeLocations + && isAllOlapTables; + if (isEnableOrderedLocations) { + sortScanNodes(); + } // set scan ranges/locations for scan nodes for (ScanNode scanNode : scanNodes) { if (!(scanNode instanceof ExternalScanNode)) { @@ -2277,6 +2284,9 @@ public class Coordinator implements CoordInterface { // only analysis olap scan node continue; } + if (isEnableOrderedLocations) { + sortScanRangeLocations(locations); + } Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.computeIfAbsent(scanNode.getFragmentId(), k -> Sets.newHashSet()); scanNodeIds.add(scanNode.getId().asInt()); @@ -2296,7 +2306,8 @@ public class Coordinator implements CoordInterface { // A fragment may contain both colocate join and bucket shuffle join // on need both compute scanRange to init basic data for query coordinator if (fragmentContainsColocateJoin) { - computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); + computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, + replicaNumPerHost, isEnableOrderedLocations); } if (fragmentContainsBucketShuffleJoin) { bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, @@ -2304,7 +2315,7 @@ public class Coordinator implements CoordInterface { } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, - replicaNumPerHost); + replicaNumPerHost, isEnableOrderedLocations); } } } @@ -2312,7 +2323,7 @@ public class Coordinator implements CoordInterface { // To ensure the same bucketSeq tablet to the same execHostPort private void computeScanRangeAssignmentByColocate( final OlapScanNode scanNode, Map<TNetworkAddress, Long> assignedBytesPerHost, - Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception { + Map<TNetworkAddress, Long> replicaNumPerHost, boolean isEnableOrderedLocations) throws Exception { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); @@ -2333,7 +2344,8 @@ public class Coordinator implements CoordInterface { List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), - scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost, replicaNumPerHost); + scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost, + replicaNumPerHost, isEnableOrderedLocations); } for (TScanRangeLocations location : locations) { @@ -2355,29 +2367,55 @@ public class Coordinator implements CoordInterface { //ensure bucket sequence distribued to every host evenly private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq, Map<TNetworkAddress, Long> assignedBytesPerHost, - Map<TNetworkAddress, Long> replicaNumPerHost) + Map<TNetworkAddress, Long> replicaNumPerHost, boolean isEnableOrderedLocations) throws Exception { Reference<Long> backendIdRef = new Reference<Long>(); - selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost, backendIdRef); + selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost, + backendIdRef, isEnableOrderedLocations); Backend backend = this.idToBackend.get(backendIdRef.getRef()); TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } + private void sortScanNodes() { + Collections.sort(scanNodes, new Comparator<ScanNode>() { + @Override + public int compare(ScanNode s1, ScanNode s2) { + return java.lang.Integer.compare(s1.getId().asInt(), s2.getId().asInt()); + } + }); + } + + private void sortScanRangeLocations(List<TScanRangeLocations> locations) { + Collections.sort(locations, new Comparator<TScanRangeLocations>() { + @Override + public int compare(TScanRangeLocations o1, TScanRangeLocations o2) { + return org.apache.thrift.TBaseHelper.compareTo( + o1.getScanRange().getPaloScanRange().tablet_id, + o2.getScanRange().getPaloScanRange().tablet_id); + } + }); + } + public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation, Map<TNetworkAddress, Long> assignedBytesPerHost, Map<TNetworkAddress, Long> replicaNumPerHost, - Reference<Long> backendIdRef) throws UserException { + Reference<Long> backendIdRef, + boolean isEnableOrderedLocations) throws UserException { + List<TScanRangeLocation> locations = seqLocation.getLocations(); + if (isEnableOrderedLocations) { + Collections.sort(locations); + } if (!Config.enable_local_replica_selection) { - return selectBackendsByRoundRobin(seqLocation.getLocations(), assignedBytesPerHost, replicaNumPerHost, + return selectBackendsByRoundRobin(locations, assignedBytesPerHost, replicaNumPerHost, backendIdRef); } List<TScanRangeLocation> localLocations = new ArrayList<>(); List<TScanRangeLocation> nonlocalLocations = new ArrayList<>(); long localBeId = Env.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress()); - for (final TScanRangeLocation location : seqLocation.getLocations()) { + for (final TScanRangeLocation location : locations) { if (location.backend_id == localBeId) { localLocations.add(location); } else { @@ -2395,14 +2433,15 @@ public class Coordinator implements CoordInterface { } } - public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> locations, + public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> sortedLocations, Map<TNetworkAddress, Long> assignedBytesPerHost, Map<TNetworkAddress, Long> replicaNumPerHost, Reference<Long> backendIdRef) throws UserException { Long minAssignedBytes = Long.MAX_VALUE; Long minReplicaNum = Long.MAX_VALUE; TScanRangeLocation minLocation = null; Long step = 1L; - for (final TScanRangeLocation location : locations) { + + for (final TScanRangeLocation location : sortedLocations) { Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L); if (assignedBytes < minAssignedBytes || (assignedBytes.equals(minAssignedBytes) && replicaNumPerHost.get(location.server) < minReplicaNum)) { @@ -2411,10 +2450,10 @@ public class Coordinator implements CoordInterface { minLocation = location; } } - for (TScanRangeLocation location : locations) { + for (TScanRangeLocation location : sortedLocations) { replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1); } - TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, locations, + TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, sortedLocations, this.idToBackend, backendIdRef); assignedBytesPerHost.put(location.server, assignedBytesPerHost.get(location.server) + step); @@ -2426,7 +2465,8 @@ public class Coordinator implements CoordInterface { final List<TScanRangeLocations> locations, FragmentScanRangeAssignment assignment, Map<TNetworkAddress, Long> assignedBytesPerHost, - Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception { + Map<TNetworkAddress, Long> replicaNumPerHost, + boolean isEnableOrderedLocations) throws Exception { // Type of locations is List, it could have elements that have same "location" // and we do have this situation for some scan node. // The duplicate "location" will NOT be filtered by FragmentScanRangeAssignment, @@ -2435,7 +2475,7 @@ public class Coordinator implements CoordInterface { for (TScanRangeLocations scanRangeLocations : locations) { Reference<Long> backendIdRef = new Reference<Long>(); TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, - assignedBytesPerHost, replicaNumPerHost, backendIdRef); + assignedBytesPerHost, replicaNumPerHost, backendIdRef, isEnableOrderedLocations); Backend backend = this.idToBackend.get(backendIdRef.getRef()); TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2af7725e029..ac653b82ec9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -420,6 +420,8 @@ public class SessionVariable implements Serializable, Writable { */ public static final String ENABLE_UNIFIED_LOAD = "enable_unified_load"; + public static final String ENABLE_ORDERED_SCAN_RANGE_LOCATIONS = "enable_ordered_scan_range_locations"; + public static final String ENABLE_PARQUET_LAZY_MAT = "enable_parquet_lazy_materialization"; public static final String ENABLE_ORC_LAZY_MAT = "enable_orc_lazy_materialization"; @@ -1468,6 +1470,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_CTE_MATERIALIZE) public boolean enableCTEMaterialize = true; + @VariableMgr.VarAttr(name = ENABLE_ORDERED_SCAN_RANGE_LOCATIONS) + public boolean enableOrderedScanRangeLocations = false; + @VariableMgr.VarAttr(name = ENABLE_ANALYZE_COMPLEX_TYPE_COLUMN) public boolean enableAnalyzeComplexTypeColumn = false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index ca8109e40e3..4c38ddd2749 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -593,7 +593,7 @@ public class CoordinatorTest extends Coordinator { locations.add(tScanRangeLocations); locations.add(tScanRangeLocations1); Deencapsulation.invoke(coordinator, "computeScanRangeAssignmentByScheduler", - olapScanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost); + olapScanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost, false); for (Map.Entry entry : assignment.entrySet()) { Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, List<TScanRangeParams>>) entry.getValue(); for (Map.Entry item : addr.entrySet()) { @@ -653,11 +653,11 @@ public class CoordinatorTest extends Coordinator { replicaNumPerHost.put(tScanRangeLocation2.server, 1L); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost); + planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost, false); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost); + planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost, false); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost); + planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost, false); List<String> hosts = new ArrayList<>(); for (Map.Entry item : assignedBytesPerHost.entrySet()) { Assert.assertTrue((Long) item.getValue() == 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org