This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8a562aeb779 [opt](nereids) recover adoptive bucket shuffle (#39598)
8a562aeb779 is described below

commit 8a562aeb77942e65298526afb65f540df6b88fbd
Author: xzj7019 <131111794+xzj7...@users.noreply.github.com>
AuthorDate: Wed Aug 21 09:26:53 2024 +0800

    [opt](nereids) recover adoptive bucket shuffle (#39598)
    
    ## Proposed changes
    
    pick from https://github.com/apache/doris/pull/36784
    
    Co-authored-by: xiongzhongjian <xiongzhongj...@selectdb.com>
---
 .../properties/ChildrenPropertiesRegulator.java    | 58 +++++++++++++++++++---
 .../java/org/apache/doris/qe/SessionVariable.java  |  9 ----
 .../bs_downgrade_shape/query19.out                 |  5 +-
 .../bs_downgrade_shape/query44.out                 |  4 +-
 .../bs_downgrade_shape/query54.out                 |  2 +-
 .../bs_downgrade_shape/query56.out                 |  7 ++-
 .../bs_downgrade_shape/query6.out                  |  5 +-
 .../bs_downgrade_shape/query61.out                 |  7 ++-
 .../bs_downgrade_shape/query68.out                 | 10 ++--
 .../bs_downgrade_shape/query8.out                  |  5 +-
 .../bs_downgrade_shape/query91.out                 |  7 ++-
 .../bs_downgrade_shape/query95.out                 | 15 +++---
 .../shape/query13.out                              |  5 +-
 .../shape/query51.out                              |  8 +--
 .../shape/query85.out                              |  5 +-
 .../rf_prune/query13.out                           |  5 +-
 .../rf_prune/query61.out                           |  2 +-
 .../rf_prune/query85.out                           |  5 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query13.out |  5 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query61.out |  2 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query85.out |  5 +-
 .../correctness_p0/test_bucket_shuffle_join.groovy |  1 +
 .../suites/nereids_p0/hint/fix_leading.groovy      |  1 +
 .../suites/nereids_p0/hint/multi_leading.groovy    |  1 +
 .../suites/nereids_p0/hint/test_distribute.groovy  |  1 +
 .../suites/nereids_p0/hint/test_leading.groovy     |  1 +
 .../nereids_p0/join/bucket_shuffle_join.groovy     |  2 +
 .../bs_downgrade_shape/query13.groovy              |  1 -
 .../bs_downgrade_shape/query19.groovy              |  1 -
 .../bs_downgrade_shape/query44.groovy              |  1 -
 .../bs_downgrade_shape/query45.groovy              |  1 -
 .../bs_downgrade_shape/query54.groovy              |  1 -
 .../bs_downgrade_shape/query56.groovy              |  1 -
 .../bs_downgrade_shape/query6.groovy               |  1 -
 .../bs_downgrade_shape/query61.groovy              |  1 -
 .../bs_downgrade_shape/query68.groovy              |  1 -
 .../bs_downgrade_shape/query8.groovy               |  1 -
 .../bs_downgrade_shape/query91.groovy              |  1 -
 .../bs_downgrade_shape/query95.groovy              |  1 -
 39 files changed, 112 insertions(+), 83 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 9e795486f7a..c83c0d3582e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
 import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
@@ -39,6 +40,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
@@ -201,12 +203,53 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         return true;
     }
 
-    private boolean isBucketShuffleDownGrade(DistributionSpecHash srcSideSpec) 
{
-        boolean isBucketShuffleDownGrade = 
ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
-        if (!isBucketShuffleDownGrade) {
+    private boolean isBucketShuffleDownGrade(Plan oneSidePlan, 
DistributionSpecHash otherSideSpec) {
+        // improper to do bucket shuffle join:
+        // oneSide:
+        //      - base table and tablets' number is small enough (< 
paraInstanceNum)
+        // otherSide:
+        //      - ShuffleType.EXECUTION_BUCKETED
+        boolean isEnableBucketShuffleJoin = 
ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin();
+        if (!isEnableBucketShuffleJoin) {
+            return true;
+        } else if (otherSideSpec.getShuffleType() != 
ShuffleType.EXECUTION_BUCKETED
+                || !(oneSidePlan instanceof GroupPlan)) {
             return false;
         } else {
-            return srcSideSpec.getShuffleType() == 
ShuffleType.EXECUTION_BUCKETED;
+            PhysicalOlapScan candidate = 
findDownGradeBucketShuffleCandidate((GroupPlan) oneSidePlan);
+            if (candidate == null || candidate.getTable() == null
+                    || candidate.getTable().getDefaultDistributionInfo() == 
null) {
+                return false;
+            } else {
+                int prunedPartNum = candidate.getSelectedPartitionIds().size();
+                int bucketNum = 
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
+                int totalBucketNum = prunedPartNum * bucketNum;
+                int backEndNum = Math.max(1, 
ConnectContext.get().getEnv().getClusterInfo()
+                        .getBackendsNumber(true));
+                int paraNum = Math.max(1, 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
+                int totalParaNum = Math.min(10, backEndNum * paraNum);
+                return totalBucketNum < totalParaNum;
+            }
+        }
+    }
+
+    private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan 
groupPlan) {
+        if (groupPlan == null || groupPlan.getGroup() == null
+                || groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
+            return null;
+        } else {
+            Plan targetPlan = 
groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
+            while (targetPlan != null
+                    && (targetPlan instanceof PhysicalProject || targetPlan 
instanceof PhysicalFilter)
+                    && !((GroupPlan) 
targetPlan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
+                targetPlan = ((GroupPlan) targetPlan.child(0)).getGroup()
+                        .getPhysicalExpressions().get(0).getPlan();
+            }
+            if (targetPlan == null || !(targetPlan instanceof 
PhysicalOlapScan)) {
+                return null;
+            } else {
+                return (PhysicalOlapScan) targetPlan;
+            }
         }
     }
 
@@ -243,6 +286,9 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
             throw new RuntimeException("should not come here, two children of 
shuffle join should all be shuffle");
         }
 
+        Plan leftChild = hashJoin.child(0);
+        Plan rightChild = hashJoin.child(1);
+
         DistributionSpecHash leftHashSpec = (DistributionSpecHash) 
leftDistributionSpec;
         DistributionSpecHash rightHashSpec = (DistributionSpecHash) 
rightDistributionSpec;
 
@@ -263,7 +309,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                     ShuffleType.EXECUTION_BUCKETED, leftHashSpec, 
rightHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec()));
-        } else if (isBucketShuffleDownGrade(rightHashSpec)) {
+        } else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
             updatedForLeft = Optional.of(calAnotherSideRequired(
                     ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
@@ -272,7 +318,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                     ShuffleType.EXECUTION_BUCKETED, leftHashSpec, 
rightHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec()));
-        } else if (isBucketShuffleDownGrade(leftHashSpec)) {
+        } else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
             updatedForLeft = Optional.of(calAnotherSideRequired(
                     ShuffleType.EXECUTION_BUCKETED, rightHashSpec, 
leftHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9e5525a27fa..5ca5bf1d36c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -260,8 +260,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_AGG_STATE = "enable_agg_state";
 
-    public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = 
"enable_bucket_shuffle_downgrade";
-
     public static final String ENABLE_RPC_OPT_FOR_PIPELINE = 
"enable_rpc_opt_for_pipeline";
 
     public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = 
"enable_single_distinct_column_opt";
@@ -850,9 +848,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = 
VariableAnnotation.EXPERIMENTAL_ONLINE)
     public boolean enableBucketShuffleJoin = true;
 
-    @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_DOWNGRADE, needForward = 
true)
-    public boolean enableBucketShuffleDownGrade = false;
-
     /**
      * explode function row count enlarge factor.
      */
