This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch resource_ctx
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a0f81123e23d5874da13727d47c5d32ae1421769
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Fri Dec 6 20:34:17 2024 +0800

    f
---
 .../runtime/workload_group/memory_controller.cpp   | 58 ----------------
 be/src/runtime/workload_management/cpu_context.h   | 51 ++++++++++++++
 be/src/runtime/workload_management/io_context.h    | 81 ++++++++++++++++++++++
 .../memory_context.h}                              | 38 +++++-----
 .../resource_context.h                             | 35 ++++------
 5 files changed, 160 insertions(+), 103 deletions(-)

diff --git a/be/src/runtime/workload_group/memory_controller.cpp 
b/be/src/runtime/workload_group/memory_controller.cpp
deleted file mode 100644
index 130f95bc59d..00000000000
--- a/be/src/runtime/workload_group/memory_controller.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-
-
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/workload_group/memory_controller.h"
-
-#include <map>
-#include <mutex>
-#include <ostream>
-#include <utility>
-
-#include "common/logging.h"
-
-namespace doris {
-
-void MemoryController::Stats::reset() {
-    revoke_attempts_ = 0;
-    revoke_wait_time_ms_ = 0;
-    revoked_bytes_ = 0;
-}
-
-MemoryController::Stats& MemoryController::Stats::operator+=(const 
MemoryController::Stats& other) {
-    revoke_attempts_ += other.revoke_attempts_;
-    revoke_wait_time_ms_ += other.revoke_wait_time_ms_;
-    revoked_bytes_ += other.revoked_bytes_;
-    return *this;
-}
-
-MemoryController::Stats MemoryController::Stats::operator-(const Stats& other) 
const {
-    Stats result;
-    result.revoke_attempts_ = revoke_attempts_ - other.revoke_attempts_;
-    result.revoke_wait_time_ms_ = revoke_wait_time_ms_ - 
other.revoke_wait_time_ms_;
-    result.revoked_bytes_ = revoked_bytes_ - other.revoked_bytes_;
-    return result;
-}
-
-std::string MemoryController::Stats::debug_string() const {
-    return fmt::format("revoke_attempts_ {} revoke_wait_time_ms_ {} 
revoked_bytes_ {}",
-                       revoke_attempts_, revoke_wait_time_ms_,
-                       PrettyPrinter::print(revoked_bytes_, TUnit::BYTES));
-}
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/cpu_context.h 
b/be/src/runtime/workload_management/cpu_context.h
new file mode 100644
index 00000000000..05b2c94a64e
--- /dev/null
+++ b/be/src/runtime/workload_management/cpu_context.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+class CPUContext : public std::enable_shared_from_this<CPUContext> {
+    ENABLE_FACTORY_CREATOR(CPUContext);
+
+public:
+    // Used to collect cpu execution stats.
+    class CPUStats {
+    public:
+        std::string debug_string();
+        // Should add some cpu stats relared method here.
+    };
+
+public:
+    CPUContext() {}
+    virtual ~CPUContext() = default;
+    // Bind current thread to cgroup, only some load thread should do this.
+    void bind_workload_group() {}
+};
+
+} // namespace doris
diff --git a/be/src/runtime/workload_management/io_context.h 
b/be/src/runtime/workload_management/io_context.h
new file mode 100644
index 00000000000..b6e883264a1
--- /dev/null
+++ b/be/src/runtime/workload_management/io_context.h
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+class IOContext : public std::enable_shared_from_this<IOContext> {
+    ENABLE_FACTORY_CREATOR(IOContext);
+
+public:
+    // Used to collect io execution stats.
+    class IOStats {
+    public:
+        IOStats() = default;
+        virtual ~IOStats() = default;
+        int64_t scan_rows() { return scan_rows_; }
+        int64_t scan_bytes() { return scan_bytes_; }
+        int64_t scan_bytes_from_local_storage() { return 
scan_bytes_from_local_storage_; }
+        int64_t scan_bytes_from_remote_storage() { return 
scan_bytes_from_remote_storage_; }
+        int64_t returned_rows() { return returned_rows_; }
+        int64_t shuffle_send_bytes() { return shuffle_send_bytes_; }
+        int64_t shuffle_send_rows() { return shuffle_send_rows_; }
+
+        int64_t incr_scan_rows(int64_t delta) { return scan_rows_ + delta; }
+        int64_t incr_scan_bytes(int64_t delta) { return scan_bytes_ + delta; }
+        int64_t incr_scan_bytes_from_local_storage(int64_t delta) {
+            return scan_bytes_from_local_storage_ + delta;
+        }
+        int64_t incr_scan_bytes_from_remote_storage(int64_t delta) {
+            return scan_bytes_from_remote_storage_ + delta;
+        }
+        int64_t incr_returned_rows(int64_t delta) { return returned_rows_ + 
delta; }
+        int64_t incr_shuffle_send_bytes(int64_t delta) { return 
shuffle_send_bytes_ + delta; }
+        int64_t incr_shuffle_send_rows(int64_t delta) { return 
shuffle_send_rows_ + delta; }
+        std::string debug_string();
+
+    private:
+        std::atomic<int64_t> scan_rows_ = 0;
+        std::atomic<int64_t> scan_bytes_ = 0;
+        std::atomic<int64_t> scan_bytes_from_local_storage_ = 0;
+        std::atomic<int64_t> scan_bytes_from_remote_storage_ = 0;
+        // number rows returned by query.
+        // only set once by result sink when closing.
+        std::atomic<int64_t> returned_rows_ = 0;
+        std::atomic<int64_t> shuffle_send_bytes_ = 0;
+        std::atomic<int64_t> shuffle_send_rows_ = 0;
+    };
+
+public:
+    IOContext() {}
+    virtual ~IOContext() = default;
+    IOThrottle* get_io_throttle() {}
+};
+
+} // namespace doris
diff --git a/be/src/runtime/workload_group/memory_controller.h 
b/be/src/runtime/workload_management/memory_context.h
similarity index 72%
rename from be/src/runtime/workload_group/memory_controller.h
rename to be/src/runtime/workload_management/memory_context.h
index a27e1b5f60f..7e62c1bc687 100644
--- a/be/src/runtime/workload_group/memory_controller.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -30,43 +30,44 @@
 
 namespace doris {
 
-class MemoryController : public std::enable_shared_from_this<MemoryController> 
{
-    ENABLE_FACTORY_CREATOR(MemoryController);
+class MemoryContext : public std::enable_shared_from_this<MemoryContext> {
+    ENABLE_FACTORY_CREATOR(MemoryContext);
 
 public:
     // Used to collect memory execution stats.
-    class Stats {
+    class MemoryStats {
     public:
-        Stats() = default;
-        virtual ~Stats() = default;
-
-        void reset();
+        MemoryStats() = default;
+        virtual ~MemoryStats() = default;
         std::string debug_string();
         int64_t revoke_attempts() { return revoke_attempts_; }
         int64_t revoke_wait_time_ms() { return revoke_wait_time_ms_; }
         int64_t revoked_bytes() { return revoked_bytes_; }
-
-        void incr_revoke_attempts(int64_t delta) { revoke_attempts_ + delta; }
-        void incr_revoke_wait_time_ms(int64_t delta) { return 
revoke_wait_time_ms_ + delta; }
-        void incr_revoked_bytes(int64_t delta) { return revoked_bytes_ + 
delta; }
+        int64_t max_peak_memory_bytes() { return max_peak_memory_bytes_; }
+        int64_t current_used_memory_bytes() { return 
current_used_memory_bytes_; }
 
     private:
+        // Maximum memory peak for all backends.
+        // only set once by result sink when closing.
+        std::atomic<int64_t> max_peak_memory_bytes_ = 0;
+        std::atomic<int64_t> current_used_memory_bytes_ = 0;
         // The total number of times that the revoke method is called.
         std::atomic<int64_t> revoke_attempts_ = 0;
-
         // The time that waiting for revoke finished.
         std::atomic<int64_t> revoke_wait_time_ms_ = 0;
-
         // The revoked bytes
         std::atomic<int64_t> revoked_bytes_ = 0;
     };
 
 public:
-    MemoryController(std::shared_ptr<MemtrackerLimiter> memtracker)
+    MemoryContext(std::shared_ptr<MemtrackerLimiter> memtracker)
             : memtracker_limiter_(memtracker) {}
 
-    virtual ~MemoryController() = default;
+    virtual ~MemoryContext() = default;
 
+    MemtrackerLimiter* get_memtracker_limiter() {}
+
+    // Following method is related with spill disk.
     // Compute the number of bytes could be released.
     virtual int64_t revokable_bytes() { return 0; }
 
@@ -79,13 +80,6 @@ public:
 
     virtual Status leave_arbitration(Status reason) { return Status::OK(); }
 
-    // Return related workload group if exists, maybe return null if not bind
-    // to a workload group.
-    virtual WorkloadGroupPtr workload_group();
-
-    // Cancel the related task
-    virtual Status cancel(Status cancel_reason) { return Status::OK(); }
-
 private:
     // std::weak_ptr<WorkloadGroup> workload_group_;
     std::weak_ptr<MemtrackerLimiter> memtracker_limiter_;
diff --git a/be/src/runtime/workload_group/resource_context.h 
b/be/src/runtime/workload_management/resource_context.h
similarity index 66%
rename from be/src/runtime/workload_group/resource_context.h
rename to be/src/runtime/workload_management/resource_context.h
index aec718ad5d4..7473d6e3afa 100644
--- a/be/src/runtime/workload_group/resource_context.h
+++ b/be/src/runtime/workload_management/resource_context.h
@@ -30,30 +30,15 @@
 
 namespace doris {
 
-class WorkloadGroupController {
+// Any task that allow cancel should implement this class.
+class ResourceReclaimer {
 public:
-    WorkloadGroupController() = default;
-    virtual ~WorkloadGroupController() = default;
-
-    virtual Status bind_workload_group() = 0;
-    virtual WorkloadGroupPtr workload_group() = 0;
-};
-
-class IOController {
-    class IOStats {};
-};
-
-class CPUController {
-    class CPUStats {};
-};
-
-class NetworkController {
-    class NetStats {};
+    virtual Status cancel(Status cancel_reason) { return Status::OK(); }
 };
 
 // Every task should have its own resource context. And BE may adjust the 
resource
 // context during running.
-// ResourceContext will bind to the running thread's thread local when the 
task is running.
+// Workload group will hold the resource context and do some control work.
 class ResourceContext : public std::enable_shared_from_this<ResourceContext> {
     ENABLE_FACTORY_CREATOR(ResourceContext);
 
@@ -61,12 +46,16 @@ public:
     ResourceContext() = default;
     virtual ~ResourceContext() = default;
 
+    void set_workload_group() {
+        // update all child context's workload group property
+    }
+
 private:
     // The controller's init value is nullptr, it means the resource context 
will ignore this controller.
-    std::shared_ptr<WorkloadGroupController> _workload_group_controller = 
nullptr;
-    std::shared_ptr<MemoryController> _memory_controller = nullptr;
-    std::shared_ptr<WorkloadGroupController> _workload_group_controller = 
nullptr;
-    std::shared_ptr<IOController> _io_controller = nullptr;
+    std::shared_ptr<CPUContext> _cpu_context = nullptr;
+    std::shared_ptr<MemoryContext> _memory_context = nullptr;
+    std::shared_ptr<IOContext> _io_context = nullptr;
+    std::shared_ptr<ResourceReclaimer> _reclaimer = nullptr;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to