This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 908f9cb7b9 [Improvement][ASAN] make BE can exit normally and ASAN memory leak checking work (#9620) 908f9cb7b9 is described below commit 908f9cb7b9993ee45ea420e120fd1db160ab02c3 Author: jacktengg <18241664+jackte...@users.noreply.github.com> AuthorDate: Wed May 18 07:40:57 2022 +0800 [Improvement][ASAN] make BE can exit normally and ASAN memory leak checking work (#9620) --- be/src/runtime/minidump.cpp | 2 +- be/src/util/priority_thread_pool.hpp | 19 +++++++++---------- be/src/util/priority_work_stealing_thread_pool.hpp | 19 +++++-------------- 3 files changed, 15 insertions(+), 25 deletions(-) diff --git a/be/src/runtime/minidump.cpp b/be/src/runtime/minidump.cpp index cf1f0611f6..1c66771403 100644 --- a/be/src/runtime/minidump.cpp +++ b/be/src/runtime/minidump.cpp @@ -104,7 +104,7 @@ bool Minidump::_minidump_cb(const google_breakpad::MinidumpDescriptor& descripto } void Minidump::stop() { - if (_stop) { + if (config::disable_minidump || _stop) { return; } _stop = true; diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp index 20d90b3f41..616cb2475f 100644 --- a/be/src/util/priority_thread_pool.hpp +++ b/be/src/util/priority_thread_pool.hpp @@ -52,7 +52,6 @@ public: // -- queue_size: the maximum size of the queue on which work items are offered. If the // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. - // -- work_function: the function to run every time an item is consumed from the queue PriorityThreadPool(uint32_t num_threads, uint32_t queue_size) : _work_queue(queue_size), _shutdown(false) { for (int i = 0; i < num_threads; ++i) { @@ -118,6 +117,15 @@ public: protected: virtual bool is_shutdown() { return _shutdown; } + // Collection of worker threads that process work from the queue. + ThreadGroup _threads; + + // Guards _empty_cv + std::mutex _lock; + + // Signalled when the queue becomes empty + std::condition_variable _empty_cv; + private: // Driver method for each thread in the pool. Continues to read work from the queue // until the pool is shutdown. @@ -137,17 +145,8 @@ private: // FIFO order. BlockingPriorityQueue<Task> _work_queue; - // Collection of worker threads that process work from the queue. - ThreadGroup _threads; - - // Guards _empty_cv - std::mutex _lock; - // Set to true when threads should stop doing work and terminate. std::atomic<bool> _shutdown; - - // Signalled when the queue becomes empty - std::condition_variable _empty_cv; }; } // namespace doris diff --git a/be/src/util/priority_work_stealing_thread_pool.hpp b/be/src/util/priority_work_stealing_thread_pool.hpp index c2f717a18f..937012722f 100644 --- a/be/src/util/priority_work_stealing_thread_pool.hpp +++ b/be/src/util/priority_work_stealing_thread_pool.hpp @@ -35,7 +35,6 @@ public: // -- queue_size: the maximum size of the queue on which work items are offered. If the // queue exceeds this size, subsequent calls to Offer will block until there is // capacity available. - // -- work_function: the function to run every time an item is consumed from the queue PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size) : PriorityThreadPool(0, 0) { DCHECK_GT(num_queues, 0); @@ -50,6 +49,11 @@ public: } } + virtual ~PriorityWorkStealingThreadPool() { + shutdown(); + join(); + } + // Blocking operation that puts a work item on the queue. If the queue is full, blocks // until there is capacity available. // @@ -79,10 +83,6 @@ public: } } - // Blocks until all threads are finished. shutdown does not need to have been called, - // since it may be called on a separate thread. - void join() override { _threads.join_all(); } - uint32_t get_queue_size() const override { uint32_t size = 0; for (auto work_queue : _work_queues) { @@ -141,15 +141,6 @@ private: // Queue on which work items are held until a thread is available to process them in // FIFO order. std::vector<std::shared_ptr<BlockingPriorityQueue<Task>>> _work_queues; - - // Collection of worker threads that process work from the queues. - ThreadGroup _threads; - - // Guards _empty_cv - std::mutex _lock; - - // Signalled when the queue becomes empty - std::condition_variable _empty_cv; }; } // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org