@@ -2552,10 +2547,6 @@ public class SessionVariable implements Serializable, 
Writable {
         return enableBucketShuffleJoin;
     }
 
-    public boolean isEnableBucketShuffleDownGrade() {
-        return enableBucketShuffleDownGrade;
-    }
-
     public boolean isEnableOdbcTransaction() {
         return enableOdbcTransaction;
     }
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.out
index 6b7d023e3be..6b8a3bdf11c 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.out
@@ -12,9 +12,8 @@ PhysicalResultSink
 ------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk 
= store.s_store_sk)) otherCondition=(( not (substring(ca_zip, 1, 5) = 
substring(s_zip, 1, 5)))) build RFs:RF4 s_store_sk->[ss_store_sk]
 --------------------PhysicalProject
 ----------------------hashJoin[INNER_JOIN] 
hashCondition=((customer.c_current_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF3 c_current_addr_sk->[ca_address_sk]
-------------------------PhysicalDistribute[DistributionSpecHash]
---------------------------PhysicalProject
-----------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
+------------------------PhysicalProject
+--------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------PhysicalProject
 ----------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF2 ss_customer_sk->[c_customer_sk]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.out
index ea4ea67293e..4f1a1be1c25 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.out
@@ -11,7 +11,7 @@ PhysicalResultSink
 ----------------hashJoin[INNER_JOIN] hashCondition=((i1.i_item_sk = 
asceding.item_sk)) otherCondition=() build RFs:RF1 item_sk->[i_item_sk]
 ------------------PhysicalProject
 --------------------PhysicalOlapScan[item] apply RFs: RF1
-------------------PhysicalDistribute[DistributionSpecReplicated]
+------------------PhysicalDistribute[DistributionSpecHash]
 --------------------PhysicalProject
 ----------------------filter((rnk < 11))
 ------------------------PhysicalWindow
@@ -44,7 +44,7 @@ PhysicalResultSink
 ----------------hashJoin[INNER_JOIN] hashCondition=((i2.i_item_sk = 
descending.item_sk)) otherCondition=() build RFs:RF0 item_sk->[i_item_sk]
 ------------------PhysicalProject
 --------------------PhysicalOlapScan[item] apply RFs: RF0
-------------------PhysicalDistribute[DistributionSpecReplicated]
+------------------PhysicalDistribute[DistributionSpecHash]
 --------------------PhysicalProject
 ----------------------filter((rnk < 11))
 ------------------------PhysicalWindow
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.out
index be67d64e1d3..2d25f4d29ff 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.out
@@ -28,7 +28,7 @@ PhysicalResultSink
 ----------------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((my_customers.c_current_addr_sk = 
customer_address.ca_address_sk)) otherCondition=() build RFs:RF3 
c_current_addr_sk->[ca_address_sk]
 ------------------------------------------------PhysicalProject
 
--------------------------------------------------PhysicalOlapScan[customer_address]
 apply RFs: RF3 RF4 RF5
-------------------------------------------------PhysicalDistribute[DistributionSpecReplicated]
+------------------------------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------------------------------PhysicalProject
 ----------------------------------------------------hashAgg[GLOBAL]
 
------------------------------------------------------PhysicalDistribute[DistributionSpecHash]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.out
index b74545fe806..e88a319658f 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.out
@@ -70,10 +70,9 @@ PhysicalResultSink
 ----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_bill_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF11 ws_bill_addr_sk->[ca_address_sk]
-----------------------------PhysicalDistribute[DistributionSpecHash]
-------------------------------PhysicalProject
---------------------------------filter((customer_address.ca_gmt_offset = 
-6.00))
-----------------------------------PhysicalOlapScan[customer_address] apply 
RFs: RF11
+----------------------------PhysicalProject
+------------------------------filter((customer_address.ca_gmt_offset = -6.00))
+--------------------------------PhysicalOlapScan[customer_address] apply RFs: 
RF11
 ----------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_item_sk = item.i_item_sk)) otherCondition=() build 
RFs:RF10 i_item_sk->[ws_item_sk]
 --------------------------------PhysicalProject
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.out
index b2169d25149..889a34c0eb0 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.out
@@ -11,9 +11,8 @@ PhysicalResultSink
 ----------------hashAgg[LOCAL]
 ------------------PhysicalProject
 --------------------hashJoin[INNER_JOIN] hashCondition=((a.ca_address_sk = 
c.c_current_addr_sk)) otherCondition=() build RFs:RF5 
c_current_addr_sk->[ca_address_sk]
-----------------------PhysicalDistribute[DistributionSpecHash]
-------------------------PhysicalProject
---------------------------PhysicalOlapScan[customer_address] apply RFs: RF5
+----------------------PhysicalProject
+------------------------PhysicalOlapScan[customer_address] apply RFs: RF5
 ----------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] hashCondition=((c.c_customer_sk 
= s.ss_customer_sk)) otherCondition=() build RFs:RF4 
ss_customer_sk->[c_customer_sk]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.out
index 115af0703b4..9a6422ac468 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.out
@@ -9,10 +9,9 @@ PhysicalResultSink
 ------------hashAgg[LOCAL]
 --------------PhysicalProject
 ----------------hashJoin[INNER_JOIN] 
hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) 
otherCondition=() build RFs:RF10 c_current_addr_sk->[ca_address_sk]
-------------------PhysicalDistribute[DistributionSpecHash]
---------------------PhysicalProject
-----------------------filter((customer_address.ca_gmt_offset = -7.00))
-------------------------PhysicalOlapScan[customer_address] apply RFs: RF10
+------------------PhysicalProject
+--------------------filter((customer_address.ca_gmt_offset = -7.00))
+----------------------PhysicalOlapScan[customer_address] apply RFs: RF10
 ------------------PhysicalDistribute[DistributionSpecHash]
 --------------------PhysicalProject
 ----------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) 
