github-actions[bot] commented on code in PR #26879: URL: https://github.com/apache/doris/pull/26879#discussion_r1392597780
########## be/src/pipeline/pipeline_x/dependency.cpp: ########## @@ -294,6 +415,12 @@ std::vector<uint16_t> HashJoinDependency::convert_block_to_null(vectorized::Bloc return results; } +void SetSharedState::set_probe_finished_children(int child_id) { Review Comment: warning: method 'set_probe_finished_children' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/pipeline_x/dependency.h:796: ```diff - void set_probe_finished_children(int child_id); + static void set_probe_finished_children(int child_id); ``` ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -145,72 +146,82 @@ class WriteDependency : public Dependency { return _write_dependency_watcher.elapsed_time(); } - [[nodiscard]] virtual WriteDependency* write_blocked_by() { + [[nodiscard]] virtual WriteDependency* write_blocked_by(PipelineXTask* task) { + std::unique_lock<std::mutex> lc(_task_lock); if (config::enable_fuzzy_mode && !_ready_for_write && _should_log(_write_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); + << id() << " block tasks: " << _blocked_task.size() + << "read :" << _ready_for_read << " write :" << _ready_for_write; + } + if (!_ready_for_write && task) { + add_block_task(task); } return _ready_for_write ? nullptr : this; } - virtual void set_ready_for_write() { - if (_ready_for_write) { - return; - } - _write_dependency_watcher.stop(); - _ready_for_write = true; - } + virtual void set_ready_for_write(); virtual void block_writing() { _ready_for_write = false; } + std::string debug_string(int indentation_level = 0) override; + void add_block_task(PipelineXTask* task) override; + protected: + friend class Dependency; std::atomic<bool> _ready_for_write; MonotonicStopWatch _write_dependency_watcher; + +private: + std::vector<PipelineXTask*> _write_blocked_task; }; class FinishDependency final : public Dependency { public: FinishDependency(int id, int node_id, std::string name) - : Dependency(id, name), _ready_to_finish(true), _node_id(node_id) {} + : Dependency(id, node_id, name), _ready_to_finish(true) {} ~FinishDependency() override = default; + void should_finish_after_check() { _ready_to_finish = false; } void start_finish_watcher() { for (auto& child : _children) { ((FinishDependency*)child.get())->start_finish_watcher(); } _finish_dependency_watcher.start(); } + bool is_finish_dependency() override { return true; } + [[nodiscard]] int64_t finish_watcher_elapse_time() { return _finish_dependency_watcher.elapsed_time(); } - [[nodiscard]] FinishDependency* finish_blocked_by() { + [[nodiscard]] FinishDependency* finish_blocked_by(PipelineXTask* task) { + std::unique_lock<std::mutex> lc(_task_lock); if (config::enable_fuzzy_mode && !_ready_to_finish && _should_log(_finish_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << _node_id; + << _node_id << " block tasks: " << _blocked_task.size(); } - return _ready_to_finish ? nullptr : this; - } - - void set_ready_to_finish() { - if (_ready_to_finish) { - return; + if (!_ready_to_finish && task) { + add_block_task(task); } - _finish_dependency_watcher.stop(); - _ready_to_finish = true; + return _ready_to_finish ? nullptr : this; } - void block_finishing() { _ready_to_finish = false; } + void set_ready_to_finish(); void* shared_state() override { return nullptr; } + std::string debug_string(int indentation_level = 0) override; + + void add_block_task(PipelineXTask* task) override; protected: - std::atomic<bool> _ready_to_finish; + bool _ready_to_finish; Review Comment: warning: use default member initializer for '_ready_to_finish' [modernize-use-default-member-init] be/src/pipeline/pipeline_x/dependency.h:181: ```diff - : Dependency(id, node_id, name), _ready_to_finish(true) {} + : Dependency(id, node_id, name), {} ``` ```suggestion bool _ready_to_finish{true}; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org