This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 17d939b [Bug] Fix scanner threads heap-use-after-free (#5111) 17d939b is described below commit 17d939b789eaf7afc84defd2496b5990decf577f Author: HuangWei <huang...@apache.org> AuthorDate: Mon Jan 4 09:28:51 2021 +0800 [Bug] Fix scanner threads heap-use-after-free (#5111) Scanner threads may be running and using the member vars of OlapScanNode, when the OlapScanNode has already destroyed. We can use `_running_thread` to be the last accessed member variable. And `transfer_thread` need to wait for `_running_thread==0`. After `transfer_thread` joined, `OlapScanNode::close()` can continue. --- be/src/exec/olap_scan_node.cpp | 26 +++++++++++++++++++++----- be/src/exec/olap_scan_node.h | 4 ++-- be/src/exec/olap_scanner.cpp | 1 - 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index ab09df1..52635d4 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -59,7 +59,6 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _status(Status::OK()), _resource_info(nullptr), _buffered_bytes(0), - _running_thread(0), _eval_conjuncts_fn(nullptr) {} OlapScanNode::~OlapScanNode() {} @@ -1251,12 +1250,22 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { state->resource_pool()->release_thread_token(true); VLOG(1) << "TransferThread finish."; - std::unique_lock<std::mutex> l(_row_batches_lock); - _transfer_done = true; - _row_batch_added_cv.notify_all(); + { + std::unique_lock<std::mutex> l(_row_batches_lock); + _transfer_done = true; + _row_batch_added_cv.notify_all(); + } + + std::unique_lock<std::mutex> l(_scan_batches_lock); + _scan_thread_exit_cv.wait(l, [this] { return _running_thread == 0; }); + VLOG(1) << "Scanner threads have been exited. TransferThread exit."; } void OlapScanNode::scanner_thread(OlapScanner* scanner) { + // Do not use ScopedTimer. There is no guarantee that, the counter + // (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`. + ThreadCpuStopWatch cpu_watch; + cpu_watch.start(); Status status = Status::OK(); bool eos = false; RuntimeState* state = scanner->runtime_state(); @@ -1345,7 +1354,6 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { if (!eos) { _olap_scanners.push_front(scanner); } - _running_thread--; } if (eos) { // close out of batches lock. we do this before _progress update @@ -1359,7 +1367,15 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { _scanner_done = true; } } + + _scan_cpu_timer->update(cpu_watch.elapsed_time()); _scan_batch_added_cv.notify_one(); + + // The transfer thead will wait for `_running_thread==0`, to make sure all scanner threads won't access class members. + // Do not access class members after this code. + std::unique_lock<std::mutex> l(_scan_batches_lock); + _running_thread--; + _scan_thread_exit_cv.notify_one(); } Status OlapScanNode::add_one_batch(RowBatchInterface* row_batch) { diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 83ffbd7..b677c86 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -232,7 +232,8 @@ private: std::mutex _scan_batches_lock; std::condition_variable _scan_batch_added_cv; - int32_t _scanner_task_finish_count; + int64_t _running_thread = 0; + std::condition_variable _scan_thread_exit_cv; std::list<RowBatchInterface*> _scan_row_batches; @@ -260,7 +261,6 @@ private: TResourceInfo* _resource_info; int64_t _buffered_bytes; - int64_t _running_thread; EvalConjunctsFn _eval_conjuncts_fn; bool _need_agg_finalize = true; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 4e7b63d..96c82b5 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -253,7 +253,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; { SCOPED_TIMER(_parent->_scan_timer); - SCOPED_CPU_TIMER(_parent->_scan_cpu_timer); while (true) { // Batch is full, break if (batch->is_full()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org