otherCondition=() build RFs:RF9 ss_customer_sk->[c_customer_sk]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.out
index dc8b5303dfd..a8a295480f8 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.out
@@ -6,9 +6,8 @@ PhysicalResultSink
 ------PhysicalTopN[LOCAL_SORT]
 --------PhysicalProject
 ----------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_addr_sk = 
current_addr.ca_address_sk)) otherCondition=(( not (ca_city = bought_city))) 
build RFs:RF5 c_current_addr_sk->[ca_address_sk]
-------------PhysicalDistribute[DistributionSpecHash]
---------------PhysicalProject
-----------------PhysicalOlapScan[customer_address] apply RFs: RF5
+------------PhysicalProject
+--------------PhysicalOlapScan[customer_address] apply RFs: RF5
 ------------PhysicalDistribute[DistributionSpecHash]
 --------------PhysicalProject
 ----------------hashJoin[INNER_JOIN] hashCondition=((dn.ss_customer_sk = 
customer.c_customer_sk)) otherCondition=() build RFs:RF4 
ss_customer_sk->[c_customer_sk]
@@ -20,9 +19,8 @@ PhysicalResultSink
 ----------------------hashAgg[LOCAL]
 ------------------------PhysicalProject
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF3 ss_addr_sk->[ca_address_sk]
-----------------------------PhysicalDistribute[DistributionSpecHash]
-------------------------------PhysicalProject
---------------------------------PhysicalOlapScan[customer_address] apply RFs: 
RF3
+----------------------------PhysicalProject
+------------------------------PhysicalOlapScan[customer_address] apply RFs: RF3
 ----------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------PhysicalProject
 --------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk)) 
otherCondition=() build RFs:RF2 hd_demo_sk->[ss_hdemo_sk]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.out
index c7b568a824f..8368b9deff6 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.out
@@ -33,9 +33,8 @@ PhysicalResultSink
 ----------------------------------hashAgg[LOCAL]
 ------------------------------------PhysicalProject
 --------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) 
otherCondition=() build RFs:RF0 c_current_addr_sk->[ca_address_sk]
-----------------------------------------PhysicalDistribute[DistributionSpecHash]
-------------------------------------------PhysicalProject
---------------------------------------------PhysicalOlapScan[customer_address] 
apply RFs: RF0
+----------------------------------------PhysicalProject
+------------------------------------------PhysicalOlapScan[customer_address] 
apply RFs: RF0
 
----------------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------------PhysicalProject
 
--------------------------------------------filter((customer.c_preferred_cust_flag
 = 'Y'))
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.out
index 42598f25208..595d1fe0dcc 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.out
@@ -18,10 +18,9 @@ PhysicalResultSink
 ------------------------------PhysicalOlapScan[catalog_returns] apply RFs: RF3 
RF4 RF5
 ----------------------------PhysicalDistribute[DistributionSpecReplicated]
 ------------------------------hashJoin[INNER_JOIN] 
hashCondition=((customer_address.ca_address_sk = customer.c_current_addr_sk)) 
otherCondition=() build RFs:RF2 c_current_addr_sk->[ca_address_sk]
---------------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------------PhysicalProject
-------------------------------------filter((customer_address.ca_gmt_offset = 
-7.00))
---------------------------------------PhysicalOlapScan[customer_address] apply 
RFs: RF2
+--------------------------------PhysicalProject
+----------------------------------filter((customer_address.ca_gmt_offset = 
-7.00))
+------------------------------------PhysicalOlapScan[customer_address] apply 
RFs: RF2
 --------------------------------PhysicalDistribute[DistributionSpecHash]
 ----------------------------------hashJoin[INNER_JOIN] 
hashCondition=((household_demographics.hd_demo_sk = 
customer.c_current_hdemo_sk)) otherCondition=() build RFs:RF1 
hd_demo_sk->[c_current_hdemo_sk]
 ------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((customer_demographics.cd_demo_sk = 
customer.c_current_cdemo_sk)) otherCondition=() build RFs:RF0 
cd_demo_sk->[c_current_cdemo_sk]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.out
 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.out
index 4311ae4d19d..cdd54a29fe9 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.out
@@ -19,14 +19,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 --------------hashAgg[LOCAL]
 ----------------PhysicalProject
 ------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF6 
