This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e29d1252004 [fix](planner) query should be cancelled if limit reached (#44338) (#45222) e29d1252004 is described below commit e29d1252004a142551cae2959511eb66b7f21a73 Author: Mingyu Chen (Rayner) <morning...@163.com> AuthorDate: Mon Dec 9 22:22:24 2024 -0800 [fix](planner) query should be cancelled if limit reached (#44338) (#45222) cherry-pick #44338 --- be/src/vec/exec/scan/scanner_scheduler.cpp | 13 ++++++++ be/src/vec/exec/scan/vscanner.h | 2 ++ .../org/apache/doris/nereids/NereidsPlanner.java | 5 --- .../org/apache/doris/planner/OriginalPlanner.java | 15 --------- .../java/org/apache/doris/planner/Planner.java | 6 ---- .../main/java/org/apache/doris/qe/Coordinator.java | 36 +++++++++++----------- 6 files changed, 33 insertions(+), 44 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index f7b6887d746..eef7acf1cb9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -271,6 +271,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, size_t raw_bytes_threshold = config::doris_scanner_row_bytes; size_t raw_bytes_read = 0; bool first_read = true; + int64_t limit = scanner->limit(); while (!eos && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(ctx->done())) { eos = true; @@ -319,6 +320,18 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, ctx->inc_block_usage(free_block->allocated_bytes()); scan_task->cached_blocks.push_back(std::move(free_block)); } + + if (limit > 0 && limit < ctx->batch_size()) { + // If this scanner has limit, and less than batch size, + // return immediately and no need to wait raw_bytes_threshold. + // This can save time that each scanner may only return a small number of rows, + // but rows are enough from all scanners. + // If not break, the query like "select * from tbl where id=1 limit 10" + // may scan a lot data when the "id=1"'s filter ratio is high. + // If limit is larger than batch size, this rule is skipped, + // to avoid user specify a large limit and causing too much small blocks. + break; + } } // end for while if (UNLIKELY(!status.ok())) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 03604621f05..acb715e6e47 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -167,6 +167,8 @@ public: _query_statistics = query_statistics; } + int64_t limit() const { return _limit; } + protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 7a8e29306c3..31ecae7f33d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -621,11 +621,6 @@ public class NereidsPlanner extends Planner { return plan; } - @Override - public boolean isBlockQuery() { - return true; - } - @Override public DescriptorTable getDescTable() { return descTable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 24433af00e2..3f0071680cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -80,10 +80,6 @@ public class OriginalPlanner extends Planner { this.analyzer = analyzer; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public PlannerContext getPlannerContext() { return plannerContext; } @@ -274,17 +270,6 @@ public class OriginalPlanner extends Planner { if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; - if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { - isBlockQuery = true; - if (LOG.isDebugEnabled()) { - LOG.debug("this is block query"); - } - } else { - isBlockQuery = false; - if (LOG.isDebugEnabled()) { - LOG.debug("this isn't block query"); - } - } // Check SelectStatement if optimization condition satisfied if (selectStmt.isPointQueryShortCircuit()) { // Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 1 and pk2 = 2 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 0a7246f5e1c..0a22ec841dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -43,8 +43,6 @@ public abstract class Planner { protected ArrayList<PlanFragment> fragments = Lists.newArrayList(); - protected boolean isBlockQuery = false; - protected TQueryOptions queryOptions; public abstract List<ScanNode> getScanNodes(); @@ -115,10 +113,6 @@ public abstract class Planner { return fragments; } - public boolean isBlockQuery() { - return isBlockQuery; - } - public TQueryOptions getQueryOptions() { return queryOptions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0f5a598420d..500ba8f22f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -240,8 +240,6 @@ public class Coordinator implements CoordInterface { // same as backend_exec_states_.size() after Exec() private final Set<TUniqueId> instanceIds = Sets.newHashSet(); - private final boolean isBlockQuery; - private int numReceivedRows = 0; private List<String> deltaUrls; @@ -336,7 +334,6 @@ public class Coordinator implements CoordInterface { // Used for query/insert/test public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.context = context; - this.isBlockQuery = planner.isBlockQuery(); this.queryId = context.queryId(); this.fragments = planner.getFragments(); this.scanNodes = planner.getScanNodes(); @@ -379,7 +376,6 @@ public class Coordinator implements CoordInterface { // Constructor of Coordinator is too complicated. public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments, List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) { - this.isBlockQuery = true; this.jobId = jobId; this.queryId = queryId; this.descTable = descTable.toThrift(); @@ -1448,24 +1444,28 @@ public class Coordinator implements CoordInterface { } } - if (resultBatch.isEos()) { - this.returnedAllResults = true; - - // if this query is a block query do not cancel. - Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); - boolean hasLimit = numLimitRows > 0; - if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) { - if (LOG.isDebugEnabled()) { - LOG.debug("no block query, return num >= limit rows, need cancel"); - } - cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit"); + if (resultBatch.getBatch() != null) { + numReceivedRows += resultBatch.getBatch().getRowsSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("number received rows: {}, {}", numReceivedRows, DebugUtil.printId(queryId)); } - if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) { + } + + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) { + if (resultBatch.isEos()) { numReceivedRows = 0; numReceivedRows += resultBatch.getQueryStatistics().getReturnedRows(); } - } else if (resultBatch.getBatch() != null) { - numReceivedRows += resultBatch.getBatch().getRowsSize(); + } + + Long limitRows = fragments.get(0).getPlanRoot().getLimit(); + if (limitRows > 0 && numReceivedRows >= limitRows) { + if (LOG.isDebugEnabled()) { + LOG.debug("reach limit rows: {}, received rows: {}, cancel query, {}", + limitRows, numReceivedRows, DebugUtil.printId(queryId)); + } + cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, "reach limit"); + resultBatch.setEos(true); } return resultBatch; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org