This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 459f75073fb [pipelineX](dependency) remove OrDependency (#27242)
459f75073fb is described below

commit 459f75073fba512232769362a4172f12a12923d9
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Nov 20 13:05:34 2023 +0800

    [pipelineX](dependency) remove OrDependency (#27242)
---
 be/src/pipeline/exec/es_scan_operator.cpp    |  4 +-
 be/src/pipeline/exec/file_scan_operator.cpp  |  4 +-
 be/src/pipeline/exec/meta_scan_operator.cpp  |  2 +-
 be/src/pipeline/exec/olap_scan_operator.cpp  |  6 +--
 be/src/pipeline/exec/scan_operator.cpp       | 47 ++++++---------------
 be/src/pipeline/exec/scan_operator.h         | 63 +++++++++++++++++-----------
 be/src/pipeline/pipeline_x/dependency.cpp    | 33 ---------------
 be/src/pipeline/pipeline_x/dependency.h      | 35 ++--------------
 be/src/pipeline/pipeline_x/pipeline_x_task.h |  4 --
 be/src/vec/exec/scan/pip_scanner_context.h   | 26 ++++--------
 be/src/vec/exec/scan/scanner_context.cpp     |  4 +-
 be/src/vec/exec/scan/scanner_context.h       | 14 +++----
 12 files changed, 80 insertions(+), 162 deletions(-)

diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index 9b41155a22b..b9112917954 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -55,7 +55,7 @@ Status EsScanLocalState::_init_profile() {
 
 Status EsScanLocalState::_process_conjuncts() {
     RETURN_IF_ERROR(Base::_process_conjuncts());
-    if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+    if (Base::_scan_dependency->eos()) {
         return Status::OK();
     }
 
@@ -66,7 +66,7 @@ Status EsScanLocalState::_process_conjuncts() {
 
 Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
-        Base::_eos_dependency->set_ready_for_read();
+        Base::_scan_dependency->set_eos();
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 019f5813a42..369ad607c6f 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -32,7 +32,7 @@ namespace doris::pipeline {
 
 Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
-        Base::_eos_dependency->set_ready_for_read();
+        Base::_scan_dependency->set_eos();
         return Status::OK();
     }
 
@@ -95,7 +95,7 @@ Status FileScanLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 
 Status FileScanLocalState::_process_conjuncts() {
     RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts());
-    if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+    if (Base::_scan_dependency->eos()) {
         return Status::OK();
     }
     // TODO: Push conjuncts down to reader.
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp 
b/be/src/pipeline/exec/meta_scan_operator.cpp
index c2f9bc7e428..2de19bb2ced 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -22,7 +22,7 @@
 namespace doris::pipeline {
 
 Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
-    if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+    if (Base::_scan_dependency->eos()) {
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index a77dba4c00a..acda79bdfb3 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -133,7 +133,7 @@ Status OlapScanLocalState::_init_profile() {
 Status OlapScanLocalState::_process_conjuncts() {
     SCOPED_TIMER(_process_conjunct_timer);
     RETURN_IF_ERROR(ScanLocalState::_process_conjuncts());
-    if (ScanLocalState::_eos_dependency->read_blocked_by() == nullptr) {
+    if (ScanLocalState::_scan_dependency->eos()) {
         return Status::OK();
     }
     RETURN_IF_ERROR(_build_key_ranges_and_filters());
@@ -213,7 +213,7 @@ bool OlapScanLocalState::_storage_no_merge() {
 
 Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
-        ScanLocalState::_eos_dependency->set_ready_for_read();
+        ScanLocalState::_scan_dependency->set_eos();
         return Status::OK();
     }
     SCOPED_TIMER(_scanner_init_timer);
@@ -408,7 +408,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
                     iter->second));
         }
         if (eos) {
-            ScanLocalState::_eos_dependency->set_ready_for_read();
+            ScanLocalState::_scan_dependency->set_eos();
         }
 
         for (auto& iter : _colname_to_value_range) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index d2953d3593b..300ebea995b 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -122,12 +122,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
 
-    _source_dependency = 
OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
+    _scan_dependency = 
ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
                                                      
PipelineXLocalState<>::_parent->node_id());
 
-    _eos_dependency = 
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
-                                                   
PipelineXLocalState<>::_parent->node_id());
-    _source_dependency->add_child(_eos_dependency);
     auto& p = _parent->cast<typename Derived::Parent>();
     set_scan_ranges(state, info.scan_ranges);
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -174,11 +171,10 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) 
{
     RETURN_IF_ERROR(_acquire_runtime_filter());
     RETURN_IF_ERROR(_process_conjuncts());
 
-    auto status =
-            _eos_dependency->read_blocked_by() == nullptr ? Status::OK() : 
_prepare_scanners();
+    auto status = _scan_dependency->eos() ? Status::OK() : _prepare_scanners();
     if (_scanner_ctx) {
         _finish_dependency->should_finish_after_check();
-        DCHECK(_eos_dependency->read_blocked_by() != nullptr && 
_num_scanners->value() > 0);
+        DCHECK(!_scan_dependency->eos() && _num_scanners->value() > 0);
         RETURN_IF_ERROR(_scanner_ctx->init());
         
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
     }
@@ -266,7 +262,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
         std::visit(
                 [&](auto&& range) {
                     if (range.is_empty_value_range()) {
-                        _eos_dependency->set_ready_for_read();
+                        _scan_dependency->set_eos();
                     }
                 },
                 it.second.second);
@@ -559,7 +555,7 @@ Status 
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
             constant_val = 
const_cast<char*>(const_column->get_data_at(0).data);
             if (constant_val == nullptr || 
!*reinterpret_cast<bool*>(constant_val)) {
                 *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
-                _eos_dependency->set_ready_for_read();
+                _scan_dependency->set_eos();
             }
         } else if (const vectorized::ColumnVector<vectorized::UInt8>* 
bool_column =
                            
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
@@ -576,7 +572,7 @@ Status 
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
                 constant_val = 
const_cast<char*>(bool_column->get_data_at(0).data);
                 if (constant_val == nullptr || 
!*reinterpret_cast<bool*>(constant_val)) {
                     *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
-                    _eos_dependency->set_ready_for_read();
+                    _scan_dependency->set_eos();
                 }
             } else {
                 LOG(WARNING) << "Constant predicate in scan node should return 
a bool column with "
@@ -773,7 +769,7 @@ Status 
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
         HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
         auto fn_name = std::string("");
         if (!is_fixed_range && state->null_in_set) {
-            _eos_dependency->set_ready_for_read();
+            _scan_dependency->set_eos();
         }
         while (iter->has_next()) {
             // column not in (nullptr) is always true
@@ -1166,7 +1162,7 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
     std::list<vectorized::VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
     if (scanners.empty()) {
-        _eos_dependency->set_ready_for_read();
+        _scan_dependency->set_eos();
     } else {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         RETURN_IF_ERROR(_start_scanners(scanners));
@@ -1181,14 +1177,8 @@ Status ScanLocalState<Derived>::_start_scanners(
     _scanner_ctx = PipScannerContext::create_shared(state(), this, 
p._output_tuple_desc, scanners,
                                                     p.limit(), 
state()->scan_queue_mem_limit(),
                                                     p._col_distribute_ids, 1);
-    _scanner_done_dependency = 
ScannerDoneDependency::create_shared(p.operator_id(), p.node_id());
-    _source_dependency->add_child(_scanner_done_dependency);
-    _data_ready_dependency =
-            DataReadyDependency::create_shared(p.operator_id(), p.node_id(), 
_scanner_ctx.get());
-    _source_dependency->add_child(_data_ready_dependency);
-
-    _scanner_ctx->set_dependency(_data_ready_dependency, 
_scanner_done_dependency,
-                                 _finish_dependency);
+    _scan_dependency->set_scanner_ctx(_scanner_ctx.get());
+    _scanner_ctx->set_dependency(_scan_dependency, _finish_dependency);
     return Status::OK();
 }
 
@@ -1340,23 +1330,12 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
         return Status::OK();
     }
     SCOPED_TIMER(_close_timer);
-    if (_data_ready_dependency) {
-        COUNTER_UPDATE(_wait_for_data_timer, 
_data_ready_dependency->read_watcher_elapse_time());
-        COUNTER_UPDATE(exec_time_counter(), 
_data_ready_dependency->read_watcher_elapse_time());
-    }
-    if (_eos_dependency) {
-        COUNTER_SET(_wait_for_eos_timer, 
_eos_dependency->read_watcher_elapse_time());
-        COUNTER_UPDATE(exec_time_counter(), 
_eos_dependency->read_watcher_elapse_time());
-    }
-    if (_scanner_done_dependency) {
-        COUNTER_SET(_wait_for_scanner_done_timer,
-                    _scanner_done_dependency->read_watcher_elapse_time());
-        COUNTER_UPDATE(exec_time_counter(), 
_scanner_done_dependency->read_watcher_elapse_time());
-    }
+
     SCOPED_TIMER(exec_time_counter());
     if (_scanner_ctx.get()) {
         
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), 
state);
     }
+    COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->read_watcher_elapse_time());
 
     return PipelineXLocalState<>::close(state);
 }
@@ -1391,7 +1370,7 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
         }
     }
 
