This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch resource_ctx in repository https://gitbox.apache.org/repos/asf/doris.git
commit a0f81123e23d5874da13727d47c5d32ae1421769 Author: yiguolei <guo...@selectdb.com> AuthorDate: Fri Dec 6 20:34:17 2024 +0800 f --- .../runtime/workload_group/memory_controller.cpp | 58 ---------------- be/src/runtime/workload_management/cpu_context.h | 51 ++++++++++++++ be/src/runtime/workload_management/io_context.h | 81 ++++++++++++++++++++++ .../memory_context.h} | 38 +++++----- .../resource_context.h | 35 ++++------ 5 files changed, 160 insertions(+), 103 deletions(-) diff --git a/be/src/runtime/workload_group/memory_controller.cpp b/be/src/runtime/workload_group/memory_controller.cpp deleted file mode 100644 index 130f95bc59d..00000000000 --- a/be/src/runtime/workload_group/memory_controller.cpp +++ /dev/null @@ -1,58 +0,0 @@ - - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/workload_group/memory_controller.h" - -#include <map> -#include <mutex> -#include <ostream> -#include <utility> - -#include "common/logging.h" - -namespace doris { - -void MemoryController::Stats::reset() { - revoke_attempts_ = 0; - revoke_wait_time_ms_ = 0; - revoked_bytes_ = 0; -} - -MemoryController::Stats& MemoryController::Stats::operator+=(const MemoryController::Stats& other) { - revoke_attempts_ += other.revoke_attempts_; - revoke_wait_time_ms_ += other.revoke_wait_time_ms_; - revoked_bytes_ += other.revoked_bytes_; - return *this; -} - -MemoryController::Stats MemoryController::Stats::operator-(const Stats& other) const { - Stats result; - result.revoke_attempts_ = revoke_attempts_ - other.revoke_attempts_; - result.revoke_wait_time_ms_ = revoke_wait_time_ms_ - other.revoke_wait_time_ms_; - result.revoked_bytes_ = revoked_bytes_ - other.revoked_bytes_; - return result; -} - -std::string MemoryController::Stats::debug_string() const { - return fmt::format("revoke_attempts_ {} revoke_wait_time_ms_ {} revoked_bytes_ {}", - revoke_attempts_, revoke_wait_time_ms_, - PrettyPrinter::print(revoked_bytes_, TUnit::BYTES)); -} - -} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/cpu_context.h b/be/src/runtime/workload_management/cpu_context.h new file mode 100644 index 00000000000..05b2c94a64e --- /dev/null +++ b/be/src/runtime/workload_management/cpu_context.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <queue> +#include <shared_mutex> +#include <string> + +#include "common/status.h" + +namespace doris { + +class CPUContext : public std::enable_shared_from_this<CPUContext> { + ENABLE_FACTORY_CREATOR(CPUContext); + +public: + // Used to collect cpu execution stats. + class CPUStats { + public: + std::string debug_string(); + // Should add some cpu stats relared method here. + }; + +public: + CPUContext() {} + virtual ~CPUContext() = default; + // Bind current thread to cgroup, only some load thread should do this. + void bind_workload_group() {} +}; + +} // namespace doris diff --git a/be/src/runtime/workload_management/io_context.h b/be/src/runtime/workload_management/io_context.h new file mode 100644 index 00000000000..b6e883264a1 --- /dev/null +++ b/be/src/runtime/workload_management/io_context.h @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <queue> +#include <shared_mutex> +#include <string> + +#include "common/status.h" + +namespace doris { + +class IOContext : public std::enable_shared_from_this<IOContext> { + ENABLE_FACTORY_CREATOR(IOContext); + +public: + // Used to collect io execution stats. + class IOStats { + public: + IOStats() = default; + virtual ~IOStats() = default; + int64_t scan_rows() { return scan_rows_; } + int64_t scan_bytes() { return scan_bytes_; } + int64_t scan_bytes_from_local_storage() { return scan_bytes_from_local_storage_; } + int64_t scan_bytes_from_remote_storage() { return scan_bytes_from_remote_storage_; } + int64_t returned_rows() { return returned_rows_; } + int64_t shuffle_send_bytes() { return shuffle_send_bytes_; } + int64_t shuffle_send_rows() { return shuffle_send_rows_; } + + int64_t incr_scan_rows(int64_t delta) { return scan_rows_ + delta; } + int64_t incr_scan_bytes(int64_t delta) { return scan_bytes_ + delta; } + int64_t incr_scan_bytes_from_local_storage(int64_t delta) { + return scan_bytes_from_local_storage_ + delta; + } + int64_t incr_scan_bytes_from_remote_storage(int64_t delta) { + return scan_bytes_from_remote_storage_ + delta; + } + int64_t incr_returned_rows(int64_t delta) { return returned_rows_ + delta; } + int64_t incr_shuffle_send_bytes(int64_t delta) { return shuffle_send_bytes_ + delta; } + int64_t incr_shuffle_send_rows(int64_t delta) { return shuffle_send_rows_ + delta; } + std::string debug_string(); + + private: + std::atomic<int64_t> scan_rows_ = 0; + std::atomic<int64_t> scan_bytes_ = 0; + std::atomic<int64_t> scan_bytes_from_local_storage_ = 0; + std::atomic<int64_t> scan_bytes_from_remote_storage_ = 0; + // number rows returned by query. + // only set once by result sink when closing. + std::atomic<int64_t> returned_rows_ = 0; + std::atomic<int64_t> shuffle_send_bytes_ = 0; + std::atomic<int64_t> shuffle_send_rows_ = 0; + }; + +public: + IOContext() {} + virtual ~IOContext() = default; + IOThrottle* get_io_throttle() {} +}; + +} // namespace doris diff --git a/be/src/runtime/workload_group/memory_controller.h b/be/src/runtime/workload_management/memory_context.h similarity index 72% rename from be/src/runtime/workload_group/memory_controller.h rename to be/src/runtime/workload_management/memory_context.h index a27e1b5f60f..7e62c1bc687 100644 --- a/be/src/runtime/workload_group/memory_controller.h +++ b/be/src/runtime/workload_management/memory_context.h @@ -30,43 +30,44 @@ namespace doris { -class MemoryController : public std::enable_shared_from_this<MemoryController> { - ENABLE_FACTORY_CREATOR(MemoryController); +class MemoryContext : public std::enable_shared_from_this<MemoryContext> { + ENABLE_FACTORY_CREATOR(MemoryContext); public: // Used to collect memory execution stats. - class Stats { + class MemoryStats { public: - Stats() = default; - virtual ~Stats() = default; - - void reset(); + MemoryStats() = default; + virtual ~MemoryStats() = default; std::string debug_string(); int64_t revoke_attempts() { return revoke_attempts_; } int64_t revoke_wait_time_ms() { return revoke_wait_time_ms_; } int64_t revoked_bytes() { return revoked_bytes_; } - - void incr_revoke_attempts(int64_t delta) { revoke_attempts_ + delta; } - void incr_revoke_wait_time_ms(int64_t delta) { return revoke_wait_time_ms_ + delta; } - void incr_revoked_bytes(int64_t delta) { return revoked_bytes_ + delta; } + int64_t max_peak_memory_bytes() { return max_peak_memory_bytes_; } + int64_t current_used_memory_bytes() { return current_used_memory_bytes_; } private: + // Maximum memory peak for all backends. + // only set once by result sink when closing. + std::atomic<int64_t> max_peak_memory_bytes_ = 0; + std::atomic<int64_t> current_used_memory_bytes_ = 0; // The total number of times that the revoke method is called. std::atomic<int64_t> revoke_attempts_ = 0; - // The time that waiting for revoke finished. std::atomic<int64_t> revoke_wait_time_ms_ = 0; - // The revoked bytes std::atomic<int64_t> revoked_bytes_ = 0; }; public: - MemoryController(std::shared_ptr<MemtrackerLimiter> memtracker) + MemoryContext(std::shared_ptr<MemtrackerLimiter> memtracker) : memtracker_limiter_(memtracker) {} - virtual ~MemoryController() = default; + virtual ~MemoryContext() = default; + MemtrackerLimiter* get_memtracker_limiter() {} + + // Following method is related with spill disk. // Compute the number of bytes could be released. virtual int64_t revokable_bytes() { return 0; } @@ -79,13 +80,6 @@ public: virtual Status leave_arbitration(Status reason) { return Status::OK(); } - // Return related workload group if exists, maybe return null if not bind - // to a workload group. - virtual WorkloadGroupPtr workload_group(); - - // Cancel the related task - virtual Status cancel(Status cancel_reason) { return Status::OK(); } - private: // std::weak_ptr<WorkloadGroup> workload_group_; std::weak_ptr<MemtrackerLimiter> memtracker_limiter_; diff --git a/be/src/runtime/workload_group/resource_context.h b/be/src/runtime/workload_management/resource_context.h similarity index 66% rename from be/src/runtime/workload_group/resource_context.h rename to be/src/runtime/workload_management/resource_context.h index aec718ad5d4..7473d6e3afa 100644 --- a/be/src/runtime/workload_group/resource_context.h +++ b/be/src/runtime/workload_management/resource_context.h @@ -30,30 +30,15 @@ namespace doris { -class WorkloadGroupController { +// Any task that allow cancel should implement this class. +class ResourceReclaimer { public: - WorkloadGroupController() = default; - virtual ~WorkloadGroupController() = default; - - virtual Status bind_workload_group() = 0; - virtual WorkloadGroupPtr workload_group() = 0; -}; - -class IOController { - class IOStats {}; -}; - -class CPUController { - class CPUStats {}; -}; - -class NetworkController { - class NetStats {}; + virtual Status cancel(Status cancel_reason) { return Status::OK(); } }; // Every task should have its own resource context. And BE may adjust the resource // context during running. -// ResourceContext will bind to the running thread's thread local when the task is running. +// Workload group will hold the resource context and do some control work. class ResourceContext : public std::enable_shared_from_this<ResourceContext> { ENABLE_FACTORY_CREATOR(ResourceContext); @@ -61,12 +46,16 @@ public: ResourceContext() = default; virtual ~ResourceContext() = default; + void set_workload_group() { + // update all child context's workload group property + } + private: // The controller's init value is nullptr, it means the resource context will ignore this controller. - std::shared_ptr<WorkloadGroupController> _workload_group_controller = nullptr; - std::shared_ptr<MemoryController> _memory_controller = nullptr; - std::shared_ptr<WorkloadGroupController> _workload_group_controller = nullptr; - std::shared_ptr<IOController> _io_controller = nullptr; + std::shared_ptr<CPUContext> _cpu_context = nullptr; + std::shared_ptr<MemoryContext> _memory_context = nullptr; + std::shared_ptr<IOContext> _io_context = nullptr; + std::shared_ptr<ResourceReclaimer> _reclaimer = nullptr; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org