This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new ca2944701e [dev-1.1.2](parquet-reader) fix dead log of parquet reader 
prefetch thread (#12292)
ca2944701e is described below

commit ca2944701edba5df39909aac46e6d559a650c919
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Fri Sep 2 13:01:52 2022 +0800

    [dev-1.1.2](parquet-reader) fix dead log of parquet reader prefetch thread 
(#12292)
    
     fix dead log of parquet reader prefetch thread
---
 be/src/exec/parquet_reader.cpp | 11 +++++------
 be/src/exec/parquet_reader.h   |  4 ++--
 2 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index e65c1c5127..5e040611b8 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -103,8 +103,7 @@ Status ParquetReaderWrap::init_parquet_reader(const 
std::vector<SlotDescriptor*>
 
         RETURN_IF_ERROR(column_indices(tuple_slot_descs));
 
-        std::thread thread(&ParquetReaderWrap::prefetch_batch, this, 
&thread_status);
-        thread.detach();
+        _prefetch_thread = std::thread(&ParquetReaderWrap::prefetch_batch, 
this);
 
         // read batch
         RETURN_IF_ERROR(read_next_batch());
@@ -134,7 +133,9 @@ void ParquetReaderWrap::close() {
     // must wait the pre_fetch thread finish.
     // because it may still use ParquetReader to read data, which may cause
     // heap-after-use bug.
-    thread_status.get_future().get();
+    if (_prefetch_thread.joinable()) {
+        _prefetch_thread.join();
+    }
     arrow::Status st = _parquet->Close();
     if (!st.ok()) {
         LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -541,7 +542,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const 
std::vector<SlotDescriptor*>&
     return read_record_batch(tuple_slot_descs, eof);
 }
 
-void ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) {
+void ParquetReaderWrap::prefetch_batch() {
     auto insert_batch = [this](const auto& batch) {
         std::unique_lock<std::mutex> lock(_mtx);
         while (!_closed && _queue.size() == _max_queue_size) {
@@ -572,8 +573,6 @@ void 
ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) {
         std::for_each(batches.begin(), batches.end(), insert_batch);
         current_group++;
     }
-    // the status' value is meaningless, just for notifying that thread is 
done.
-    status->set_value(Status::OK());
 }
 
 Status ParquetReaderWrap::read_next_batch() {
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h
index f56a56061c..96dcd55af9 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/parquet_reader.h
@@ -104,7 +104,7 @@ private:
                             int32_t* wbtyes);
 
 private:
-    void prefetch_batch(std::promise<Status>* status);
+    void prefetch_batch();
     Status read_next_batch();
 
 private:
@@ -137,7 +137,7 @@ private:
     std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
     const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
 
-    std::promise<Status> thread_status;
+    std::thread _prefetch_thread;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to