ws_order_number->[wr_order_number,ws_order_number]
---------------------PhysicalDistribute[DistributionSpecHash]
-----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: 
RF5 RF6
---------------------------PhysicalDistribute[DistributionSpecHash]
-----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
+--------------------PhysicalProject
+----------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 
RF6
+------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------PhysicalProject
+----------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
 --------------------PhysicalProject
 ----------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() 
build RFs:RF7 ws_order_number->[ws_order_number,ws_order_number]
 ------------------------PhysicalDistribute[DistributionSpecHash]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query13.out 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query13.out
index 0bb5e76d2be..ff71ac9d027 100644
--- a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query13.out
+++ b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query13.out
@@ -6,8 +6,9 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------hashJoin[INNER_JOIN] hashCondition=((store.s_store_sk = 
store_sales.ss_store_sk)) otherCondition=() build RFs:RF4 
ss_store_sk->[s_store_sk]
-------------PhysicalProject
---------------PhysicalOlapScan[store] apply RFs: RF4
+------------PhysicalDistribute[DistributionSpecHash]
+--------------PhysicalProject
+----------------PhysicalOlapScan[store] apply RFs: RF4
 ------------PhysicalDistribute[DistributionSpecHash]
 --------------PhysicalProject
 ----------------hashJoin[INNER_JOIN] 
