This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 49d1671966e [fix](planner) query should be cancelled if limit reached 
(#44338) (#45223)
49d1671966e is described below

commit 49d1671966ed1d69f864eb90256e793bf4dc84d2
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Mon Dec 9 21:59:54 2024 -0800

    [fix](planner) query should be cancelled if limit reached (#44338) (#45223)
    
    bp #44338
---
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 13 ++++-
 be/src/vec/exec/scan/vscanner.h                    |  2 +
 .../org/apache/doris/nereids/NereidsPlanner.java   |  5 --
 .../org/apache/doris/planner/OriginalPlanner.java  | 15 ------
 .../java/org/apache/doris/planner/Planner.java     |  6 ---
 .../main/java/org/apache/doris/qe/Coordinator.java | 25 ++++-----
 .../main/java/org/apache/doris/qe/LimitUtils.java  | 54 ++++++++++++++++++++
 .../java/org/apache/doris/qe/LimitUtilsTest.java   | 59 ++++++++++++++++++++++
 8 files changed, 137 insertions(+), 42 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 385b581d2a5..3750d8b40b4 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -268,7 +268,7 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
             }
 
             size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
-            size_t raw_bytes_read = 0; bool first_read = true;
+            size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = 
scanner->limit();
             while (!eos && raw_bytes_read < raw_bytes_threshold) {
                 if (UNLIKELY(ctx->done())) {
                     eos = true;
@@ -322,6 +322,17 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     ctx->inc_block_usage(free_block->allocated_bytes());
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }
+                if (limit > 0 && limit < ctx->batch_size()) {
+                    // If this scanner has limit, and less than batch size,
+                    // return immediately and no need to wait 
raw_bytes_threshold.
+                    // This can save time that each scanner may only return a 
small number of rows,
+                    // but rows are enough from all scanners.
+                    // If not break, the query like "select * from tbl where 
id=1 limit 10"
+                    // may scan a lot data when the "id=1"'s filter ratio is 
high.
+                    // If limit is larger than batch size, this rule is 
skipped,
+                    // to avoid user specify a large limit and causing too 
much small blocks.
+                    break;
+                }
             } // end for while
 
             if (UNLIKELY(!status.ok())) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 6c4f3294ce1..bb68055e1f0 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -156,6 +156,8 @@ public:
         _query_statistics = query_statistics;
     }
 
