This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 49d1671966e [fix](planner) query should be cancelled if limit reached
(#44338) (#45223)
49d1671966e is described below
commit 49d1671966ed1d69f864eb90256e793bf4dc84d2
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Dec 9 21:59:54 2024 -0800
[fix](planner) query should be cancelled if limit reached (#44338) (#45223)
bp #44338
---
be/src/vec/exec/scan/scanner_scheduler.cpp | 13 ++++-
be/src/vec/exec/scan/vscanner.h | 2 +
.../org/apache/doris/nereids/NereidsPlanner.java | 5 --
.../org/apache/doris/planner/OriginalPlanner.java | 15 ------
.../java/org/apache/doris/planner/Planner.java | 6 ---
.../main/java/org/apache/doris/qe/Coordinator.java | 25 ++++-----
.../main/java/org/apache/doris/qe/LimitUtils.java | 54 ++++++++++++++++++++
.../java/org/apache/doris/qe/LimitUtilsTest.java | 59 ++++++++++++++++++++++
8 files changed, 137 insertions(+), 42 deletions(-)
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 385b581d2a5..3750d8b40b4 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -268,7 +268,7 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
- size_t raw_bytes_read = 0; bool first_read = true;
+ size_t raw_bytes_read = 0; bool first_read = true; int64_t limit =
scanner->limit();
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
@@ -322,6 +322,17 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
+ if (limit > 0 && limit < ctx->batch_size()) {
+ // If this scanner has limit, and less than batch size,
+ // return immediately and no need to wait
raw_bytes_threshold.
+ // This can save time that each scanner may only return a
small number of rows,
+ // but rows are enough from all scanners.
+ // If not break, the query like "select * from tbl where
id=1 limit 10"
+ // may scan a lot data when the "id=1"'s filter ratio is
high.
+ // If limit is larger than batch size, this rule is
skipped,
+ // to avoid user specify a large limit and causing too
much small blocks.
+ break;
+ }
} // end for while
if (UNLIKELY(!status.ok())) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 6c4f3294ce1..bb68055e1f0 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -156,6 +156,8 @@ public:
_query_statistics = query_statistics;
}
+ int64_t limit() const { return _limit; }
+
protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index d365ff912de..c9985911670 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -719,11 +719,6 @@ public class NereidsPlanner extends Planner {
return plan;
}
- @Override
- public boolean isBlockQuery() {
- return true;
- }
-
@Override
public DescriptorTable getDescTable() {
return descTable;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index c8ef2e22662..f78b0735a1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -79,10 +79,6 @@ public class OriginalPlanner extends Planner {
this.analyzer = analyzer;
}
- public boolean isBlockQuery() {
- return isBlockQuery;
- }
-
public PlannerContext getPlannerContext() {
return plannerContext;
}
@@ -276,17 +272,6 @@ public class OriginalPlanner extends Planner {
if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
- if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() !=
null) {
- isBlockQuery = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("this is block query");
- }
- } else {
- isBlockQuery = false;
- if (LOG.isDebugEnabled()) {
- LOG.debug("this isn't block query");
- }
- }
if (selectStmt.isTwoPhaseReadOptEnabled()) {
// Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER
BY ... LIMIT ...`
if (singleNodePlan instanceof SortNode
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 5617ad57e8f..cfcd27af8fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -44,8 +44,6 @@ public abstract class Planner {
protected ArrayList<PlanFragment> fragments = Lists.newArrayList();
- protected boolean isBlockQuery = false;
-
protected TQueryOptions queryOptions;
public abstract List<ScanNode> getScanNodes();
@@ -116,10 +114,6 @@ public abstract class Planner {
return fragments;
}
- public boolean isBlockQuery() {
- return isBlockQuery;
- }
-
public TQueryOptions getQueryOptions() {
return queryOptions;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 78493a46ad1..0f6bc0212d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -234,8 +234,6 @@ public class Coordinator implements CoordInterface {
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();
- private final boolean isBlockQuery;
-
private int numReceivedRows = 0;
private List<String> deltaUrls;
@@ -331,7 +329,6 @@ public class Coordinator implements CoordInterface {
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner
planner) {
this.context = context;
- this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
@@ -374,7 +371,6 @@ public class Coordinator implements CoordInterface {
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable
descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean
loadZeroTolerance, boolean enableProfile) {
- this.isBlockQuery = true;
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
@@ -1206,23 +1202,22 @@ public class Coordinator implements CoordInterface {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}
+ // if reached limit rows, cancel this query immediately
+ // to avoid BE from reading more data.
+ // ATTN: if change here, also need to change the same logic in
QueryProcessor.getNext();
+ Long limitRows = fragments.get(0).getPlanRoot().getLimit();
+ boolean reachedLimit = LimitUtils.cancelIfReachLimit(
+ resultBatch, limitRows, numReceivedRows, this::cancelInternal);
+
if (resultBatch.isEos()) {
receivers.remove(receiver);
if (receivers.isEmpty()) {
returnedAllResults = true;
- } else {
+ } else if (!reachedLimit) {
+ // if reachedLimit is true, which means this query has been
cancelled.
+ // so no need to set eos to false again.
resultBatch.setEos(false);
}
-
- // if this query is a block query do not cancel.
- Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
- boolean hasLimit = numLimitRows > 0;
- if (!isBlockQuery && instanceIds.size() > 1 && hasLimit &&
numReceivedRows >= numLimitRows) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("no block query, return num >= limit rows, need
cancel");
- }
- cancelInternal(new Status(TStatusCode.LIMIT_REACH, "query
reach limit"));
- }
}
if (!returnedAllResults) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
new file mode 100644
index 00000000000..cbbe5c71a0f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.common.Status;
+import org.apache.doris.thrift.TStatusCode;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.Consumer;
+
+/**
+ * This is a utility class for limit related operations.
+ * Because current there are 2 places need to check limit rows, so put the
logic here for unification.
+ * - Coordinator.getNext();
+ * - QueryProcessor.getNext();
+ */
+public class LimitUtils {
+ private static final Logger LOG = LogManager.getLogger(LimitUtils.class);
+ private static final Status LIMIT_REACH_STATUS = new
Status(TStatusCode.LIMIT_REACH, "query reach limit");
+
+ // if reached limit rows, cancel this query immediately
+ // to avoid BE from reading more data.
+ public static boolean cancelIfReachLimit(RowBatch resultBatch, long
limitRows, long numReceivedRows,
+ Consumer<Status> cancelFunc) {
+ boolean reachedLimit = false;
+ if (limitRows > 0 && numReceivedRows >= limitRows) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("reach limit rows: {}, received rows: {}, cancel
query", limitRows, numReceivedRows);
+ }
+ cancelFunc.accept(LIMIT_REACH_STATUS);
+ // set this
+ resultBatch.setEos(true);
+ reachedLimit = true;
+ }
+ return reachedLimit;
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java
new file mode 100644
index 00000000000..012fbad18a5
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+
+import org.apache.doris.common.Status;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.function.Consumer;
+
+public class LimitUtilsTest {
+
+ private static int res = 0;
+
+ @Test
+ public void testUpperBound() {
+ Consumer<Status> cancelFunc = batch -> res = 666;
+ RowBatch rowBatch = new RowBatch();
+ rowBatch.setEos(false);
+ // - no limit
+ Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10,
cancelFunc));
+ Assert.assertFalse(rowBatch.isEos());
+ Assert.assertEquals(0, res);
+
+ // - not reach limit
+ Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1,
cancelFunc));
+ Assert.assertFalse(rowBatch.isEos());
+ Assert.assertEquals(0, res);
+
+ // - reach limit
+ Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10,
cancelFunc));
+ Assert.assertTrue(rowBatch.isEos());
+ Assert.assertEquals(666, res);
+
+ // - reach limit
+ res = 0;
+ rowBatch.setEos(false);
+ Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100,
cancelFunc));
+ Assert.assertTrue(rowBatch.isEos());
+ Assert.assertEquals(666, res);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]