This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new ca2944701e [dev-1.1.2](parquet-reader) fix dead log of parquet reader prefetch thread (#12292) ca2944701e is described below commit ca2944701edba5df39909aac46e6d559a650c919 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Fri Sep 2 13:01:52 2022 +0800 [dev-1.1.2](parquet-reader) fix dead log of parquet reader prefetch thread (#12292) fix dead log of parquet reader prefetch thread --- be/src/exec/parquet_reader.cpp | 11 +++++------ be/src/exec/parquet_reader.h | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index e65c1c5127..5e040611b8 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -103,8 +103,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*> RETURN_IF_ERROR(column_indices(tuple_slot_descs)); - std::thread thread(&ParquetReaderWrap::prefetch_batch, this, &thread_status); - thread.detach(); + _prefetch_thread = std::thread(&ParquetReaderWrap::prefetch_batch, this); // read batch RETURN_IF_ERROR(read_next_batch()); @@ -134,7 +133,9 @@ void ParquetReaderWrap::close() { // must wait the pre_fetch thread finish. // because it may still use ParquetReader to read data, which may cause // heap-after-use bug. - thread_status.get_future().get(); + if (_prefetch_thread.joinable()) { + _prefetch_thread.join(); + } arrow::Status st = _parquet->Close(); if (!st.ok()) { LOG(WARNING) << "close parquet file error: " << st.ToString(); @@ -541,7 +542,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& return read_record_batch(tuple_slot_descs, eof); } -void ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) { +void ParquetReaderWrap::prefetch_batch() { auto insert_batch = [this](const auto& batch) { std::unique_lock<std::mutex> lock(_mtx); while (!_closed && _queue.size() == _max_queue_size) { @@ -572,8 +573,6 @@ void ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) { std::for_each(batches.begin(), batches.end(), insert_batch); current_group++; } - // the status' value is meaningless, just for notifying that thread is done. - status->set_value(Status::OK()); } Status ParquetReaderWrap::read_next_batch() { diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index f56a56061c..96dcd55af9 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -104,7 +104,7 @@ private: int32_t* wbtyes); private: - void prefetch_batch(std::promise<Status>* status); + void prefetch_batch(); Status read_next_batch(); private: @@ -137,7 +137,7 @@ private: std::list<std::shared_ptr<arrow::RecordBatch>> _queue; const size_t _max_queue_size = config::parquet_reader_max_buffer_size; - std::promise<Status> thread_status; + std::thread _prefetch_thread; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org