This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ede4d7de19 [fix](join) Disable scan sharing for NAAJ (#39480)
2ede4d7de19 is described below

commit 2ede4d7de19235a727a73d8ade7b8478e38264bc
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Aug 19 11:15:01 2024 +0800

    [fix](join) Disable scan sharing for NAAJ (#39480)
---
 .../main/java/org/apache/doris/qe/Coordinator.java   | 20 +++++++++++---------
 .../java/org/apache/doris/qe/CoordinatorTest.java    | 20 ++++++++++----------
 2 files changed, 21 insertions(+), 19 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 4ee46d3bec3..881cb7d096a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1876,10 +1876,11 @@ public class Coordinator implements CoordInterface {
             if ((isColocateFragment(fragment, fragment.getPlanRoot())
                     && 
fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
                     && 
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) {
-                computeColocateJoinInstanceParam(fragment.getFragmentId(), 
parallelExecInstanceNum, params);
+                computeColocateJoinInstanceParam(fragment.getFragmentId(), 
parallelExecInstanceNum, params,
+                        fragment.hasNullAwareLeftAntiJoin());
             } else if 
(bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt()))
 {
                 
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
-                        parallelExecInstanceNum, params);
+                        parallelExecInstanceNum, params, 
fragment.hasNullAwareLeftAntiJoin());
             } else {
                 // case A
                 for (Entry<TNetworkAddress, Map<Integer, 
List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
@@ -1904,7 +1905,8 @@ public class Coordinator implements CoordInterface {
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
                         boolean forceToLocalShuffle = context != null
-                                && 
context.getSessionVariable().isForceToLocalShuffle();
+                                && 
context.getSessionVariable().isForceToLocalShuffle()
+                                && !fragment.hasNullAwareLeftAntiJoin();
                         boolean ignoreStorageDataDistribution = 
forceToLocalShuffle || (node.isPresent()
                                 && 
node.get().ignoreStorageDataDistribution(context, addressToBackendID.size())
                                 && useNereids);
@@ -2072,9 +2074,9 @@ public class Coordinator implements CoordInterface {
     }
 
     private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
-            int parallelExecInstanceNum, FragmentExecParams params) {
+            int parallelExecInstanceNum, FragmentExecParams params, boolean 
hasNullAwareLeftAntiJoin) {
         assignScanRanges(fragmentId, parallelExecInstanceNum, params, 
fragmentIdTobucketSeqToScanRangeMap,
-                fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+                fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, 
hasNullAwareLeftAntiJoin);
     }
 
     private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
@@ -2689,16 +2691,16 @@ public class Coordinator implements CoordInterface {
         }
 
         private void computeInstanceParam(PlanFragmentId fragmentId,
-                int parallelExecInstanceNum, FragmentExecParams params) {
+                int parallelExecInstanceNum, FragmentExecParams params, 
boolean hasNullAwareLeftAntiJoin) {
             assignScanRanges(fragmentId, parallelExecInstanceNum, params, 
fragmentIdBucketSeqToScanRangeMap,
-                    fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+                    fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds, 
hasNullAwareLeftAntiJoin);
         }
     }
 
     private void assignScanRanges(PlanFragmentId fragmentId, int 
parallelExecInstanceNum, FragmentExecParams params,
             Map<PlanFragmentId, BucketSeqToScanRange> 
fragmentIdBucketSeqToScanRangeMap,
             Map<PlanFragmentId, Map<Integer, TNetworkAddress>> 
curFragmentIdToSeqToAddressMap,
-            Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
+            Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds, boolean 
hasNullAwareLeftAntiJoin) {
         Map<Integer, TNetworkAddress> bucketSeqToAddress = 
curFragmentIdToSeqToAddressMap.get(fragmentId);
         BucketSeqToScanRange bucketSeqToScanRange = 
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
         Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
@@ -2732,7 +2734,7 @@ public class Coordinator implements CoordInterface {
          * 2. Use Nereids planner.
          */
         boolean forceToLocalShuffle = context != null
-                && context.getSessionVariable().isForceToLocalShuffle();
+                && context.getSessionVariable().isForceToLocalShuffle() && 
!hasNullAwareLeftAntiJoin;
         boolean ignoreStorageDataDistribution = forceToLocalShuffle || 
(scanNodes.stream()
                 .allMatch(node -> node.ignoreStorageDataDistribution(context,
                         addressToBackendID.size()))
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index e249b3d87d2..b6632a39db7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -123,7 +123,7 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(coordinator, 
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
 
         FragmentExecParams params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
         Assert.assertEquals(1, params.instanceExecParams.size());
 
         // check whether one instance have 3 tablet to scan
@@ -134,15 +134,15 @@ public class CoordinatorTest extends Coordinator {
         }
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 2, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 2, params, false);
         Assert.assertEquals(2, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 3, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 3, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 5, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 5, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
     }
 
@@ -324,7 +324,7 @@ public class CoordinatorTest extends Coordinator {
         PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
                 new DataPartition(TPartitionType.UNPARTITIONED));
         FragmentExecParams params = new FragmentExecParams(fragment);
-        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(coordinator, 
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
         StringBuilder sb = new StringBuilder();
         params.appendTo(sb);
         Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
@@ -452,19 +452,19 @@ public class CoordinatorTest extends Coordinator {
         Deencapsulation.setField(bucketShuffleJoinController, 
"fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
 
         FragmentExecParams params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params, false);
         Assert.assertEquals(1, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 2, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 2, params, false);
         Assert.assertEquals(2, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 3, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 3, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
 
         params = new FragmentExecParams(null);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 5, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 5, params, false);
         Assert.assertEquals(3, params.instanceExecParams.size());
     }
 
@@ -506,7 +506,7 @@ public class CoordinatorTest extends Coordinator {
                 new DataPartition(TPartitionType.UNPARTITIONED));
 
         FragmentExecParams params = new FragmentExecParams(fragment);
-        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params);
+        Deencapsulation.invoke(bucketShuffleJoinController, 
"computeInstanceParam", planFragmentId, 1, params, false);
         Assert.assertEquals(1, params.instanceExecParams.size());
         StringBuilder sb = new StringBuilder();
         params.appendTo(sb);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to