scott.smith created this revision. scott.smith added a project: LLDB. Herald added a subscriber: mgorny.
Remove the thread pool and for_each-like iteration functions. Keep RunTasks, which has no analog in llvm::parallel, but implement it using llvm::parallel. Repository: rL LLVM https://reviews.llvm.org/D33246 Files: include/lldb/Utility/TaskPool.h source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp source/Utility/CMakeLists.txt source/Utility/TaskPool.cpp unittests/Utility/TaskPoolTest.cpp
Index: unittests/Utility/TaskPoolTest.cpp =================================================================== --- unittests/Utility/TaskPoolTest.cpp +++ unittests/Utility/TaskPoolTest.cpp @@ -2,20 +2,6 @@ #include "lldb/Utility/TaskPool.h" -TEST(TaskPoolTest, AddTask) { - auto fn = [](int x) { return x * x + 1; }; - - auto f1 = TaskPool::AddTask(fn, 1); - auto f2 = TaskPool::AddTask(fn, 2); - auto f3 = TaskPool::AddTask(fn, 3); - auto f4 = TaskPool::AddTask(fn, 4); - - ASSERT_EQ(10, f3.get()); - ASSERT_EQ(2, f1.get()); - ASSERT_EQ(17, f4.get()); - ASSERT_EQ(5, f2.get()); -} - TEST(TaskPoolTest, RunTasks) { std::vector<int> r(4); @@ -29,15 +15,3 @@ ASSERT_EQ(10, r[2]); ASSERT_EQ(17, r[3]); } - -TEST(TaskPoolTest, TaskMap) { - int data[4]; - auto fn = [&data](int x) { data[x] = x * x; }; - - TaskMapOverInt(0, 4, fn); - - ASSERT_EQ(data[0], 0); - ASSERT_EQ(data[1], 1); - ASSERT_EQ(data[2], 4); - ASSERT_EQ(data[3], 9); -} Index: source/Utility/TaskPool.cpp =================================================================== --- source/Utility/TaskPool.cpp +++ /dev/null @@ -1,98 +0,0 @@ -//===--------------------- TaskPool.cpp -------------------------*- C++ -*-===// -// -// The LLVM Compiler Infrastructure -// -// This file is distributed under the University of Illinois Open Source -// License. See LICENSE.TXT for details. -// -//===----------------------------------------------------------------------===// - -#include "lldb/Utility/TaskPool.h" - -#include <cstdint> // for uint32_t -#include <queue> // for queue -#include <thread> // for thread - -namespace { -class TaskPoolImpl { -public: - static TaskPoolImpl &GetInstance(); - - void AddTask(std::function<void()> &&task_fn); - -private: - TaskPoolImpl(); - - static void Worker(TaskPoolImpl *pool); - - std::queue<std::function<void()>> m_tasks; - std::mutex m_tasks_mutex; - uint32_t m_thread_count; -}; - -} // end of anonymous namespace - -TaskPoolImpl &TaskPoolImpl::GetInstance() { - static TaskPoolImpl g_task_pool_impl; - return g_task_pool_impl; -} - -void TaskPool::AddTaskImpl(std::function<void()> &&task_fn) { - TaskPoolImpl::GetInstance().AddTask(std::move(task_fn)); -} - -TaskPoolImpl::TaskPoolImpl() : m_thread_count(0) {} - -void TaskPoolImpl::AddTask(std::function<void()> &&task_fn) { - static const uint32_t max_threads = std::thread::hardware_concurrency(); - - std::unique_lock<std::mutex> lock(m_tasks_mutex); - m_tasks.emplace(std::move(task_fn)); - if (m_thread_count < max_threads) { - m_thread_count++; - // Note that this detach call needs to happen with the m_tasks_mutex held. - // This prevents the thread - // from exiting prematurely and triggering a linux libc bug - // (https://sourceware.org/bugzilla/show_bug.cgi?id=19951). - std::thread(Worker, this).detach(); - } -} - -void TaskPoolImpl::Worker(TaskPoolImpl *pool) { - while (true) { - std::unique_lock<std::mutex> lock(pool->m_tasks_mutex); - if (pool->m_tasks.empty()) { - pool->m_thread_count--; - break; - } - - std::function<void()> f = pool->m_tasks.front(); - pool->m_tasks.pop(); - lock.unlock(); - - f(); - } -} - -void TaskMapOverInt(size_t begin, size_t end, - std::function<void(size_t)> const &func) { - std::atomic<size_t> idx{begin}; - size_t num_workers = - std::min<size_t>(end, std::thread::hardware_concurrency()); - - auto wrapper = [&idx, end, &func]() { - while (true) { - size_t i = idx.fetch_add(1); - if (i >= end) - break; - func(i); - } - }; - - std::vector<std::future<void>> futures; - futures.reserve(num_workers); - for (size_t i = 0; i < num_workers; i++) - futures.push_back(TaskPool::AddTask(wrapper)); - for (size_t i = 0; i < num_workers; i++) - futures[i].wait(); -} Index: source/Utility/CMakeLists.txt =================================================================== --- source/Utility/CMakeLists.txt +++ source/Utility/CMakeLists.txt @@ -26,7 +26,6 @@ StringExtractorGDBRemote.cpp StringLexer.cpp StringList.cpp - TaskPool.cpp TildeExpressionResolver.cpp UserID.cpp UriParser.cpp Index: source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp =================================================================== --- source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp +++ source/Plugins/SymbolFile/DWARF/SymbolFileDWARF.cpp @@ -11,6 +11,7 @@ // Other libraries and framework includes #include "llvm/Support/Casting.h" +#include "llvm/Support/Parallel.h" #include "llvm/Support/Threading.h" #include "lldb/Core/ArchSpec.h" @@ -1991,13 +1992,15 @@ // a DIE in one compile unit refers to another and the indexes accesses // those DIEs. //---------------------------------------------------------------------- - TaskMapOverInt(0, num_compile_units, extract_fn); + llvm::parallel::for_each_n(llvm::parallel::par, 0U, num_compile_units, + extract_fn); // Now create a task runner that can index each DWARF compile unit in a // separate // thread so we can index quickly. - TaskMapOverInt(0, num_compile_units, parser_fn); + llvm::parallel::for_each_n(llvm::parallel::par, 0U, num_compile_units, + parser_fn); auto finalize_fn = [](NameToDIE &index, std::vector<NameToDIE> &srcs) { for (auto &src : srcs) Index: include/lldb/Utility/TaskPool.h =================================================================== --- include/lldb/Utility/TaskPool.h +++ include/lldb/Utility/TaskPool.h @@ -10,82 +10,16 @@ #ifndef utility_TaskPool_h_ #define utility_TaskPool_h_ -#include <functional> // for bind, function -#include <future> -#include <list> -#include <memory> // for make_shared -#include <mutex> // for mutex, unique_lock, condition_variable -#include <type_traits> // for forward, result_of, move +#include <llvm/Support/Parallel.h> -// Global TaskPool class for running tasks in parallel on a set of worker thread -// created the first -// time the task pool is used. The TaskPool provide no guarantee about the order -// the task will be run -// and about what tasks will run in parallel. None of the task added to the task -// pool should block -// on something (mutex, future, condition variable) what will be set only by the -// completion of an -// other task on the task pool as they may run on the same thread sequentally. -class TaskPool { -public: - // Add a new task to the task pool and return a std::future belonging to the - // newly created task. - // The caller of this function has to wait on the future for this task to - // complete. - template <typename F, typename... Args> - static std::future<typename std::result_of<F(Args...)>::type> - AddTask(F &&f, Args &&... args); +namespace TaskPool { - // Run all of the specified tasks on the task pool and wait until all of them - // are finished - // before returning. This method is intended to be used for small number tasks - // where listing - // them as function arguments is acceptable. For running large number of tasks - // you should use - // AddTask for each task and then call wait() on each returned future. - template <typename... T> static void RunTasks(T &&... tasks); - -private: - TaskPool() = delete; - - template <typename... T> struct RunTaskImpl; - - static void AddTaskImpl(std::function<void()> &&task_fn); -}; - -template <typename F, typename... Args> -std::future<typename std::result_of<F(Args...)>::type> -TaskPool::AddTask(F &&f, Args &&... args) { - auto task_sp = std::make_shared< - std::packaged_task<typename std::result_of<F(Args...)>::type()>>( - std::bind(std::forward<F>(f), std::forward<Args>(args)...)); - - AddTaskImpl([task_sp]() { (*task_sp)(); }); - - return task_sp->get_future(); -} - -template <typename... T> void TaskPool::RunTasks(T &&... tasks) { - RunTaskImpl<T...>::Run(std::forward<T>(tasks)...); +template <typename... T> void RunTasks(T &&... tasks) { + std::function<void()> cbs[sizeof...(T)]{tasks...}; + llvm::parallel::for_each_n(llvm::parallel::par, static_cast<size_t>(0), + sizeof...(T), [&cbs](size_t idx) { cbs[idx](); }); } -template <typename Head, typename... Tail> -struct TaskPool::RunTaskImpl<Head, Tail...> { - static void Run(Head &&h, Tail &&... t) { - auto f = AddTask(std::forward<Head>(h)); - RunTaskImpl<Tail...>::Run(std::forward<Tail>(t)...); - f.wait(); - } -}; - -template <> struct TaskPool::RunTaskImpl<> { - static void Run() {} -}; - -// Run 'func' on every value from begin .. end-1. Each worker will grab -// 'batch_size' numbers at a time to work on, so for very fast functions, batch -// should be large enough to avoid too much cache line contention. -void TaskMapOverInt(size_t begin, size_t end, - std::function<void(size_t)> const &func); +} // namespace TaskPool #endif // #ifndef utility_TaskPool_h_
_______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits