tberghammer updated this revision to Diff 37498.
tberghammer added a comment.

Addressing comments from the discussion with destroying the treads not in use 
while keeping a global thread pool with at most hardware_concurrency threads.

IMO this update also simplifies the implementation of the ThreadPool with 
removing the conditional_variable.

A possible future improvement is to destroy the threads only if they are idle 
for a given time (e.g. 50 ms) so we still don't have them hanging around while 
we can avoid the possibly unnecessary thread creation and destroyation in case 
the tasks coming in slowly.


http://reviews.llvm.org/D13727

Files:
  include/lldb/Utility/TaskPool.h
  source/Utility/CMakeLists.txt
  source/Utility/TaskPool.cpp
  unittests/Utility/CMakeLists.txt
  unittests/Utility/TaskPoolTest.cpp

Index: unittests/Utility/TaskPoolTest.cpp
===================================================================
--- /dev/null
+++ unittests/Utility/TaskPoolTest.cpp
@@ -0,0 +1,62 @@
+#include "gtest/gtest.h"
+
+#include "lldb/Utility/TaskPool.h"
+
+TEST (TaskPoolTest, AddTask)
+{
+    auto fn = [](int x) { return x * x + 1; };
+
+    auto f1 = TaskPool::AddTask(fn, 1);
+    auto f2 = TaskPool::AddTask(fn, 2);
+    auto f3 = TaskPool::AddTask(fn, 3);
+    auto f4 = TaskPool::AddTask(fn, 4);
+
+    ASSERT_EQ (10, f3.get());
+    ASSERT_EQ ( 2, f1.get());
+    ASSERT_EQ (17, f4.get());
+    ASSERT_EQ ( 5, f2.get());
+}
+
+TEST (TaskPoolTest, RunTasks)
+{
+    std::vector<int> r(4);
+
+    auto fn = [](int x, int& y) { y = x * x + 1; };
+    
+    TaskPool::RunTasks(
+        [fn, &r]() { fn(1, r[0]); },
+        [fn, &r]() { fn(2, r[1]); },
+        [fn, &r]() { fn(3, r[2]); },
+        [fn, &r]() { fn(4, r[3]); }
+    );
+
+    ASSERT_EQ ( 2, r[0]);
+    ASSERT_EQ ( 5, r[1]);
+    ASSERT_EQ (10, r[2]);
+    ASSERT_EQ (17, r[3]);
+}
+
+TEST (TaskPoolTest, TaskRunner)
+{
+    auto fn = [](int x) { return std::make_pair(x, x * x); };
+
+    TaskRunner<std::pair<int, int>> tr;
+    tr.AddTask(fn, 1);
+    tr.AddTask(fn, 2);
+    tr.AddTask(fn, 3);
+    tr.AddTask(fn, 4);
+
+    int count = 0;
+    while (true)
+    {
+        auto f = tr.WaitForNextCompletedTask();
+        if (!f.valid())
+            break;
+
+        ++count;
+        std::pair<int, int> v = f.get();
+        ASSERT_EQ (v.first * v.first, v.second);
+    }
+
+    ASSERT_EQ(4, count);
+}
Index: unittests/Utility/CMakeLists.txt
===================================================================
--- unittests/Utility/CMakeLists.txt
+++ unittests/Utility/CMakeLists.txt
@@ -1,4 +1,5 @@
 add_lldb_unittest(UtilityTests
   StringExtractorTest.cpp
+  TaskPoolTest.cpp
   UriParserTest.cpp
   )
