This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 89ff353ebe2 [opt](scanner) Control the degree of parallelism of scanner when only limit involved #39927 (#42079) 89ff353ebe2 is described below commit 89ff353ebe220c09f747cd44f7b13e762bc5e2e9 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Mon Nov 11 23:13:17 2024 +0800 [opt](scanner) Control the degree of parallelism of scanner when only limit involved #39927 (#42079) cherry pick from #39927 --------- Co-authored-by: zhiqiang-hhhh <hezhiqi...@flywheels.com> --- be/src/pipeline/exec/scan_operator.cpp | 30 ++++++++++++++++++++++ .../java/org/apache/doris/planner/ScanNode.java | 16 ++++++++++-- .../java/org/apache/doris/qe/SessionVariable.java | 24 +++++++++++++++++ 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 32943c4d44e..a59098c0fee 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -45,6 +45,8 @@ namespace doris::pipeline { +const static int32_t ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT = 10000; + #define RETURN_IF_PUSH_DOWN(stmt, status) \ if (pdt == PushDownType::UNACCEPTABLE) { \ status = stmt; \ @@ -1186,6 +1188,34 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState* if (tnode.__isset.topn_filter_source_node_ids) { topn_filter_source_node_ids = tnode.topn_filter_source_node_ids; } + + // The first branch is kept for compatibility with the old version of the FE + if (!query_options.__isset.enable_adaptive_pipeline_task_serial_read_on_limit) { + if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { + // Which means the request could be fullfilled in a single segment iterator request. + if (tnode.limit > 0 && + tnode.limit <= ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT) { + _should_run_serial = true; + } + } + } else { + DCHECK(query_options.__isset.adaptive_pipeline_task_serial_read_on_limit); + // The set of enable_adaptive_pipeline_task_serial_read_on_limit + // is checked in previous branch. + if (query_options.enable_adaptive_pipeline_task_serial_read_on_limit) { + int32_t adaptive_pipeline_task_serial_read_on_limit = + ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT; + if (query_options.__isset.adaptive_pipeline_task_serial_read_on_limit) { + adaptive_pipeline_task_serial_read_on_limit = + query_options.adaptive_pipeline_task_serial_read_on_limit; + } + + if (tnode.limit > 0 && tnode.limit <= adaptive_pipeline_task_serial_read_on_limit) { + _should_run_serial = true; + } + } + } + return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 9f28424ccc5..85c8de68b8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -739,8 +739,20 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { } public boolean shouldUseOneInstance(ConnectContext ctx) { - long limitRowsForSingleInstance = ctx == null ? 10000 : ctx.getSessionVariable().limitRowsForSingleInstance; - return hasLimit() && getLimit() < limitRowsForSingleInstance && conjuncts.isEmpty(); + int adaptivePipelineTaskSerialReadOnLimit = 10000; + + if (ctx != null) { + if (ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) { + adaptivePipelineTaskSerialReadOnLimit = ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit; + } else { + return false; + } + } else { + // No connection context, typically for broker load. + } + + // For UniqueKey table, we will use multiple instance. + return hasLimit() && getLimit() <= adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty(); } // In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c745e8913f1..164ec1d1168 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -683,6 +683,11 @@ public class SessionVariable implements Serializable, Writable { */ public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE = "enable_auto_create_when_overwrite"; + public static final String ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT = + "enable_adaptive_pipeline_task_serial_read_on_limit"; + public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT = + "adaptive_pipeline_task_serial_read_on_limit"; + /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -2273,6 +2278,22 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_COOLDOWN_REPLICA_AFFINITY, needForward = true) public boolean enableCooldownReplicaAffinity = true; + @VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = { + "开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于" + + "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan 的并行度将会被设置为 1", + "When enabled, the pipeline task concurrency will be adjusted automatically. When the scan node has no filter " + + "conditions and the limit parameter is less than the number of rows specified in " + + "adaptive_pipeline_task_serial_read_on_limit, the parallelism of the scan will be set to 1." + }) + public boolean enableAdaptivePipelineTaskSerialReadOnLimit = true; + + @VariableMgr.VarAttr(name = ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = { + "当 enable_adaptive_pipeline_task_serial_read_on_limit 开启时,scan 的并行度将会被设置为 1 的行数阈值", + "When enable_adaptive_pipeline_task_serial_read_on_limit is enabled, " + + "the number of rows at which the parallelism of the scan will be set to 1." + }) + public int adaptivePipelineTaskSerialReadOnLimit = 10000; + public void setEnableEsParallelScroll(boolean enableESParallelScroll) { this.enableESParallelScroll = enableESParallelScroll; } @@ -3911,6 +3932,9 @@ public class SessionVariable implements Serializable, Writable { tResult.setKeepCarriageReturn(keepCarriageReturn); tResult.setEnableAutoCreateWhenOverwrite(enableAutoCreateWhenOverwrite); + tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit); + tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit); + tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes); tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes); tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org