This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch resource_ctx
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a8b41b6cac8926cfad8ca5d41bbbf1ad701aa0e0
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Mon Dec 2 20:25:55 2024 +0800

    first pr for resource context
---
 .../runtime/workload_group/memory_controller.cpp   | 58 +++++++++++++
 be/src/runtime/workload_group/memory_controller.h  | 94 ++++++++++++++++++++++
 be/src/runtime/workload_group/resource_context.h   | 70 ++++++++++++++++
 3 files changed, 222 insertions(+)

diff --git a/be/src/runtime/workload_group/memory_controller.cpp 
b/be/src/runtime/workload_group/memory_controller.cpp
new file mode 100644
index 00000000000..130f95bc59d
--- /dev/null
+++ b/be/src/runtime/workload_group/memory_controller.cpp
@@ -0,0 +1,58 @@
+
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/workload_group/memory_controller.h"
+
+#include <map>
+#include <mutex>
+#include <ostream>
+#include <utility>
+
+#include "common/logging.h"
+
+namespace doris {
+
+void MemoryController::Stats::reset() {
+    revoke_attempts_ = 0;
+    revoke_wait_time_ms_ = 0;
+    revoked_bytes_ = 0;
+}
+
+MemoryController::Stats& MemoryController::Stats::operator+=(const 
MemoryController::Stats& other) {
+    revoke_attempts_ += other.revoke_attempts_;
+    revoke_wait_time_ms_ += other.revoke_wait_time_ms_;
+    revoked_bytes_ += other.revoked_bytes_;
+    return *this;
+}
+
+MemoryController::Stats MemoryController::Stats::operator-(const Stats& other) 
const {
+    Stats result;
+    result.revoke_attempts_ = revoke_attempts_ - other.revoke_attempts_;
+    result.revoke_wait_time_ms_ = revoke_wait_time_ms_ - 
other.revoke_wait_time_ms_;
+    result.revoked_bytes_ = revoked_bytes_ - other.revoked_bytes_;
+    return result;
+}
+
+std::string MemoryController::Stats::debug_string() const {
+    return fmt::format("revoke_attempts_ {} revoke_wait_time_ms_ {} 
revoked_bytes_ {}",
+                       revoke_attempts_, revoke_wait_time_ms_,
+                       PrettyPrinter::print(revoked_bytes_, TUnit::BYTES));
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_group/memory_controller.h 
b/be/src/runtime/workload_group/memory_controller.h
new file mode 100644
index 00000000000..a27e1b5f60f
--- /dev/null
+++ b/be/src/runtime/workload_group/memory_controller.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+class MemoryController : public std::enable_shared_from_this<MemoryController> 
{
+    ENABLE_FACTORY_CREATOR(MemoryController);
+
+public:
+    // Used to collect memory execution stats.
+    class Stats {
+    public:
+        Stats() = default;
+        virtual ~Stats() = default;
+
+        void reset();
+        std::string debug_string();
+        int64_t revoke_attempts() { return revoke_attempts_; }
+        int64_t revoke_wait_time_ms() { return revoke_wait_time_ms_; }
+        int64_t revoked_bytes() { return revoked_bytes_; }
+
+        void incr_revoke_attempts(int64_t delta) { revoke_attempts_ + delta; }
+        void incr_revoke_wait_time_ms(int64_t delta) { return 
revoke_wait_time_ms_ + delta; }
+        void incr_revoked_bytes(int64_t delta) { return revoked_bytes_ + 
delta; }
+
+    private:
+        // The total number of times that the revoke method is called.
+        std::atomic<int64_t> revoke_attempts_ = 0;
+
+        // The time that waiting for revoke finished.
+        std::atomic<int64_t> revoke_wait_time_ms_ = 0;
+
+        // The revoked bytes
+        std::atomic<int64_t> revoked_bytes_ = 0;
+    };
+
+public:
+    MemoryController(std::shared_ptr<MemtrackerLimiter> memtracker)
+            : memtracker_limiter_(memtracker) {}
+
+    virtual ~MemoryController() = default;
+
+    // Compute the number of bytes could be released.
+    virtual int64_t revokable_bytes() { return 0; }
+
+    virtual bool ready_do_revoke() { return true; }
+
+    // Begin to do revoke memory task.
+    virtual Status revoke(int64_t bytes) { return Status::OK(); }
+
+    virtual Status enter_arbitration(Status reason) { return Status::OK(); }
+
+    virtual Status leave_arbitration(Status reason) { return Status::OK(); }
+
+    // Return related workload group if exists, maybe return null if not bind
+    // to a workload group.
+    virtual WorkloadGroupPtr workload_group();
+
+    // Cancel the related task
+    virtual Status cancel(Status cancel_reason) { return Status::OK(); }
+
+private:
+    // std::weak_ptr<WorkloadGroup> workload_group_;
+    std::weak_ptr<MemtrackerLimiter> memtracker_limiter_;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/workload_group/resource_context.h 
b/be/src/runtime/workload_group/resource_context.h
new file mode 100644
index 00000000000..a492ad31a19
--- /dev/null
+++ b/be/src/runtime/workload_group/resource_context.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+class WorkloadGroupController {
+public:
+    WorkloadGroupController() = default;
+    virtual ~WorkloadGroupController() = default;
+
+    virtual Status bind_workload_group() = 0;
+    virtual WorkloadGroupPtr workload_group() = 0;
+};
+
+class IOController {
+    class IOStats {};
+};
+
+class CPUController {
+    class CPUStats {};
+};
+
+class NetworkController {
+    class NetStats {};
+};
+
+// Every task should have its own resource context. And BE may adjust the 
resource
+// context during running.
+// ResourceContext will bind to the running thread's thread local when the 
task is running.
+class ResourceContext : public std::enable_shared_from_this<ResourceContext> {
+    ENABLE_FACTORY_CREATOR(ResourceContext);
+
+public:
+    ResourceContext() = default;
+    virtual ~ResourceContext() = default;
+
+private:
+    // The controller's init value is nullptr, it means the resource context 
will ignore this controller.
+    std::shared_ptr<WorkloadGroupController> _workload_group_controller = 
nullptr;
+    std::shared_ptr<MemoryController> _memory_controller = nullptr;
+};
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to