hashCondition=((customer_demographics.cd_demo_sk = store_sales.ss_cdemo_sk)) 
otherCondition=((((household_demographics.hd_dep_count = 1) AND 
((((customer_demographics.cd_marital_status = 'D') AND 
(customer_demographics.cd_education_status = 'Primary')) AND 
((store_sales.ss_sales_price >= 50.00) AND (store_sales.ss_sales_price <= 
100.00))) OR (((customer_demographics.cd_marital_status = 'W') AND 
(customer_demographics.cd_education_status = '2 yr Degree [...]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query51.out 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query51.out
index 06a6c3d034e..3cacbaeedc0 100644
--- a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query51.out
+++ b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query51.out
@@ -19,9 +19,9 @@ PhysicalResultSink
 --------------------------------PhysicalDistribute[DistributionSpecHash]
 ----------------------------------hashAgg[LOCAL]
 ------------------------------------PhysicalProject
---------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF1 d_date_sk->[ws_sold_date_sk]
+--------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF1 d_date_sk->[ss_sold_date_sk]
 ----------------------------------------PhysicalProject
-------------------------------------------PhysicalOlapScan[web_sales] apply 
RFs: RF1
+------------------------------------------PhysicalOlapScan[store_sales] apply 
RFs: RF1
 
----------------------------------------PhysicalDistribute[DistributionSpecReplicated]
 ------------------------------------------PhysicalProject
 --------------------------------------------filter((date_dim.d_month_seq <= 
1223) and (date_dim.d_month_seq >= 1212))
@@ -35,9 +35,9 @@ PhysicalResultSink
 --------------------------------PhysicalDistribute[DistributionSpecHash]
 ----------------------------------hashAgg[LOCAL]
 ------------------------------------PhysicalProject
---------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ss_sold_date_sk]
+--------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF0 d_date_sk->[ws_sold_date_sk]
 ----------------------------------------PhysicalProject
-------------------------------------------PhysicalOlapScan[store_sales] apply 
RFs: RF0
+------------------------------------------PhysicalOlapScan[web_sales] apply 
RFs: RF0
 
----------------------------------------PhysicalDistribute[DistributionSpecReplicated]
 ------------------------------------------PhysicalProject
 --------------------------------------------filter((date_dim.d_month_seq <= 
1223) and (date_dim.d_month_seq >= 1212))
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query85.out 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query85.out
index 8e6087c95b3..f5b12bcfd5e 100644
--- a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query85.out
+++ b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query85.out
@@ -10,8 +10,9 @@ PhysicalResultSink
 --------------hashAgg[LOCAL]
 ----------------PhysicalProject
 ------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) 
otherCondition=() build RFs:RF9 ws_web_page_sk->[wp_web_page_sk]
---------------------PhysicalProject
-----------------------PhysicalOlapScan[web_page] apply RFs: RF9
+--------------------PhysicalDistribute[DistributionSpecHash]
+----------------------PhysicalProject
+------------------------PhysicalOlapScan[web_page] apply RFs: RF9
 --------------------PhysicalDistribute[DistributionSpecHash]
 ----------------------PhysicalProject
 ------------------------hashJoin[INNER_JOIN] 
hashCondition=((reason.r_reason_sk = web_returns.wr_reason_sk)) 
otherCondition=() build RFs:RF8 r_reason_sk->[wr_reason_sk]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query13.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query13.out
index accd0ebbb14..8260c20c39c 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query13.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query13.out
@@ -6,8 +6,9 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------hashJoin[INNER_JOIN] hashCondition=((store.s_store_sk = 
store_sales.ss_store_sk)) otherCondition=() build RFs:RF4 
ss_store_sk->[s_store_sk]
-------------PhysicalProject
---------------PhysicalOlapScan[store] apply RFs: RF4
+------------PhysicalDistribute[DistributionSpecHash]
+--------------PhysicalProject
+----------------PhysicalOlapScan[store] apply RFs: RF4
 ------------PhysicalDistribute[DistributionSpecHash]
 --------------PhysicalProject
 ----------------hashJoin[INNER_JOIN] 
hashCondition=((customer_demographics.cd_demo_sk = store_sales.ss_cdemo_sk)) 
otherCondition=((((household_demographics.hd_dep_count = 1) AND 
((((customer_demographics.cd_marital_status = 'S') AND 
(customer_demographics.cd_education_status = 'College')) AND 
((store_sales.ss_sales_price >= 50.00) AND (store_sales.ss_sales_price <= 
100.00))) OR (((customer_demographics.cd_marital_status = 'M') AND 
(customer_demographics.cd_education_status = '4 yr Degree [...]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query61.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query61.out
index 8396f36855e..baed30c1705 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query61.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query61.out
@@ -28,7 +28,7 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((((promotion.p_channel_dmail = 'Y') 
OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
 --------------------------------------PhysicalOlapScan[promotion] apply RFs: 
RF7
-----------------------------------PhysicalDistribute[DistributionSpecHash]
+----------------------------------PhysicalDistribute[DistributionSpecReplicated]
 ------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF6 ss_sold_date_sk->[d_date_sk]
 --------------------------------------PhysicalProject
 ----------------------------------------filter((date_dim.d_moy = 11) and 
(date_dim.d_year = 1999))
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query85.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query85.out
index 1e0d108c71a..24317f11fbd 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query85.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query85.out
@@ -18,8 +18,9 @@ PhysicalResultSink
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------PhysicalProject
 ----------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) 
otherCondition=() build RFs:RF7 ws_web_page_sk->[wp_web_page_sk]
-------------------------------PhysicalProject
---------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
+------------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------------PhysicalProject
+----------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
 ------------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------------PhysicalProject
 ----------------------------------hashJoin[INNER_JOIN] 
hashCondition=((cd1.cd_education_status = cd2.cd_education_status) and 
(cd1.cd_marital_status = cd2.cd_marital_status) and (cd2.cd_demo_sk = 
web_returns.wr_returning_cdemo_sk)) otherCondition=() build RFs:RF4 
wr_returning_cdemo_sk->[cd_demo_sk];RF5 
cd_marital_status->[cd_marital_status];RF6 
cd_education_status->[cd_education_status]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query13.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query13.out
index accd0ebbb14..8260c20c39c 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query13.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query13.out
@@ -6,8 +6,9 @@ PhysicalResultSink
 ------hashAgg[LOCAL]
 --------PhysicalProject
 ----------hashJoin[INNER_JOIN] hashCondition=((store.s_store_sk = 
store_sales.ss_store_sk)) otherCondition=() build RFs:RF4 
ss_store_sk->[s_store_sk]
-------------PhysicalProject
---------------PhysicalOlapScan[store] apply RFs: RF4
+------------PhysicalDistribute[DistributionSpecHash]
+--------------PhysicalProject
+----------------PhysicalOlapScan[store] apply RFs: RF4
 ------------PhysicalDistribute[DistributionSpecHash]
 --------------PhysicalProject
 ----------------hashJoin[INNER_JOIN] 
hashCondition=((customer_demographics.cd_demo_sk = store_sales.ss_cdemo_sk)) 
otherCondition=((((household_demographics.hd_dep_count = 1) AND 
((((customer_demographics.cd_marital_status = 'S') AND 
(customer_demographics.cd_education_status = 'College')) AND 
((store_sales.ss_sales_price >= 50.00) AND (store_sales.ss_sales_price <= 
100.00))) OR (((customer_demographics.cd_marital_status = 'M') AND 
(customer_demographics.cd_education_status = '4 yr Degree [...]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query61.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query61.out
index 8396f36855e..baed30c1705 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query61.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query61.out
@@ -28,7 +28,7 @@ PhysicalResultSink
 ----------------------------------PhysicalProject
 ------------------------------------filter((((promotion.p_channel_dmail = 'Y') 
OR (promotion.p_channel_email = 'Y')) OR (promotion.p_channel_tv = 'Y')))
 --------------------------------------PhysicalOlapScan[promotion] apply RFs: 
RF7
-----------------------------------PhysicalDistribute[DistributionSpecHash]
+----------------------------------PhysicalDistribute[DistributionSpecReplicated]
 ------------------------------------hashJoin[INNER_JOIN] 
hashCondition=((store_sales.ss_sold_date_sk = date_dim.d_date_sk)) 
otherCondition=() build RFs:RF6 ss_sold_date_sk->[d_date_sk]
 --------------------------------------PhysicalProject
 ----------------------------------------filter((date_dim.d_moy = 11) and 
(date_dim.d_year = 1999))
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query85.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query85.out
index 607b0f176f6..d64ad99a7a5 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query85.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query85.out
@@ -18,8 +18,9 @@ PhysicalResultSink
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------PhysicalProject
 ----------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_web_page_sk = web_page.wp_web_page_sk)) 
otherCondition=() build RFs:RF7 ws_web_page_sk->[wp_web_page_sk]
-------------------------------PhysicalProject
---------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
+------------------------------PhysicalDistribute[DistributionSpecHash]
+--------------------------------PhysicalProject
+----------------------------------PhysicalOlapScan[web_page] apply RFs: RF7
 ------------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------------PhysicalProject
 ----------------------------------hashJoin[INNER_JOIN] 
hashCondition=((cd1.cd_education_status = cd2.cd_education_status) and 
(cd1.cd_marital_status = cd2.cd_marital_status) and (cd2.cd_demo_sk = 
web_returns.wr_returning_cdemo_sk)) otherCondition=() build RFs:RF4 
wr_returning_cdemo_sk->[cd_demo_sk];RF5 
cd_marital_status->[cd_marital_status];RF6 
cd_education_status->[cd_education_status]
diff --git 
a/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy 
b/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy
index dcb06a2bbd1..1ca2d6845de 100644
--- a/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy
+++ b/regression-test/suites/correctness_p0/test_bucket_shuffle_join.groovy
@@ -18,6 +18,7 @@
 suite("test_bucket_shuffle_join") {
 
     sql "set disable_join_reorder=true"
+    sql "set parallel_pipeline_task_num=1"
 
     sql """ DROP TABLE IF EXISTS `test_colo1` """
     sql """ DROP TABLE IF EXISTS `test_colo2` """
diff --git a/regression-test/suites/nereids_p0/hint/fix_leading.groovy 
b/regression-test/suites/nereids_p0/hint/fix_leading.groovy
index 7701f49fe6d..e7cb8c6c189 100644
--- a/regression-test/suites/nereids_p0/hint/fix_leading.groovy
+++ b/regression-test/suites/nereids_p0/hint/fix_leading.groovy
@@ -26,6 +26,7 @@ suite("fix_leading") {
     // setting planner to nereids
     sql 'set exec_mem_limit=21G'
     sql 'set be_number_for_test=1'
+    sql "set parallel_pipeline_task_num=1"
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     sql 'set enable_nereids_planner=true'
     sql 'set enable_fallback_to_original_planner=false'
diff --git a/regression-test/suites/nereids_p0/hint/multi_leading.groovy 
b/regression-test/suites/nereids_p0/hint/multi_leading.groovy
index bcefcfef141..0e22c0f455a 100644
--- a/regression-test/suites/nereids_p0/hint/multi_leading.groovy
+++ b/regression-test/suites/nereids_p0/hint/multi_leading.groovy
@@ -25,6 +25,7 @@ suite("multi_leading") {
     // setting planner to nereids
     sql 'set exec_mem_limit=21G'
     sql 'set be_number_for_test=1'
+    sql 'set parallel_pipeline_task_num=1'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     sql 'set enable_nereids_planner=true'
     sql "set ignore_shape_nodes='PhysicalProject'"
diff --git a/regression-test/suites/nereids_p0/hint/test_distribute.groovy 
b/regression-test/suites/nereids_p0/hint/test_distribute.groovy
index db53a3b4ffd..06ec5da5b3f 100644
--- a/regression-test/suites/nereids_p0/hint/test_distribute.groovy
+++ b/regression-test/suites/nereids_p0/hint/test_distribute.groovy
@@ -28,6 +28,7 @@ suite("test_distribute") {
     sql 'set enable_fallback_to_original_planner=false'
     sql 'set runtime_filter_mode=OFF'
     sql 'set be_number_for_test=1'
+    sql "set parallel_pipeline_task_num=1"
     
     // create tables
     sql """drop table if exists t1;"""
diff --git a/regression-test/suites/nereids_p0/hint/test_leading.groovy 
b/regression-test/suites/nereids_p0/hint/test_leading.groovy
index 44bfed0edcd..cbbff148c3a 100644
--- a/regression-test/suites/nereids_p0/hint/test_leading.groovy
+++ b/regression-test/suites/nereids_p0/hint/test_leading.groovy
@@ -26,6 +26,7 @@ suite("test_leading") {
     // setting planner to nereids
     sql 'set exec_mem_limit=21G'
     sql 'set be_number_for_test=1'
+    sql 'set parallel_pipeline_task_num=1'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     sql 'set enable_nereids_planner=true'
     sql "set ignore_shape_nodes='PhysicalProject'"
diff --git a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy 
b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy
index e5334010d7b..afce212ae98 100644
--- a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy
+++ b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy
@@ -18,6 +18,8 @@
 suite("bucket-shuffle-join") {
     sql "SET enable_nereids_planner=true"
     sql "SET enable_fallback_to_original_planner=false"
+    sql 'SET be_number_for_test=1'
+    sql 'SET parallel_pipeline_task_num=1'
     order_qt_test_bucket """
     select * from test_bucket_shuffle_join where rectime="2021-12-01 00:00:00" 
and id in (select k1 from test_join where k1 in (1,2))
     """
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query13.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query13.groovy
index e193a3c164a..02586d5c8ac 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query13.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query13.groovy
@@ -31,7 +31,6 @@ suite("query13") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select avg(ss_quantity)
        ,avg(ss_ext_sales_price)
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.groovy
index 5f033fc2a39..5d8e7b72b96 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query19.groovy
@@ -31,7 +31,6 @@ suite("query19") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  i_brand_id brand_id, i_brand brand, i_manufact_id, 
i_manufact,
        sum(ss_ext_sales_price) ext_price
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.groovy
index 33623255505..2545983724a 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query44.groovy
@@ -31,7 +31,6 @@ suite("query44") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  asceding.rnk, i1.i_product_name best_performing, 
i2.i_product_name worst_performing
 from(select *
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query45.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query45.groovy
index bb6a1474645..7e7510ff6a0 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query45.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query45.groovy
@@ -31,7 +31,6 @@ suite("query45") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  ca_zip, ca_city, sum(ws_sales_price)
  from web_sales, customer, customer_address, date_dim, item
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.groovy
index 73aa5bc448e..9d22cc30b66 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query54.groovy
@@ -31,7 +31,6 @@ suite("query54") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """with my_customers as (
  select distinct c_customer_sk
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.groovy
index e52121652ad..b1c7169b0f4 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query56.groovy
@@ -31,7 +31,6 @@ suite("query56") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """with ss as (
  select i_item_id,sum(ss_ext_sales_price) total_sales
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.groovy
index 0c9aa79fdc0..5708cdf3572 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query6.groovy
@@ -31,7 +31,6 @@ suite("query6") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  a.ca_state state, count(*) cnt
  from customer_address a
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.groovy
index 934096e2dd0..c158a01bdea 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query61.groovy
@@ -31,7 +31,6 @@ suite("query61") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  promotions,total,cast(promotions as 
decimal(15,4))/cast(total as decimal(15,4))*100
 from
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.groovy
index 8254a07d9e4..0214be7a485 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query68.groovy
@@ -31,7 +31,6 @@ suite("query68") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  c_last_name
        ,c_first_name
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.groovy
index b80b49ca1d0..78597d354eb 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query8.groovy
@@ -31,7 +31,6 @@ suite("query8") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  s_store_name
       ,sum(ss_net_profit)
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.groovy
index 9111287fc7a..c8d78357912 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query91.groovy
@@ -31,7 +31,6 @@ suite("query91") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """select  
         cc_call_center_id Call_Center,
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.groovy
index e34c656e2cc..b3439e40240 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/bs_downgrade_shape/query95.groovy
@@ -31,7 +31,6 @@ suite("query95") {
     sql 'set enable_runtime_filter_prune=false'
     sql 'set runtime_filter_type=8'
     sql 'set dump_nereids_memo=false'
-    sql 'set enable_bucket_shuffle_downgrade=true'
     sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION"
     def ds = """with ws_wh as
 (select ws1.ws_order_number,ws1.ws_warehouse_sk wh1,ws2.ws_warehouse_sk wh2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to