This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e29d1252004 [fix](planner) query should be cancelled if limit reached 
(#44338) (#45222)
e29d1252004 is described below

commit e29d1252004a142551cae2959511eb66b7f21a73
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Mon Dec 9 22:22:24 2024 -0800

    [fix](planner) query should be cancelled if limit reached (#44338) (#45222)
    
    cherry-pick #44338
---
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 13 ++++++++
 be/src/vec/exec/scan/vscanner.h                    |  2 ++
 .../org/apache/doris/nereids/NereidsPlanner.java   |  5 ---
 .../org/apache/doris/planner/OriginalPlanner.java  | 15 ---------
 .../java/org/apache/doris/planner/Planner.java     |  6 ----
 .../main/java/org/apache/doris/qe/Coordinator.java | 36 +++++++++++-----------
 6 files changed, 33 insertions(+), 44 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f7b6887d746..eef7acf1cb9 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -271,6 +271,7 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
     size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     size_t raw_bytes_read = 0;
     bool first_read = true;
+    int64_t limit = scanner->limit();
     while (!eos && raw_bytes_read < raw_bytes_threshold) {
         if (UNLIKELY(ctx->done())) {
             eos = true;
@@ -319,6 +320,18 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
             ctx->inc_block_usage(free_block->allocated_bytes());
             scan_task->cached_blocks.push_back(std::move(free_block));
         }
+
+        if (limit > 0 && limit < ctx->batch_size()) {
+            // If this scanner has limit, and less than batch size,
+            // return immediately and no need to wait raw_bytes_threshold.
+            // This can save time that each scanner may only return a small 
number of rows,
+            // but rows are enough from all scanners.
+            // If not break, the query like "select * from tbl where id=1 
limit 10"
+            // may scan a lot data when the "id=1"'s filter ratio is high.
+            // If limit is larger than batch size, this rule is skipped,
+            // to avoid user specify a large limit and causing too much small 
blocks.
+            break;
+        }
     } // end for while
 
     if (UNLIKELY(!status.ok())) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 03604621f05..acb715e6e47 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -167,6 +167,8 @@ public:
         _query_statistics = query_statistics;
     }
 
+    int64_t limit() const { return _limit; }
+
 protected:
     void _discard_conjuncts() {
         for (auto& conjunct : _conjuncts) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 7a8e29306c3..31ecae7f33d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -621,11 +621,6 @@ public class NereidsPlanner extends Planner {
         return plan;
     }
 
-    @Override
-    public boolean isBlockQuery() {
-        return true;
-    }
-
     @Override
     public DescriptorTable getDescTable() {
         return descTable;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 24433af00e2..3f0071680cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -80,10 +80,6 @@ public class OriginalPlanner extends Planner {
         this.analyzer = analyzer;
     }
 
-    public boolean isBlockQuery() {
-        return isBlockQuery;
-    }
-
     public PlannerContext getPlannerContext() {
         return plannerContext;
     }
@@ -274,17 +270,6 @@ public class OriginalPlanner extends Planner {
 
         if (queryStmt instanceof SelectStmt) {
             SelectStmt selectStmt = (SelectStmt) queryStmt;
-            if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != 
null) {
-                isBlockQuery = true;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("this is block query");
-                }
-            } else {
-                isBlockQuery = false;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("this isn't block query");
-                }
-            }
             // Check SelectStatement if optimization condition satisfied
             if (selectStmt.isPointQueryShortCircuit()) {
                 // Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 
1 and pk2 = 2
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 0a7246f5e1c..0a22ec841dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -43,8 +43,6 @@ public abstract class Planner {
 
     protected ArrayList<PlanFragment> fragments = Lists.newArrayList();
 
-    protected boolean isBlockQuery = false;
-
     protected TQueryOptions queryOptions;
 
     public abstract List<ScanNode> getScanNodes();
@@ -115,10 +113,6 @@ public abstract class Planner {
         return fragments;
     }
 
-    public boolean isBlockQuery() {
-        return isBlockQuery;
-    }
-
     public TQueryOptions getQueryOptions() {
         return queryOptions;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 0f5a598420d..500ba8f22f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -240,8 +240,6 @@ public class Coordinator implements CoordInterface {
     // same as backend_exec_states_.size() after Exec()
     private final Set<TUniqueId> instanceIds = Sets.newHashSet();
 
-    private final boolean isBlockQuery;
-
     private int numReceivedRows = 0;
 
     private List<String> deltaUrls;
@@ -336,7 +334,6 @@ public class Coordinator implements CoordInterface {
     // Used for query/insert/test
     public Coordinator(ConnectContext context, Analyzer analyzer, Planner 
planner) {
         this.context = context;
-        this.isBlockQuery = planner.isBlockQuery();
         this.queryId = context.queryId();
         this.fragments = planner.getFragments();
         this.scanNodes = planner.getScanNodes();
@@ -379,7 +376,6 @@ public class Coordinator implements CoordInterface {
     // Constructor of Coordinator is too complicated.
     public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable 
descTable, List<PlanFragment> fragments,
             List<ScanNode> scanNodes, String timezone, boolean 
loadZeroTolerance, boolean enableProfile) {
-        this.isBlockQuery = true;
         this.jobId = jobId;
         this.queryId = queryId;
         this.descTable = descTable.toThrift();
@@ -1448,24 +1444,28 @@ public class Coordinator implements CoordInterface {
             }
         }
 
-        if (resultBatch.isEos()) {
-            this.returnedAllResults = true;
-
-            // if this query is a block query do not cancel.
-            Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
-            boolean hasLimit = numLimitRows > 0;
-            if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && 
numReceivedRows >= numLimitRows) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("no block query, return num >= limit rows, need 
cancel");
-                }
-                cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, 
"query reach limit");
+        if (resultBatch.getBatch() != null) {
+            numReceivedRows += resultBatch.getBatch().getRowsSize();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("number received rows: {}, {}", numReceivedRows, 
DebugUtil.printId(queryId));
             }
-            if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().dryRunQuery) {
+        }
+
+        if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().dryRunQuery) {
+            if (resultBatch.isEos()) {
                 numReceivedRows = 0;
                 numReceivedRows += 
resultBatch.getQueryStatistics().getReturnedRows();
             }
-        } else if (resultBatch.getBatch() != null) {
-            numReceivedRows += resultBatch.getBatch().getRowsSize();
+        }
+
+        Long limitRows = fragments.get(0).getPlanRoot().getLimit();
+        if (limitRows > 0 && numReceivedRows >= limitRows) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("reach limit rows: {}, received rows: {}, cancel 
query, {}",
+                        limitRows, numReceivedRows, 
DebugUtil.printId(queryId));
+            }
+            cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, 
"reach limit");
+            resultBatch.setEos(true);
         }
 
         return resultBatch;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to