This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 236f394c3d [dev-1.1.2][fix](parquet-reader) fix concurrency bug (#11605) 236f394c3d is described below commit 236f394c3d83674eabed38f58de44055e0d6f2ea Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Aug 10 14:46:33 2022 +0800 [dev-1.1.2][fix](parquet-reader) fix concurrency bug (#11605) * [fix](parquet-reader) fix concurrency bug Co-authored-by: morningman <morning...@apache.org> --- be/src/exec/parquet_reader.cpp | 79 ++++++++++++++++++++++++++--------------- be/src/exec/parquet_reader.h | 15 ++++++-- be/src/exec/parquet_scanner.cpp | 3 +- 3 files changed, 63 insertions(+), 34 deletions(-) diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 61eb4d8e19..e65c1c5127 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -103,7 +103,7 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*> RETURN_IF_ERROR(column_indices(tuple_slot_descs)); - std::thread thread(&ParquetReaderWrap::prefetch_batch, this); + std::thread thread(&ParquetReaderWrap::prefetch_batch, this, &thread_status); thread.detach(); // read batch @@ -131,6 +131,10 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*> void ParquetReaderWrap::close() { _closed = true; _queue_writer_cond.notify_one(); + // must wait the pre_fetch thread finish. + // because it may still use ParquetReader to read data, which may cause + // heap-after-use bug. + thread_status.get_future().get(); arrow::Status st = _parquet->Close(); if (!st.ok()) { LOG(WARNING) << "close parquet file error: " << st.ToString(); @@ -537,7 +541,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& return read_record_batch(tuple_slot_descs, eof); } -void ParquetReaderWrap::prefetch_batch() { +void ParquetReaderWrap::prefetch_batch(std::promise<Status>* status) { auto insert_batch = [this](const auto& batch) { std::unique_lock<std::mutex> lock(_mtx); while (!_closed && _queue.size() == _max_queue_size) { @@ -552,22 +556,24 @@ void ParquetReaderWrap::prefetch_batch() { int current_group = 0; while (true) { if (_closed || current_group >= _total_groups) { - return; + break; } _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch); if (!_status.ok()) { _closed = true; - return; + break; } arrow::RecordBatchVector batches; _status = _rb_batch->ReadAll(&batches); if (!_status.ok()) { _closed = true; - return; + break; } std::for_each(batches.begin(), batches.end(), insert_batch); current_group++; } + // the status' value is meaningless, just for notifying that thread is done. + status->set_value(Status::OK()); } Status ParquetReaderWrap::read_next_batch() { @@ -596,30 +602,36 @@ ParquetFile::~ParquetFile() { } arrow::Status ParquetFile::Close() { + std::lock_guard<std::mutex> lock(_lock); + if (_is_closed) { + return arrow::Status::OK(); + } + if (_file != nullptr) { _file->close(); delete _file; _file = nullptr; } + _is_closed = true; return arrow::Status::OK(); } bool ParquetFile::closed() const { - if (_file != nullptr) { - return _file->closed(); - } else { - return true; - } + return _is_closed; } -arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* buffer) { - return ReadAt(_pos, nbytes, buffer); +arrow::Status ParquetFile::Seek(int64_t position) { + _pos = position; + // NOTE: Only readat operation is used, so _file seek is not called here. + return arrow::Status::OK(); } -arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes, void* out) { +arrow::Result<int64_t> ParquetFile::Read(int64_t nbytes, void* out) { + if (_is_closed) { + return arrow::Status::IOError("Already closed"); + } int64_t reads = 0; int64_t bytes_read = 0; - _pos = position; while (nbytes > 0) { Status result = _file->readat(_pos, nbytes, &reads, out); if (!result.ok()) { @@ -637,25 +649,14 @@ arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes, voi return bytes_read; } -arrow::Result<int64_t> ParquetFile::GetSize() { - return _file->size(); -} - -arrow::Status ParquetFile::Seek(int64_t position) { - _pos = position; - // NOTE: Only readat operation is used, so _file seek is not called here. - return arrow::Status::OK(); -} - -arrow::Result<int64_t> ParquetFile::Tell() const { - return _pos; -} - arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::Read(int64_t nbytes) { + if (_is_closed) { + return arrow::Status::IOError("Already closed"); + } auto buffer = arrow::AllocateBuffer(nbytes, arrow::default_memory_pool()); ARROW_RETURN_NOT_OK(buffer); std::shared_ptr<arrow::Buffer> read_buf = std::move(buffer.ValueOrDie()); - auto bytes_read = ReadAt(_pos, nbytes, read_buf->mutable_data()); + auto bytes_read = Read(nbytes, read_buf->mutable_data()); ARROW_RETURN_NOT_OK(bytes_read); // If bytes_read is equal with read_buf's capacity, we just assign if (bytes_read.ValueOrDie() == nbytes) { @@ -665,4 +666,24 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::Read(int64_t nbytes) } } +arrow::Result<int64_t> ParquetFile::ReadAt(int64_t position, int64_t nbytes, void* out) { + std::lock_guard<std::mutex> lock(_lock); + ARROW_RETURN_NOT_OK(Seek(position)); + return Read(nbytes, out); +} + +arrow::Result<std::shared_ptr<arrow::Buffer>> ParquetFile::ReadAt(int64_t position, int64_t nbytes) { + std::lock_guard<std::mutex> lock(_lock); + ARROW_RETURN_NOT_OK(Seek(position)); + return Read(nbytes); +} + +arrow::Result<int64_t> ParquetFile::GetSize() { + return _file->size(); +} + +arrow::Result<int64_t> ParquetFile::Tell() const { + return _pos; +} + } // namespace doris diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index 93f1a2b2dd..f56a56061c 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -37,6 +37,7 @@ #include <mutex> #include <string> #include <thread> +#include <future> #include "common/status.h" #include "common/config.h" @@ -59,18 +60,24 @@ class ParquetFile : public arrow::io::RandomAccessFile { public: ParquetFile(FileReader* file); ~ParquetFile() override; + + arrow::Status Seek(int64_t position) override; arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override; + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override; + arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override; + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override; + arrow::Result<int64_t> GetSize() override; - arrow::Status Seek(int64_t position) override; - arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override; arrow::Result<int64_t> Tell() const override; arrow::Status Close() override; bool closed() const override; private: + std::mutex _lock; FileReader* _file; int64_t _pos = 0; + bool _is_closed = false; }; // Reader of broker parquet file @@ -97,7 +104,7 @@ private: int32_t* wbtyes); private: - void prefetch_batch(); + void prefetch_batch(std::promise<Status>* status); Status read_next_batch(); private: @@ -129,6 +136,8 @@ private: std::condition_variable _queue_writer_cond; std::list<std::shared_ptr<arrow::RecordBatch>> _queue; const size_t _max_queue_size = config::parquet_reader_max_buffer_size; + + std::promise<Status> thread_status; }; } // namespace doris diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 3295dc4bc7..b8e077c7e7 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -135,8 +135,7 @@ Status ParquetScanner::open_next_reader() { break; } case TFileType::FILE_S3: { - file_reader.reset(new BufferedReader( - _profile, new S3Reader(_params.properties, range.path, range.start_offset))); + file_reader.reset(new S3Reader(_params.properties, range.path, range.start_offset)); break; } default: { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org