This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3b18b588ba [fix](recycler) Fix CountdownEvent error and hang (#45760)
e3b18b588ba is described below

commit e3b18b588ba0f737cf3a1c802e7cc444ec9d71e8
Author: abmdocrt <lianyuk...@selectdb.com>
AuthorDate: Tue Dec 24 15:16:03 2024 +0800

    [fix](recycler) Fix CountdownEvent error and hang (#45760)
    
    Fix CountdownEvent error "Invoking add_count() after wait() was invoked"
    
    ---------
    
    Co-authored-by: Gavin Chou <gavineaglec...@gmail.com>
---
 cloud/src/common/simple_thread_pool.h |  5 +-
 cloud/src/recycler/recycler.cpp       |  1 -
 cloud/src/recycler/sync_executor.h    | 26 +++++++++-
 cloud/test/util_test.cpp              | 90 +++++++++++++++++++++++++++++++++++
 4 files changed, 118 insertions(+), 4 deletions(-)

diff --git a/cloud/src/common/simple_thread_pool.h 
b/cloud/src/common/simple_thread_pool.h
index e18d6787bf7..37a4cedbdad 100644
--- a/cloud/src/common/simple_thread_pool.h
+++ b/cloud/src/common/simple_thread_pool.h
@@ -19,6 +19,7 @@
 
 #include <atomic>
 #include <condition_variable>
+#include <iostream>
 #include <memory>
 #include <mutex>
 #include <thread>
@@ -154,8 +155,10 @@ private:
             }
             try {
                 job();
+            } catch (const std::exception& e) {
+                std::cerr << "exception happened when execute job. err: " << 
e.what() << std::endl;
             } catch (...) {
-                // do nothing
+                std::cerr << "exception happened when execute job." << 
std::endl;
             }
         }
     }
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 6877d7e433b..ca22b28e031 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -1278,7 +1278,6 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
             LOG_WARNING("failed to recycle tablet").tag("instance_id", 
instance_id_);
             return -1;
         }
-        sync_executor.reset();
         if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0;
         // sort the vector using key's order
         std::sort(tablet_keys.begin(), tablet_keys.end(),
diff --git a/cloud/src/recycler/sync_executor.h 
b/cloud/src/recycler/sync_executor.h
index c84e5e22467..909f36a56c4 100644
--- a/cloud/src/recycler/sync_executor.h
+++ b/cloud/src/recycler/sync_executor.h
@@ -18,10 +18,12 @@
 #pragma once
 
 #include <bthread/countdown_event.h>
+#include <cpp/sync_point.h>
 #include <fmt/core.h>
 #include <gen_cpp/cloud.pb.h>
 #include <glog/logging.h>
 
+#include <chrono>
 #include <future>
 #include <string>
 
@@ -48,10 +50,12 @@ public:
         return *this;
     }
     std::vector<T> when_all(bool* finished) {
+        std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&](int*) { _reset(); });
         timespec current_time;
         auto current_time_second = time(nullptr);
         current_time.tv_sec = current_time_second + 300;
         current_time.tv_nsec = 0;
+        // Wait for all tasks to complete
         while (0 != _count.timed_wait(current_time)) {
             current_time.tv_sec += 300;
             LOG(WARNING) << _name_tag << " has already taken 5 min, cost: "
@@ -65,11 +69,26 @@ public:
                 *finished = false;
                 return res;
             }
-            res.emplace_back((*task).get());
+            size_t max_wait_ms = 10000;
+            TEST_SYNC_POINT_CALLBACK("SyncExecutor::when_all.set_wait_time", 
&max_wait_ms);
+            // _count.timed_wait has already ensured that all tasks are 
completed.
+            // The 10 seconds here is just waiting for the task results to be 
returned,
+            // so 10 seconds is more than enough.
+            auto status = task->wait_for(max_wait_ms);
+            if (status == std::future_status::ready) {
+                res.emplace_back(task->get());
+            } else {
+                *finished = false;
+                LOG(WARNING) << _name_tag << " task timed out after 10 
seconds";
+                return res;
+            }
         }
         return res;
     }
-    void reset() {
+
+private:
+    void _reset() {
+        _count.reset(0);
         _res.clear();
         _stop_token = false;
     }
@@ -98,6 +117,9 @@ private:
             }
             _pro.set_value(std::move(t));
         }