Index: source/Utility/TaskPool.cpp
===================================================================
--- /dev/null
+++ source/Utility/TaskPool.cpp
@@ -0,0 +1,88 @@
+//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include "lldb/Utility/TaskPool.h"
+
+namespace
+{
+    class TaskPoolImpl
+    {
+    public:
+        static TaskPoolImpl&
+        GetInstance();
+
+        void
+        AddTask(std::function<void()>&& task_fn);
+
+    private:
+        TaskPoolImpl(uint32_t num_threads);
+
+        static void
+        Worker(TaskPoolImpl* pool);
+
+        std::queue<std::function<void()>> m_tasks;
+        std::mutex                        m_tasks_mutex;
+        uint32_t                          m_thread_count;
+    };
+
+} // end of anonymous namespace
+
+TaskPoolImpl&
+TaskPoolImpl::GetInstance()
+{
+    static TaskPoolImpl g_task_pool_impl(std::thread::hardware_concurrency());
+    return g_task_pool_impl;
+}
+
+void
+TaskPool::AddTaskImpl(std::function<void()>&& task_fn)
+{
+    TaskPoolImpl::GetInstance().AddTask(std::move(task_fn));
+}
+
+TaskPoolImpl::TaskPoolImpl(uint32_t num_threads) :
+    m_thread_count(0)
+{
+}
+
+void
+TaskPoolImpl::AddTask(std::function<void()>&& task_fn)
+{
+    static const uint32_t max_threads = std::thread::hardware_concurrency();
+
+    std::unique_lock<std::mutex> lock(m_tasks_mutex);
+    m_tasks.emplace(std::move(task_fn));
+    if (m_thread_count < max_threads)
+    {
+        m_thread_count++;
+        lock.unlock();
+
+        std::thread (Worker, this).detach();
+    }
+}
+
+void
+TaskPoolImpl::Worker(TaskPoolImpl* pool)
+{
+    while (true)
+    {
+        std::unique_lock<std::mutex> lock(pool->m_tasks_mutex);
+        if (pool->m_tasks.empty())
+        {
+            pool->m_thread_count--;
+            break;
+        }
+
+        std::function<void()> f = pool->m_tasks.front();
+        pool->m_tasks.pop();
+        lock.unlock();
+
+        f();
+    }
+}
Index: source/Utility/CMakeLists.txt
===================================================================
--- source/Utility/CMakeLists.txt
+++ source/Utility/CMakeLists.txt
@@ -14,6 +14,7 @@
   StringExtractor.cpp
   StringExtractorGDBRemote.cpp
   StringLexer.cpp
+  TaskPool.cpp
   TimeSpecTimeout.cpp
   UriParser.cpp
   )
