This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1715a824dd [fix](nereids) fix partition dest overwrite bug when cte as 
bc right (#22177)
1715a824dd is described below

commit 1715a824dd046e617e084e14fdfabe2b4568fa83
Author: xzj7019 <131111794+xzj7...@users.noreply.github.com>
AuthorDate: Tue Jul 25 19:26:29 2023 +0800

    [fix](nereids) fix partition dest overwrite bug when cte as bc right 
(#22177)
    
    In current cte multicast fragment param computing logic in coordinator, if 
shared hash table for bc opened, its destination's number will be the same as 
be hosts'. But the judgment of falling into shared hash table bc part code is 
wrong, which will cause when a multicast's target is fixed with both bc and 
partition, the first bc info will overwrite the following partition's, i.e, the 
destination info will be the host level, which should be per instance. This 
will cause the hash partit [...]
---
 .../glue/translator/PhysicalPlanTranslator.java       |  4 ++--
 .../org/apache/doris/planner/DistributedPlanner.java  |  2 +-
 .../java/org/apache/doris/planner/ExchangeNode.java   | 10 ++++++++++
 .../java/org/apache/doris/planner/PlanFragment.java   | 10 ----------
 .../main/java/org/apache/doris/planner/PlanNode.java  | 19 +++++++++++++++++++
 .../main/java/org/apache/doris/qe/Coordinator.java    | 15 +++++++++++++--
 6 files changed, 45 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index cc7993d6dd..76211137c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1014,11 +1014,11 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             hashJoinNode.setColocate(true, "");
             leftFragment.setHasColocatePlanNode(true);
         } else if (JoinUtils.shouldBroadcastJoin(physicalHashJoin)) {
-            Preconditions.checkState(rightFragment.getPlanRoot() instanceof 
ExchangeNode,
+            Preconditions.checkState(rightPlanRoot instanceof ExchangeNode,
                     "right child of broadcast join must be ExchangeNode but it 
is " + rightFragment.getPlanRoot());
             Preconditions.checkState(rightFragment.getChildren().size() == 1,
                     "right child of broadcast join must have 1 child, but meet 
" + rightFragment.getChildren().size());
-            rightFragment.getChild(0).setRightChildOfBroadcastHashJoin(true);
+            ((ExchangeNode) 
rightPlanRoot).setRightChildOfBroadcastHashJoin(true);
             hashJoinNode.setDistributionMode(DistributionMode.BROADCAST);
         } else if (JoinUtils.shouldBucketShuffleJoin(physicalHashJoin)) {
             hashJoinNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 2c236f5f72..75ca717050 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -388,7 +388,7 @@ public class DistributedPlanner {
             node.setChild(0, leftChildFragment.getPlanRoot());
             connectChildFragment(node, 1, leftChildFragment, 
rightChildFragment);
             leftChildFragment.setPlanRoot(node);
-            rightChildFragment.setRightChildOfBroadcastHashJoin(true);
+            ((ExchangeNode) 
node.getChild(1)).setRightChildOfBroadcastHashJoin(true);
             return leftChildFragment;
         } else {
             
node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 6694a05219..f040af0042 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -63,6 +63,8 @@ public class ExchangeNode extends PlanNode {
     // exchange node. Null if this exchange does not merge sorted streams
     private SortInfo mergeInfo;
 
+    private boolean isRightChildOfBroadcastHashJoin = false;
+
     /**
      * use for Nereids only.
      */
@@ -184,4 +186,12 @@ public class ExchangeNode extends PlanNode {
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         return prefix + "offset: " + offset + "\n";
     }
+
+    public boolean isRightChildOfBroadcastHashJoin() {
+        return isRightChildOfBroadcastHashJoin;
+    }
+
+    public void setRightChildOfBroadcastHashJoin(boolean value) {
+        isRightChildOfBroadcastHashJoin = value;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index c398ed7a8e..3d74bfc0df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -145,8 +145,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     // has colocate plan node
     private boolean hasColocatePlanNode = false;
 
-    private boolean isRightChildOfBroadcastHashJoin = false;
-
     /**
      * C'tor for fragment with specific partition; the output is by default 
broadcast.
      */
@@ -434,14 +432,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         return transferQueryStatisticsWithEveryBatch;
     }
 
-    public boolean isRightChildOfBroadcastHashJoin() {
-        return isRightChildOfBroadcastHashJoin;
-    }
-
-    public void setRightChildOfBroadcastHashJoin(boolean value) {
-        isRightChildOfBroadcastHashJoin = value;
-    }
-
     public int getFragmentSequenceNum() {
         if 
(ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
             return fragmentSequenceNum;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 568975b93d..a279af676e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -958,6 +958,25 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         }
     }
 
+    /**
+     * find planNode recursively based on the planNodeId
+     */
+    public static PlanNode findPlanNodeFromPlanNodeId(PlanNode root, 
PlanNodeId id) {
+        if (root == null || root.getId() == null || id == null) {
+            return null;
+        } else if (root.getId().equals(id)) {
+            return root;
+        } else {
+            for (PlanNode child : root.getChildren()) {
+                PlanNode retNode = findPlanNodeFromPlanNodeId(child, id);
+                if (retNode != null) {
+                    return retNode;
+                }
+            }
+            return null;
+        }
+    }
+
     public String getPlanTreeExplainStr() {
         StringBuilder sb = new StringBuilder();
         sb.append("[").append(getId().asInt()).append(": 
").append(getPlanNodeName()).append("]");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7f0abff227..39c94320c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1294,6 +1294,9 @@ public class Coordinator {
         // compute destinations and # senders per exchange node
         // (the root fragment doesn't have a destination)
         for (FragmentExecParams params : fragmentExecParamsMap.values()) {
+            if (params.fragment instanceof MultiCastPlanFragment) {
+                continue;
+            }
             PlanFragment destFragment = params.fragment.getDestFragment();
             if (destFragment == null) {
                 // root plan fragment
@@ -1308,6 +1311,10 @@ public class Coordinator {
             // output at the moment
 
             PlanNodeId exchId = sink.getExchNodeId();
+            PlanNode exchNode = 
PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId);
+            Preconditions.checkState(exchNode != null, "exchNode is null");
+            Preconditions.checkState(exchNode instanceof ExchangeNode,
+                    "exchNode is not ExchangeNode" + 
exchNode.getId().toString());
             // we might have multiple fragments sending to this exchange node
             // (distributed MERGE), which is why we need to add up the #senders
             if (destParams.perExchNumSenders.get(exchId.asInt()) == null) {
@@ -1357,7 +1364,7 @@ public class Coordinator {
                 }
             } else {
                 if (enablePipelineEngine && 
enableShareHashTableForBroadcastJoin
-                        && params.fragment.isRightChildOfBroadcastHashJoin()) {
+                        && ((ExchangeNode) 
exchNode).isRightChildOfBroadcastHashJoin()) {
                     // here choose the first instance to build hash table.
                     Map<TNetworkAddress, FInstanceExecParam> destHosts = new 
HashMap<>();
                     destParams.instanceExecParams.forEach(param -> {
@@ -1412,7 +1419,11 @@ public class Coordinator {
                 
multi.getDestFragmentList().get(i).setOutputPartition(params.fragment.getOutputPartition());
 
                 PlanNodeId exchId = sink.getExchNodeId();
+                PlanNode exchNode = 
PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId);
                 
Preconditions.checkState(!destParams.perExchNumSenders.containsKey(exchId.asInt()));
+                Preconditions.checkState(exchNode != null, "exchNode is null");
+                Preconditions.checkState(exchNode instanceof ExchangeNode,
+                        "exchNode is not ExchangeNode" + 
exchNode.getId().toString());
                 if (destParams.perExchNumSenders.get(exchId.asInt()) == null) {
                     destParams.perExchNumSenders.put(exchId.asInt(), 
params.instanceExecParams.size());
                 } else {
@@ -1422,7 +1433,7 @@ public class Coordinator {
 
                 List<TPlanFragmentDestination> destinations = 
multiSink.getDestinations().get(i);
                 if (enablePipelineEngine && 
enableShareHashTableForBroadcastJoin
-                        && params.fragment.isRightChildOfBroadcastHashJoin()) {
+                        && ((ExchangeNode) 
exchNode).isRightChildOfBroadcastHashJoin()) {
                     // here choose the first instance to build hash table.
                     Map<TNetworkAddress, FInstanceExecParam> destHosts = new 
HashMap<>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to