This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git

The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 236f394c3d [dev-1.1.2][fix](parquet-reader) fix concurrency bug 
(#11605)
236f394c3d is described below

commit 236f394c3d83674eabed38f58de44055e0d6f2ea
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Aug 10 14:46:33 2022 +0800

    [dev-1.1.2][fix](parquet-reader) fix concurrency bug (#11605)
    
    * [fix](parquet-reader) fix concurrency bug
    
    Co-authored-by: morningman <morning...@apache.org>
---
 be/src/exec/parquet_reader.cpp  | 79 ++++++++++++++++++++++++++---------------
 be/src/exec/parquet_reader.h    | 15 ++++++--
 be/src/exec/parquet_scanner.cpp |  3 +-
 3 files changed, 63 insertions(+), 34 deletions(-)

diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index 61eb4d8e19..e65c1c5127 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -103,7 +103,7 @@ Status ParquetReaderWrap::init_parquet_reader(const 
std::vector<SlotDescriptor*>
 
         RETURN_IF_ERROR(column_indices(tuple_slot_descs));
 
-        std::thread thread(&ParquetReaderWrap::prefetch_batch, this);
+        std::thread thread(&ParquetReaderWrap::prefetch_batch, this, 
&thread_status);
         thread.detach();
 
         // read batch
@@ -131,6 +131,10 @@ Status ParquetReaderWrap::init_parquet_reader(const 
std::vector<SlotDescriptor*>
 void ParquetReaderWrap::close() {
     _closed = true;
     _queue_writer_cond.notify_one();
+    // must wait the pre_fetch thread finish.
+    // because it may still use ParquetReader to read data, which may cause
+    // heap-after-use bug.
+    thread_status.get_future().get();
     arrow::Status st = _parquet->Close();
     if (!st.ok()) {
         LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -537,7 +541,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const 
std::vector<SlotDescriptor*>&
     return read_record_batch(tuple_slot_descs, eof);
 }
 
-void ParquetReaderWrap::prefetch_batch() {
+void ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) {
     auto insert_batch = [this](const auto& batch) {
         std::unique_lock<std::mutex> lock(_mtx);
         while (!_closed && _queue.size() == _max_queue_size) {
@@ -552,22 +556,24 @@ void ParquetReaderWrap::prefetch_batch() {
     int current_group = 0;
     while (true) {
         if (_closed || current_group >= _total_groups) {
-            return;
+            break;
         }
         _status = _reader->GetRecordBatchReader({current_group}, 
_parquet_column_ids, &_rb_batch);
         if (!_status.ok()) {
             _closed = true;
-            return;
+            break;
         }
         arrow::RecordBatchVector batches;
         _status = _rb_batch->ReadAll(&batches);
         if (!_status.ok()) {
             _closed = true;
-            return;
+            break;
         }
         std::for_each(batches.begin(), batches.end(), insert_batch);
         current_group++;
     }
+    // the status' value is meaningless, just for notifying that thread is 
done.
+    status->set_value(Status::OK());
 }
 
 Status ParquetReaderWrap::read_next_batch() {
@@ -596,30 +602,36 @@ ParquetFile::~ParquetFile() {
 }
 
 arrow::Status ParquetFile::Close() {
+    std::lock_guard<std::mutex> lock(_lock);
+    if (_is_closed) {
+        return arrow::Status::OK();
+    }
+
     if (_file != nullptr) {
         _file->close();
         delete _file;
         _file = nullptr;
     }
+    _is_closed = true;
     return arrow::Status::OK();
 }
 
 bool ParquetFile::closed() const {
-    if (_file != nullptr) {
-        return _file->closed();
-    } else {
-        return true;
-    }
+    return _is_closed;
 }
 
-arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* buffer) {
-    return ReadAt(_pos, nbytes, buffer);
+arrow::Status ParquetFile::Seek(int64_t position) {
+    _pos = position;
+    // NOTE: Only readat operation is used, so _file seek is not called here.
+    return arrow::Status::OK();
 }
 
-arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes, 
void* out) {
+arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* out) {
+    if (_is_closed) {
+        return arrow::Status::IOError("Already closed");
+    }
     int64_t reads = 0;
     int64_t bytes_read = 0;
-    _pos = position;
     while (nbytes > 0) {
         Status result = _file->readat(_pos, nbytes, &reads, out);
         if (!result.ok()) {
@@ -637,25 +649,14 @@ arrow::Result<int64_t> ParquetFile::ReadAt(int64_t 
position, int64_t nbytes, voi
     return bytes_read;
 }
 
-arrow::Result<int64_t> ParquetFile::GetSize() {
-    return _file->size();
-}
-
-arrow::Status ParquetFile::Seek(int64_t position) {
-    _pos = position;
-    // NOTE: Only readat operation is used, so _file seek is not called here.
-    return arrow::Status::OK();
-}
-
-arrow::Result<int64_t> ParquetFile::Tell() const {
-    return _pos;
-}
-
 arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::Read(int64_t 
nbytes) {
+    if (_is_closed) {
+        return arrow::Status::IOError("Already closed");
+    }
     auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool());
     ARROW_RETURN_NOT_OK(buffer);
     std::shared_ptr<arrow::Buffer> read_buf = std::move(buffer.ValueOrDie());
-    auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data());
+    auto bytes_read = Read(nbytes, read_buf->mutable_data());
     ARROW_RETURN_NOT_OK(bytes_read);
     // If bytes_read is equal with read_buf's capacity, we just assign
     if (bytes_read.ValueOrDie() == nbytes) {
@@ -665,4 +666,24 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
ParquetFile::Read(int64_t nbytes)
     }
 }
 
+arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes, 
void* out) {
+    std::lock_guard<std::mutex> lock(_lock);
+    ARROW_RETURN_NOT_OK(Seek(position));
+    return Read(nbytes, out);
+}
+
+arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::ReadAt(int64_t 
position, int64_t nbytes) {
+    std::lock_guard<std::mutex> lock(_lock);
+    ARROW_RETURN_NOT_OK(Seek(position));
+    return Read(nbytes);
+}
+
+arrow::Result<int64_t> ParquetFile::GetSize() {
+    return _file->size();
+}
+
+arrow::Result<int64_t> ParquetFile::Tell() const {
+    return _pos;
+}
+
 } // namespace doris
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h
index 93f1a2b2dd..f56a56061c 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/parquet_reader.h
@@ -37,6 +37,7 @@
 #include <mutex>
 #include <string>
 #include <thread>
+#include <future>
 
 #include "common/status.h"
 #include "common/config.h"
@@ -59,18 +60,24 @@ class ParquetFile : public arrow::io::RandomAccessFile {
 public:
     ParquetFile(FileReader* file);
     ~ParquetFile() override;
+
+    arrow::Status Seek(int64_t position) override;
     arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
+    arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) 
override;
+
     arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) 
override;
+    arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, 
int64_t nbytes) override;
+
     arrow::Result<int64_t> GetSize() override;
-    arrow::Status Seek(int64_t position) override;
-    arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) 
override;
     arrow::Result<int64_t> Tell() const override;
     arrow::Status Close() override;
     bool closed() const override;
 
 private:
+    std::mutex _lock;
     FileReader* _file;
     int64_t _pos = 0;
+    bool _is_closed = false;
 };
 
 // Reader of broker parquet file
@@ -97,7 +104,7 @@ private:
                             int32_t* wbtyes);
 
 private:
-    void prefetch_batch();
+    void prefetch_batch(std::promise<Status>* status);
     Status read_next_batch();
 
 private:
@@ -129,6 +136,8 @@ private:
     std::condition_variable _queue_writer_cond;
     std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
     const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
+
+    std::promise<Status> thread_status;
 };
 
 } // namespace doris
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 3295dc4bc7..b8e077c7e7 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -135,8 +135,7 @@ Status ParquetScanner::open_next_reader() {
             break;
         }
         case TFileType::FILE_S3: {
-            file_reader.reset(new BufferedReader(
-                    _profile, new S3Reader(_params.properties, range.path, 
range.start_offset)));
+            file_reader.reset(new S3Reader(_params.properties, range.path, 
range.start_offset));
             break;
         }
         default: {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to