This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0ac5228c05 [feature-wip][multi-catalog]Support prefetch for orc file format (#11292) 0ac5228c05 is described below commit 0ac5228c0535fa42efed5ec4e1a1850c5e753da0 Author: huangzhaowei <huangzhaowei....@bytedance.com> AuthorDate: Tue Aug 2 11:01:15 2022 +0800 [feature-wip][multi-catalog]Support prefetch for orc file format (#11292) Refactor the prefetch code in parquet and support prefetch for orc file format --- be/src/exec/arrow/arrow_reader.cpp | 61 +++++++++++++++++++++++ be/src/exec/arrow/arrow_reader.h | 16 +++++- be/src/exec/arrow/orc_reader.cpp | 40 +++++++-------- be/src/exec/arrow/orc_reader.h | 3 +- be/src/exec/arrow/parquet_reader.cpp | 94 ++++++++---------------------------- be/src/exec/arrow/parquet_reader.h | 15 ++---- 6 files changed, 120 insertions(+), 109 deletions(-) diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index 9d20697148..5d1785f744 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -48,6 +48,11 @@ ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, ArrowReaderWrap::~ArrowReaderWrap() { close(); + _closed = true; + _queue_writer_cond.notify_one(); + if (_thread.joinable()) { + _thread.join(); + } } void ArrowReaderWrap::close() { @@ -76,6 +81,62 @@ Status ArrowReaderWrap::column_indices(const std::vector<SlotDescriptor*>& tuple return Status::OK(); } +Status ArrowReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) { + std::unique_lock<std::mutex> lock(_mtx); + while (!_closed && _queue.empty()) { + if (_batch_eof) { + _include_column_ids.clear(); + *eof = true; + _batch_eof = false; + return Status::OK(); + } + _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); + } + if (UNLIKELY(_closed)) { + return Status::InternalError(_status.message()); + } + *batch = _queue.front(); + _queue.pop_front(); + _queue_writer_cond.notify_one(); + return Status::OK(); +} + +void ArrowReaderWrap::prefetch_batch() { + auto insert_batch = [this](const auto& batch) { + std::unique_lock<std::mutex> lock(_mtx); + while (!_closed && _queue.size() == _max_queue_size) { + _queue_writer_cond.wait_for(lock, std::chrono::seconds(1)); + } + if (UNLIKELY(_closed)) { + return; + } + _queue.push_back(batch); + _queue_reader_cond.notify_one(); + }; + int current_group = _current_group; + int total_groups = _total_groups; + while (true) { + if (_closed || current_group >= total_groups) { + _batch_eof = true; + _queue_reader_cond.notify_one(); + return; + } + if (filter_row_group(current_group)) { + current_group++; + continue; + } + + arrow::RecordBatchVector batches; + read_batches(batches, current_group); + if (!_status.ok()) { + _closed = true; + return; + } + std::for_each(batches.begin(), batches.end(), insert_batch); + current_group++; + } +} + ArrowFile::ArrowFile(FileReader* file) : _file(file) {} ArrowFile::~ArrowFile() { diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index 1389426fc9..704ca0750e 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -92,13 +92,17 @@ public: return Status::NotSupported("Not Implemented read"); } // for vec - virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) = 0; + Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof); std::shared_ptr<Statistics>& statistics() { return _statistics; } void close(); virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); } + void prefetch_batch(); + protected: virtual Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs); + virtual void read_batches(arrow::RecordBatchVector& batches, int current_group) = 0; + virtual bool filter_row_group(int current_group) = 0; protected: const int64_t _batch_size; @@ -110,6 +114,16 @@ protected: std::map<std::string, int> _map_column; // column-name <---> column-index std::vector<int> _include_column_ids; // columns that need to get from file std::shared_ptr<Statistics> _statistics; + + std::atomic<bool> _closed = false; + std::atomic<bool> _batch_eof = false; + arrow::Status _status; + std::mutex _mtx; + std::condition_variable _queue_reader_cond; + std::condition_variable _queue_writer_cond; + std::list<std::shared_ptr<arrow::RecordBatch>> _queue; + const size_t _max_queue_size = config::parquet_reader_max_buffer_size; + std::thread _thread; }; } // namespace doris diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index 78d51d8376..0db5640369 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -71,11 +71,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, } RETURN_IF_ERROR(column_indices(tuple_slot_descs)); - bool eof = false; - RETURN_IF_ERROR(_next_stripe_reader(&eof)); - if (eof) { - return Status::EndOfFile("end of file"); - } + _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this); return Status::OK(); } @@ -143,23 +139,23 @@ Status ORCReaderWrap::_next_stripe_reader(bool* eof) { return Status::OK(); } -Status ORCReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) { - *eof = false; - do { - auto st = _rb_reader->ReadNext(batch); - if (!st.ok()) { - LOG(WARNING) << "failed to get next batch, errmsg=" << st; - return Status::InternalError(st.ToString()); - } - if (*batch == nullptr) { - // try next stripe - RETURN_IF_ERROR(_next_stripe_reader(eof)); - if (*eof) { - break; - } - } - } while (*batch == nullptr); - return Status::OK(); +void ORCReaderWrap::read_batches(arrow::RecordBatchVector& batches, int current_group) { + bool eof = false; + Status status = _next_stripe_reader(&eof); + if (!status.ok()) { + _closed = true; + return; + } + if (eof) { + _closed = true; + return; + } + + _status = _rb_reader->ReadAll(&batches); +} + +bool ORCReaderWrap::filter_row_group(int current_group) { + return false; } } // namespace doris \ No newline at end of file diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index fe8c5c54a4..1e6f0f83e6 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -40,11 +40,12 @@ public: const std::vector<SlotDescriptor*>& tuple_slot_descs, const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) override; - Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override; private: Status _next_stripe_reader(bool* eof); Status _seek_start_stripe(); + void read_batches(arrow::RecordBatchVector& batches, int current_group) override; + bool filter_row_group(int current_group) override; private: // orc file reader object diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index c0f45f52af..8d119146b4 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -46,14 +46,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size _range_start_offset(range_start_offset), _range_size(range_size) {} -ParquetReaderWrap::~ParquetReaderWrap() { - _closed = true; - _queue_writer_cond.notify_one(); - if (_thread.joinable()) { - _thread.join(); - } -} - Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, const std::vector<SlotDescriptor*>& tuple_slot_descs, const std::vector<ExprContext*>& conjunct_ctxs, @@ -111,7 +103,7 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids, file_size); } - _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this); + _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this); return Status::OK(); } catch (parquet::ParquetException& e) { std::stringstream str_error; @@ -184,26 +176,6 @@ Status ParquetReaderWrap::read_record_batch(bool* eof) { return Status::OK(); } -Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) { - std::unique_lock<std::mutex> lock(_mtx); - while (!_closed && _queue.empty()) { - if (_batch_eof) { - _include_column_ids.clear(); - *eof = true; - _batch_eof = false; - return Status::OK(); - } - _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); - } - if (UNLIKELY(_closed)) { - return Status::InternalError(_status.message()); - } - *batch = _queue.front(); - _queue.pop_front(); - _queue_writer_cond.notify_one(); - return Status::OK(); -} - Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf, int32_t* wbytes) { const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type()); @@ -546,50 +518,6 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& return read_record_batch(eof); } -void ParquetReaderWrap::prefetch_batch() { - auto insert_batch = [this](const auto& batch) { - std::unique_lock<std::mutex> lock(_mtx); - while (!_closed && _queue.size() == _max_queue_size) { - _queue_writer_cond.wait_for(lock, std::chrono::seconds(1)); - } - if (UNLIKELY(_closed)) { - return; - } - _queue.push_back(batch); - _queue_reader_cond.notify_one(); - }; - int current_group = 0; - int total_groups = _total_groups; - while (true) { - if (_closed || current_group >= total_groups) { - _batch_eof = true; - _queue_reader_cond.notify_one(); - return; - } - if (config::parquet_predicate_push_down) { - auto filter_group_set = _row_group_reader->filter_groups(); - if (filter_group_set.end() != filter_group_set.find(current_group)) { - // find filter group, skip - current_group++; - continue; - } - } - _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader); - if (!_status.ok()) { - _closed = true; - return; - } - arrow::RecordBatchVector batches; - _status = _rb_reader->ReadAll(&batches); - if (!_status.ok()) { - _closed = true; - return; - } - std::for_each(batches.begin(), batches.end(), insert_batch); - current_group++; - } -} - Status ParquetReaderWrap::read_next_batch() { std::unique_lock<std::mutex> lock(_mtx); while (!_closed && _queue.empty()) { @@ -609,4 +537,24 @@ Status ParquetReaderWrap::read_next_batch() { return Status::OK(); } +void ParquetReaderWrap::read_batches(arrow::RecordBatchVector& batches, int current_group) { + _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader); + if (!_status.ok()) { + _closed = true; + return; + } + _status = _rb_reader->ReadAll(&batches); +} + +bool ParquetReaderWrap::filter_row_group(int current_group) { + if (config::parquet_predicate_push_down) { + auto filter_group_set = _row_group_reader->filter_groups(); + if (filter_group_set.end() != filter_group_set.find(current_group)) { + // find filter group, skip + return true; + } + } + return false; +} + } // namespace doris diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h index 3bf4cf4814..95774f60b0 100644 --- a/be/src/exec/arrow/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -64,7 +64,7 @@ public: // batch_size is not use here ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size); - ~ParquetReaderWrap() override; + ~ParquetReaderWrap() override = default; // Read Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, @@ -75,7 +75,6 @@ public: const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) override; Status init_parquet_type(); - Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override; private: void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, @@ -86,8 +85,9 @@ private: int32_t* wbtyes); private: - void prefetch_batch(); Status read_next_batch(); + void read_batches(arrow::RecordBatchVector& batches, int current_group) override; + bool filter_row_group(int current_group) override; private: // parquet file reader object @@ -104,16 +104,7 @@ private: int64_t _range_size; private: - std::atomic<bool> _closed = false; - std::atomic<bool> _batch_eof = false; - arrow::Status _status; - std::mutex _mtx; - std::condition_variable _queue_reader_cond; - std::condition_variable _queue_writer_cond; - std::list<std::shared_ptr<arrow::RecordBatch>> _queue; std::unique_ptr<doris::RowGroupReader> _row_group_reader; - const size_t _max_queue_size = config::parquet_reader_max_buffer_size; - std::thread _thread; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org