This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e302882e529 [branch-2.1](pick) Pick 2 PRs to branch-2.1 (#39604) e302882e529 is described below commit e302882e5295d2ae25a05c23a2ae598c50346284 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Aug 20 17:10:30 2024 +0800 [branch-2.1](pick) Pick 2 PRs to branch-2.1 (#39604) ## Proposed changes pick #39480 #39589 <!--Describe your changes.--> --- .../main/java/org/apache/doris/qe/Coordinator.java | 20 +++++++++++--------- .../java/org/apache/doris/qe/CoordinatorTest.java | 20 ++++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index dac9be06b9f..19ca1050469 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2107,10 +2107,11 @@ public class Coordinator implements CoordInterface { if ((isColocateFragment(fragment, fragment.getPlanRoot()) && fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId()) && fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) { - computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params); + computeColocateJoinInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params, + fragment.hasNullAwareLeftAntiJoin()); } else if (bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt())) { bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(), - parallelExecInstanceNum, params); + parallelExecInstanceNum, params, fragment.hasNullAwareLeftAntiJoin()); } else { // case A for (Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : fragmentExecParamsMap.get( @@ -2135,7 +2136,8 @@ public class Coordinator implements CoordInterface { int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle(); + && context.getSessionVariable().isForceToLocalShuffle() + && !fragment.hasNullAwareLeftAntiJoin() && useNereids; boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, addressToBackendID.size())) && useNereids); @@ -2304,9 +2306,9 @@ public class Coordinator implements CoordInterface { } private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, - int parallelExecInstanceNum, FragmentExecParams params) { + int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdTobucketSeqToScanRangeMap, - fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin); } private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() { @@ -3049,16 +3051,16 @@ public class Coordinator implements CoordInterface { } private void computeInstanceParam(PlanFragmentId fragmentId, - int parallelExecInstanceNum, FragmentExecParams params) { + int parallelExecInstanceNum, FragmentExecParams params, boolean hasNullAwareLeftAntiJoin) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap, - fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, hasNullAwareLeftAntiJoin); } } private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params, Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap, Map<PlanFragmentId, Map<Integer, TNetworkAddress>> curFragmentIdToSeqToAddressMap, - Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) { + Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds, boolean hasNullAwareLeftAntiJoin) { Map<Integer, TNetworkAddress> bucketSeqToAddress = curFragmentIdToSeqToAddressMap.get(fragmentId); BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId); Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); @@ -3092,7 +3094,7 @@ public class Coordinator implements CoordInterface { * 2. Use Nereids planner. */ boolean forceToLocalShuffle = context != null - && context.getSessionVariable().isForceToLocalShuffle(); + && context.getSessionVariable().isForceToLocalShuffle() && !hasNullAwareLeftAntiJoin && useNereids; boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() .allMatch(node -> node.ignoreStorageDataDistribution(context, addressToBackendID.size())) diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 4c38ddd2749..9b3ed7d3119 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -122,7 +122,7 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(coordinator, "fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); FragmentExecParams params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); // check whether one instance have 3 tablet to scan @@ -133,15 +133,15 @@ public class CoordinatorTest extends Coordinator { } params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 2, params, false); Assert.assertEquals(2, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 3, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 3, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 5, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); } @@ -323,7 +323,7 @@ public class CoordinatorTest extends Coordinator { PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode, new DataPartition(TPartitionType.UNPARTITIONED)); FragmentExecParams params = new FragmentExecParams(fragment); - Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(coordinator, "computeColocateJoinInstanceParam", planFragmentId, 1, params, false); StringBuilder sb = new StringBuilder(); params.appendTo(sb); Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]")); @@ -451,19 +451,19 @@ public class CoordinatorTest extends Coordinator { Deencapsulation.setField(bucketShuffleJoinController, "fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap); FragmentExecParams params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 2, params, false); Assert.assertEquals(2, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 3, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); params = new FragmentExecParams(null); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 5, params, false); Assert.assertEquals(3, params.instanceExecParams.size()); } @@ -505,7 +505,7 @@ public class CoordinatorTest extends Coordinator { new DataPartition(TPartitionType.UNPARTITIONED)); FragmentExecParams params = new FragmentExecParams(fragment); - Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params); + Deencapsulation.invoke(bucketShuffleJoinController, "computeInstanceParam", planFragmentId, 1, params, false); Assert.assertEquals(1, params.instanceExecParams.size()); StringBuilder sb = new StringBuilder(); params.appendTo(sb); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org