This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e302882e529 [branch-2.1](pick) Pick 2 PRs to branch-2.1 (#39604)
e302882e529 is described below

commit e302882e5295d2ae25a05c23a2ae598c50346284
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Tue Aug 20 17:10:30 2024 +0800

    [branch-2.1](pick) Pick 2 PRs to branch-2.1 (#39604)
    
    ## Proposed changes
    
    pick #39480 #39589
    
    <!--Describe your changes.-->
---
 .../main/java/org/apache/doris/qe/Coordinator.java   | 20 +++++++++++---------
 .../java/org/apache/doris/qe/CoordinatorTest.java    | 20 ++++++++++----------
 2 files changed, 21 insertions(+), 19 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dac9be06b9f..19ca1050469 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2107,10 +2107,11 @@ public class Coordinator implements CoordInterface {
             if ((isColocateFragment(fragment, fragment.getPlanRoot())
                     && 
fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
                     && 
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) {
-                computeColocateJoinInstanceParam(fragment.getFragmentId(), 
parallelExecInstanceNum, params);
+                computeColocateJoinInstanceParam(fragment.getFragmentId(), 
parallelExecInstanceNum, params,
+                        fragment.hasNullAwareLeftAntiJoin());
             } else if 
(bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt()))
 {
                 
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
-                        parallelExecInstanceNum, params);
+                        parallelExecInstanceNum, params, 
fragment.hasNullAwareLeftAntiJoin());
             } else {
                 // case A
                 for (Entry<TNetworkAddress, Map<Integer, 
List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
@@ -2135,7 +2136,8 @@ public class Coordinator implements CoordInterface {
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
                         boolean forceToLocalShuffle = context != null
-                                && 
context.getSessionVariable().isForceToLocalShuffle();
+                                && 
context.getSessionVariable().isForceToLocalShuffle()
+                                && !fragment.hasNullAwareLeftAntiJoin() && 
useNereids;
                         boolean ignoreStorageDataDistribution = 
forceToLocalShuffle || (scanNodes.stream()
                                 .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context,
                                         addressToBackendID.size())) && 
useNereids);
@@ -2304,9 +2306,9 @@ public class Coordinator implements CoordInterface {
     }
 
     private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
-            int parallelExecInstanceNum, FragmentExecParams params) {
+            int parallelExecInstanceNum, FragmentExecParams params, boolean 
hasNullAwareLeftAntiJoin) {
         assignScanRanges(fragmentId, parallelExecInstanceNum, params, 
fragmentIdTobucketSeqToScanRangeMap,
-                fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+                fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, 
hasNullAwareLeftAntiJoin);
     }
 
     private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
@@ -3049,16 +3051,16 @@ public class Coordinator implements CoordInterface {
         }
 
         private void computeInstanceParam(PlanFragmentId fragmentId,
-                int parallelExecInstanceNum, FragmentExecParams params) {
+                int parallelExecInstanceNum, FragmentExecParams params, 
boolean hasNullAwareLeftAntiJoin) {
             assignScanRanges(fragmentId, parallelExecInstanceNum, params, 
fragmentIdBucketSeqToScanRangeMap,
-                    fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+                    fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, 
hasNullAwareLeftAntiJoin);
         }
     }
 
     private void assignScanRanges(PlanFragmentId fragmentId, int 
parallelExecInstanceNum, FragmentExecParams params,
             Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdBucketSeqToScanRangeMap,
             Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
curFragmentIdToSeqToAddressMap,
-            Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
+            Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds, boolean 
hasNullAwareLeftAntiJoin) {
         Map<Integer, TNetworkAddress> bucketSeqToAddress = 
curFragmentIdToSeqToAddressMap.get(fragmentId);
         BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
         Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
@@ -3092,7 +3094,7 @@ public class Coordinator implements CoordInterface {
          * 2. Use Nereids planner.
          */
         boolean forceToLocalShuffle = context != null
-                && context.getSessionVariable().isForceToLocalShuffle();
+                && context.getSessionVariable().isForceToLocalShuffle() && 
!hasNullAwareLeftAntiJoin && useNereids;
         boolean ignoreStorageDataDistribution = forceToLocalShuffle || 
(scanNodes.stream()
                 .allMatch(node -> node.ignoreStorageDataDistribution(context,
                         addressToBackendID.size()))
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 4c38ddd2749..9b3ed7d3119 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -122,7 +122,7 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(coordinator, 
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
 
         FragmentExecParams params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
         Assert.assertEquals(1, params.instanceExecParams.size());
 
         // check whether one instance have 3 tablet to scan
@@ -133,15 +133,15 @@ public class CoordinatorTest extends Coordinator {
         }
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 2, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 2, params, false);
         Assert.assertEquals(2, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 3, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 3, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 5, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 5, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
     }
 
@@ -323,7 +323,7 @@ public class CoordinatorTest extends Coordinator {
         PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
                 new DataPartition(TPartitionType.UNPARTITIONED));
         FragmentExecParams params = new FragmentExecParams(fragment);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
         StringBuilder sb = new StringBuilder();
         params.appendTo(sb);
         Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
@@ -451,19 +451,19 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(bucketShuffleJoinController, 
"fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
 
         FragmentExecParams params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params, false);
         Assert.assertEquals(1, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 2, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 2, params, false);
         Assert.assertEquals(2, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 3, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 3, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 5, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 5, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
     }
 
@@ -505,7 +505,7 @@ public class CoordinatorTest extends Coordinator {
                 new DataPartition(TPartitionType.UNPARTITIONED));
 
         FragmentExecParams params = new FragmentExecParams(fragment);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params, false);
         Assert.assertEquals(1, params.instanceExecParams.size());
         StringBuilder sb = new StringBuilder();
         params.appendTo(sb);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to