This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b0ad66  [runtime] Replace the thread pool in FragmentMgr (#4057)
9b0ad66 is described below

commit 9b0ad66b78c8860de16d2bb0df41f53b5f0a2359
Author: HuangWei <huang...@apache.org>
AuthorDate: Wed Jul 15 10:03:48 2020 +0800

    [runtime] Replace the thread pool in FragmentMgr (#4057)
---
 be/src/common/config.h                    |   5 +-
 be/src/runtime/fragment_mgr.cpp           | 114 ++++++++++++++----------------
 be/src/runtime/fragment_mgr.h             |   4 +-
 be/src/runtime/plan_fragment_executor.cpp |   4 ++
 be/src/runtime/plan_fragment_executor.h   |   5 ++
 be/test/runtime/fragment_mgr_test.cpp     |  36 +++++++++-
 6 files changed, 99 insertions(+), 69 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3adf389..4915732 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -322,8 +322,9 @@ namespace config {
     CONF_mInt32(olap_table_sink_send_interval_ms, "10");
 
     // Fragment thread pool
-    CONF_Int32(fragment_pool_thread_num, "64");
-    CONF_Int32(fragment_pool_queue_size, "1024");
+    CONF_Int32(fragment_pool_thread_num_min, "64");
+    CONF_Int32(fragment_pool_thread_num_max, "512");
+    CONF_Int32(fragment_pool_queue_size, "2048");
 
     //for cast
     // CONF_Bool(cast, "true");
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 30ec73b..9413f82 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -23,30 +23,32 @@
 #include <gperftools/profiler.h>
 #include <boost/bind.hpp>
 
-#include <thrift/protocol/TDebugProtocol.h>
 #include "agent/cgroups_mgr.h"
 #include "common/object_pool.h"
 #include "common/resource_tls.h"
-#include "gen_cpp/DataSinks_types.h"
-#include "gen_cpp/FrontendService.h"
-#include "gen_cpp/HeartbeatService.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "gen_cpp/QueryPlanExtra_types.h"
-#include "gen_cpp/Types_types.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/client_cache.h"
-#include "runtime/datetime_value.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/plan_fragment_executor.h"
 #include "service/backend_options.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/exec_env.h"
+#include "runtime/datetime_value.h"
+#include "util/stopwatch.hpp"
 #include "util/debug_util.h"
 #include "util/doris_metrics.h"
-#include "util/stopwatch.hpp"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
 #include "util/url_coding.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "gen_cpp/HeartbeatService.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+#include "gen_cpp/DataSinks_types.h"
+#include "gen_cpp/Types_types.h"
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/QueryPlanExtra_types.h"
+#include <thrift/protocol/TDebugProtocol.h>
+#include "util/threadpool.h"
+#include "gutil/strings/substitute.h"
 
 namespace doris {
 
@@ -87,6 +89,8 @@ public:
 
     Status execute();
 
+    Status cancel_before_execute();
+
     Status cancel(const PPlanFragmentCancelReason& reason);
 
     TUniqueId fragment_instance_id() const {
@@ -219,6 +223,13 @@ Status FragmentExecState::execute() {
     return Status::OK();
 }
 
+Status FragmentExecState::cancel_before_execute() {
+    // set status as 'abort', cuz cancel() won't effect the status arg of 
DataSink::close().
+    _executor.set_abort();
+    _executor.cancel();
+    return Status::OK();
+}
+
 Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason) {
     std::lock_guard<std::mutex> l(_status_lock);
     RETURN_IF_ERROR(_exec_status);
@@ -364,26 +375,31 @@ void FragmentExecState::coordinator_callback(
     }
 }
 
-FragmentMgr::FragmentMgr(ExecEnv* exec_env) :
-        _exec_env(exec_env),
-        _fragment_map(),
-        _stop(false),
-        _cancel_thread(std::bind<void>(&FragmentMgr::cancel_worker, this)),
-        // TODO(zc): we need a better thread-pool
-        // now one user can use all the thread pool, others have no resource.
-        _thread_pool(config::fragment_pool_thread_num, 
config::fragment_pool_queue_size) {
+FragmentMgr::FragmentMgr(ExecEnv* exec_env)
+        : _exec_env(exec_env),
+          _fragment_map(),
+          _stop(false),
+          _cancel_thread(std::bind<void>(&FragmentMgr::cancel_worker, this)) {
     REGISTER_GAUGE_DORIS_METRIC(plan_fragment_count, [this]() {
         std::lock_guard<std::mutex> lock(_lock);
         return _fragment_map.size();
     });
+    // TODO(zc): we need a better thread-pool
+    // now one user can use all the thread pool, others have no resource.
+    ThreadPoolBuilder("FragmentMgrThreadPool")
+            .set_min_threads(config::fragment_pool_thread_num_min)
+            .set_max_threads(config::fragment_pool_thread_num_max)
+            .set_max_queue_size(config::fragment_pool_queue_size)
+            .build(&_thread_pool);
 }
 
 FragmentMgr::~FragmentMgr() {
     // stop thread
     _stop = true;
     _cancel_thread.join();
-    // Stop all the worker
-    _thread_pool.drain_and_shutdown();
+    // Stop all the worker, should wait for a while?
+    // _thread_pool->wait_for();
+    _thread_pool->shutdown();
 
     // Only me can delete
     {
@@ -421,13 +437,6 @@ Status FragmentMgr::exec_plan_fragment(
     return exec_plan_fragment(params, std::bind<void>(&empty_function, 
std::placeholders::_1));
 }
 
-static void* fragment_executor(void* param) {
-    PriorityThreadPool::WorkFunction* func = 
(PriorityThreadPool::WorkFunction*)param;
-    (*func)();
-    delete func;
-    return nullptr;
-}
-
 Status FragmentMgr::exec_plan_fragment(
         const TExecPlanFragmentParams& params,
         FinishCallback cb) {
@@ -448,7 +457,7 @@ Status FragmentMgr::exec_plan_fragment(
             _exec_env,
             params.coord));
     RETURN_IF_ERROR(exec_state->prepare(params));
-    bool use_pool = true;
+
     {
         std::lock_guard<std::mutex> lock(_lock);
         auto iter = _fragment_map.find(fragment_instance_id);
@@ -458,38 +467,19 @@ Status FragmentMgr::exec_plan_fragment(
         }
         // register exec_state before starting exec thread
         _fragment_map.insert(std::make_pair(fragment_instance_id, exec_state));
-
-        // Now, we the fragement is
-        if (_fragment_map.size() >= config::fragment_pool_thread_num) {
-            use_pool = false;
-        }
     }
 
-    if (use_pool) {
-        if (!_thread_pool.offer(
-                boost::bind<void>(&FragmentMgr::exec_actual, this, exec_state, 
cb))) {
-            {
-                // Remove the exec state added
-                std::lock_guard<std::mutex> lock(_lock);
-                _fragment_map.erase(fragment_instance_id);
-            }
-            return Status::InternalError("Put planfragment to failed.");
-        }
-    } else {
-        pthread_t id;
-        int ret = pthread_create(&id,
-                       nullptr,
-                       fragment_executor,
-                       new PriorityThreadPool::WorkFunction(
-                           std::bind<void>(&FragmentMgr::exec_actual, this, 
exec_state, cb)));
-        if (ret != 0) {
-            std::string err_msg("Could not create thread.");
-            err_msg.append(strerror(ret));
-            err_msg.append(",");
-            err_msg.append(std::to_string(ret));
-            return Status::InternalError(err_msg);
+    auto st = _thread_pool->submit_func(
+            std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb));
+    if (!st.ok()) {
+        {
+            // Remove the exec state added
+            std::lock_guard<std::mutex> lock(_lock);
+            _fragment_map.erase(fragment_instance_id);
         }
-        pthread_detach(id);
+        exec_state->cancel_before_execute();
+        return Status::InternalError(strings::Substitute(
+                "Put planfragment to thread pool failed. err = $0", 
st.get_error_msg()));
     }
 
     return Status::OK();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 88ad8da..70c430c 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -29,7 +29,6 @@
 #include "gen_cpp/DorisExternalService_types.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
-#include "util/priority_thread_pool.hpp"
 #include "util/hash_util.hpp"
 #include "http/rest_monitor_iface.h"
 
@@ -40,6 +39,7 @@ class FragmentExecState;
 class TExecPlanFragmentParams;
 class TUniqueId;
 class PlanFragmentExecutor;
+class ThreadPool;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
@@ -90,7 +90,7 @@ private:
     bool _stop;
     std::thread _cancel_thread;
     // every job is a pool
-    PriorityThreadPool _thread_pool;
+    std::unique_ptr<ThreadPool> _thread_pool;
 };
 
 }
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index de076eb..d55dd19 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -510,6 +510,10 @@ void PlanFragmentExecutor::cancel() {
     
_runtime_state->exec_env()->result_mgr()->cancel(_runtime_state->fragment_instance_id());
 }
 
+void PlanFragmentExecutor::set_abort() {
+    update_status(Status::Aborted("Execution aborted before start"));
+}
+
 const RowDescriptor& PlanFragmentExecutor::row_desc() {
     return _plan->row_desc();
 }
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 2251ea4..bd4b0a2 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -114,6 +114,11 @@ public:
     // in open()/get_next().
     void close();
 
+    // Abort this execution. Must be called if we skip running open().
+    // It will let DataSink node closed with error status, to avoid use 
resources which created in open() phase.
+    // DataSink node should distinguish Aborted status from other error status.
+    void set_abort();
+
     // Initiate cancellation. Must not be called until after prepare() 
returned.
     void cancel();
 
diff --git a/be/test/runtime/fragment_mgr_test.cpp 
b/be/test/runtime/fragment_mgr_test.cpp
index 683ee59..a8b0ce7 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -29,6 +29,7 @@ namespace doris {
 
 static Status s_prepare_status;
 static Status s_open_status;
+static int s_abort_cnt;
 // Mock used for this unittest
 PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
                                            const report_status_callback& 
report_status_cb)
@@ -47,6 +48,11 @@ Status PlanFragmentExecutor::open() {
 
 void PlanFragmentExecutor::cancel() {}
 
+void PlanFragmentExecutor::set_abort() {
+    LOG(INFO) << "Plan Aborted";
+    s_abort_cnt++;
+}
+
 void PlanFragmentExecutor::close() {}
 
 class FragmentMgrTest : public testing::Test {
@@ -57,9 +63,9 @@ protected:
     virtual void SetUp() {
         s_prepare_status = Status::OK();
         s_open_status = Status::OK();
-        LOG(INFO) << "fragment_pool_thread_num=" << 
config::fragment_pool_thread_num
-                  << ", pool_size=" << config::fragment_pool_queue_size;
-        config::fragment_pool_thread_num = 32;
+
+        config::fragment_pool_thread_num_min = 32;
+        config::fragment_pool_thread_num_max = 32;
         config::fragment_pool_queue_size = 1024;
     }
     virtual void TearDown() {}
@@ -117,6 +123,30 @@ TEST_F(FragmentMgrTest, PrepareFailed) {
     ASSERT_FALSE(mgr.exec_plan_fragment(params).ok());
 }
 
+TEST_F(FragmentMgrTest, OfferPoolFailed) {
+    config::fragment_pool_thread_num_min = 1;
+    config::fragment_pool_thread_num_max = 1;
+    config::fragment_pool_queue_size = 0;
+    s_abort_cnt = 0;
+    FragmentMgr mgr(nullptr);
+
+    TExecPlanFragmentParams params;
+    params.params.fragment_instance_id = TUniqueId();
+    params.params.fragment_instance_id.__set_hi(100);
+    params.params.fragment_instance_id.__set_lo(200);
+    ASSERT_TRUE(mgr.exec_plan_fragment(params).ok());
+
+    // the first plan open will cost 50ms, so the next 3 plans will be aborted.
+    for (int i = 1; i < 4; ++i) {
+        TExecPlanFragmentParams params;
+        params.params.fragment_instance_id = TUniqueId();
+        params.params.fragment_instance_id.__set_hi(100 + i);
+        params.params.fragment_instance_id.__set_lo(200);
+        ASSERT_FALSE(mgr.exec_plan_fragment(params).ok());
+    }
+    ASSERT_EQ(3, s_abort_cnt);
+}
+
 } // namespace doris
 
 int main(int argc, char** argv) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to