This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c6622dbe459 [opt](coord) use single instance only with small limit 
#33888 (#33903)
c6622dbe459 is described below

commit c6622dbe45968b5ed1246b640e78c7d978908d9c
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Apr 19 22:22:08 2024 +0800

    [opt](coord) use single instance only with small limit #33888 (#33903)
---
 .../src/main/java/org/apache/doris/planner/ScanNode.java      |  5 +++--
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java |  4 ++--
 .../src/main/java/org/apache/doris/qe/SessionVariable.java    | 11 +++++++++++
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 99184df1453..1c5ee53e34b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -708,7 +708,8 @@ public abstract class ScanNode extends PlanNode {
         return isKeySearch() || !enableShardScan;
     }
 
-    public boolean shouldUseOneInstance() {
-        return hasLimit() && conjuncts.isEmpty();
+    public boolean shouldUseOneInstance(ConnectContext ctx) {
+        long limitRowsForSingleInstance = ctx == null ? 10000 : 
ctx.getSessionVariable().limitRowsForSingleInstance;
+        return hasLimit() && getLimit() < limitRowsForSingleInstance && 
conjuncts.isEmpty();
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index cd3b8f38770..164072c0b5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1759,7 +1759,7 @@ public class Coordinator implements CoordInterface {
                             }
                             // if have limit and no conjuncts, only need 1 
instance to save cpu and
                             // mem resource
-                            if (node.isPresent() && 
node.get().shouldUseOneInstance()) {
+                            if (node.isPresent() && 
node.get().shouldUseOneInstance(ConnectContext.get())) {
                                 expectedInstanceNum = 1;
                             }
 
@@ -1772,7 +1772,7 @@ public class Coordinator implements CoordInterface {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
                             // if have limit and conjuncts, only need 1 
instance to save cpu and
                             // mem resource
-                            if (node.isPresent() && 
node.get().shouldUseOneInstance()) {
+                            if (node.isPresent() && 
node.get().shouldUseOneInstance(ConnectContext.get())) {
                                 expectedInstanceNum = 1;
                             }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index efcfec7570e..633389812e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -461,6 +461,9 @@ public class SessionVariable implements Serializable, 
Writable {
     );
 
     public static final String ENABLE_STATS = "enable_stats";
+
+    public static final String LIMIT_ROWS_FOR_SINGLE_INSTANCE = 
"limit_rows_for_single_instance";
+
     /**
      * If set false, user couldn't submit analyze SQL and FE won't allocate 
any related resources.
      */
@@ -1355,6 +1358,14 @@ public class SessionVariable implements Serializable, 
Writable {
                     "当变量为true时,show processlist命令展示所有fe的连接"})
     public boolean showAllFeConnection = false;
 
+    @VariableMgr.VarAttr(name = LIMIT_ROWS_FOR_SINGLE_INSTANCE,
+            description = {"当一个 ScanNode 上没有过滤条件,且 limit 值小于这个阈值时,"
+                    + "系统会将这个算子的并发度调整为1,以减少简单查询的扇出",
+                    "When a ScanNode has no filter conditions and the limit 
value is less than this threshold, "
+                            + "the system will adjust the concurrency of this 
operator to 1 "
+                            + "to reduce the fan-out of simple queries"})
+    public long limitRowsForSingleInstance = 10000;
+
     public Set<Integer> getIgnoredRuntimeFilterIds() {
         return Arrays.stream(ignoreRuntimeFilterIds.split(",[\\s]*"))
                 .map(v -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to