This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 571608dee59742811864ccc9bc161c6ae4fabd67 Author: Lightman <31928846+lchangli...@users.noreply.github.com> AuthorDate: Thu May 19 23:52:01 2022 +0800 [Enhancement] improve parquet reader via arrow's prefetch and multi thread (#9472) * add ArrowReaderProperties to parquet::arrow::FileReader * support perfecth batch --- be/src/common/config.h | 3 + be/src/exec/parquet_reader.cpp | 137 ++++++++++++++++++++++++++++------------- be/src/exec/parquet_reader.h | 23 ++++++- 3 files changed, 117 insertions(+), 46 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index b37fbc048a..48582beabe 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -715,6 +715,9 @@ CONF_Validator(string_type_length_soft_limit_bytes, // is greater than object_pool_buffer_size, release the object in the unused_object_pool. CONF_Int32(object_pool_buffer_size, "100"); +// ParquetReaderWrap prefetch buffer size +CONF_Int32(parquet_reader_max_buffer_size, "50"); + } // namespace config } // namespace doris diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index ddb3531e17..61eb4d8e19 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -18,9 +18,15 @@ #include <arrow/array.h> #include <arrow/status.h> +#include <arrow/type_fwd.h> #include <time.h> +#include <algorithm> +#include <mutex> +#include <thread> + #include "common/logging.h" +#include "common/status.h" #include "exec/file_reader.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/TPaloBrokerService.h" @@ -44,9 +50,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_col _current_line_of_group(0), _current_line_of_batch(0) { _parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader)); - _properties = parquet::ReaderProperties(); - _properties.enable_buffered_stream(); - _properties.set_buffer_size(65535); } ParquetReaderWrap::~ParquetReaderWrap() { @@ -55,10 +58,23 @@ ParquetReaderWrap::~ParquetReaderWrap() { Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, const std::string& timezone) { try { - // new file reader for parquet file - auto st = parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), - parquet::ParquetFileReader::Open(_parquet, _properties), &_reader); + parquet::ArrowReaderProperties arrow_reader_properties = + parquet::default_arrow_reader_properties(); + arrow_reader_properties.set_pre_buffer(true); + arrow_reader_properties.set_use_threads(true); + // Open Parquet file reader + auto reader_builder = parquet::arrow::FileReaderBuilder(); + reader_builder.properties(arrow_reader_properties); + + auto st = reader_builder.Open(_parquet); + + if (!st.ok()) { + LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString(); + return Status::InternalError("Failed to create file reader"); + } + + st = reader_builder.Build(&_reader); + if (!st.ok()) { LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString(); return Status::InternalError("Failed to create file reader"); @@ -85,31 +101,23 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*> _timezone = timezone; - if (_current_line_of_group == 0) { // the first read - RETURN_IF_ERROR(column_indices(tuple_slot_descs)); - // read batch - arrow::Status status = _reader->GetRecordBatchReader({_current_group}, - _parquet_column_ids, &_rb_batch); - if (!status.ok()) { - LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString(); - return Status::InternalError(status.ToString()); - } - status = _rb_batch->ReadNext(&_batch); - if (!status.ok()) { - LOG(WARNING) << "The first read record. " << status.ToString(); - return Status::InternalError(status.ToString()); - } - _current_line_of_batch = 0; - //save column type - std::shared_ptr<arrow::Schema> field_schema = _batch->schema(); - for (int i = 0; i < _parquet_column_ids.size(); i++) { - std::shared_ptr<arrow::Field> field = field_schema->field(i); - if (!field) { - LOG(WARNING) << "Get field schema failed. Column order:" << i; - return Status::InternalError(status.ToString()); - } - _parquet_column_type.emplace_back(field->type()->id()); + RETURN_IF_ERROR(column_indices(tuple_slot_descs)); + + std::thread thread(&ParquetReaderWrap::prefetch_batch, this); + thread.detach(); + + // read batch + RETURN_IF_ERROR(read_next_batch()); + _current_line_of_batch = 0; + //save column type + std::shared_ptr<arrow::Schema> field_schema = _batch->schema(); + for (int i = 0; i < _parquet_column_ids.size(); i++) { + std::shared_ptr<arrow::Field> field = field_schema->field(i); + if (!field) { + LOG(WARNING) << "Get field schema failed. Column order:" << i; + return Status::InternalError(_status.ToString()); } + _parquet_column_type.emplace_back(field->type()->id()); } return Status::OK(); } catch (parquet::ParquetException& e) { @@ -121,6 +129,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*> } void ParquetReaderWrap::close() { + _closed = true; + _queue_writer_cond.notify_one(); arrow::Status st = _parquet->Close(); if (!st.ok()) { LOG(WARNING) << "close parquet file error: " << st.ToString(); @@ -195,25 +205,15 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>& _rows_of_group = _file_metadata->RowGroup(_current_group) ->num_rows(); //get rows of the current row group // read batch - arrow::Status status = - _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch); - if (!status.ok()) { - return Status::InternalError("Get RecordBatchReader Failed."); - } - status = _rb_batch->ReadNext(&_batch); - if (!status.ok()) { - return Status::InternalError("Read Batch Error With Libarrow."); - } + RETURN_IF_ERROR(read_next_batch()); _current_line_of_batch = 0; } else if (_current_line_of_batch >= _batch->num_rows()) { VLOG_DEBUG << "read_record_batch, current group id:" << _current_group << " current line of batch:" << _current_line_of_batch << " is larger than batch size:" << _batch->num_rows() << ". start to read next batch"; - arrow::Status status = _rb_batch->ReadNext(&_batch); - if (!status.ok()) { - return Status::InternalError("Read Batch Error With Libarrow."); - } + // read batch + RETURN_IF_ERROR(read_next_batch()); _current_line_of_batch = 0; } return Status::OK(); @@ -537,6 +537,55 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& return read_record_batch(tuple_slot_descs, eof); } +void ParquetReaderWrap::prefetch_batch() { + auto insert_batch = [this](const auto& batch) { + std::unique_lock<std::mutex> lock(_mtx); + while (!_closed && _queue.size() == _max_queue_size) { + _queue_writer_cond.wait_for(lock, std::chrono::seconds(1)); + } + if (UNLIKELY(_closed)) { + return; + } + _queue.push_back(batch); + _queue_reader_cond.notify_one(); + }; + int current_group = 0; + while (true) { + if (_closed || current_group >= _total_groups) { + return; + } + _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch); + if (!_status.ok()) { + _closed = true; + return; + } + arrow::RecordBatchVector batches; + _status = _rb_batch->ReadAll(&batches); + if (!_status.ok()) { + _closed = true; + return; + } + std::for_each(batches.begin(), batches.end(), insert_batch); + current_group++; + } +} + +Status ParquetReaderWrap::read_next_batch() { + std::unique_lock<std::mutex> lock(_mtx); + while (!_closed && _queue.empty()) { + _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); + } + + if (UNLIKELY(_closed)) { + return Status::InternalError(_status.message()); + } + + _batch = _queue.front(); + _queue.pop_front(); + _queue_writer_cond.notify_one(); + return Status::OK(); +} + ParquetFile::ParquetFile(FileReader* file) : _file(file) {} ParquetFile::~ParquetFile() { diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index 2bd5b5a802..93f1a2b2dd 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -22,6 +22,7 @@ #include <arrow/io/api.h> #include <arrow/io/file.h> #include <arrow/io/interfaces.h> +#include <arrow/status.h> #include <parquet/api/reader.h> #include <parquet/api/writer.h> #include <parquet/arrow/reader.h> @@ -29,10 +30,16 @@ #include <parquet/exception.h> #include <stdint.h> +#include <atomic> +#include <condition_variable> +#include <list> #include <map> +#include <mutex> #include <string> +#include <thread> #include "common/status.h" +#include "common/config.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" @@ -51,7 +58,7 @@ class FileReader; class ParquetFile : public arrow::io::RandomAccessFile { public: ParquetFile(FileReader* file); - virtual ~ParquetFile(); + ~ParquetFile() override; arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override; arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override; arrow::Result<int64_t> GetSize() override; @@ -89,9 +96,12 @@ private: Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf, int32_t* wbtyes); +private: + void prefetch_batch(); + Status read_next_batch(); + private: const int32_t _num_of_columns_from_file; - parquet::ReaderProperties _properties; std::shared_ptr<ParquetFile> _parquet; // parquet file reader object @@ -110,6 +120,15 @@ private: int _current_line_of_batch; std::string _timezone; + +private: + std::atomic<bool> _closed = false; + arrow::Status _status; + std::mutex _mtx; + std::condition_variable _queue_reader_cond; + std::condition_variable _queue_writer_cond; + std::list<std::shared_ptr<arrow::RecordBatch>> _queue; + const size_t _max_queue_size = config::parquet_reader_max_buffer_size; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org