Index: include/lldb/Utility/TaskPool.h
===================================================================
--- /dev/null
+++ include/lldb/Utility/TaskPool.h
@@ -0,0 +1,196 @@
+//===--------------------- TaskPool.h ---------------------------*- C++ -*-===//
+//
+//                     The LLVM Compiler Infrastructure
+//
+// This file is distributed under the University of Illinois Open Source
+// License. See LICENSE.TXT for details.
+//
+//===----------------------------------------------------------------------===//
+
+#include <cassert>
+#include <cstdint>
+#include <future>
+#include <list>
+#include <queue>
+#include <thread>
+#include <vector>
+
+// Global TaskPool class for running tasks in parallel on a set of worker thread created the first
+// time the task pool is used. The TaskPool provide no gurantee about the order the task will be run
+// and about what tasks will run in parrallel. Non of the task added to the task pool should block
+// on something (mutex, future, condition variable) what will be set only by the completion of an
+// other task on the task pool as they may run on the same thread sequentally.
+class TaskPool
+{
+public:
+    // Add a new task to the thread pool and return a std::future belonging to the newly created
+    // task. The caller of this function has to wait on the future for this task to complete.
+    template<typename F, typename... Args>
+    static std::future<typename std::result_of<F(Args...)>::type>
+    AddTask(F&& f, Args&&... args);
+
+    // Run all of the specified tasks on the thread pool and wait until all of them are finished
+    // before returning. This method is intended to be used for small number tasks where listing
+    // them as function arguments is acceptable. For running large number of tasks you should use
+    // AddTask for each task and then call wait() on each returned future.
+    template<typename... T>
+    static void
+    RunTasks(T&&... tasks);
+
+private:
+    TaskPool() = delete;
+
+    template<typename... T>
+    struct RunTaskImpl;
+
+    static void
+    AddTaskImpl(std::function<void()>&& task_fn);
+};
+
+// Wrapper class around the global TaskPool implementation to make it possible to create a set of
+// tasks and then wait for the tasks to be completed by the WaitForNextCompletedTask call. This
+// class should be used when WaitForNextCompletedTask is needed because this class add no other
+// extra functionality to the TaskPool class and it have a very minor performance overhead.
+template <typename T> // The return type of the tasks what will be added to this task runner
+class TaskRunner
+{
+public:
+    // Add a task to the task runner what will also add the task to the global TaskPool. The
+    // function don't return the std::future for the task because it will be supplied by the
+    // WaitForNextCompletedTask after the task is completed.
+    template<typename F, typename... Args>
+    void
+    AddTask(F&& f, Args&&... args);
+
+    // Wait for the next task in this task runner to finish and then return the std::future what
+    // belongs to the finished task. If there is no task in this task runner (neither pending nor
+    // comleted) then this function will return an invalid future. Usually this function should be
+    // called in a loop processing the results of the tasks until it returns an invalid std::future
+    // what means that all task in this task runner is completed.
+    std::future<T>
+    WaitForNextCompletedTask();
+
+    // Convenience method to wait for all task in this TaskRunner to finish. Do NOT use this class
+    // just because of this method. Use TaskPool instead and wait for each std::future returned by
+    // AddTask in a loop.
+    void
+    WaitForAllTasks();
+
+private:
+    std::list<std::future<T>> m_ready;
+    std::list<std::future<T>> m_pending;
+    std::mutex                m_mutex;
+    std::condition_variable   m_cv;
+};
+
+template<typename F, typename... Args>
+std::future<typename std::result_of<F(Args...)>::type>
+TaskPool::AddTask(F&& f, Args&&... args)
+{
+    auto task_sp = std::make_shared<std::packaged_task<typename std::result_of<F(Args...)>::type()>>(
+        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
+
+    AddTaskImpl([task_sp]() { (*task_sp)(); });
+
+    return task_sp->get_future();
+}
+
+template<typename... T>
+void
+TaskPool::RunTasks(T&&... tasks)
+{
+    RunTaskImpl<T...>::Run(std::forward<T>(tasks)...);
+}
+
+template<typename Head, typename... Tail>
+struct TaskPool::RunTaskImpl<Head, Tail...>
+{
+    static void
+    Run(Head&& h, Tail&&... t)
+    {
+        auto f = AddTask(std::forward<Head>(h));
+        RunTaskImpl<Tail...>::Run(std::forward<Tail>(t)...);
+        f.wait();
+    }
+};
+
+template<>
+struct TaskPool::RunTaskImpl<>
+{
+    static void
+    Run() {}
+};
+
+template <typename T>
+template<typename F, typename... Args>
+void
+TaskRunner<T>::AddTask(F&& f, Args&&... args)
+{
+    std::unique_lock<std::mutex> lock(m_mutex);
+    auto it = m_pending.emplace(m_pending.end());
+    *it = std::move(TaskPool::AddTask(
+        [this, it](F f, Args... args)
+        {
+            T&& r = f(std::forward<Args>(args)...);
+
+            std::unique_lock<std::mutex> lock(this->m_mutex);
+            this->m_ready.emplace_back(std::move(*it));
+            this->m_pending.erase(it);
+            lock.unlock();
+
+            this->m_cv.notify_one();
+            return r;
+        },
+        std::forward<F>(f),
+        std::forward<Args>(args)...));
+}
+
+template <>
+template<typename F, typename... Args>
+void
+TaskRunner<void>::AddTask(F&& f, Args&&... args)
+{
+    std::unique_lock<std::mutex> lock(m_mutex);
+    auto it = m_pending.emplace(m_pending.end());
+    *it = std::move(TaskPool::AddTask(
+        [this, it](F f, Args... args)
+        {
+            f(std::forward<Args>(args)...);
+
+            std::unique_lock<std::mutex> lock(this->m_mutex);
+            this->m_ready.emplace_back(std::move(*it));
+            this->m_pending.erase(it);
+            lock.unlock();
+
+            this->m_cv.notify_one();
+        },
+        std::forward<F>(f),
+        std::forward<Args>(args)...));
+}
+
+template <typename T>
+std::future<T>
+TaskRunner<T>::WaitForNextCompletedTask()
+{
+    std::unique_lock<std::mutex> lock(m_mutex);
+    if (m_ready.empty() && m_pending.empty())
+        return std::future<T>(); // No more tasks
+
+    if (m_ready.empty())
+        m_cv.wait(lock, [this](){ return !this->m_ready.empty(); });
+
+    std::future<T> res = std::move(m_ready.front());
+    m_ready.pop_front();
+    
+    lock.unlock();
+    res.wait();
+
+    return std::move(res);
+}
+
+template <typename T>
+void
+TaskRunner<T>::WaitForAllTasks()
+{
+    while (WaitForNextCompletedTask().valid());
+}
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to