This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 89ff353ebe2 [opt](scanner) Control the degree of parallelism of 
scanner when only limit involved #39927 (#42079)
89ff353ebe2 is described below

commit 89ff353ebe220c09f747cd44f7b13e762bc5e2e9
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Mon Nov 11 23:13:17 2024 +0800

    [opt](scanner) Control the degree of parallelism of scanner when only limit 
involved #39927 (#42079)
    
    cherry pick from #39927
    
    ---------
    
    Co-authored-by: zhiqiang-hhhh <hezhiqi...@flywheels.com>
---
 be/src/pipeline/exec/scan_operator.cpp             | 30 ++++++++++++++++++++++
 .../java/org/apache/doris/planner/ScanNode.java    | 16 ++++++++++--
 .../java/org/apache/doris/qe/SessionVariable.java  | 24 +++++++++++++++++
 3 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 32943c4d44e..a59098c0fee 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -45,6 +45,8 @@
 
 namespace doris::pipeline {
 
+const static int32_t ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT = 
10000;
+
 #define RETURN_IF_PUSH_DOWN(stmt, status)    \
     if (pdt == PushDownType::UNACCEPTABLE) { \
         status = stmt;                       \
@@ -1186,6 +1188,34 @@ Status ScanOperatorX<LocalStateType>::init(const 
TPlanNode& tnode, RuntimeState*
     if (tnode.__isset.topn_filter_source_node_ids) {
         topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
     }
+
+    // The first branch is kept for compatibility with the old version of the 
FE
+    if 
(!query_options.__isset.enable_adaptive_pipeline_task_serial_read_on_limit) {
+        if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
+            // Which means the request could be fullfilled in a single segment 
iterator request.
+            if (tnode.limit > 0 &&
+                tnode.limit <= 
ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT) {
+                _should_run_serial = true;
+            }
+        }
+    } else {
+        
DCHECK(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit);
+        // The set of enable_adaptive_pipeline_task_serial_read_on_limit
+        // is checked in previous branch.
+        if (query_options.enable_adaptive_pipeline_task_serial_read_on_limit) {
+            int32_t adaptive_pipeline_task_serial_read_on_limit =
+                    ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT;
+            if 
(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit) {
+                adaptive_pipeline_task_serial_read_on_limit =
+                        
query_options.adaptive_pipeline_task_serial_read_on_limit;
+            }
+
+            if (tnode.limit > 0 && tnode.limit <= 
adaptive_pipeline_task_serial_read_on_limit) {
+                _should_run_serial = true;
+            }
+        }
+    }
+
     return Status::OK();
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 9f28424ccc5..85c8de68b8c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -739,8 +739,20 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     }
 
     public boolean shouldUseOneInstance(ConnectContext ctx) {
-        long limitRowsForSingleInstance = ctx == null ? 10000 : 
ctx.getSessionVariable().limitRowsForSingleInstance;
-        return hasLimit() && getLimit() < limitRowsForSingleInstance && 
conjuncts.isEmpty();
+        int adaptivePipelineTaskSerialReadOnLimit = 10000;
+
+        if (ctx != null) {
+            if 
(ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) {
+                adaptivePipelineTaskSerialReadOnLimit = 
ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit;
+            } else {
+                return false;
+            }
+        } else {
+            // No connection context, typically for broker load.
+        }
+
+        // For UniqueKey table, we will use multiple instance.
+        return hasLimit() && getLimit() <= 
adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty();
     }
 
     // In cloud mode, meta read lock is not enough to keep a snapshot of the 
partition versions.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c745e8913f1..164ec1d1168 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -683,6 +683,11 @@ public class SessionVariable implements Serializable, 
Writable {
      */
     public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE = 
"enable_auto_create_when_overwrite";
 
+    public static final String 
ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
+                                    
"enable_adaptive_pipeline_task_serial_read_on_limit";
+    public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
+                                    
"adaptive_pipeline_task_serial_read_on_limit";
+
     /**
      * If set false, user couldn't submit analyze SQL and FE won't allocate 
any related resources.
      */
@@ -2273,6 +2278,22 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_COOLDOWN_REPLICA_AFFINITY, needForward 
= true)
     public boolean enableCooldownReplicaAffinity = true;
 
+    @VariableMgr.VarAttr(name = 
ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, 
description = {
+        "开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于"
+            + "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan 
的并行度将会被设置为 1",
+        "When enabled, the pipeline task concurrency will be adjusted 
automatically. When the scan node has no filter "
+            + "conditions and the limit parameter is less than the number of 
rows specified in "
+            + "adaptive_pipeline_task_serial_read_on_limit, the parallelism of 
the scan will be set to 1."
+    })
+    public boolean enableAdaptivePipelineTaskSerialReadOnLimit = true;
+
+    @VariableMgr.VarAttr(name = ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, 
needForward = true, description = {
+        "当 enable_adaptive_pipeline_task_serial_read_on_limit 开启时,scan 
的并行度将会被设置为 1 的行数阈值",
+            "When enable_adaptive_pipeline_task_serial_read_on_limit is 
enabled, "
+            + "the number of rows at which the parallelism of the scan will be 
set to 1."
+    })
+    public int adaptivePipelineTaskSerialReadOnLimit = 10000;
+
     public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
         this.enableESParallelScroll = enableESParallelScroll;
     }
@@ -3911,6 +3932,9 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setKeepCarriageReturn(keepCarriageReturn);
         
tResult.setEnableAutoCreateWhenOverwrite(enableAutoCreateWhenOverwrite);
 
+        
tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
+        
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
+
         tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes);
         tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes);
         tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to