This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 49d1671966e [fix](planner) query should be cancelled if limit reached (#44338) (#45223) 49d1671966e is described below commit 49d1671966ed1d69f864eb90256e793bf4dc84d2 Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Mon Dec 9 21:59:54 2024 -0800 [fix](planner) query should be cancelled if limit reached (#44338) (#45223) bp #44338 --- be/src/vec/exec/scan/scanner_scheduler.cpp | 13 ++++- be/src/vec/exec/scan/vscanner.h | 2 + .../org/apache/doris/nereids/NereidsPlanner.java | 5 -- .../org/apache/doris/planner/OriginalPlanner.java | 15 ------ .../java/org/apache/doris/planner/Planner.java | 6 --- .../main/java/org/apache/doris/qe/Coordinator.java | 25 ++++----- .../main/java/org/apache/doris/qe/LimitUtils.java | 54 ++++++++++++++++++++ .../java/org/apache/doris/qe/LimitUtilsTest.java | 59 ++++++++++++++++++++++ 8 files changed, 137 insertions(+), 42 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 385b581d2a5..3750d8b40b4 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -268,7 +268,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, } size_t raw_bytes_threshold = config::doris_scanner_row_bytes; - size_t raw_bytes_read = 0; bool first_read = true; + size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = scanner->limit(); while (!eos && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(ctx->done())) { eos = true; @@ -322,6 +322,17 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, ctx->inc_block_usage(free_block->allocated_bytes()); scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } + if (limit > 0 && limit < ctx->batch_size()) { + // If this scanner has limit, and less than batch size, + // return immediately and no need to wait raw_bytes_threshold. + // This can save time that each scanner may only return a small number of rows, + // but rows are enough from all scanners. + // If not break, the query like "select * from tbl where id=1 limit 10" + // may scan a lot data when the "id=1"'s filter ratio is high. + // If limit is larger than batch size, this rule is skipped, + // to avoid user specify a large limit and causing too much small blocks. + break; + } } // end for while if (UNLIKELY(!status.ok())) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 6c4f3294ce1..bb68055e1f0 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -156,6 +156,8 @@ public: _query_statistics = query_statistics; } + int64_t limit() const { return _limit; } + protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index d365ff912de..c9985911670 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -719,11 +719,6 @@ public class NereidsPlanner extends Planner { return plan; } - @Override - public boolean isBlockQuery() { - return true; - } - @Override public DescriptorTable getDescTable() { return descTable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index c8ef2e22662..f78b0735a1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -79,10 +79,6 @@ public class OriginalPlanner extends Planner { this.analyzer = analyzer; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public PlannerContext getPlannerContext() { return plannerContext; } @@ -276,17 +272,6 @@ public class OriginalPlanner extends Planner { if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; - if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { - isBlockQuery = true; - if (LOG.isDebugEnabled()) { - LOG.debug("this is block query"); - } - } else { - isBlockQuery = false; - if (LOG.isDebugEnabled()) { - LOG.debug("this isn't block query"); - } - } if (selectStmt.isTwoPhaseReadOptEnabled()) { // Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...` if (singleNodePlan instanceof SortNode diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 5617ad57e8f..cfcd27af8fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -44,8 +44,6 @@ public abstract class Planner { protected ArrayList<PlanFragment> fragments = Lists.newArrayList(); - protected boolean isBlockQuery = false; - protected TQueryOptions queryOptions; public abstract List<ScanNode> getScanNodes(); @@ -116,10 +114,6 @@ public abstract class Planner { return fragments; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public TQueryOptions getQueryOptions() { return queryOptions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 78493a46ad1..0f6bc0212d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -234,8 +234,6 @@ public class Coordinator implements CoordInterface { // same as backend_exec_states_.size() after Exec() private final Set<TUniqueId> instanceIds = Sets.newHashSet(); - private final boolean isBlockQuery; - private int numReceivedRows = 0; private List<String> deltaUrls; @@ -331,7 +329,6 @@ public class Coordinator implements CoordInterface { // Used for query/insert/test public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.context = context; - this.isBlockQuery = planner.isBlockQuery(); this.queryId = context.queryId(); this.fragments = planner.getFragments(); this.scanNodes = planner.getScanNodes(); @@ -374,7 +371,6 @@ public class Coordinator implements CoordInterface { // Constructor of Coordinator is too complicated. public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments, List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) { - this.isBlockQuery = true; this.jobId = jobId; this.queryId = queryId; this.descTable = descTable.toThrift(); @@ -1206,23 +1202,22 @@ public class Coordinator implements CoordInterface { numReceivedRows += resultBatch.getBatch().getRowsSize(); } + // if reached limit rows, cancel this query immediately + // to avoid BE from reading more data. + // ATTN: if change here, also need to change the same logic in QueryProcessor.getNext(); + Long limitRows = fragments.get(0).getPlanRoot().getLimit(); + boolean reachedLimit = LimitUtils.cancelIfReachLimit( + resultBatch, limitRows, numReceivedRows, this::cancelInternal); + if (resultBatch.isEos()) { receivers.remove(receiver); if (receivers.isEmpty()) { returnedAllResults = true; - } else { + } else if (!reachedLimit) { + // if reachedLimit is true, which means this query has been cancelled. + // so no need to set eos to false again. resultBatch.setEos(false); } - - // if this query is a block query do not cancel. - Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); - boolean hasLimit = numLimitRows > 0; - if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { - if (LOG.isDebugEnabled()) { - LOG.debug("no block query, return num >= limit rows, need cancel"); - } - cancelInternal(new Status(TStatusCode.LIMIT_REACH, "query reach limit")); - } } if (!returnedAllResults) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java new file mode 100644 index 00000000000..cbbe5c71a0f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/LimitUtils.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.common.Status; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.Consumer; + +/** + * This is a utility class for limit related operations. + * Because current there are 2 places need to check limit rows, so put the logic here for unification. + * - Coordinator.getNext(); + * - QueryProcessor.getNext(); + */ +public class LimitUtils { + private static final Logger LOG = LogManager.getLogger(LimitUtils.class); + private static final Status LIMIT_REACH_STATUS = new Status(TStatusCode.LIMIT_REACH, "query reach limit"); + + // if reached limit rows, cancel this query immediately + // to avoid BE from reading more data. + public static boolean cancelIfReachLimit(RowBatch resultBatch, long limitRows, long numReceivedRows, + Consumer<Status> cancelFunc) { + boolean reachedLimit = false; + if (limitRows > 0 && numReceivedRows >= limitRows) { + if (LOG.isDebugEnabled()) { + LOG.debug("reach limit rows: {}, received rows: {}, cancel query", limitRows, numReceivedRows); + } + cancelFunc.accept(LIMIT_REACH_STATUS); + // set this + resultBatch.setEos(true); + reachedLimit = true; + } + return reachedLimit; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java new file mode 100644 index 00000000000..012fbad18a5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LimitUtilsTest.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + + +import org.apache.doris.common.Status; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.function.Consumer; + +public class LimitUtilsTest { + + private static int res = 0; + + @Test + public void testUpperBound() { + Consumer<Status> cancelFunc = batch -> res = 666; + RowBatch rowBatch = new RowBatch(); + rowBatch.setEos(false); + // - no limit + Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 0, 10, cancelFunc)); + Assert.assertFalse(rowBatch.isEos()); + Assert.assertEquals(0, res); + + // - not reach limit + Assert.assertFalse(LimitUtils.cancelIfReachLimit(rowBatch, 10, 1, cancelFunc)); + Assert.assertFalse(rowBatch.isEos()); + Assert.assertEquals(0, res); + + // - reach limit + Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 10, cancelFunc)); + Assert.assertTrue(rowBatch.isEos()); + Assert.assertEquals(666, res); + + // - reach limit + res = 0; + rowBatch.setEos(false); + Assert.assertTrue(LimitUtils.cancelIfReachLimit(rowBatch, 10, 100, cancelFunc)); + Assert.assertTrue(rowBatch.isEos()); + Assert.assertEquals(666, res); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org