+        std::future_status wait_for(size_t milliseconds) {
+            return _fut.wait_for(std::chrono::milliseconds(milliseconds));
+        }
         bool valid() { return _valid; }
         T get() { return _fut.get(); }
 
diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp
index c88ef555f82..e505b2b99a5 100644
--- a/cloud/test/util_test.cpp
+++ b/cloud/test/util_test.cpp
@@ -18,6 +18,7 @@
 #include "recycler/util.h"
 
 #include <chrono>
+#include <stdexcept>
 #include <string>
 #include <string_view>
 #include <thread>
@@ -28,6 +29,7 @@
 #include "common/logging.h"
 #include "common/simple_thread_pool.h"
 #include "common/string_util.h"
+#include "cpp/sync_point.h"
 #include "gtest/gtest.h"
 #include "recycler/recycler.h"
 #include "recycler/sync_executor.h"
@@ -235,3 +237,91 @@ TEST(UtilTest, normal) {
         std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, 
n); });
     }
 }
+
+TEST(UtilTest, test_add_after_when_all) {
+    auto f = []() {
+        auto pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+        pool->start();
+        SyncExecutor<int> sync_executor(pool, "test add after when all: 
inside",
+                                        [](int k) { return k != 0; });
+        auto f1 = []() { return 0; };
+        sync_executor.add(f1);
+        bool finished = true;
+        std::vector<int> res = sync_executor.when_all(&finished);
+        sync_executor.add(f1);
+        res = sync_executor.when_all(&finished);
+        EXPECT_EQ(1, res.size());
+        EXPECT_EQ(finished, true);
+        std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); 
});
+        return 0;
+    };
+
+    auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    s3_producer_pool->start();
+    SyncExecutor<int> s3_sync_executor(s3_producer_pool, "test add after when 
all: outside",
+                                       [](int k) { return k != 0; });
+    s3_sync_executor.add(f);
+    bool finished = true;
+    std::vector<int> res = s3_sync_executor.when_all(&finished);
+    EXPECT_EQ(1, res.size());
+    EXPECT_EQ(finished, true);
+    std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
+}
+
+TEST(UtilTest, exception) {
+    auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    s3_producer_pool->start();
+    {
+        SyncExecutor<int> sync_executor(s3_producer_pool, "exception test",
+                                        [](int k) { return k != 0; });
+        auto f = []() {
+            throw(std::runtime_error("test exception"));
+            return 1;
+        };
+        sync_executor.add(f);
+        bool finished = true;
+        std::vector<int> res = sync_executor.when_all(&finished);
+        EXPECT_EQ(0, res.size());
+        EXPECT_EQ(finished, false);
+        std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(1, n); 
});
+    }
+}
+
+TEST(UtilTest, test_sync_executor) {
+    auto f = []() {
+        sleep(1);
+        auto pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+        pool->start();
+        SyncExecutor<int> sync_executor(pool, "test sync executor: inside",
+                                        [](int k) { return k != 0; });
+        auto f1 = []() { return 0; };
+        sync_executor.add(f1);
+        bool finished = true;
+        std::vector<int> res = sync_executor.when_all(&finished);
+        sync_executor.add(f1);
+        res = sync_executor.when_all(&finished);
+        EXPECT_EQ(1, res.size());
+        EXPECT_EQ(finished, true);
+        std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); 
});
+        return 0;
+    };
+    std::mutex go_mutex;
+
+    auto* sp = doris::SyncPoint::get_instance();
+    sp->set_call_back("SyncExecutor::when_all.set_wait_time", [&](auto&& args) 
{
+        std::unique_lock<std::mutex> _lock(go_mutex);
+        auto max_wait_time = *doris::try_any_cast<size_t*>(args[0]);
+        max_wait_time = 100;
+    });
+
+    auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    s3_producer_pool->start();
+    SyncExecutor<int> s3_sync_executor(s3_producer_pool, "test sync executor: 
outside",
+                                       [](int k) { return k != 0; });
+    s3_sync_executor.add(f);
+    bool finished = true;
+    std::vector<int> res = s3_sync_executor.when_all(&finished);
+    EXPECT_EQ(1, res.size());
+    EXPECT_EQ(finished, true);
+    std::for_each(res.begin(), res.end(), [](auto&& n) { EXPECT_EQ(0, n); });
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to