This is an automated email from the ASF dual-hosted git repository. lichaoyong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 1cf0fb9 Use ThreadPool to refactor MemTableFlushExecutor (#2931) 1cf0fb9 is described below commit 1cf0fb9117a38f94c88d4ca4be9daed6f690af1a Author: lichaoyong <lichaoy...@baidu.com> AuthorDate: Tue Feb 18 18:39:04 2020 +0800 Use ThreadPool to refactor MemTableFlushExecutor (#2931) 1. MemTableFlushExecutor maintain a ThreadPool to receive FlushTask. 2. FlushToken is used to seperate different tasks from different tablets. Every DeltaWriter of tablet constructs a FlushToken, task in FlushToken are handle serially, task between FlushToken are handle concurrently. 3. I have remove thread limit on data_dir, because of I/O is not the main timer consumer of Flush thread. Much of time is consumed in CPU decoding and compress. --- be/src/exec/olap_scan_node.cpp | 1 - be/src/olap/delta_writer.cpp | 21 ++- be/src/olap/delta_writer.h | 4 +- be/src/olap/memtable_flush_executor.cpp | 162 +++++---------------- be/src/olap/memtable_flush_executor.h | 131 ++++------------- be/src/runtime/dpp_sink.cpp | 2 +- be/src/runtime/exec_env.h | 5 +- be/src/runtime/exec_env_init.cpp | 3 +- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/fragment_mgr.h | 4 +- be/src/runtime/routine_load/data_consumer_group.h | 4 +- .../routine_load/routine_load_task_executor.h | 4 +- be/src/runtime/runtime_state.h | 1 - be/src/runtime/tablets_channel.h | 2 +- be/src/service/internal_service.h | 4 +- be/src/util/priority_thread_pool.hpp | 8 +- be/src/util/thread_pool.hpp | 154 -------------------- be/test/olap/skiplist_test.cpp | 4 +- be/test/runtime/load_channel_mgr_test.cpp | 1 + 19 files changed, 102 insertions(+), 417 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index cc3ebd9..db769c6 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -36,7 +36,6 @@ #include "runtime/string_value.h" #include "runtime/tuple_row.h" #include "util/runtime_profile.h" -#include "util/thread_pool.hpp" #include "util/debug_util.h" #include "util/priority_thread_pool.hpp" #include "agent/cgroups_mgr.h" diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1aea7d6..0a89bc4 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -50,10 +50,9 @@ DeltaWriter::~DeltaWriter() { return; } - if (_flush_handler != nullptr) { + if (_flush_token != nullptr) { // cancel and wait all memtables in flush queue to be finished - _flush_handler->cancel(); - _flush_handler->wait(); + _flush_token->cancel(); } if (_tablet != nullptr) { @@ -150,8 +149,7 @@ OLAPStatus DeltaWriter::init() { _reset_mem_table(); // create flush handler - RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_handler( - _tablet->data_dir()->path_hash(), &_flush_handler)); + RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token)); _is_init = true; return OLAP_SUCCESS; @@ -175,7 +173,7 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) { } OLAPStatus DeltaWriter::_flush_memtable_async() { - return _flush_handler->submit(_mem_table); + return _flush_token->submit(_mem_table); } OLAPStatus DeltaWriter::flush_memtable_and_wait() { @@ -190,7 +188,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() { // this means there should be at least one memtable in flush queue. } // wait all memtables in flush queue to be flushed. - RETURN_NOT_OK(_flush_handler->wait()); + RETURN_NOT_OK(_flush_token->wait()); return OLAP_SUCCESS; } @@ -218,7 +216,7 @@ OLAPStatus DeltaWriter::close() { OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) { DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called"; // return error if previous flush failed - RETURN_NOT_OK(_flush_handler->wait()); + RETURN_NOT_OK(_flush_token->wait()); DCHECK_EQ(_mem_tracker->consumption(), 0); // use rowset meta manager to save meta @@ -268,7 +266,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf _delta_written_success = true; - const FlushStatistic& stat = _flush_handler->get_stats(); + const FlushStatistic& stat = _flush_token->get_stats(); LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() << ", stats: " << stat; return OLAP_SUCCESS; } @@ -278,10 +276,9 @@ OLAPStatus DeltaWriter::cancel() { return OLAP_SUCCESS; } _mem_table.reset(); - if (_flush_handler != nullptr) { + if (_flush_token != nullptr) { // cancel and wait all memtables in flush queue to be finished - _flush_handler->cancel(); - _flush_handler->wait(); + _flush_token->cancel(); } DCHECK_EQ(_mem_tracker->consumption(), 0); return OLAP_SUCCESS; diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index dbf1326..64828d5 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -24,7 +24,7 @@ namespace doris { -class FlushHandler; +class FlushToken; class MemTable; class MemTracker; class Schema; @@ -106,7 +106,7 @@ private: bool _delta_written_success; StorageEngine* _storage_engine; - std::shared_ptr<FlushHandler> _flush_handler; + std::unique_ptr<FlushToken> _flush_token; std::unique_ptr<MemTracker> _mem_tracker; }; diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 51d5f23..00bf34e 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -24,146 +24,62 @@ #include "olap/memtable.h" #include "runtime/exec_env.h" #include "runtime/mem_tracker.h" +#include "util/scoped_cleanup.h" namespace doris { -OLAPStatus FlushHandler::submit(std::shared_ptr<MemTable> memtable) { - RETURN_NOT_OK(_last_flush_status.load()); - MemTableFlushContext ctx; - ctx.memtable = std::move(memtable); - ctx.flush_handler = this->shared_from_this(); - _counter_cond.inc(); - VLOG(5) << "submitting " << *(ctx.memtable) << " to flush queue " << _flush_queue_idx; - RETURN_NOT_OK(_flush_executor->_push_memtable(_flush_queue_idx, ctx)); - return OLAP_SUCCESS; -} - -OLAPStatus FlushHandler::wait() { - // wait all submitted tasks to be finished or cancelled - _counter_cond.block_wait(); - return _last_flush_status.load(); -} - -void FlushHandler::on_flush_finished(const FlushResult& res) { - if (res.flush_status != OLAP_SUCCESS) { - _last_flush_status.store(res.flush_status); - } else { - _stats.flush_time_ns.fetch_add(res.flush_time_ns); - _stats.flush_count.fetch_add(1); - } - _counter_cond.dec(); +std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { + os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000 + << ", flush count=" << stat.flush_count << ")"; + return os; } -OLAPStatus MemTableFlushExecutor::create_flush_handler( - size_t path_hash, std::shared_ptr<FlushHandler>* flush_handler) { - size_t flush_queue_idx = _get_queue_idx(path_hash); - flush_handler->reset(new FlushHandler(flush_queue_idx, this)); +OLAPStatus FlushToken::submit(std::shared_ptr<MemTable> memtable) { + _flush_token->submit_func(boost::bind(boost::mem_fn(&FlushToken::_flush_memtable), this, memtable)); return OLAP_SUCCESS; } -void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) { - int32_t data_dir_num = data_dirs.size(); - _thread_num_per_store = std::max(1, config::flush_thread_num_per_store); - _num_threads = data_dir_num * _thread_num_per_store; - - // create flush queues - for (int i = 0; i < _num_threads; ++i) { - BlockingQueue<MemTableFlushContext>* queue = new BlockingQueue<MemTableFlushContext>(10); - _flush_queues.push_back(queue); - } - // create thread pool - _flush_pool = new ThreadPool(_num_threads, 1); - for (int32_t i = 0; i < _num_threads; ++i) { - _flush_pool->offer(std::bind<void>(&MemTableFlushExecutor::_flush_memtable, this, i)); - } - - // _path_map saves the path hash to current idx of flush queue. - // eg. - // there are 4 data stores, each store has 2 work thread. - // so there are 8(= 4 * 2) queues in _flush_queues. - // and the path hash of the 4 paths are mapped to idx 0, 2, 4, 6. - int32_t group = 0; - for (auto store : data_dirs) { - _path_map[store->path_hash()] = group; - group += _thread_num_per_store; - } +void FlushToken::cancel() { + _flush_token->shutdown(); } -MemTableFlushExecutor::~MemTableFlushExecutor() { - // shutdown queues - for (auto queue : _flush_queues) { - queue->shutdown(); - } - - // shutdown thread pool - _flush_pool->shutdown(); - _flush_pool->join(); - - // delete queue - for (auto queue : _flush_queues) { - delete queue; - } - _flush_queues.clear(); - - delete _flush_pool; +OLAPStatus FlushToken::wait() { + _flush_token->wait(); + return _flush_status; } -size_t MemTableFlushExecutor::_get_queue_idx(size_t path_hash) { - std::lock_guard<SpinLock> l(_lock); - size_t cur_idx = _path_map[path_hash]; - size_t group = cur_idx / _thread_num_per_store; - size_t next_idx = group * _thread_num_per_store + ((cur_idx + 1) % _thread_num_per_store); - DCHECK(next_idx < _num_threads); - _path_map[path_hash] = next_idx; - return cur_idx; -} - -OLAPStatus MemTableFlushExecutor::_push_memtable(int32_t queue_idx, MemTableFlushContext& ctx) { - if (!_flush_queues[queue_idx]->blocking_put(ctx)) { - return OLAP_ERR_OTHER_ERROR; +void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) { + MonotonicStopWatch timer; + timer.start(); + _flush_status = memtable->flush(); + SCOPED_CLEANUP({ + memtable.reset(); + }); + if (_flush_status != OLAP_SUCCESS) { + return; } - return OLAP_SUCCESS; + _stats.flush_time_ns += timer.elapsed_time(); + _stats.flush_count++; + _stats.flush_size_bytes += memtable->memory_usage(); + LOG(INFO) << "flushed " << *(memtable) << " in " << _stats.flush_time_ns / 1000 / 1000 + << " ms, status=" << _flush_status; } -void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) { - while (true) { - MemTableFlushContext ctx; - if (!_flush_queues[queue_idx]->blocking_get(&ctx)) { - // queue is empty and shutdown, end of thread - return; - } - - // if last flush of this tablet already failed, just skip - if (ctx.flush_handler->is_cancelled()) { - VLOG(5) << "skip flushing " << *(ctx.memtable) << " due to cancellation"; - // must release memtable before notifying - ctx.memtable.reset(); - ctx.flush_handler->on_flush_cancelled(); - continue; - } - - // flush the memtable - VLOG(5) << "begin to flush " << *(ctx.memtable); - FlushResult res; - MonotonicStopWatch timer; - timer.start(); - res.flush_status = ctx.memtable->flush(); - res.flush_time_ns = timer.elapsed_time(); - res.flush_size_bytes = ctx.memtable->memory_usage(); - VLOG(5) << "flushed " << *(ctx.memtable) << " in " << res.flush_time_ns / 1000 / 1000 - << " ms, status=" << res.flush_status; - // must release memtable before notifying - ctx.memtable.reset(); - // callback - ctx.flush_handler->on_flush_finished(res); - } +void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) { + int32_t data_dir_num = data_dirs.size(); + size_t min_threads = std::max(1, config::flush_thread_num_per_store); + size_t max_threads = data_dir_num * min_threads; + ThreadPoolBuilder("MemTableFlushThreadPool") + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .build(&_flush_pool); } -std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { - os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000 - << ", flush count=" << stat.flush_count << ")"; - return os; +// create a flush token +OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) { + flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); + return OLAP_SUCCESS; } -} // end of namespac +} // namespace doris diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index ef84e29..bb58e3f 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -17,19 +17,13 @@ #pragma once -#include <atomic> #include <cstdint> #include <memory> -#include <queue> #include <vector> -#include <unordered_map> #include <utility> -#include "util/blocking_queue.hpp" -#include "util/counter_cond_variable.hpp" -#include "util/spinlock.h" -#include "util/thread_pool.hpp" #include "olap/olap_define.h" +#include "util/threadpool.h" namespace doris { @@ -38,132 +32,65 @@ class DeltaWriter; class ExecEnv; class MemTable; -// The context for a memtable to be flushed. -class FlushHandler; -struct MemTableFlushContext { - // memtable to be flushed - std::shared_ptr<MemTable> memtable; - // flush handler from a delta writer. - // use shared ptr because flush_handler may be deleted before this - // memtable being flushed. so we need to make sure the flush_handler - // is alive until this memtable being flushed. - std::shared_ptr<FlushHandler> flush_handler; -}; - -// the flush result of a single memtable flush -struct FlushResult { - OLAPStatus flush_status; - int64_t flush_time_ns = 0; - int64_t flush_size_bytes = 0; -}; - // the statistic of a certain flush handler. // use atomic because it may be updated by multi threads struct FlushStatistic { - std::atomic<std::int64_t> flush_time_ns = {0}; - std::atomic<std::int64_t> flush_count= {0}; + int64_t flush_time_ns = 0; + int64_t flush_count= 0; + int64_t flush_size_bytes = 0; }; std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); -class MemTableFlushExecutor; - -// flush handler is for flushing memtables in a delta writer -// This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception -// when calling submit(); -class FlushHandler : public std::enable_shared_from_this<FlushHandler> { +// A thin wrapper of ThreadPoolToken to submit task. +class FlushToken { public: - FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor) : - _flush_queue_idx(flush_queue_idx), - _last_flush_status(OLAP_SUCCESS), - _counter_cond(0), - _flush_executor(flush_executor), - _is_cancelled(false) { - } - - // submit a memtable to flush. return error if some previous submitted MemTable has failed - OLAPStatus submit(std::shared_ptr<MemTable> memtable); - // wait for all memtables submitted by itself to be finished. + explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) + : _flush_status(OLAP_SUCCESS), + _flush_token(std::move(flush_pool_token)) {} + + OLAPStatus submit(std::shared_ptr<MemTable> mem_table); + + // error has happpens, so we cancel this token + // And remove all tasks in the queue. + void cancel(); + + // wait all tasks in token to be completed. OLAPStatus wait(); + // get flush operations' statistics const FlushStatistic& get_stats() const { return _stats; } - bool is_cancelled() { - return _last_flush_status.load() != OLAP_SUCCESS || _is_cancelled.load(); - } - void cancel() { _is_cancelled.store(true); } - - // These on_xxx() methods are callback when flush finishes or cancels, user should - // not call them directly. - // called when a memtable is finished by executor. - void on_flush_finished(const FlushResult& res); - // called when a flush memtable execution is cancelled - void on_flush_cancelled() { - _counter_cond.dec(); - } - private: - // flush queue idx in memtable flush executor - int32_t _flush_queue_idx; - // the flush status of last memtable - std::atomic<OLAPStatus> _last_flush_status; - // used to wait/notify the memtable flush execution - CounterCondVariable _counter_cond; + void _flush_memtable(std::shared_ptr<MemTable> mem_table); + OLAPStatus _flush_status; + std::unique_ptr<ThreadPoolToken> _flush_token; FlushStatistic _stats; - MemTableFlushExecutor* _flush_executor; - - // the caller of the flush handler can set this variable to notify that the - // uppper application is already cancelled. - std::atomic<bool> _is_cancelled; }; // MemTableFlushExecutor is responsible for flushing memtables to disk. -// Each data directory has a specified number of worker threads and each thread will correspond -// to a queue. The only job of each worker thread is to take memtable from its corresponding -// flush queue and writes the data to disk. -// -// NOTE: User SHOULD NOT call method of this class directly, use pattern should be: +// It encapsulate a ThreadPool to handle all tasks. +// Usage Example: // ... // std::shared_ptr<FlushHandler> flush_handler; -// memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler); +// memTableFlushExecutor.create_flush_token(path_hash, &flush_handler); // ... -// flush_handler->submit(memtable) +// flush_token->submit(memtable) // ... class MemTableFlushExecutor { public: MemTableFlushExecutor() {} - ~MemTableFlushExecutor(); + ~MemTableFlushExecutor() {} // init should be called after storage engine is opened, // because it needs path hash of each data dir. void init(const std::vector<DataDir*>& data_dirs); - // create a flush handler to access the flush executor - OLAPStatus create_flush_handler(size_t path_hash, std::shared_ptr<FlushHandler>* flush_handler); + OLAPStatus create_flush_token(std::unique_ptr<FlushToken>* flush_token); + private: - friend class FlushHandler; - - // given the path hash, return the next idx of flush queue. - // eg. - // path A is mapped to idx 0 and 1, so each time get_queue_idx(A) is called, - // 0 and 1 will returned alternately. - size_t _get_queue_idx(size_t path_hash); - - // push the memtable to specified flush queue - OLAPStatus _push_memtable(int32_t queue_idx, MemTableFlushContext& ctx); - - void _flush_memtable(int32_t queue_idx); - - int32_t _thread_num_per_store; - int32_t _num_threads; - ThreadPool* _flush_pool; - // the size of this vector should equal to _num_threads - std::vector<BlockingQueue<MemTableFlushContext>*> _flush_queues; - // lock to protect _path_map - SpinLock _lock; - // path hash -> queue idx of _flush_queues; - std::unordered_map<size_t, size_t> _path_map; + std::unique_ptr<ThreadPool> _flush_pool; }; } // end namespace diff --git a/be/src/runtime/dpp_sink.cpp b/be/src/runtime/dpp_sink.cpp index 43d3a0b..bea1c96 100644 --- a/be/src/runtime/dpp_sink.cpp +++ b/be/src/runtime/dpp_sink.cpp @@ -35,7 +35,7 @@ #include "gen_cpp/Types_types.h" #include "util/countdown_latch.h" #include "util/debug_util.h" -#include "util/thread_pool.hpp" +#include "util/priority_thread_pool.hpp" #include "olap/field.h" namespace doris { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 125f039..076b904 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -47,7 +47,6 @@ class ResultQueueMgr; class TMasterInfo; class LoadChannelMgr; class TestExecEnv; -class ThreadPool; class ThreadResourceMgr; class TmpFileMgr; class WebPageHandler; @@ -106,7 +105,7 @@ public: PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } ThreadResourceMgr* thread_mgr() { return _thread_mgr; } PriorityThreadPool* thread_pool() { return _thread_pool; } - ThreadPool* etl_thread_pool() { return _etl_thread_pool; } + PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; } CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } TMasterInfo* master_info() { return _master_info; } @@ -157,7 +156,7 @@ private: PoolMemTrackerRegistry* _pool_mem_trackers = nullptr; ThreadResourceMgr* _thread_mgr = nullptr; PriorityThreadPool* _thread_pool = nullptr; - ThreadPool* _etl_thread_pool = nullptr; + PriorityThreadPool* _etl_thread_pool = nullptr; CgroupsMgr* _cgroups_mgr = nullptr; FragmentMgr* _fragment_mgr = nullptr; TMasterInfo* _master_info = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 8b4a28c..47cc38b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -55,7 +55,6 @@ #include "util/brpc_stub_cache.h" #include "util/priority_thread_pool.hpp" #include "agent/cgroups_mgr.h" -#include "util/thread_pool.hpp" #include "gen_cpp/BackendService.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/TPaloBrokerService.h" @@ -86,7 +85,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _thread_pool = new PriorityThreadPool( config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_queue_size); - _etl_thread_pool = new ThreadPool( + _etl_thread_pool = new PriorityThreadPool( config::etl_thread_pool_size, config::etl_thread_pool_queue_size); _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index a60dc47..fec4c5d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -411,7 +411,7 @@ Status FragmentMgr::exec_plan_fragment( } static void* fragment_executor(void* param) { - ThreadPool::WorkFunction* func = (ThreadPool::WorkFunction*)param; + PriorityThreadPool::WorkFunction* func = (PriorityThreadPool::WorkFunction*)param; (*func)(); delete func; return nullptr; @@ -469,7 +469,7 @@ Status FragmentMgr::exec_plan_fragment( int ret = pthread_create(&id, nullptr, fragment_executor, - new ThreadPool::WorkFunction( + new PriorityThreadPool::WorkFunction( std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb))); if (ret != 0) { std::string err_msg("Could not create thread."); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 5eb04db..74b25c4 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -29,7 +29,7 @@ #include "gen_cpp/DorisExternalService_types.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" -#include "util/thread_pool.hpp" +#include "util/priority_thread_pool.hpp" #include "util/hash_util.hpp" #include "http/rest_monitor_iface.h" @@ -90,7 +90,7 @@ private: bool _stop; std::thread _cancel_thread; // every job is a pool - ThreadPool _thread_pool; + PriorityThreadPool _thread_pool; }; diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index 52b756b..9083835 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -19,7 +19,7 @@ #include "runtime/routine_load/data_consumer.h" #include "util/blocking_queue.hpp" -#include "util/thread_pool.hpp" +#include "util/priority_thread_pool.hpp" namespace doris { @@ -59,7 +59,7 @@ protected: UniqueId _grp_id; std::vector<std::shared_ptr<DataConsumer>> _consumers; // thread pool to run each consumer in multi thread - ThreadPool _thread_pool; + PriorityThreadPool _thread_pool; // mutex to protect counter. // the counter is init as the number of consumers. // once a consumer is done, decrease the counter. diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 2bc88f4..3700e80 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -22,7 +22,7 @@ #include <mutex> #include "runtime/routine_load/data_consumer_pool.h" -#include "util/thread_pool.hpp" +#include "util/priority_thread_pool.hpp" #include "util/uid_util.h" #include "gen_cpp/internal_service.pb.h" @@ -74,7 +74,7 @@ private: private: ExecEnv* _exec_env; - ThreadPool _thread_pool; + PriorityThreadPool _thread_pool; DataConsumerPool _data_consumer_pool; std::mutex _lock; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 9589035..688e7fb 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -51,7 +51,6 @@ class DateTimeValue; class MemTracker; class DataStreamRecvr; class ResultBufferMgr; -class ThreadPool; class DiskIoMgrs; class TmpFileMgr; class BufferedBlockMgr; diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 25c6cb1..cb865cf 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -23,7 +23,7 @@ #include "runtime/descriptors.h" #include "runtime/mem_tracker.h" #include "util/bitmap.h" -#include "util/thread_pool.hpp" +#include "util/priority_thread_pool.hpp" #include "util/uid_util.h" #include "gen_cpp/Types_types.h" diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 957bb79..e27c2c3 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -20,7 +20,7 @@ #include "common/status.h" #include "gen_cpp/internal_service.pb.h" #include "gen_cpp/palo_internal_service.pb.h" -#include "util/thread_pool.hpp" +#include "util/priority_thread_pool.hpp" namespace brpc { class Controller; @@ -90,7 +90,7 @@ private: Status _exec_plan_fragment(brpc::Controller* cntl); private: ExecEnv* _exec_env; - ThreadPool _tablet_worker_pool; + PriorityThreadPool _tablet_worker_pool; }; } diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index faa00ba..68f50d7 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -56,7 +56,6 @@ public: // capacity available. // -- work_function: the function to run every time an item is consumed from the queue PriorityThreadPool(uint32_t num_threads, uint32_t queue_size) : - _thread_num(num_threads), _work_queue(queue_size), _shutdown(false) { for (int i = 0; i < num_threads; ++i) { @@ -88,6 +87,11 @@ public: return _work_queue.blocking_put(task); } + bool offer(WorkFunction func) { + PriorityThreadPool::Task task = {0, func}; + return _work_queue.blocking_put(task); + } + // Shuts the thread pool down, causing the work queue to cease accepting offered work // and the worker threads to terminate once they have processed their current work item. // Returns once the shutdown flag has been set, does not wait for the threads to @@ -145,8 +149,6 @@ private: return _shutdown; } - uint32_t _thread_num; - // Queue on which work items are held until a thread is available to process them in // FIFO order. BlockingPriorityQueue<Task> _work_queue; diff --git a/be/src/util/thread_pool.hpp b/be/src/util/thread_pool.hpp deleted file mode 100644 index 9aa1feb..0000000 --- a/be/src/util/thread_pool.hpp +++ /dev/null @@ -1,154 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_COMMON_UTIL_THREAD_POOL_HPP -#define DORIS_BE_SRC_COMMON_UTIL_THREAD_POOL_HPP - -#include "util/blocking_queue.hpp" - -#include <boost/thread.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/bind/mem_fn.hpp> - -namespace doris { - -// Simple threadpool which processes items (of type T) in parallel which were placed on a -// blocking queue by Offer(). Each item is processed by a single user-supplied method. -class ThreadPool { -public: - // Signature of a work-processing function. Takes the integer id of the thread which is - // calling it (ids run from 0 to num_threads - 1) and a reference to the item to - // process. - typedef boost::function<void ()> WorkFunction; - - // Creates a new thread pool and start num_threads threads. - // -- num_threads: how many threads are part of this pool - // -- queue_size: the maximum size of the queue on which work items are offered. If the - // queue exceeds this size, subsequent calls to Offer will block until there is - // capacity available. - // -- work_function: the function to run every time an item is consumed from the queue - ThreadPool(uint32_t num_threads, uint32_t queue_size) : - _work_queue(queue_size), - _shutdown(false) { - for (int i = 0; i < num_threads; ++i) { - _threads.create_thread( - boost::bind<void>(boost::mem_fn(&ThreadPool::work_thread), this, i)); - } - } - - // Destructor ensures that all threads are terminated before this object is freed - // otherwise they may continue to run and reference member variables - ~ThreadPool() { - shutdown(); - join(); - } - - // Blocking operation that puts a work item on the queue. If the queue is full, blocks - // until there is capacity available. - // - // 'work' is copied into the work queue, but may be referenced at any time in the - // future. Therefore the caller needs to ensure that any data referenced by work (if T - // is, e.g., a pointer type) remains valid until work has been processed, and it's up to - // the caller to provide their own signalling mechanism to detect this (or to wait until - // after DrainAndShutdown returns). - // - // Returns true if the work item was successfully added to the queue, false otherwise - // (which typically means that the thread pool has already been shut down). - bool offer(WorkFunction func) { - return _work_queue.blocking_put(func); - } - - // Shuts the thread pool down, causing the work queue to cease accepting offered work - // and the worker threads to terminate once they have processed their current work item. - // Returns once the shutdown flag has been set, does not wait for the threads to - // terminate. - void shutdown() { - { - boost::lock_guard<boost::mutex> l(_lock); - _shutdown = true; - } - _work_queue.shutdown(); - } - - // Blocks until all threads are finished. Shutdown does not need to have been called, - // since it may be called on a separate thread. - void join() { - _threads.join_all(); - } - - uint32_t get_queue_size() const { - return _work_queue.get_size(); - } - - // Blocks until the work queue is empty, and then calls Shutdown to stop the worker - // threads and Join to wait until they are finished. - // Any work Offer()'ed during drain_and_shutdown may or may not be processed. - void drain_and_shutdown() { - { - boost::unique_lock<boost::mutex> l(_lock); - - while (_work_queue.get_size() != 0) { - _empty_cv.wait(l); - } - } - shutdown(); - join(); - } - -private: - // Driver method for each thread in the pool. Continues to read work from the queue - // until the pool is shutdown. - void work_thread(int thread_id) { - while (!is_shutdown()) { - WorkFunction work_function; - - if (_work_queue.blocking_get(&work_function)) { - work_function(); - } - - if (_work_queue.get_size() == 0) { - _empty_cv.notify_all(); - } - } - } - - // Returns value of _shutdown under a lock, forcing visibility to threads in the pool. - bool is_shutdown() { - boost::lock_guard<boost::mutex> l(_lock); - return _shutdown; - } - - // Queue on which work items are held until a thread is available to process them in - // FIFO order. - BlockingQueue<WorkFunction> _work_queue; - - // Collection of worker threads that process work from the queue. - boost::thread_group _threads; - - // Guards _shutdown and _empty_cv - boost::mutex _lock; - - // Set to true when threads should stop doing work and terminate. - bool _shutdown; - - // Signalled when the queue becomes empty - boost::condition_variable _empty_cv; -}; - -} - -#endif diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp index 0e82465..bfaf738 100644 --- a/be/test/olap/skiplist_test.cpp +++ b/be/test/olap/skiplist_test.cpp @@ -28,7 +28,7 @@ #include "util/random.h" #include "util/condition_variable.h" #include "util/mutex.h" -#include "util/thread_pool.hpp" +#include "util/priority_thread_pool.hpp" namespace doris { @@ -411,7 +411,7 @@ static void run_concurrent(int run) { Random rnd(seed); const int N = 1000; const int kSize = 1000; - ThreadPool thread_pool(10, 100); + PriorityThreadPool thread_pool(10, 100); for (int i = 0; i < N; i++) { if ((i % 100) == 0) { fprintf(stderr, "Run %d of %d\n", i, N); diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp index 8417a13..bda51c4 100644 --- a/be/test/runtime/load_channel_mgr_test.cpp +++ b/be/test/runtime/load_channel_mgr_test.cpp @@ -32,6 +32,7 @@ #include "runtime/descriptor_helper.h" #include "util/thrift_util.h" #include "olap/delta_writer.h" +#include "olap/memtable_flush_executor.h" #include "olap/schema.h" #include "olap/storage_engine.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org