-    if (local_state._eos_dependency->read_blocked_by() == nullptr) {
+    if (local_state._scan_dependency->eos()) {
         source_state = SourceState::FINISHED;
         return Status::OK();
     }
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 66543dc7ffd..f058225580d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -56,38 +56,56 @@ public:
     Status try_close(RuntimeState* state) override;
 };
 
-class EosDependency final : public Dependency {
+class ScanDependency final : public Dependency {
 public:
-    ENABLE_FACTORY_CREATOR(EosDependency);
-    EosDependency(int id, int node_id) : Dependency(id, node_id, 
"EosDependency") {}
-    void* shared_state() override { return nullptr; }
-};
-
-class ScannerDoneDependency final : public Dependency {
-public:
-    ENABLE_FACTORY_CREATOR(ScannerDoneDependency);
-    ScannerDoneDependency(int id, int node_id) : Dependency(id, node_id, 
"ScannerDoneDependency") {}
-    void* shared_state() override { return nullptr; }
-};
-
-class DataReadyDependency final : public Dependency {
-public:
-    ENABLE_FACTORY_CREATOR(DataReadyDependency);
-    DataReadyDependency(int id, int node_id, vectorized::ScannerContext* 
scanner_ctx)
-            : Dependency(id, node_id, "DataReadyDependency"), 
_scanner_ctx(scanner_ctx) {}
+    ENABLE_FACTORY_CREATOR(ScanDependency);
+    ScanDependency(int id, int node_id)
+            : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) 
{}
 
     void* shared_state() override { return nullptr; }
 
     // TODO(gabriel):
     [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
-        if (_scanner_ctx->get_num_running_scanners() == 0 && 
_scanner_ctx->should_be_scheduled()) {
+        if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
+            _scanner_ctx->should_be_scheduled()) {
             _scanner_ctx->reschedule_scanner_ctx();
         }
         return Dependency::read_blocked_by(task);
     }
 
+    void block_reading() override {
+        if (_eos) {
+            return;
+        }
+        if (_scanner_done) {
+            return;
+        }
+        Dependency::block_reading();
+    }
+
+    bool eos() const { return _eos.load(); }
+    void set_eos() {
+        if (_eos) {
+            return;
+        }
+        _eos = true;
+        Dependency::set_ready_for_read();
+    }
+
+    void set_scanner_done() {
+        if (_scanner_done) {
+            return;
+        }
+        _scanner_done = true;
+        Dependency::set_ready_for_read();
+    }
+
+    void set_scanner_ctx(vectorized::ScannerContext* scanner_ctx) { 
_scanner_ctx = scanner_ctx; }
+
 private:
     vectorized::ScannerContext* _scanner_ctx;
+    std::atomic<bool> _eos {false};
+    std::atomic<bool> _scanner_done {false};
 };
 
 class ScanLocalStateBase : public PipelineXLocalState<>, public 
vectorized::RuntimeFilterConsumer {
@@ -128,10 +146,7 @@ protected:
     virtual Status _init_profile() = 0;
 
     std::atomic<bool> _opened {false};
-    std::shared_ptr<EosDependency> _eos_dependency;
-    std::shared_ptr<OrDependency> _source_dependency;
-    std::shared_ptr<ScannerDoneDependency> _scanner_done_dependency;
-    std::shared_ptr<DataReadyDependency> _data_ready_dependency;
+    std::shared_ptr<ScanDependency> _scan_dependency;
 
     std::shared_ptr<RuntimeProfile> _scanner_profile;
     RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
@@ -203,7 +218,7 @@ class ScanLocalState : public ScanLocalStateBase {
 
     int64_t get_push_down_count() override;
 
-    Dependency* dependency() override { return _source_dependency.get(); }
+    Dependency* dependency() override { return _scan_dependency.get(); }
 
 protected:
     template <typename LocalStateType>
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index a06fadef03b..2e43007dee7 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -170,28 +170,6 @@ WriteDependency* 
WriteDependency::write_blocked_by(PipelineXTask* task) {
     return ready_for_write ? nullptr : this;
 }
 
-Dependency* OrDependency::read_blocked_by(PipelineXTask* task) {
-    // TODO(gabriel):
-    for (auto& child : _children) {
-        auto* cur_res = child->read_blocked_by(nullptr);
-        if (cur_res == nullptr) {
-            return nullptr;
-        }
-    }
-    return this;
-}
-
-WriteDependency* OrDependency::write_blocked_by(PipelineXTask* task) {
-    for (auto& child : _children) {
-        CHECK(child->is_write_dependency());
-        auto* cur_res = 
((WriteDependency*)child.get())->write_blocked_by(nullptr);
-        if (cur_res == nullptr) {
-            return nullptr;
-        }
-    }
-    return this;
-}
-
 template Status HashJoinDependency::extract_join_column<true>(
         vectorized::Block&,
         
COW<vectorized::IColumn>::mutable_ptr<vectorized::ColumnVector<unsigned char>>&,
@@ -250,17 +228,6 @@ std::string AndDependency::debug_string(int 
indentation_level) {
     return fmt::to_string(debug_string_buffer);
 }
 
-std::string OrDependency::debug_string(int indentation_level) {
-    fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
-                   std::string(indentation_level * 2, ' '), _name, _node_id);
-    for (auto& child : _children) {
-        fmt::format_to(debug_string_buffer, "{}, \n", 
child->debug_string(indentation_level = 1));
-    }
-    fmt::format_to(debug_string_buffer, "{}]", std::string(indentation_level * 
2, ' '));
-    return fmt::to_string(debug_string_buffer);
-}
-
 Status AggDependency::reset_hash_table() {
     return std::visit(
             [&](auto&& agg_method) {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 11a8975b306..f6a37766525 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -55,7 +55,6 @@ public:
             : _id(id), _node_id(node_id), _name(std::move(name)), 
_ready_for_read(false) {}
     virtual ~Dependency() = default;
 
-    virtual bool is_or_dep() { return false; }
     [[nodiscard]] int id() const { return _id; }
     [[nodiscard]] virtual std::string name() const { return _name; }
     virtual void* shared_state() = 0;
@@ -280,37 +279,6 @@ public:
     }
 };
 
-class OrDependency final : public WriteDependency {
-public:
-    ENABLE_FACTORY_CREATOR(OrDependency);
-    OrDependency(int id, int node_id) : WriteDependency(id, node_id, 
"OrDependency") {}
-
-    [[nodiscard]] std::string name() const override {
-        fmt::memory_buffer debug_string_buffer;
-        fmt::format_to(debug_string_buffer, "{}[", _name);
-        for (auto& child : _children) {
-            fmt::format_to(debug_string_buffer, "{}, ", child->name());
-        }
-        fmt::format_to(debug_string_buffer, "]");
-        return fmt::to_string(debug_string_buffer);
-    }
-
-    void* shared_state() override { return nullptr; }
-
-    std::string debug_string(int indentation_level = 0) override;
-
-    bool is_or_dep() override { return true; }
-
-    [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override;
-
-    [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) 
override;
-
-    void add_child(std::shared_ptr<Dependency> child) override {
-        WriteDependency::add_child(child);
-        child->set_parent(weak_from_this());
-    }
-};
-
 struct FakeSharedState {};
 struct FakeDependency final : public WriteDependency {
 public:
@@ -681,6 +649,9 @@ public:
     }
 
     void set_eos() {
+        if (_eos) {
+            return;
+        }
         _eos = true;
         WriteDependency::set_ready_for_read();
     }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index bc50f1e89de..04c5ddc1974 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -136,10 +136,6 @@ public:
         return _use_blocking_queue || get_state() == 
PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
     }
     void set_use_blocking_queue(bool use_blocking_queue) {
-        if (_blocked_dep->is_or_dep()) {
-            _use_blocking_queue = true;
-            return;
-        }
         _use_blocking_queue = use_blocking_queue;
     }
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index f02e07a6f86..fe00a9489aa 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -50,14 +50,6 @@ public:
               _col_distribute_ids(col_distribute_ids),
               _need_colocate_distribute(!_col_distribute_ids.empty()) {}
 
-    void set_dependency(std::shared_ptr<DataReadyDependency> dependency,
-                        std::shared_ptr<ScannerDoneDependency> 
scanner_done_dependency,
-                        std::shared_ptr<FinishDependency> finish_dependency) 
override {
-        _data_dependency = dependency;
-        _scanner_done_dependency = scanner_done_dependency;
-        _finish_dependency = finish_dependency;
-    }
-
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
                                 int id, bool wait = false) override {
         {
@@ -84,8 +76,8 @@ public:
             *block = std::move(_blocks_queues[id].front());
             _blocks_queues[id].pop_front();
 
-            if (_blocks_queues[id].empty() && _data_dependency) {
-                _data_dependency->block_reading();
+            if (_blocks_queues[id].empty() && _dependency) {
+                _dependency->block_reading();
             }
         }
         _current_used_bytes -= (*block)->allocated_bytes();
@@ -157,8 +149,8 @@ public:
                     for (int j = i; j < block_size; j += queue_size) {
                         
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
                     }
-                    if (_data_dependency) {
-                        _data_dependency->set_ready_for_read();
+                    if (_dependency) {
+                        _dependency->set_ready_for_read();
                     }
                 }
                 _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
@@ -209,8 +201,8 @@ public:
                     
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
                     _colocate_mutable_blocks[i]->clear();
                 }
-                if (_data_dependency) {
-                    _data_dependency->set_ready_for_read();
+                if (_dependency) {
+                    _dependency->set_ready_for_read();
                 }
             }
         }
@@ -237,8 +229,6 @@ private:
     std::vector<std::unique_ptr<vectorized::MutableBlock>> 
_colocate_mutable_blocks;
     std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;
 
-    std::shared_ptr<DataReadyDependency> _data_dependency = nullptr;
-
     void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
                                    const std::vector<int>& rows) {
         int row_wait_add = rows.size();
@@ -265,8 +255,8 @@ private:
                     std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
                     
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
                 }
-                if (_data_dependency) {
-                    _data_dependency->set_ready_for_read();
+                if (_dependency) {
+                    _dependency->set_ready_for_read();
                 }
                 _colocate_blocks[loc] = get_free_block();
                 _colocate_mutable_blocks[loc]->set_muatable_columns(
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index ef6a9d415de..b2c481871ae 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -430,8 +430,8 @@ bool ScannerContext::no_schedule() {
 }
 
 void ScannerContext::_set_scanner_done() {
-    if (_scanner_done_dependency) {
-        _scanner_done_dependency->set_ready_for_read();
+    if (_dependency) {
+        _dependency->set_scanner_done();
     }
 }
 
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 10b4775ceff..a0702960ac1 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -43,9 +43,8 @@ class TupleDescriptor;
 
 namespace pipeline {
 class ScanLocalStateBase;
-class ScannerDoneDependency;
+class ScanDependency;
 class FinishDependency;
-class DataReadyDependency;
 } // namespace pipeline
 
 namespace taskgroup {
@@ -106,10 +105,11 @@ public:
         return _process_status;
     }
 
-    virtual void set_dependency(
-            std::shared_ptr<pipeline::DataReadyDependency> dependency,
-            std::shared_ptr<pipeline::ScannerDoneDependency> 
scanner_done_dependency,
-            std::shared_ptr<pipeline::FinishDependency> finish_dependency) {}
+    void set_dependency(std::shared_ptr<pipeline::ScanDependency> dependency,
+                        std::shared_ptr<pipeline::FinishDependency> 
finish_dependency) {
+        _dependency = dependency;
+        _finish_dependency = finish_dependency;
+    }
 
     // Called by ScanNode.
     // Used to notify the scheduler that this ScannerContext can stop working.
@@ -283,7 +283,7 @@ protected:
     RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
     RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
 
-    std::shared_ptr<pipeline::ScannerDoneDependency> _scanner_done_dependency 
= nullptr;
+    std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
     std::shared_ptr<pipeline::FinishDependency> _finish_dependency = nullptr;
 };
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to