This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 459f75073fb [pipelineX](dependency) remove OrDependency (#27242) 459f75073fb is described below commit 459f75073fba512232769362a4172f12a12923d9 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Nov 20 13:05:34 2023 +0800 [pipelineX](dependency) remove OrDependency (#27242) --- be/src/pipeline/exec/es_scan_operator.cpp | 4 +- be/src/pipeline/exec/file_scan_operator.cpp | 4 +- be/src/pipeline/exec/meta_scan_operator.cpp | 2 +- be/src/pipeline/exec/olap_scan_operator.cpp | 6 +-- be/src/pipeline/exec/scan_operator.cpp | 47 ++++++--------------- be/src/pipeline/exec/scan_operator.h | 63 +++++++++++++++++----------- be/src/pipeline/pipeline_x/dependency.cpp | 33 --------------- be/src/pipeline/pipeline_x/dependency.h | 35 ++-------------- be/src/pipeline/pipeline_x/pipeline_x_task.h | 4 -- be/src/vec/exec/scan/pip_scanner_context.h | 26 ++++-------- be/src/vec/exec/scan/scanner_context.cpp | 4 +- be/src/vec/exec/scan/scanner_context.h | 14 +++---- 12 files changed, 80 insertions(+), 162 deletions(-) diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index 9b41155a22b..b9112917954 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -55,7 +55,7 @@ Status EsScanLocalState::_init_profile() { Status EsScanLocalState::_process_conjuncts() { RETURN_IF_ERROR(Base::_process_conjuncts()); - if (Base::_eos_dependency->read_blocked_by() == nullptr) { + if (Base::_scan_dependency->eos()) { return Status::OK(); } @@ -66,7 +66,7 @@ Status EsScanLocalState::_process_conjuncts() { Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { if (_scan_ranges.empty()) { - Base::_eos_dependency->set_ready_for_read(); + Base::_scan_dependency->set_eos(); return Status::OK(); } diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 019f5813a42..369ad607c6f 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -32,7 +32,7 @@ namespace doris::pipeline { Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { if (_scan_ranges.empty()) { - Base::_eos_dependency->set_ready_for_read(); + Base::_scan_dependency->set_eos(); return Status::OK(); } @@ -95,7 +95,7 @@ Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { Status FileScanLocalState::_process_conjuncts() { RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts()); - if (Base::_eos_dependency->read_blocked_by() == nullptr) { + if (Base::_scan_dependency->eos()) { return Status::OK(); } // TODO: Push conjuncts down to reader. diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index c2f9bc7e428..2de19bb2ced 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -22,7 +22,7 @@ namespace doris::pipeline { Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { - if (Base::_eos_dependency->read_blocked_by() == nullptr) { + if (Base::_scan_dependency->eos()) { return Status::OK(); } diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index a77dba4c00a..acda79bdfb3 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -133,7 +133,7 @@ Status OlapScanLocalState::_init_profile() { Status OlapScanLocalState::_process_conjuncts() { SCOPED_TIMER(_process_conjunct_timer); RETURN_IF_ERROR(ScanLocalState::_process_conjuncts()); - if (ScanLocalState::_eos_dependency->read_blocked_by() == nullptr) { + if (ScanLocalState::_scan_dependency->eos()) { return Status::OK(); } RETURN_IF_ERROR(_build_key_ranges_and_filters()); @@ -213,7 +213,7 @@ bool OlapScanLocalState::_storage_no_merge() { Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { if (_scan_ranges.empty()) { - ScanLocalState::_eos_dependency->set_ready_for_read(); + ScanLocalState::_scan_dependency->set_eos(); return Status::OK(); } SCOPED_TIMER(_scanner_init_timer); @@ -408,7 +408,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() { iter->second)); } if (eos) { - ScanLocalState::_eos_dependency->set_ready_for_read(); + ScanLocalState::_scan_dependency->set_eos(); } for (auto& iter : _colname_to_value_range) { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d2953d3593b..300ebea995b 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -122,12 +122,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); - _source_dependency = OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), + _scan_dependency = ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), PipelineXLocalState<>::_parent->node_id()); - _eos_dependency = EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), - PipelineXLocalState<>::_parent->node_id()); - _source_dependency->add_child(_eos_dependency); auto& p = _parent->cast<typename Derived::Parent>(); set_scan_ranges(state, info.scan_ranges); _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); @@ -174,11 +171,10 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) { RETURN_IF_ERROR(_acquire_runtime_filter()); RETURN_IF_ERROR(_process_conjuncts()); - auto status = - _eos_dependency->read_blocked_by() == nullptr ? Status::OK() : _prepare_scanners(); + auto status = _scan_dependency->eos() ? Status::OK() : _prepare_scanners(); if (_scanner_ctx) { _finish_dependency->should_finish_after_check(); - DCHECK(_eos_dependency->read_blocked_by() != nullptr && _num_scanners->value() > 0); + DCHECK(!_scan_dependency->eos() && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); } @@ -266,7 +262,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() { std::visit( [&](auto&& range) { if (range.is_empty_value_range()) { - _eos_dependency->set_ready_for_read(); + _scan_dependency->set_eos(); } }, it.second.second); @@ -559,7 +555,7 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr, constant_val = const_cast<char*>(const_column->get_data_at(0).data); if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) { *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE; - _eos_dependency->set_ready_for_read(); + _scan_dependency->set_eos(); } } else if (const vectorized::ColumnVector<vectorized::UInt8>* bool_column = check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>( @@ -576,7 +572,7 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr, constant_val = const_cast<char*>(bool_column->get_data_at(0).data); if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) { *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE; - _eos_dependency->set_ready_for_read(); + _scan_dependency->set_eos(); } } else { LOG(WARNING) << "Constant predicate in scan node should return a bool column with " @@ -773,7 +769,7 @@ Status ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate( HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); auto fn_name = std::string(""); if (!is_fixed_range && state->null_in_set) { - _eos_dependency->set_ready_for_read(); + _scan_dependency->set_eos(); } while (iter->has_next()) { // column not in (nullptr) is always true @@ -1166,7 +1162,7 @@ Status ScanLocalState<Derived>::_prepare_scanners() { std::list<vectorized::VScannerSPtr> scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); if (scanners.empty()) { - _eos_dependency->set_ready_for_read(); + _scan_dependency->set_eos(); } else { COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size())); RETURN_IF_ERROR(_start_scanners(scanners)); @@ -1181,14 +1177,8 @@ Status ScanLocalState<Derived>::_start_scanners( _scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners, p.limit(), state()->scan_queue_mem_limit(), p._col_distribute_ids, 1); - _scanner_done_dependency = ScannerDoneDependency::create_shared(p.operator_id(), p.node_id()); - _source_dependency->add_child(_scanner_done_dependency); - _data_ready_dependency = - DataReadyDependency::create_shared(p.operator_id(), p.node_id(), _scanner_ctx.get()); - _source_dependency->add_child(_data_ready_dependency); - - _scanner_ctx->set_dependency(_data_ready_dependency, _scanner_done_dependency, - _finish_dependency); + _scan_dependency->set_scanner_ctx(_scanner_ctx.get()); + _scanner_ctx->set_dependency(_scan_dependency, _finish_dependency); return Status::OK(); } @@ -1340,23 +1330,12 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) { return Status::OK(); } SCOPED_TIMER(_close_timer); - if (_data_ready_dependency) { - COUNTER_UPDATE(_wait_for_data_timer, _data_ready_dependency->read_watcher_elapse_time()); - COUNTER_UPDATE(exec_time_counter(), _data_ready_dependency->read_watcher_elapse_time()); - } - if (_eos_dependency) { - COUNTER_SET(_wait_for_eos_timer, _eos_dependency->read_watcher_elapse_time()); - COUNTER_UPDATE(exec_time_counter(), _eos_dependency->read_watcher_elapse_time()); - } - if (_scanner_done_dependency) { - COUNTER_SET(_wait_for_scanner_done_timer, - _scanner_done_dependency->read_watcher_elapse_time()); - COUNTER_UPDATE(exec_time_counter(), _scanner_done_dependency->read_watcher_elapse_time()); - } + SCOPED_TIMER(exec_time_counter()); if (_scanner_ctx.get()) { _scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), state); } + COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->read_watcher_elapse_time()); return PipelineXLocalState<>::close(state); } @@ -1391,7 +1370,7 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized: } } - if (local_state._eos_dependency->read_blocked_by() == nullptr) { + if (local_state._scan_dependency->eos()) { source_state = SourceState::FINISHED; return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 66543dc7ffd..f058225580d 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -56,38 +56,56 @@ public: Status try_close(RuntimeState* state) override; }; -class EosDependency final : public Dependency { +class ScanDependency final : public Dependency { public: - ENABLE_FACTORY_CREATOR(EosDependency); - EosDependency(int id, int node_id) : Dependency(id, node_id, "EosDependency") {} - void* shared_state() override { return nullptr; } -}; - -class ScannerDoneDependency final : public Dependency { -public: - ENABLE_FACTORY_CREATOR(ScannerDoneDependency); - ScannerDoneDependency(int id, int node_id) : Dependency(id, node_id, "ScannerDoneDependency") {} - void* shared_state() override { return nullptr; } -}; - -class DataReadyDependency final : public Dependency { -public: - ENABLE_FACTORY_CREATOR(DataReadyDependency); - DataReadyDependency(int id, int node_id, vectorized::ScannerContext* scanner_ctx) - : Dependency(id, node_id, "DataReadyDependency"), _scanner_ctx(scanner_ctx) {} + ENABLE_FACTORY_CREATOR(ScanDependency); + ScanDependency(int id, int node_id) + : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) {} void* shared_state() override { return nullptr; } // TODO(gabriel): [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { - if (_scanner_ctx->get_num_running_scanners() == 0 && _scanner_ctx->should_be_scheduled()) { + if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 && + _scanner_ctx->should_be_scheduled()) { _scanner_ctx->reschedule_scanner_ctx(); } return Dependency::read_blocked_by(task); } + void block_reading() override { + if (_eos) { + return; + } + if (_scanner_done) { + return; + } + Dependency::block_reading(); + } + + bool eos() const { return _eos.load(); } + void set_eos() { + if (_eos) { + return; + } + _eos = true; + Dependency::set_ready_for_read(); + } + + void set_scanner_done() { + if (_scanner_done) { + return; + } + _scanner_done = true; + Dependency::set_ready_for_read(); + } + + void set_scanner_ctx(vectorized::ScannerContext* scanner_ctx) { _scanner_ctx = scanner_ctx; } + private: vectorized::ScannerContext* _scanner_ctx; + std::atomic<bool> _eos {false}; + std::atomic<bool> _scanner_done {false}; }; class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::RuntimeFilterConsumer { @@ -128,10 +146,7 @@ protected: virtual Status _init_profile() = 0; std::atomic<bool> _opened {false}; - std::shared_ptr<EosDependency> _eos_dependency; - std::shared_ptr<OrDependency> _source_dependency; - std::shared_ptr<ScannerDoneDependency> _scanner_done_dependency; - std::shared_ptr<DataReadyDependency> _data_ready_dependency; + std::shared_ptr<ScanDependency> _scan_dependency; std::shared_ptr<RuntimeProfile> _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; @@ -203,7 +218,7 @@ class ScanLocalState : public ScanLocalStateBase { int64_t get_push_down_count() override; - Dependency* dependency() override { return _source_dependency.get(); } + Dependency* dependency() override { return _scan_dependency.get(); } protected: template <typename LocalStateType> diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index a06fadef03b..2e43007dee7 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -170,28 +170,6 @@ WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) { return ready_for_write ? nullptr : this; } -Dependency* OrDependency::read_blocked_by(PipelineXTask* task) { - // TODO(gabriel): - for (auto& child : _children) { - auto* cur_res = child->read_blocked_by(nullptr); - if (cur_res == nullptr) { - return nullptr; - } - } - return this; -} - -WriteDependency* OrDependency::write_blocked_by(PipelineXTask* task) { - for (auto& child : _children) { - CHECK(child->is_write_dependency()); - auto* cur_res = ((WriteDependency*)child.get())->write_blocked_by(nullptr); - if (cur_res == nullptr) { - return nullptr; - } - } - return this; -} - template Status HashJoinDependency::extract_join_column<true>( vectorized::Block&, COW<vectorized::IColumn>::mutable_ptr<vectorized::ColumnVector<unsigned char>>&, @@ -250,17 +228,6 @@ std::string AndDependency::debug_string(int indentation_level) { return fmt::to_string(debug_string_buffer); } -std::string OrDependency::debug_string(int indentation_level) { - fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[", - std::string(indentation_level * 2, ' '), _name, _node_id); - for (auto& child : _children) { - fmt::format_to(debug_string_buffer, "{}, \n", child->debug_string(indentation_level = 1)); - } - fmt::format_to(debug_string_buffer, "{}]", std::string(indentation_level * 2, ' ')); - return fmt::to_string(debug_string_buffer); -} - Status AggDependency::reset_hash_table() { return std::visit( [&](auto&& agg_method) { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 11a8975b306..f6a37766525 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -55,7 +55,6 @@ public: : _id(id), _node_id(node_id), _name(std::move(name)), _ready_for_read(false) {} virtual ~Dependency() = default; - virtual bool is_or_dep() { return false; } [[nodiscard]] int id() const { return _id; } [[nodiscard]] virtual std::string name() const { return _name; } virtual void* shared_state() = 0; @@ -280,37 +279,6 @@ public: } }; -class OrDependency final : public WriteDependency { -public: - ENABLE_FACTORY_CREATOR(OrDependency); - OrDependency(int id, int node_id) : WriteDependency(id, node_id, "OrDependency") {} - - [[nodiscard]] std::string name() const override { - fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}[", _name); - for (auto& child : _children) { - fmt::format_to(debug_string_buffer, "{}, ", child->name()); - } - fmt::format_to(debug_string_buffer, "]"); - return fmt::to_string(debug_string_buffer); - } - - void* shared_state() override { return nullptr; } - - std::string debug_string(int indentation_level = 0) override; - - bool is_or_dep() override { return true; } - - [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override; - - [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) override; - - void add_child(std::shared_ptr<Dependency> child) override { - WriteDependency::add_child(child); - child->set_parent(weak_from_this()); - } -}; - struct FakeSharedState {}; struct FakeDependency final : public WriteDependency { public: @@ -681,6 +649,9 @@ public: } void set_eos() { + if (_eos) { + return; + } _eos = true; WriteDependency::set_ready_for_read(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index bc50f1e89de..04c5ddc1974 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -136,10 +136,6 @@ public: return _use_blocking_queue || get_state() == PipelineTaskState::BLOCKED_FOR_DEPENDENCY; } void set_use_blocking_queue(bool use_blocking_queue) { - if (_blocked_dep->is_or_dep()) { - _use_blocking_queue = true; - return; - } _use_blocking_queue = use_blocking_queue; } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index f02e07a6f86..fe00a9489aa 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -50,14 +50,6 @@ public: _col_distribute_ids(col_distribute_ids), _need_colocate_distribute(!_col_distribute_ids.empty()) {} - void set_dependency(std::shared_ptr<DataReadyDependency> dependency, - std::shared_ptr<ScannerDoneDependency> scanner_done_dependency, - std::shared_ptr<FinishDependency> finish_dependency) override { - _data_dependency = dependency; - _scanner_done_dependency = scanner_done_dependency; - _finish_dependency = finish_dependency; - } - Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait = false) override { { @@ -84,8 +76,8 @@ public: *block = std::move(_blocks_queues[id].front()); _blocks_queues[id].pop_front(); - if (_blocks_queues[id].empty() && _data_dependency) { - _data_dependency->block_reading(); + if (_blocks_queues[id].empty() && _dependency) { + _dependency->block_reading(); } } _current_used_bytes -= (*block)->allocated_bytes(); @@ -157,8 +149,8 @@ public: for (int j = i; j < block_size; j += queue_size) { _blocks_queues[queue].emplace_back(std::move(blocks[j])); } - if (_data_dependency) { - _data_dependency->set_ready_for_read(); + if (_dependency) { + _dependency->set_ready_for_read(); } } _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; @@ -209,8 +201,8 @@ public: _blocks_queues[i].emplace_back(std::move(_colocate_blocks[i])); _colocate_mutable_blocks[i]->clear(); } - if (_data_dependency) { - _data_dependency->set_ready_for_read(); + if (_dependency) { + _dependency->set_ready_for_read(); } } } @@ -237,8 +229,6 @@ private: std::vector<std::unique_ptr<vectorized::MutableBlock>> _colocate_mutable_blocks; std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs; - std::shared_ptr<DataReadyDependency> _data_dependency = nullptr; - void _add_rows_colocate_blocks(vectorized::Block* block, int loc, const std::vector<int>& rows) { int row_wait_add = rows.size(); @@ -265,8 +255,8 @@ private: std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]); _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); } - if (_data_dependency) { - _data_dependency->set_ready_for_read(); + if (_dependency) { + _dependency->set_ready_for_read(); } _colocate_blocks[loc] = get_free_block(); _colocate_mutable_blocks[loc]->set_muatable_columns( diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index ef6a9d415de..b2c481871ae 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -430,8 +430,8 @@ bool ScannerContext::no_schedule() { } void ScannerContext::_set_scanner_done() { - if (_scanner_done_dependency) { - _scanner_done_dependency->set_ready_for_read(); + if (_dependency) { + _dependency->set_scanner_done(); } } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 10b4775ceff..a0702960ac1 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -43,9 +43,8 @@ class TupleDescriptor; namespace pipeline { class ScanLocalStateBase; -class ScannerDoneDependency; +class ScanDependency; class FinishDependency; -class DataReadyDependency; } // namespace pipeline namespace taskgroup { @@ -106,10 +105,11 @@ public: return _process_status; } - virtual void set_dependency( - std::shared_ptr<pipeline::DataReadyDependency> dependency, - std::shared_ptr<pipeline::ScannerDoneDependency> scanner_done_dependency, - std::shared_ptr<pipeline::FinishDependency> finish_dependency) {} + void set_dependency(std::shared_ptr<pipeline::ScanDependency> dependency, + std::shared_ptr<pipeline::FinishDependency> finish_dependency) { + _dependency = dependency; + _finish_dependency = finish_dependency; + } // Called by ScanNode. // Used to notify the scheduler that this ScannerContext can stop working. @@ -283,7 +283,7 @@ protected: RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; - std::shared_ptr<pipeline::ScannerDoneDependency> _scanner_done_dependency = nullptr; + std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr; std::shared_ptr<pipeline::FinishDependency> _finish_dependency = nullptr; }; } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org