+    int64_t limit() const { return _limit; }
+
 protected:
     void _discard_conjuncts() {
         for (auto& conjunct : _conjuncts) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index d365ff912de..c9985911670 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -719,11 +719,6 @@ public class NereidsPlanner extends Planner {
         return plan;
     }
 
-    @Override
-    public boolean isBlockQuery() {
-        return true;
-    }
-
     @Override
     public DescriptorTable getDescTable() {
         return descTable;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index c8ef2e22662..f78b0735a1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -79,10 +79,6 @@ public class OriginalPlanner extends Planner {
         this.analyzer = analyzer;
     }
 
-    public boolean isBlockQuery() {
-        return isBlockQuery;
-    }
-
     public PlannerContext getPlannerContext() {
         return plannerContext;
     }
@@ -276,17 +272,6 @@ public class OriginalPlanner extends Planner {
 
         if (queryStmt instanceof SelectStmt) {
             SelectStmt selectStmt = (SelectStmt) queryStmt;
-            if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != 
null) {
-                isBlockQuery = true;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("this is block query");
-                }
-            } else {
-                isBlockQuery = false;
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("this isn't block query");
-                }
-            }
             if (selectStmt.isTwoPhaseReadOptEnabled()) {
                 // Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER 
BY ... LIMIT ...`
                 if (singleNodePlan instanceof SortNode
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 5617ad57e8f..cfcd27af8fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -44,8 +44,6 @@ public abstract class Planner {
 
     protected ArrayList<PlanFragment> fragments = Lists.newArrayList();
 
-    protected boolean isBlockQuery = false;
-
     protected TQueryOptions queryOptions;
 
     public abstract List<ScanNode> getScanNodes();
@@ -116,10 +114,6 @@ public abstract class Planner {
         return fragments;
     }
 
-    public boolean isBlockQuery() {
-        return isBlockQuery;
-    }
-
     public TQueryOptions getQueryOptions() {
         return queryOptions;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 78493a46ad1..0f6bc0212d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -234,8 +234,6 @@ public class Coordinator implements CoordInterface {
     // same as backend_exec_states_.size() after Exec()
     private final Set<TUniqueId> instanceIds = Sets.newHashSet();
 
-    private final boolean isBlockQuery;
-
     private int numReceivedRows = 0;
 
     private List<String> deltaUrls;
@@ -331,7 +329,6 @@ public class Coordinator implements CoordInterface {
     // Used for query/insert/test
     public Coordinator(ConnectContext context, Analyzer analyzer, Planner 
planner) {
         this.context = context;
-        this.isBlockQuery = planner.isBlockQuery();
         this.queryId = context.queryId();
         this.fragments = planner.getFragments();
         this.scanNodes = planner.getScanNodes();
@@ -374,7 +371,6 @@ public class Coordinator implements CoordInterface {
     // Constructor of Coordinator is too complicated.
     public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable 
descTable, List<PlanFragment> fragments,
             List<ScanNode> scanNodes, String timezone, boolean 
loadZeroTolerance, boolean enableProfile) {
-        this.isBlockQuery = true;
         this.jobId = jobId;
         this.queryId = queryId;
         this.descTable = descTable.toThrift();
@@ -1206,23 +1202,22 @@ public class Coordinator implements CoordInterface {
             numReceivedRows += resultBatch.getBatch().getRowsSize();
         }
 
+        // if reached limit rows, cancel this query immediately
+        // to avoid BE from reading more data.
+        // ATTN: if change here, also need to change the same logic in 
QueryProcessor.getNext();
+        Long limitRows = fragments.get(0).getPlanRoot().getLimit();
+        boolean reachedLimit = LimitUtils.cancelIfReachLimit(
+                resultBatch, limitRows, numReceivedRows, this::cancelInternal);
+
         if (resultBatch.isEos()) {
             receivers.remove(receiver);
             if (receivers.isEmpty()) {
                 returnedAllResults = true;
-            } else {
+            } else if (!reachedLimit) {
+                // if reachedLimit is true, which means this query has been 
cancelled.
+                // so no need to set eos to false again.
                 resultBatch.setEos(false);
             }
-
-            // if this query is a block query do not cancel.
-            Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
-            boolean hasLimit = numLimitRows > 0;
-            if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && 
numReceivedRows >= numLimitRows) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("no block query, return num >= limit rows, need 
cancel");
-                }
-                cancelInternal(new Status(TStatusCode.LIMIT_REACH, "query 
reach limit"));
-            }
         }
 
         if (!returnedAllResults) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
new file mode 100644
index 00000000000..cbbe5c71a0f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.Status;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.Consumer;
+
+/**
+ * This is a utility class for limit related operations.
+ * Because current there are 2 places need to check limit rows, so put the 
logic here for unification.
+ *  - Coordinator.getNext();
+ *  - QueryProcessor.getNext();
+ */
+public class LimitUtils {
+    private static final Logger LOG = LogManager.getLogger(LimitUtils.class);
+    private static final Status LIMIT_REACH_STATUS = new 
Status(TStatusCode.LIMIT_REACH, "query reach limit");
+
+    // if reached limit rows, cancel this query immediately
+    // to avoid BE from reading more data.
+    public static boolean cancelIfReachLimit(RowBatch resultBatch, long 
limitRows, long numReceivedRows,
+            Consumer<Status> cancelFunc) {
+        boolean reachedLimit = false;
+        if (limitRows > 0 && numReceivedRows >= limitRows) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("reach limit rows: {}, received rows: {}, cancel 
query", limitRows, numReceivedRows);
+            }
+            cancelFunc.accept(LIMIT_REACH_STATUS);
+            // set this
+            resultBatch.setEos(true);
+            reachedLimit = true;
+        }
+        return reachedLimit;
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java
new file mode 100644
index 00000000000..012fbad18a5
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+
+import org.apache.doris.common.Status;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.function.Consumer;
+
+public class LimitUtilsTest {
+
+    private static int res = 0;
+
+    @Test
+    public void testUpperBound() {
+        Consumer<Status> cancelFunc = batch -> res = 666;
+        RowBatch rowBatch = new RowBatch();
+        rowBatch.setEos(false);
+        // - no limit
+        Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10, 
cancelFunc));
+        Assert.assertFalse(rowBatch.isEos());
+        Assert.assertEquals(0, res);
+
+        // - not reach limit
+        Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1, 
cancelFunc));
+        Assert.assertFalse(rowBatch.isEos());
+        Assert.assertEquals(0, res);
+
+        // - reach limit
+        Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10, 
cancelFunc));
+        Assert.assertTrue(rowBatch.isEos());
+        Assert.assertEquals(666, res);
+
+        // - reach limit
+        res = 0;
+        rowBatch.setEos(false);
+        Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100, 
cancelFunc));
+        Assert.assertTrue(rowBatch.isEos());
+        Assert.assertEquals(666, res);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to