This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 39b5682d59 [Pipeline](shared_scan_opt) Support shared scan opt in pipeline exec engine 39b5682d59 is described below commit 39b5682d59e798fa7fb60315dbf823aa34888891 Author: HappenLee <happen...@hotmail.com> AuthorDate: Mon Mar 13 10:33:57 2023 +0800 [Pipeline](shared_scan_opt) Support shared scan opt in pipeline exec engine --- be/src/common/status.h | 4 + be/src/exprs/runtime_filter.cpp | 50 ---------- be/src/pipeline/exec/scan_operator.cpp | 20 ++-- be/src/pipeline/pipeline_fragment_context.cpp | 3 + be/src/pipeline/pipeline_task.cpp | 3 + be/src/runtime/query_fragments_ctx.h | 7 ++ be/src/runtime/runtime_state.h | 6 ++ be/src/vec/CMakeLists.txt | 3 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 7 +- be/src/vec/exec/scan/pip_scanner_context.h | 62 ++++++++++-- be/src/vec/exec/scan/scanner_context.cpp | 105 ++++++++++++--------- be/src/vec/exec/scan/scanner_context.h | 49 ++++------ be/src/vec/exec/scan/vscan_node.cpp | 105 +++++++++++++++------ be/src/vec/exec/scan/vscan_node.h | 12 ++- be/src/vec/functions/function_timestamp.cpp | 2 +- be/src/vec/runtime/shared_hash_table_controller.h | 2 +- be/src/vec/runtime/shared_scanner_controller.h | 69 ++++++++++++++ .../properties/ChildOutputPropertyDeriver.java | 5 +- .../org/apache/doris/planner/OlapScanNode.java | 3 + .../main/java/org/apache/doris/qe/Coordinator.java | 25 +++-- .../java/org/apache/doris/qe/SessionVariable.java | 1 + gensrc/thrift/PaloInternalService.thrift | 4 + .../data/nereids_syntax_p0/grouping_sets.out | 11 ++- .../nereids_syntax_p0/sub_query_correlated.out | 90 +++++++++--------- .../sub_query_diff_old_optimize.out | 12 +-- .../correctness_p0/test_colocate_join.groovy | 2 +- .../duplicate/storage/test_dup_tab_char.groovy | 2 +- .../nereids_function_p0/gen_function/gen.groovy | 2 +- .../suites/nereids_syntax_p0/grouping_sets.groovy | 4 +- .../suites/nereids_syntax_p0/set_operation.groovy | 6 +- .../nereids_syntax_p0/sub_query_correlated.groovy | 46 ++++----- .../sub_query_diff_old_optimize.groovy | 4 +- .../performance_p0/redundant_conjuncts.groovy | 4 +- .../conditional_functions/test_query_limit.groovy | 6 +- .../window_functions/test_window_fn.groovy | 20 ++-- 35 files changed, 472 insertions(+), 284 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index e4fdc27911..2c0da39b8e 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -246,6 +246,7 @@ E(SEGCOMPACTION_INIT_READER, -3117); E(SEGCOMPACTION_INIT_WRITER, -3118); E(SEGCOMPACTION_FAILED, -3119); E(PIP_WAIT_FOR_RF, -3120); +E(PIP_WAIT_FOR_SC, -3121); E(INVERTED_INDEX_INVALID_PARAMETERS, -6000); E(INVERTED_INDEX_NOT_SUPPORTED, -6001); E(INVERTED_INDEX_CLUCENE_ERROR, -6002); @@ -383,6 +384,7 @@ public: ERROR_CTOR(EndOfFile, END_OF_FILE) ERROR_CTOR(InternalError, INTERNAL_ERROR) ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF) + ERROR_CTOR(WaitForScannerContext, PIP_WAIT_FOR_SC) ERROR_CTOR(RuntimeError, RUNTIME_ERROR) ERROR_CTOR(Cancelled, CANCELLED) ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED) @@ -402,6 +404,8 @@ public: bool ok() const { return _code == ErrorCode::OK; } + bool is_blocked_by_sc() const { return _code == ErrorCode::PIP_WAIT_FOR_SC; } + bool is_io_error() const { return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH == _code || ErrorCode::CHECKSUM_ERROR == _code || ErrorCode::FILE_DATA_ERROR == _code || diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 7ec1f5a21c..5db63116cd 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -43,56 +43,6 @@ #include "vec/runtime/shared_hash_table_controller.h" namespace doris { -// PrimitiveType->TExprNodeType -// TODO: use constexpr if we use c++14 -TExprNodeType::type get_expr_node_type(PrimitiveType type) { - switch (type) { - case TYPE_BOOLEAN: - return TExprNodeType::BOOL_LITERAL; - - case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - return TExprNodeType::INT_LITERAL; - - case TYPE_LARGEINT: - return TExprNodeType::LARGE_INT_LITERAL; - break; - - case TYPE_NULL: - return TExprNodeType::NULL_LITERAL; - - case TYPE_FLOAT: - case TYPE_DOUBLE: - case TYPE_TIME: - case TYPE_TIMEV2: - return TExprNodeType::FLOAT_LITERAL; - break; - - case TYPE_DECIMAL32: - case TYPE_DECIMAL64: - case TYPE_DECIMAL128I: - case TYPE_DECIMALV2: - return TExprNodeType::DECIMAL_LITERAL; - - case TYPE_DATETIME: - case TYPE_DATEV2: - case TYPE_DATETIMEV2: - return TExprNodeType::DATE_LITERAL; - - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_HLL: - case TYPE_OBJECT: - case TYPE_STRING: - return TExprNodeType::STRING_LITERAL; - - default: - DCHECK(false) << "Invalid type."; - return TExprNodeType::NULL_LITERAL; - } -} // PrimitiveType-> PColumnType // TODO: use constexpr if we use c++14 diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index f673ca1afa..6451927f77 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -25,13 +25,21 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator) bool ScanOperator::can_read() { - if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->no_schedule()) { - // _eos: need eos - // _scanner_ctx->done(): need finish - // _scanner_ctx->no_schedule(): should schedule _scanner_ctx - return true; + if (!_node->_opened) { + if (_node->_should_create_scanner || _node->ready_to_open()) { + return true; + } else { + return false; + } } else { - return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process + if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->no_schedule()) { + // _eos: need eos + // _scanner_ctx->done(): need finish + // _scanner_ctx->no_schedule(): should schedule _scanner_ctx + return true; + } else { + return _node->ready_to_read(); // there are some blocks to process + } } } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 727383d1a5..c2852b67a9 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -344,6 +344,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (request.__isset.load_job_id) { _runtime_state->set_load_job_id(request.load_job_id); } + if (request.__isset.shared_scan_opt) { + _runtime_state->set_shared_scan_opt(request.shared_scan_opt); + } if (request.query_options.__isset.is_report_success) { fragment_context->set_is_report_success(request.query_options.is_report_success); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 6f5732121e..fecb7d61b3 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -135,6 +135,9 @@ Status PipelineTask::execute(bool* eos) { if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) { set_state(PipelineTaskState::BLOCKED_FOR_RF); return Status::OK(); + } else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) { + set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); + return Status::OK(); } RETURN_IF_ERROR(st); } diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h index 389cecb860..098bcce063 100644 --- a/be/src/runtime/query_fragments_ctx.h +++ b/be/src/runtime/query_fragments_ctx.h @@ -33,6 +33,7 @@ #include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/shared_hash_table_controller.h" +#include "vec/runtime/shared_scanner_controller.h" namespace doris { @@ -46,6 +47,7 @@ public: : fragment_num(total_fragment_num), timeout_second(-1), _exec_env(exec_env) { _start_time = DateTimeValue::local_time(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); + _shared_scanner_controller.reset(new vectorized::SharedScannerController()); } ~QueryFragmentsCtx() { @@ -122,6 +124,10 @@ public: return _shared_hash_table_controller; } + std::shared_ptr<vectorized::SharedScannerController> get_shared_scanner_controller() { + return _shared_scanner_controller; + } + vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; } public: @@ -167,6 +173,7 @@ private: std::atomic<bool> _is_cancelled {false}; std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller; + std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller; vectorized::RuntimePredicate _runtime_predicate; }; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index d309390d6a..7ded1c66fc 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -88,6 +88,7 @@ public: bool abort_on_default_limit_exceeded() const { return _query_options.abort_on_default_limit_exceeded; } + int query_parallel_instance_num() const { return _query_options.parallel_instance; } int max_errors() const { return _query_options.max_errors; } int query_timeout() const { return _query_options.query_timeout; } int insert_timeout() const { return _query_options.insert_timeout; } @@ -200,6 +201,10 @@ public: int64_t load_job_id() const { return _load_job_id; } + void set_shared_scan_opt(bool shared_scan_opt) { _shared_scan_opt = shared_scan_opt; } + + bool shared_scan_opt() const { return _shared_scan_opt; } + const std::string get_error_log_file_path() const { return _error_log_file_path; } // append error msg and error line to file when loading data. @@ -453,6 +458,7 @@ private: std::string _db_name; std::string _load_dir; int64_t _load_job_id; + bool _shared_scan_opt = false; // mini load int64_t _normal_row_number; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 29bea35d47..12343adf80 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -320,8 +320,7 @@ set(VEC_FILES if (WITH_MYSQL) set(VEC_FILES ${VEC_FILES} - exec/scan/mysql_scanner.cpp - ) + exec/scan/mysql_scanner.cpp) endif () add_library(Vec STATIC diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 8761ca380c..55babf3066 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -411,10 +411,9 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) { TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err); if (tablet == nullptr) { - std::stringstream ss; - ss << "failed to get tablet: " << tablet_id << ", reason: " << err; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + auto err_str = fmt::format("failed to get tablet: {}, reason: {}", tablet_id, err); + LOG(WARNING) << err_str; + return Status::InternalError(err_str); } std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &cond_ranges; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 57d458d770..0e418256c2 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -33,20 +33,68 @@ public: : vectorized::ScannerContext(state, parent, input_tuple_desc, output_tuple_desc, scanners, limit, max_bytes_in_blocks_queue) {} - void _update_block_queue_empty() override { _blocks_queue_empty = _blocks_queue.empty(); } + Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, + int id, bool wait = false) override { + { + std::unique_lock<std::mutex> l(_transfer_lock); + if (state->is_cancelled()) { + _process_status = Status::Cancelled("cancelled"); + } - Status get_block_from_queue(vectorized::BlockUPtr* block, bool* eos, - bool wait = false) override { - return vectorized::ScannerContext::get_block_from_queue(block, eos, false); + if (!_process_status.ok()) { + return _process_status; + } + } + + { + std::unique_lock<std::mutex> l(*_queue_mutexs[id]); + if (!_blocks_queues[id].empty()) { + *block = std::move(_blocks_queues[id].front()); + _blocks_queues[id].pop_front(); + return Status::OK(); + } else { + *eos = _is_finished || _should_stop; + } + } + return Status::OK(); } // We should make those method lock free. bool done() override { return _is_finished || _should_stop || _status_error; } - bool no_schedule() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; } - bool empty_in_queue() override { return _blocks_queue_empty; } + + void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) override { + const int queue_size = _queue_mutexs.size(); + const int block_size = blocks.size(); + for (int i = 0; i < queue_size && i < block_size; ++i) { + int queue = _next_queue_to_feed; + { + std::lock_guard<std::mutex> l(*_queue_mutexs[queue]); + for (int j = i; j < block_size; j += queue_size) { + _blocks_queues[queue].emplace_back(std::move(blocks[j])); + } + } + _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; + } + } + + bool empty_in_queue(int id) override { + std::unique_lock<std::mutex> l(*_queue_mutexs[id]); + return _blocks_queues[id].empty(); + } + + void set_max_queue_size(int max_queue_size) override { + for (int i = 0; i < max_queue_size; ++i) { + _blocks_queue_empty.emplace_back(true); + _queue_mutexs.emplace_back(new std::mutex); + _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>()); + } + } private: - std::atomic_bool _blocks_queue_empty = true; + int _next_queue_to_feed = 0; + std::vector<bool> _blocks_queue_empty; + std::vector<std::unique_ptr<std::mutex>> _queue_mutexs; + std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues; }; } // namespace pipeline } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 70a962b8ea..16d8971227 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -23,24 +23,53 @@ #include "runtime/runtime_state.h" #include "util/threadpool.h" #include "vec/core/block.h" -#include "vec/exec/scan/scanner_scheduler.h" #include "vec/exec/scan/vscan_node.h" #include "vec/exec/scan/vscanner.h" namespace doris::vectorized { +ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent, + const doris::TupleDescriptor* input_tuple_desc, + const doris::TupleDescriptor* output_tuple_desc, + const std::list<VScanner*>& scanners_, int64_t limit_, + int64_t max_bytes_in_blocks_queue_) + : _state(state_), + _parent(parent), + _input_tuple_desc(input_tuple_desc), + _output_tuple_desc(output_tuple_desc), + _process_status(Status::OK()), + _batch_size(state_->batch_size()), + limit(limit_), + _max_bytes_in_queue(max_bytes_in_blocks_queue_), + _scanner_scheduler(state_->exec_env()->scanner_scheduler()), + _scanners(scanners_) { + ctx_id = UniqueId::gen_uid().to_string(); + if (_scanners.empty()) { + _is_finished = true; + } +} + +// After init function call, should not access _parent Status ScannerContext::init() { _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; // 1. Calculate max concurrency // TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4 // should find a more reasonable value. - _max_thread_num = - std::min(config::doris_scanner_thread_pool_thread_num / 4, (int32_t)_scanners.size()); + _max_thread_num = _state->shared_scan_opt() ? config::doris_scanner_thread_pool_thread_num + : config::doris_scanner_thread_pool_thread_num / 4; + _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size()); // For select * from table limit 10; should just use one thread. if (_parent->should_run_serial()) { _max_thread_num = 1; } + _scanner_profile = _parent->_scanner_profile; + _scanner_sched_counter = _parent->_scanner_sched_counter; + _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter; + _free_blocks_memory_usage = _parent->_free_blocks_memory_usage; + _newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num; + _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage; + _scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer; // 2. Calculate how many blocks need to be preallocated. // The calculation logic is as follows: // 1. Assuming that at most M rows can be scanned in one scan(config::doris_scanner_row_num), @@ -50,8 +79,8 @@ Status ScannerContext::init() { auto doris_scanner_row_num = limit == -1 ? config::doris_scanner_row_num : std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit); - int real_block_size = limit == -1 ? _state->batch_size() - : std::min(static_cast<int64_t>(_state->batch_size()), limit); + int real_block_size = + limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit); _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; auto pre_alloc_block_count = _max_thread_num * _block_per_scanner; @@ -64,7 +93,7 @@ Status ScannerContext::init() { free_blocks_memory_usage += block->allocated_bytes(); _free_blocks.emplace_back(std::move(block)); } - _parent->_free_blocks_memory_usage->add(free_blocks_memory_usage); + _free_blocks_memory_usage->add(free_blocks_memory_usage); #ifndef BE_TEST // 3. get thread token @@ -91,20 +120,20 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block) { if (!_free_blocks.empty()) { auto block = std::move(_free_blocks.back()); _free_blocks.pop_back(); - _parent->_free_blocks_memory_usage->add(-block->allocated_bytes()); + _free_blocks_memory_usage->add(-block->allocated_bytes()); return block; } } *has_free_block = false; - COUNTER_UPDATE(_parent->_newly_create_free_blocks_num, 1); - return std::make_unique<vectorized::Block>(_real_tuple_desc->slots(), _state->batch_size(), + COUNTER_UPDATE(_newly_create_free_blocks_num, 1); + return std::make_unique<vectorized::Block>(_real_tuple_desc->slots(), _batch_size, true /*ignore invalid slots*/); } void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> block) { block->clear_column_data(); - _parent->_free_blocks_memory_usage->add(block->allocated_bytes()); + _free_blocks_memory_usage->add(block->allocated_bytes()); std::lock_guard l(_free_blocks_lock); _free_blocks.emplace_back(std::move(block)); } @@ -117,18 +146,18 @@ void ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& _blocks_queue.push_back(std::move(b)); } blocks.clear(); - _update_block_queue_empty(); _blocks_queue_added_cv.notify_one(); - _parent->_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue); + _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue); } -bool ScannerContext::empty_in_queue() { - std::unique_lock l(_transfer_lock); +bool ScannerContext::empty_in_queue(int id) { + std::unique_lock<std::mutex> l(_transfer_lock); return _blocks_queue.empty(); } -Status ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, bool* eos, bool wait) { - std::unique_lock l(_transfer_lock); +Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, + bool* eos, int id, bool wait) { + std::unique_lock<std::mutex> l(_transfer_lock); // Normally, the scanner scheduler will schedule ctx. // But when the amount of data in the blocks queue exceeds the upper limit, // the scheduler will stop scheduling. @@ -137,18 +166,18 @@ Status ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, bool* // data can be continuously fetched. if (_has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { _num_scheduling_ctx++; - _state->exec_env()->scanner_scheduler()->submit(this); + _scanner_scheduler->submit(this); } // Wait for block from queue if (wait) { - SCOPED_TIMER(_parent->_scanner_wait_batch_timer); + SCOPED_TIMER(_scanner_wait_batch_timer); while (!(!_blocks_queue.empty() || _is_finished || !_process_status.ok() || - _state->is_cancelled())) { + state->is_cancelled())) { _blocks_queue_added_cv.wait(l); } } - if (_state->is_cancelled()) { + if (state->is_cancelled()) { _process_status = Status::Cancelled("cancelled"); } @@ -159,10 +188,9 @@ Status ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, bool* if (!_blocks_queue.empty()) { *block = std::move(_blocks_queue.front()); _blocks_queue.pop_front(); - _update_block_queue_empty(); auto block_bytes = (*block)->allocated_bytes(); _cur_bytes_in_queue -= block_bytes; - _parent->_queued_blocks_memory_usage->add(-block_bytes); + _queued_blocks_memory_usage->add(-block_bytes); return Status::OK(); } else { *eos = _is_finished; @@ -181,9 +209,9 @@ bool ScannerContext::set_status_on_error(const Status& status) { return false; } -Status ScannerContext::_close_and_clear_scanners() { - std::unique_lock l(_scanners_lock); - if (_state->enable_profile()) { +Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* state) { + std::unique_lock<std::mutex> l(_scanners_lock); + if (state->enable_profile()) { std::stringstream scanner_statistics; std::stringstream scanner_rows_read; scanner_statistics << "["; @@ -207,13 +235,12 @@ Status ScannerContext::_close_and_clear_scanners() { } scanner_statistics << "]"; scanner_rows_read << "]"; - _parent->_scanner_profile->add_info_string("PerScannerRunningTime", - scanner_statistics.str()); - _parent->_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); + node->_scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); + node->_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); } // Only unfinished scanners here for (auto scanner : _scanners) { - scanner->close(_state); + scanner->close(state); // Scanners are in ObjPool in ScanNode, // so no need to delete them here. } @@ -221,8 +248,8 @@ Status ScannerContext::_close_and_clear_scanners() { return Status::OK(); } -void ScannerContext::clear_and_join() { - std::unique_lock l(_transfer_lock); +void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) { + std::unique_lock<std::mutex> l(_transfer_lock); do { if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { break; @@ -239,14 +266,10 @@ void ScannerContext::clear_and_join() { } // Must wait all running scanners stop running. // So that we can make sure to close all scanners. - _close_and_clear_scanners(); + _close_and_clear_scanners(node, state); - COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling); - COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling); _blocks_queue.clear(); _free_blocks.clear(); - - return; } bool ScannerContext::no_schedule() { @@ -273,7 +296,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { std::lock_guard l(_transfer_lock); _num_scheduling_ctx++; - auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this); + auto submit_st = _scanner_scheduler->submit(this); if (!submit_st.ok()) { _num_scheduling_ctx--; } @@ -285,11 +308,6 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { // same scanner. if (scanner->need_to_close() && scanner->set_counted_down() && (--_num_unfinished_scanners) == 0) { - // ATTN: this 2 counters will be set at close() again, which is the final values. - // But we set them here because the counter set at close() can not send to FE's profile. - // So we set them here, and the counter value may be little less than final values. - COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling); - COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling); _is_finished = true; _blocks_queue_added_cv.notify_one(); } @@ -306,7 +324,7 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru if (_has_enough_space_in_blocks_queue()) { // If there are enough space in blocks queue, // the scanner number depends on the _free_blocks numbers - std::lock_guard l(_free_blocks_lock); + std::lock_guard f(_free_blocks_lock); thread_slot_num = _free_blocks.size() / _block_per_scanner; thread_slot_num += (_free_blocks.size() % _block_per_scanner != 0); thread_slot_num = std::min(thread_slot_num, _max_thread_num - _num_running_scanners); @@ -340,7 +358,6 @@ void ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru } } } - return; } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 09500d59cf..1285467a5d 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -32,7 +32,6 @@ namespace doris { class PriorityThreadPool; class ThreadPool; class ThreadPoolToken; -class ScannerScheduler; namespace vectorized { @@ -51,35 +50,21 @@ class ScannerContext { public: ScannerContext(RuntimeState* state_, VScanNode* parent, const TupleDescriptor* input_tuple_desc, const TupleDescriptor* output_tuple_desc, const std::list<VScanner*>& scanners_, - int64_t limit_, int64_t max_bytes_in_blocks_queue_) - : _state(state_), - _parent(parent), - _input_tuple_desc(input_tuple_desc), - _output_tuple_desc(output_tuple_desc), - _process_status(Status::OK()), - limit(limit_), - _max_bytes_in_queue(max_bytes_in_blocks_queue_), - _scanners(scanners_) { - ctx_id = UniqueId::gen_uid().to_string(); - if (_scanners.empty()) { - _is_finished = true; - } - } + int64_t limit_, int64_t max_bytes_in_blocks_queue_); virtual ~ScannerContext() = default; - Status init(); vectorized::BlockUPtr get_free_block(bool* has_free_block); void return_free_block(std::unique_ptr<vectorized::Block> block); // Append blocks from scanners to the blocks queue. - void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks); - + virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks); // Get next block from blocks queue. Called by ScanNode // Set eos to true if there is no more data to read. // And if eos is true, the block returned must be nullptr. - virtual Status get_block_from_queue(vectorized::BlockUPtr* block, bool* eos, bool wait = true); + virtual Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, + bool* eos, int id, bool wait = true); // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. @@ -121,7 +106,7 @@ public: void get_next_batch_of_scanners(std::list<VScanner*>* current_run); - void clear_and_join(); + void clear_and_join(VScanNode* node, RuntimeState* state); virtual bool no_schedule(); @@ -129,12 +114,14 @@ public: RuntimeState* state() { return _state; } - void incr_num_ctx_scheduling(int64_t num) { _num_ctx_scheduling += num; } - void incr_num_scanner_scheduling(int64_t num) { _num_scanner_scheduling += num; } + void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); } + void incr_num_scanner_scheduling(int64_t num) { _scanner_sched_counter->update(num); } VScanNode* parent() { return _parent; } - virtual bool empty_in_queue(); + virtual bool empty_in_queue(int id); + + virtual void set_max_queue_size(int max_queue_size) {}; // the unique id of this context std::string ctx_id; @@ -143,15 +130,12 @@ public: std::vector<bthread_t> _btids; private: - Status _close_and_clear_scanners(); + Status _close_and_clear_scanners(VScanNode* node, RuntimeState* state); inline bool _has_enough_space_in_blocks_queue() const { return _cur_bytes_in_queue < _max_bytes_in_queue / 2; } - // do nothing here, we only do update on pip_scanner_context - virtual void _update_block_queue_empty() {} - protected: RuntimeState* _state; VScanNode* _parent; @@ -198,6 +182,7 @@ protected: doris::Mutex _free_blocks_lock; std::vector<vectorized::BlockUPtr> _free_blocks; + int _batch_size; // The limit from SQL's limit clause int64_t limit; @@ -221,6 +206,7 @@ protected: // The max limit bytes of blocks in blocks queue int64_t _max_bytes_in_queue; + doris::vectorized::ScannerScheduler* _scanner_scheduler; // List "scanners" saves all "unfinished" scanners. // The scanner scheduler will pop scanners from this list, run scanner, // and then if the scanner is not finished, will be pushed back to this list. @@ -230,8 +216,13 @@ protected: std::vector<int64_t> _finished_scanner_runtime; std::vector<int64_t> _finished_scanner_rows_read; - int64_t _num_ctx_scheduling = 0; - int64_t _num_scanner_scheduling = 0; + std::shared_ptr<RuntimeProfile> _scanner_profile; + RuntimeProfile::Counter* _scanner_sched_counter = nullptr; + RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr; + RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr; + RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = nullptr; + RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; + RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index e7b46ca659..f019df4b60 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -66,6 +66,8 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); _state = state; + _is_pipeline_scan = state->enable_pipeline_exec(); + _shared_scan_opt = state->shared_scan_opt(); const TQueryOptions& query_options = state->query_options(); if (query_options.__isset.max_scan_key_num) { @@ -92,6 +94,21 @@ Status VScanNode::prepare(RuntimeState* state) { for (auto& rf_ctx : _runtime_filter_ctxs) { rf_ctx.runtime_filter->init_profile(_runtime_profile.get()); } + + if (_is_pipeline_scan) { + if (_shared_scan_opt) { + _shared_scanner_controller = + state->get_query_fragments_ctx()->get_shared_scanner_controller(); + auto [should_create_scanner, queue_id] = + _shared_scanner_controller->should_build_scanner_and_queue_id(id()); + _should_create_scanner = should_create_scanner; + _context_queue_id = queue_id; + } else { + _should_create_scanner = true; + _context_queue_id = 0; + } + } + return Status::OK(); } @@ -113,18 +130,37 @@ Status VScanNode::alloc_resource(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::alloc_resource(state)); RETURN_IF_ERROR(_acquire_runtime_filter()); RETURN_IF_ERROR(_process_conjuncts()); - if (_eos) { - return Status::OK(); - } - std::list<VScanner*> scanners; - RETURN_IF_ERROR(_init_scanners(&scanners)); - if (scanners.empty()) { - _eos = true; + if (_is_pipeline_scan) { + if (_should_create_scanner) { + auto status = !_eos ? _prepare_scanners() : Status::OK(); + if (_scanner_ctx) { + DCHECK(!_eos && _num_scanners->value() > 0); + _scanner_ctx->set_max_queue_size( + _shared_scan_opt ? std::max(state->query_parallel_instance_num(), 1) : 1); + RETURN_IF_ERROR( + _state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); + } + if (_shared_scan_opt) { + _shared_scanner_controller->set_scanner_context(id(), + _eos ? nullptr : _scanner_ctx); + } + RETURN_IF_ERROR(status); + } else if (_shared_scanner_controller->scanner_context_is_ready(id())) { + _scanner_ctx = _shared_scanner_controller->get_scanner_context(id()); + if (!_scanner_ctx) { + _eos = true; + } + } else { + return Status::WaitForScannerContext("Need wait for scanner context create"); + } } else { - COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size())); - RETURN_IF_ERROR(_start_scanners(scanners)); + RETURN_IF_ERROR(!_eos ? _prepare_scanners() : Status::OK()); + if (_scanner_ctx) { + RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); + } } + RETURN_IF_CANCELLED(state); _opened = true; return Status::OK(); @@ -163,7 +199,7 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* } vectorized::BlockUPtr scan_block = nullptr; - RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(&scan_block, eos)); + RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block, eos, _context_queue_id)); if (*eos) { DCHECK(scan_block == nullptr); return Status::OK(); @@ -184,12 +220,6 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* Status VScanNode::_init_profile() { // 1. counters for scan node - auto* memory_usage = _runtime_profile->create_child("MemoryUsage", true, true); - _runtime_profile->add_child(memory_usage, false, nullptr); - _queued_blocks_memory_usage = - memory_usage->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES); - _free_blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES); - _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT); _total_throughput_counter = runtime_profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); @@ -201,30 +231,36 @@ Status VScanNode::_init_profile() { _scanner_profile.reset(new RuntimeProfile("VScanner")); runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); + auto* memory_usage = _scanner_profile->create_child("MemoryUsage", true, true); + _runtime_profile->add_child(memory_usage, false, nullptr); + _queued_blocks_memory_usage = + memory_usage->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES); + _free_blocks_memory_usage = memory_usage->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES); + _newly_create_free_blocks_num = + ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); + // time of transfer thread to wait for block from scan thread + _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime"); + _scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT); + _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT); + _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime"); _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime"); _prefilter_timer = ADD_TIMER(_scanner_profile, "ScannerPrefilterTime"); _convert_block_timer = ADD_TIMER(_scanner_profile, "ScannerConvertBlockTime"); _filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime"); - _scanner_sched_counter = ADD_COUNTER(_runtime_profile, "ScannerSchedCount", TUnit::UNIT); - _scanner_ctx_sched_counter = ADD_COUNTER(_runtime_profile, "ScannerCtxSchedCount", TUnit::UNIT); - // time of transfer thread to wait for block from scan thread - _scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, "ScannerBatchWaitTime"); // time of scan thread to wait for worker thread of the thread pool _scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime"); _pre_alloc_free_blocks_num = ADD_COUNTER(_runtime_profile, "PreAllocFreeBlocksNum", TUnit::UNIT); - _newly_create_free_blocks_num = - ADD_COUNTER(_runtime_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); return Status::OK(); } Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) { - if (_state->enable_pipeline_exec()) { + if (_is_pipeline_scan) { _scanner_ctx.reset(new pipeline::PipScannerContext(_state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(), _state->query_options().mem_limit / 20)); @@ -234,7 +270,6 @@ Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) { _state->query_options().mem_limit / 20)); } RETURN_IF_ERROR(_scanner_ctx->init()); - RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); return Status::OK(); } @@ -374,10 +409,12 @@ Status VScanNode::close(RuntimeState* state) { void VScanNode::release_resource(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::release_resource"); if (_scanner_ctx.get()) { - // stop and wait the scanner scheduler to be done - // _scanner_ctx may not be created for some short circuit case. - _scanner_ctx->set_should_stop(); - _scanner_ctx->clear_and_join(); + if (!state->enable_pipeline_exec() || _should_create_scanner) { + // stop and wait the scanner scheduler to be done + // _scanner_ctx may not be created for some short circuit case. + _scanner_ctx->set_should_stop(); + _scanner_ctx->clear_and_join(this, state); + } } for (auto& ctx : _runtime_filter_ctxs) { @@ -1318,4 +1355,16 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* return PushDownType::ACCEPTABLE; } +Status VScanNode::_prepare_scanners() { + std::list<VScanner*> scanners; + RETURN_IF_ERROR(_init_scanners(&scanners)); + if (scanners.empty()) { + _eos = true; + } else { + COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size())); + RETURN_IF_ERROR(_start_scanners(scanners)); + } + + return Status::OK(); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 23b1e59b51..51dc2edb1b 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -103,6 +103,8 @@ public: Status try_close(); bool should_run_serial() const { return _should_run_serial; } + bool ready_to_open() { return _shared_scanner_controller->scanner_context_is_ready(id()); } + bool ready_to_read() { return !_scanner_ctx->empty_in_queue(_context_queue_id); } enum class PushDownType { // The predicate can not be pushed down to data source @@ -178,8 +180,12 @@ protected: // Only predicate on key column can be pushed down. virtual bool _is_key_column(const std::string& col_name) { return false; } + Status _prepare_scanners(); + protected: RuntimeState* _state; + bool _is_pipeline_scan = false; + bool _shared_scan_opt = false; // For load scan node, there should be both input and output tuple descriptor. // For query scan node, there is only output_tuple_desc. TupleId _input_tuple_id = -1; @@ -267,7 +273,11 @@ protected: int64_t _limit_per_scanner = -1; protected: - std::unique_ptr<RuntimeProfile> _scanner_profile; + std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller; + bool _should_create_scanner = false; + int _context_queue_id = -1; + + std::shared_ptr<RuntimeProfile> _scanner_profile; // rows read from the scanner (including those discarded by (pre)filters) RuntimeProfile::Counter* _rows_read_counter; diff --git a/be/src/vec/functions/function_timestamp.cpp b/be/src/vec/functions/function_timestamp.cpp index 4ec1b30cc1..0ba8ec5f84 100644 --- a/be/src/vec/functions/function_timestamp.cpp +++ b/be/src/vec/functions/function_timestamp.cpp @@ -417,7 +417,7 @@ struct UnixTimeStampDateImpl { static Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count) { - const ColumnPtr col_source = block.get_by_position(arguments[0]).column; + const ColumnPtr& col_source = block.get_by_position(arguments[0]).column; auto col_result = ColumnVector<Int32>::create(); auto null_map = ColumnVector<UInt8>::create(); auto& col_result_data = col_result->get_data(); diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index a6cf99edca..4c579f1d91 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -85,4 +85,4 @@ private: }; } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/runtime/shared_scanner_controller.h b/be/src/vec/runtime/shared_scanner_controller.h new file mode 100644 index 0000000000..5fb2244c82 --- /dev/null +++ b/be/src/vec/runtime/shared_scanner_controller.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <map> +#include <memory> +#include <mutex> +#include <vector> + +#include "vec/core/block.h" +#include "vec/exec/scan/scanner_context.h" + +namespace doris::vectorized { + +class SharedScannerController { +public: + std::pair<bool, int> should_build_scanner_and_queue_id(int my_node_id) { + std::lock_guard<std::mutex> lock(_mutex); + auto it = _scanner_parallel.find(my_node_id); + + if (it == _scanner_parallel.cend()) { + _scanner_parallel.insert({my_node_id, 0}); + return {true, 0}; + } else { + auto queue_id = it->second; + _scanner_parallel[my_node_id] = queue_id + 1; + return {false, queue_id + 1}; + } + } + + void set_scanner_context(int my_node_id, + const std::shared_ptr<ScannerContext> scanner_context) { + std::lock_guard<std::mutex> lock(_mutex); + _scanner_context.insert({my_node_id, scanner_context}); + } + + bool scanner_context_is_ready(int my_node_id) { + std::lock_guard<std::mutex> lock(_mutex); + return _scanner_context.find(my_node_id) != _scanner_context.end(); + } + + std::shared_ptr<ScannerContext> get_scanner_context(int my_node_id) { + std::lock_guard<std::mutex> lock(_mutex); + return _scanner_context[my_node_id]; + } + +private: + std::mutex _mutex; + std::map<int /*node id*/, int /*parallel*/> _scanner_parallel; + std::map<int /*node id*/, std::shared_ptr<ScannerContext>> _scanner_context; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 4623ba311e..467bdc004c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggrega import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.JoinUtils; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -218,7 +219,9 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, @Override public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanContext context) { // TODO: find a better way to handle both tablet num == 1 and colocate table together in future - if (!olapScan.getTable().isColocateTable() && olapScan.getScanTabletNum() == 1) { + if (!olapScan.getTable().isColocateTable() && olapScan.getScanTabletNum() == 1 + && (!ConnectContext.get().getSessionVariable().enablePipelineEngine() + || ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() == 1)) { return PhysicalProperties.GATHER; } else if (olapScan.getDistributionSpec() instanceof DistributionSpecHash) { return PhysicalProperties.createHash((DistributionSpecHash) olapScan.getDistributionSpec()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index d53697a338..1821b55829 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1118,6 +1118,9 @@ public class OlapScanNode extends ScanNode { @Override public int getNumInstances() { + if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + } return result.size(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index d35bcec8c0..18b7ae9cab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1622,6 +1622,7 @@ public class Coordinator { bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(), parallelExecInstanceNum, params); } else { + params.sharedScanOpt = true; // case A for (Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : fragmentExecParamsMap.get( fragment.getFragmentId()).scanRangeAssignment.entrySet()) { @@ -1630,13 +1631,22 @@ public class Coordinator { for (Integer planNodeId : value.keySet()) { List<TScanRangeParams> perNodeScanRanges = value.get(planNodeId); - int expectedInstanceNum = 1; - if (parallelExecInstanceNum > 1) { - //the scan instance num should not larger than the tablets num - expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum); + List<List<TScanRangeParams>> perInstanceScanRanges = Lists.newArrayList(); + if (!enablePipelineEngine) { + int expectedInstanceNum = 1; + if (parallelExecInstanceNum > 1) { + //the scan instance num should not larger than the tablets num + expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum); + } + perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, + expectedInstanceNum); + } else { + int expectedInstanceNum = Math.min(parallelExecInstanceNum, + leftMostNode.getNumInstances()); + for (int j = 0; j < Math.max(expectedInstanceNum, 1); j++) { + perInstanceScanRanges.add(perNodeScanRanges); + } } - List<List<TScanRangeParams>> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, - expectedInstanceNum); LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size()); @@ -3034,6 +3044,8 @@ public class Coordinator { public List<FInstanceExecParam> instanceExecParams = Lists.newArrayList(); public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment(); + public boolean sharedScanOpt = false; + public FragmentExecParams(PlanFragment fragment) { this.fragment = fragment; } @@ -3125,6 +3137,7 @@ public class Coordinator { fragment.isTransferQueryStatisticsWithEveryBatch()); params.setFragment(fragment.toThrift()); params.setLocalParams(Lists.newArrayList()); + params.setSharedScanOpt(sharedScanOpt); res.put(instanceExecParam.host, params); } TPipelineFragmentParams params = res.get(instanceExecParam.host); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ad471791ea..8dfd08a05e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1593,6 +1593,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setCodegenLevel(codegenLevel); tResult.setBeExecVersion(Config.be_exec_version); tResult.setEnablePipelineEngine(enablePipelineEngine); + tResult.setParallelInstance(parallelExecInstanceNum); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index cafcf1044d..d799b28945 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -201,9 +201,12 @@ struct TQueryOptions { // For debug purpose, skip delete bitmap when reading data 63: optional bool skip_delete_bitmap = false + 64: optional bool dry_run_query = false 65: optional bool enable_common_expr_pushdown = false; + + 66: optional i32 parallel_instance = 1 } @@ -598,6 +601,7 @@ struct TPipelineFragmentParams { 22: optional TGlobalDict global_dict // scan node could use the global dict to encode the string value to an integer 23: optional Planner.TPlanFragment fragment 24: list<TPipelineInstanceParams> local_params + 25: optional bool shared_scan_opt = false; } struct TPipelineFragmentParamsList { diff --git a/regression-test/data/nereids_syntax_p0/grouping_sets.out b/regression-test/data/nereids_syntax_p0/grouping_sets.out index 6c18f54e2b..bbc2997e3c 100644 --- a/regression-test/data/nereids_syntax_p0/grouping_sets.out +++ b/regression-test/data/nereids_syntax_p0/grouping_sets.out @@ -159,16 +159,16 @@ 4 3 18 -- !select3 -- -\N \N 24 \N \N 6 +\N \N 24 \N 1 1 \N 2 3 \N 3 4 \N 4 2 \N 6 5 \N 9 3 -1 \N 24 1 \N 6 +1 \N 24 1 1 1 1 2 3 1 3 4 @@ -216,13 +216,13 @@ -- !select7 -- 1 -2 -3 -4 1 2 +2 +3 3 4 +4 -- !select1 -- a 1 @@ -258,3 +258,4 @@ all 1 2 1 2 1 2 2 + diff --git a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out index 80640f7372..b7e57d2613 100644 --- a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out +++ b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out @@ -167,23 +167,23 @@ 24 4 -- !in_subquery_with_order -- -1 3 1 2 +1 3 2 5 -3 3 20 2 22 3 24 4 +3 3 -- !exists_subquery_with_order -- -1 3 1 2 +1 3 2 4 -3 4 -3 3 20 2 22 3 24 4 +3 3 +3 4 -- !scalar_subquery_with_limit -- 20 2 @@ -200,13 +200,13 @@ 20 -- !case_when_subquery -- -4.0 -4.0 20.0 20.0 20.0 20.0 20.0 +4.0 +4.0 -- !in -- 1 2 @@ -244,106 +244,106 @@ 3 4 -- !hash_join_with_other_conjuncts1 -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !hash_join_with_other_conjuncts2 -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !hash_join_with_other_conjuncts3 -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !hash_join_with_other_conjuncts4 -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !same_subquery_in_conjuncts -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !two_subquery_in_one_conjuncts -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !multi_subquery_in_and_scalry -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !multi_subquery_in_and_exist -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !multi_subquery_in_and_exist_sum -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 -3 3 +2 5 20 2 22 3 24 4 +3 3 +3 4 -- !multi_subquery_in_and_in -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !multi_subquery_scalar_and_exist -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 -3 3 +2 5 20 2 22 3 24 4 +3 3 +3 4 -- !multi_subquery_scalar_and_scalar -- -1 3 1 2 -2 5 +1 3 2 4 -3 4 +2 5 3 3 +3 4 -- !multi_subquery_in_first_or_in_and_in -- 3 3 diff --git a/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out b/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out index 2b66c921cb..91b55d0c89 100644 --- a/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out +++ b/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out @@ -1,21 +1,21 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !exists_subquery_with_limit -- -1 3 1 2 +1 3 2 4 -3 4 -3 3 20 2 22 3 24 4 +3 3 +3 4 -- !exists_subquery_with_order_and_limit -- -1 3 1 2 +1 3 2 4 -3 4 -3 3 20 2 22 3 24 4 +3 3 +3 4 diff --git a/regression-test/suites/correctness_p0/test_colocate_join.groovy b/regression-test/suites/correctness_p0/test_colocate_join.groovy index 45e4e57bd3..e8bd7206b8 100644 --- a/regression-test/suites/correctness_p0/test_colocate_join.groovy +++ b/regression-test/suites/correctness_p0/test_colocate_join.groovy @@ -161,7 +161,7 @@ suite("test_colocate_join") { (20220101, 101, 202, 200, 100);""" explain { - sql("select " + + sql("select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ " + " sum_col1,sum_col2 " + "from " + "(select datekey,sum(sum_col1) as sum_col1 from test_query_colocate where datekey=20220101 group by datekey) t1 " + diff --git a/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy b/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy index dd1eb207b4..cfbef017db 100644 --- a/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy +++ b/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy @@ -59,4 +59,4 @@ PROPERTIES ( sql "drop table if exists ${table1}" -} \ No newline at end of file +} diff --git a/regression-test/suites/nereids_function_p0/gen_function/gen.groovy b/regression-test/suites/nereids_function_p0/gen_function/gen.groovy index 22b97ff220..4ca054ab25 100644 --- a/regression-test/suites/nereids_function_p0/gen_function/gen.groovy +++ b/regression-test/suites/nereids_function_p0/gen_function/gen.groovy @@ -49,4 +49,4 @@ suite("nereids_gen_fn") { qt_sql_explode_split_outer_Varchar_Varchar_notnull ''' select id, e from fn_test lateral view explode_split_outer('a, b, c, d', ',') lv as e order by id, e''' -} \ No newline at end of file +} diff --git a/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy b/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy index 8e6cc6e5c7..d5298d154e 100644 --- a/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy +++ b/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy @@ -121,7 +121,7 @@ suite("test_nereids_grouping_sets") { rollup(k1_, k2) order by k1_, k2 """ - qt_select3 "select 1 as k, k3, sum(k1) from groupingSetsTable group by cube(k, k3) order by k, k3" + qt_select3 "select 1 as k, k3, sum(k1) as sum_k1 from groupingSetsTable group by cube(k, k3) order by k, k3, sum_k1" qt_select4 """ select k2, concat(k5, k6) as k_concat, sum(k1) from groupingSetsTable group by @@ -230,7 +230,7 @@ suite("test_nereids_grouping_sets") { from grouping_sum_table ) T - ) T2; + ) T2 order by a; """ order_qt_select1 """ diff --git a/regression-test/suites/nereids_syntax_p0/set_operation.groovy b/regression-test/suites/nereids_syntax_p0/set_operation.groovy index 2e06f2aa78..c6838f9986 100644 --- a/regression-test/suites/nereids_syntax_p0/set_operation.groovy +++ b/regression-test/suites/nereids_syntax_p0/set_operation.groovy @@ -227,7 +227,7 @@ suite("test_nereids_set_operation") { } qt_union39 """(select k1 from setOperationTable order by k1) union all (select k1 from setOperationTableNotNullable order by k1) order by k1;""" - qt_union40 """ + order_qt_union40 """ SELECT k1 FROM setOperationTable WHERE k2 = 2 INTERSECT SELECT k1 FROM setOperationTable WHERE k1 = 1 @@ -235,7 +235,7 @@ suite("test_nereids_set_operation") { SELECT k1 FROM setOperationTable WHERE k3 = 2 """ - qt_union41 """ + order_qt_union41 """ SELECT k1 FROM setOperationTable WHERE k2 = 1 EXCEPT SELECT k1 FROM setOperationTable WHERE k3 = 2 @@ -245,7 +245,7 @@ suite("test_nereids_set_operation") { SELECT k1 FROM setOperationTable WHERE k2 > 0) """ - qt_union42 """ + order_qt_union42 """ SELECT k1 FROM setOperationTable WHERE k2 = 1 EXCEPT SELECT k1 FROM setOperationTable WHERE k3 = 2 diff --git a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy index 637e53223e..1d153405ea 100644 --- a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy +++ b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy @@ -236,35 +236,35 @@ suite ("sub_query_correlated") { """*/ //----------subquery with order---------- - qt_scalar_subquery_with_order """ + order_qt_scalar_subquery_with_order """ select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 > (select sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by a); """ - qt_in_subquery_with_order """ + order_qt_in_subquery_with_order """ select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 not in (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2); """ - qt_exists_subquery_with_order """ + order_qt_exists_subquery_with_order """ select * from sub_query_correlated_subquery1 where exists (select sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by k2); """ //----------subquery with limit---------- - qt_scalar_subquery_with_limit """ + order_qt_scalar_subquery_with_limit """ select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 > (select sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 limit 1); """ //----------subquery with order and limit---------- - qt_scalar_subquery_with_order_and_limit """ + order_qt_scalar_subquery_with_order_and_limit """ select * from sub_query_correlated_subquery1 where sub_query_correlated_subquery1.k1 > (select sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3 where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by a limit 1); """ //---------subquery with Disjunctions------------- - qt_scalar_subquery_with_disjunctions """ + order_qt_scalar_subquery_with_disjunctions """ SELECT DISTINCT k1 FROM sub_query_correlated_subquery1 i1 WHERE ((SELECT count(*) FROM sub_query_correlated_subquery1 WHERE ((k1 = i1.k1) AND (k2 = 2)) or ((k1 = i1.k1) AND (k2 = 1)) ) > 0); """ //--------subquery case when----------- - qt_case_when_subquery """ + order_qt_case_when_subquery """ SELECT CASE WHEN ( SELECT COUNT(*) / 2 @@ -298,86 +298,86 @@ suite ("sub_query_correlated") { SELECT * FROM sub_query_correlated_subquery1 WHERE EXISTS (SELECT k1 FROM sub_query_correlated_subquery3 WHERE k1 > 10) OR k1 < 10; """ - qt_hash_join_with_other_conjuncts1 """ + order_qt_hash_join_with_other_conjuncts1 """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 > sub_query_correlated_subquery3.k3) OR k1 < 10 ORDER BY k1; """ - qt_hash_join_with_other_conjuncts2 """ + order_qt_hash_join_with_other_conjuncts2 """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 < sub_query_correlated_subquery3.k3) OR k1 < 10 ORDER BY k1; """ - qt_hash_join_with_other_conjuncts3 """ + order_qt_hash_join_with_other_conjuncts3 """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 > sub_query_correlated_subquery3.k3) OR k1 < 11 ORDER BY k1; """ - qt_hash_join_with_other_conjuncts4 """ + order_qt_hash_join_with_other_conjuncts4 """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 < sub_query_correlated_subquery3.k3) OR k1 < 11 ORDER BY k1; """ - qt_same_subquery_in_conjuncts """ + order_qt_same_subquery_in_conjuncts """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3) OR k1 IN (SELECT k1 FROM sub_query_correlated_subquery3) OR k1 < 10 ORDER BY k1; """ - qt_two_subquery_in_one_conjuncts """ + order_qt_two_subquery_in_one_conjuncts """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3) OR k1 IN (SELECT k3 FROM sub_query_correlated_subquery3) OR k1 < 10 ORDER BY k1; """ - qt_multi_subquery_in_and_scalry """ + order_qt_multi_subquery_in_and_scalry """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) OR k1 < (SELECT sum(k1) FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.v1) OR k1 < 10 ORDER BY k1; """ - qt_multi_subquery_in_and_exist """ + order_qt_multi_subquery_in_and_exist """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) OR exists (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.v1) OR k1 < 10 ORDER BY k1; """ - qt_multi_subquery_in_and_exist_sum """ + order_qt_multi_subquery_in_and_exist_sum """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) OR exists (SELECT sum(k1) FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.v1) OR k1 < 10 ORDER BY k1; """ - qt_multi_subquery_in_and_in """ + order_qt_multi_subquery_in_and_in """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) OR k2 in (SELECT k2 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.v1) OR k1 < 10 ORDER BY k1; """ - qt_multi_subquery_scalar_and_exist """ + order_qt_multi_subquery_scalar_and_exist """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 < (SELECT sum(k1) FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) OR exists (SELECT sum(k1) FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.v1) OR k1 < 10 ORDER BY k1; """ - qt_multi_subquery_scalar_and_scalar """ + order_qt_multi_subquery_scalar_and_scalar """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 < (SELECT sum(k1) FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) OR k2 < (SELECT sum(k1) FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.v1) OR k1 < 10 ORDER BY k1; """ - qt_multi_subquery_in_first_or_in_and_in """ + order_qt_multi_subquery_in_first_or_in_and_in """ SELECT * FROM sub_query_correlated_subquery1 WHERE (k1 in (SELECT k2 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) or k2 in (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1)) and k1 in (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) """ - qt_multi_subquery_in_second_or_in_and_in """ + order_qt_multi_subquery_in_second_or_in_and_in """ SELECT * FROM sub_query_correlated_subquery1 WHERE k1 in (SELECT k2 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) or k2 in (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) and k1 in (SELECT k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1) """ - qt_multi_subquery_scalar_and_in_or_scalar_and_exists_agg """ + order_qt_multi_subquery_scalar_and_in_or_scalar_and_exists_agg """ SELECT * FROM sub_query_correlated_subquery1 WHERE ((k1 != (SELECT sum(k1) FROM sub_query_correlated_subquery3) and k1 = 1 OR k1 < 10) and k1 = 10 and k1 = 15) and (k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.k1) OR k1 < (SELECT sum(k1) FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.k1)) and exists (SELECT sum(k1) FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.k1); """ - qt_multi_subquery_scalar_and_in_or_scalar_and_exists """ + order_qt_multi_subquery_scalar_and_in_or_scalar_and_exists """ SELECT * FROM sub_query_correlated_subquery1 WHERE ((k1 != (SELECT sum(k1) FROM sub_query_correlated_subquery3) and k1 = 1 OR k1 < 10) and k1 = 10 and k1 = 15) and (k1 IN (SELECT k1 FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.k1) OR k1 < (SELECT sum(k1) FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = sub_query_correlated_subquery3.k1)) diff --git a/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy b/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy index 4ee579f839..6d5bc11a5a 100644 --- a/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy +++ b/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy @@ -159,7 +159,7 @@ suite ("sub_query_diff_old_optimize") { """*/ //----------subquery with limit---------- - qt_exists_subquery_with_limit """ + order_qt_exists_subquery_with_limit """ select * from sub_query_diff_old_optimize_subquery1 where exists (select sub_query_diff_old_optimize_subquery3.k3 from sub_query_diff_old_optimize_subquery3 where sub_query_diff_old_optimize_subquery3.v2 = sub_query_diff_old_optimize_subquery1.k2 limit 1); """ @@ -172,7 +172,7 @@ suite ("sub_query_diff_old_optimize") { } //----------subquery with order and limit------- - qt_exists_subquery_with_order_and_limit """ + order_qt_exists_subquery_with_order_and_limit """ select * from sub_query_diff_old_optimize_subquery1 where exists (select sub_query_diff_old_optimize_subquery3.k3 from sub_query_diff_old_optimize_subquery3 where sub_query_diff_old_optimize_subquery3.v2 = sub_query_diff_old_optimize_subquery1.k2 order by k1 limit 1); """ diff --git a/regression-test/suites/performance_p0/redundant_conjuncts.groovy b/regression-test/suites/performance_p0/redundant_conjuncts.groovy index 86035612c3..4027c02aaf 100644 --- a/regression-test/suites/performance_p0/redundant_conjuncts.groovy +++ b/regression-test/suites/performance_p0/redundant_conjuncts.groovy @@ -32,10 +32,10 @@ suite("redundant_conjuncts") { """ qt_redundant_conjuncts """ - EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2) */ v1 FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1; + EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2, parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1; """ qt_redundant_conjuncts_gnerated_by_extract_common_filter """ - EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100) */ v1 FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2; + EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100, parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2; """ } diff --git a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy index eadab28672..87becfe020 100644 --- a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy +++ b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy @@ -73,15 +73,15 @@ suite("test_query_limit", "query,p0") { qt_limit16 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 2, 2" qt_limit17 "select * from (select * from ${tableName} order by k1, k2, k3, k4 limit 1, 2) a limit 2, 3" test { - sql "select * from ${tableName} limit 1, 10" + sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from ${tableName} limit 1, 10" rowNum 2 } test { - sql "select * from ${tableName} limit 2, 10" + sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from ${tableName} limit 2, 10" rowNum 1 } test { - sql "select * from ${tableName} limit 3, 10" + sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from ${tableName} limit 3, 10" rowNum 0 } } diff --git a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy index 0e517f1d7e..181f597849 100644 --- a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy +++ b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy @@ -123,7 +123,7 @@ suite("test_window_fn") { select first_value(salary) over(order by enroll_date range between unbounded preceding and UNBOUNDED following), last_value(salary) over(order by enroll_date range between unbounded preceding and UNBOUNDED following), salary, enroll_date from ${tbName1} order by salary, enroll_date; """ qt_sql """ - SELECT first_value(ten) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10; + SELECT first_value(ten) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten; """ qt_sql """ SELECT first_value(unique1) over (order by four range between current row and unbounded following), @@ -190,7 +190,7 @@ suite("test_window_fn") { """ qt_sql """ SELECT depname, empno, salary, rank() OVER (PARTITION BY depname ORDER BY salary, empno) - FROM ${tbName1} ORDER BY rank() OVER (PARTITION BY depname ORDER BY salary, empno); + FROM ${tbName1} ORDER BY rank() OVER (PARTITION BY depname ORDER BY salary, empno), depname; """ qt_sql """ SELECT sum(salary) as s, row_number() OVER (ORDER BY depname) as r, sum(sum(salary)) OVER (ORDER BY depname DESC) as ss @@ -207,10 +207,10 @@ suite("test_window_fn") { SELECT row_number() OVER (ORDER BY unique2) FROM ${tbName2} WHERE unique2 < 10; """ qt_sql """ - SELECT rank() OVER (PARTITION BY four ORDER BY ten) AS rank_1, ten, four FROM ${tbName2} WHERE unique2 < 10; + SELECT rank() OVER (PARTITION BY four ORDER BY ten) AS rank_1, ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten; """ qt_sql """ - SELECT dense_rank() OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10; + SELECT dense_rank() OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten; """ qt_sql """ select ten, sum(unique1) + sum(unique2) as res, rank() over (order by sum(unique1) + sum(unique2)) as rank from ${tbName2} group by ten order by ten; @@ -256,7 +256,7 @@ suite("test_window_fn") { SELECT count(1) OVER (PARTITION BY four) as c, four FROM (SELECT * FROM ${tbName2} WHERE two = 1)s WHERE unique2 < 10 order by c, four; """ qt_sql """ - SELECT avg(four) OVER (PARTITION BY four ORDER BY thousand / 100) FROM ${tbName2} WHERE unique2 < 10; + SELECT avg(four) OVER (PARTITION BY four ORDER BY thousand / 100) FROM ${tbName2} WHERE unique2 < 10 order by four; """ qt_sql """ SELECT count(1) OVER (PARTITION BY four) FROM (SELECT * FROM ${tbName2} WHERE FALSE)s; @@ -278,19 +278,19 @@ suite("test_window_fn") { // lag qt_sql """ - SELECT lag(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10; + SELECT lag(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten; """ // lead qt_sql """ - SELECT lead(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10; + SELECT lead(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten; """ qt_sql """ - SELECT lead(ten * 2, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10; + SELECT lead(ten * 2, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten; """ qt_sql """ - SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10; + SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten), ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten; """ @@ -353,7 +353,7 @@ suite("test_window_fn") { qt_sql_window_last_value """ select u_id, u_city, u_salary, last_value(u_salary) over (partition by u_city order by u_id rows between unbounded preceding and 1 preceding) last_value_test - from example_window_tb; + from example_window_tb order by u_id; """ sql "DROP TABLE IF EXISTS example_window_tb;" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org