platoneko commented on code in PR #37630:
URL: https://github.com/apache/doris/pull/37630#discussion_r1678802435


##########
cloud/src/recycler/s3_accessor.h:
##########
@@ -29,11 +29,14 @@ class S3Client;
 
 namespace doris::cloud {
 class ObjectStoreInfoPB;
+class SimpleThreadPool;
 
 enum class S3RateLimitType;
 extern int reset_s3_rate_limiter(S3RateLimitType type, size_t max_speed, 
size_t max_burst,
                                  size_t limit);
 
+extern std::shared_ptr<SimpleThreadPool> worker_pool;

Review Comment:
   No need to extern this symbol



##########
cloud/src/recycler/s3_accessor.cpp:
##########
@@ -249,7 +250,15 @@ int S3Accessor::create(S3Conf conf, 
std::shared_ptr<S3Accessor>* accessor) {
     return (*accessor)->init();
 }
 
+std::shared_ptr<SimpleThreadPool> worker_pool;

Review Comment:
   static std::shared_ptr<SimpleThreadPool> worker_pool;



##########
cloud/src/recycler/recycler.h:
##########
@@ -83,11 +92,13 @@ class Recycler {
 
     WhiteBlackList instance_filter_;
     std::unique_ptr<Checker> checker_;
+    std::unique_ptr<RecyclerThreadPoolGroup> _thread_pool_group;

Review Comment:
   use `std::shared_ptr`



##########
cloud/src/recycler/util.h:
##########
@@ -17,15 +17,112 @@
 
 #pragma once
 
+#include <bthread/countdown_event.h>
 #include <fmt/core.h>
 #include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
 
+#include <future>
 #include <string>
 
+#include "common/simple_thread_pool.h"
+
 namespace doris::cloud {
 
 class TxnKv;
 
+template <typename T>
+class SyncExecutor {
+public:
+    SyncExecutor(
+            SimpleThreadPool* pool, std::string name_tag,
+            std::function<bool(const T&)> cancel = [](const T& /**/) { return 
false; })
+            : _pool(pool), _cancel(std::move(cancel)), 
_name_tag(std::move(name_tag)) {}
+    auto add(std::function<T()> callback) -> SyncExecutor<T>& {
+        auto task = std::make_unique<Task>(std::move(callback), _cancel, 
_count);
+        _count.add_count();
+        // The actual task logic would be wrapped by one promise and passed to 
the threadpool.
+        // The result would be returned by the future once the task is 
finished.
+        // Or the task would be invalid if the whole task is cancelled.
+        int r = _pool->submit([this, t = task.get()]() { (*t)(_stop_token); });
+        CHECK(r == 0);
+        _res.emplace_back(std::move(task));
+        return *this;
+    }
+    std::vector<T> when_all(bool* finished) {
+        timespec current_time;
+        auto current_time_second = time(nullptr);
+        current_time.tv_sec = current_time_second + 300;
+        current_time.tv_nsec = 0;
+        auto msg = fmt::format("{} has already taken 5 min", _name_tag);
+        while (0 != _count.timed_wait(current_time)) {
+            current_time.tv_sec += 300;
+            LOG(WARNING) << msg;
+        }
+        *finished = !_stop_token;
+        std::vector<T> res;
+        res.reserve(_res.size());
+        for (auto& task : _res) {
+            if (!task->valid()) {
+                *finished = false;
+                return res;
+            }
+            res.emplace_back((*task).get());
+        }
+        return res;
+    }
+    void reset() {
+        _res.clear();
+        _stop_token = false;
+    }
+
+private:
+    class Task {
+    public:
+        Task(std::function<T()> callback, std::function<bool(const T&)> cancel,
+             bthread::CountdownEvent& count)
+                : _callback(std::move(callback)),
+                  _cancel(std::move(cancel)),
+                  _count(count),
+                  _fut(_pro.get_future()) {}
+        void operator()(std::atomic_bool& stop_token) {
+            std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
+                                                                  [&](int*) { 
_count.signal(); });
+            if (stop_token) {
+                _valid = false;
+                return;
+            }
+            T t = _callback();
+            // We'll return this task result to user even if this task return 
error
+            // So we don't set _valid to false here
+            if (_cancel(t)) {
+                stop_token = true;
+            }
+            _pro.set_value(std::move(t));
+        }
+        bool valid() { return _valid; }
+        T get() { return _fut.get(); }
+
+    private:
+        // It's guarantted that the valid function can only be called inside 
SyncExecutor's `when_all()` function
+        // and only be called when the _count.timed_wait function returned. So 
there would be no data race for
+        // _valid then it doesn't need to be one atomic bool.
+        bool _valid = true;
+        std::function<T()> _callback;
+        std::function<bool(const T&)> _cancel;
+        std::promise<T> _pro;
+        bthread::CountdownEvent& _count;
+        std::future<T> _fut;
+    };
+    std::vector<std::unique_ptr<Task>> _res;
+    // use CountdownEvent to do periodically log using 
CountdownEvent::time_wait()
+    bthread::CountdownEvent _count {0};
+    std::atomic_bool _stop_token {false};
+    SimpleThreadPool* _pool;

Review Comment:
   use `shared_ptr<SimpleThreadPool>`



##########
cloud/src/recycler/obj_storage_client.h:
##########
@@ -51,6 +51,12 @@ class ObjectListIterator {
     virtual std::optional<ObjectMeta> next() = 0;
 };
 
+class SimpleThreadPool;
+struct ObjClientIoOptions {

Review Comment:
   rename to ObjClientOptions



##########
cloud/src/recycler/util.h:
##########


Review Comment:
   Consider to extract `SyncExecutor` to sync_executor.h



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to