This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c9d90b636e8 [feature](pipelineX) add time unit when slow_dependency need to log (#26466) c9d90b636e8 is described below commit c9d90b636e82860694eb9a07ab9d69fa82fdbee5 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Mon Nov 6 19:58:07 2023 +0800 [feature](pipelineX) add time unit when slow_dependency need to log (#26466) --- be/src/pipeline/exec/exchange_sink_operator.h | 2 +- be/src/pipeline/exec/scan_operator.h | 2 +- be/src/pipeline/pipeline_x/dependency.h | 28 ++++++++++++++++++++------- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index a0badc7561c..815d3930577 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -81,7 +81,7 @@ public: [[nodiscard]] WriteDependency* write_blocked_by() override { if (config::enable_fuzzy_mode && _available_block == 0 && - _write_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_write_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << id(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index c71811c831d..68d006006f6 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -103,7 +103,7 @@ public: _scanner_ctx->reschedule_scanner_ctx(); } if (config::enable_fuzzy_mode && !_ready_for_read && - _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_read_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << id(); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index d5349307e5b..1d575690f8e 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -46,7 +46,8 @@ class Dependency; using DependencySPtr = std::shared_ptr<Dependency>; static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 10 * 1000L * 1000L * 1000L; - +static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 5 * 1000L * 1000L * 1000L; +static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); class Dependency : public std::enable_shared_from_this<Dependency> { public: Dependency(int id, std::string name) : _id(id), _name(name), _ready_for_read(false) {} @@ -73,7 +74,7 @@ public: // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready. [[nodiscard]] virtual Dependency* read_blocked_by() { if (config::enable_fuzzy_mode && !_ready_for_read && - _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_read_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << id(); } @@ -102,6 +103,17 @@ public: void remove_first_child() { _children.erase(_children.begin()); } protected: + bool _should_log(uint64_t cur_time) { + if (cur_time < SLOW_DEPENDENCY_THRESHOLD) { + return false; + } + if ((cur_time - _last_log_time) < TIME_UNIT_DEPENDENCY_LOG) { + return false; + } + _last_log_time = cur_time; + return true; + } + int _id; std::string _name; std::atomic<bool> _ready_for_read; @@ -110,6 +122,8 @@ protected: std::weak_ptr<Dependency> _parent; std::list<std::shared_ptr<Dependency>> _children; + + uint64_t _last_log_time = 0; }; class WriteDependency : public Dependency { @@ -133,7 +147,7 @@ public: [[nodiscard]] virtual WriteDependency* write_blocked_by() { if (config::enable_fuzzy_mode && !_ready_for_write && - _write_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_write_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << id(); } @@ -174,7 +188,7 @@ public: [[nodiscard]] FinishDependency* finish_blocked_by() { if (config::enable_fuzzy_mode && !_ready_to_finish && - _finish_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_finish_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << _node_id; } @@ -726,7 +740,7 @@ public: [[nodiscard]] Dependency* read_blocked_by() override { if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) && - _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_read_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << id(); } @@ -845,7 +859,7 @@ public: // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready. [[nodiscard]] Dependency* read_blocked_by() override { if (config::enable_fuzzy_mode && !_set_state->ready_for_read && - _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_read_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << id(); } @@ -905,7 +919,7 @@ public: Dependency* read_blocked_by() override { if (config::enable_fuzzy_mode && !_should_run() && - _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + _should_log(_read_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " << id(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org