This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8a366c9ba2 [feature](multi-catalog) read parquet file by start/offset (#10843) 8a366c9ba2 is described below commit 8a366c9ba2fc56c523708b3ced9e65e69f17f9ae Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Mon Jul 18 20:51:08 2022 +0800 [feature](multi-catalog) read parquet file by start/offset (#10843) To avoid reading the repeat row group, we should align offsets --- be/src/common/config.h | 2 +- be/src/exec/arrow/parquet_reader.cpp | 15 ++- be/src/exec/arrow/parquet_reader.h | 6 +- be/src/exec/arrow/parquet_row_group_reader.cpp | 144 ++++++++++++++++++------- be/src/exec/arrow/parquet_row_group_reader.h | 17 ++- be/src/exec/parquet_scanner.cpp | 2 +- be/src/vec/exec/file_arrow_scanner.cpp | 16 ++- be/src/vec/exec/file_arrow_scanner.h | 9 +- be/src/vec/exec/varrow_scanner.cpp | 5 +- be/src/vec/exec/varrow_scanner.h | 3 +- be/src/vec/exec/vorc_scanner.cpp | 3 +- be/src/vec/exec/vorc_scanner.h | 3 +- be/src/vec/exec/vparquet_scanner.cpp | 7 +- be/src/vec/exec/vparquet_scanner.h | 3 +- 14 files changed, 169 insertions(+), 66 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index ff4184eb62..f0fb39800e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -769,7 +769,7 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); -CONF_Bool(parquet_predicate_push_down, "false"); +CONF_Bool(parquet_predicate_push_down, "true"); // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index c14022bc53..c0f45f52af 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -37,11 +37,14 @@ namespace doris { // Broker ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file), _rows_of_group(0), _current_line_of_group(0), - _current_line_of_batch(0) {} + _current_line_of_batch(0), + _range_start_offset(range_start_offset), + _range_size(range_size) {} ParquetReaderWrap::~ParquetReaderWrap() { _closed = true; @@ -101,8 +104,12 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, RETURN_IF_ERROR(column_indices(tuple_slot_descs)); if (config::parquet_predicate_push_down) { - _row_group_reader.reset(new RowGroupReader(conjunct_ctxs, _file_metadata, this)); - _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids); + int64_t file_size = 0; + size(&file_size); + _row_group_reader.reset(new RowGroupReader(_range_start_offset, _range_size, + conjunct_ctxs, _file_metadata, this)); + _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids, + file_size); } _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this); return Status::OK(); diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h index 6aaab5d24d..3bf4cf4814 100644 --- a/be/src/exec/arrow/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -62,8 +62,8 @@ class RowGroupReader; class ParquetReaderWrap final : public ArrowReaderWrap { public: // batch_size is not use here - ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file); + ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size); ~ParquetReaderWrap() override; // Read @@ -100,6 +100,8 @@ private: int _current_line_of_group; int _current_line_of_batch; std::string _timezone; + int64_t _range_start_offset; + int64_t _range_size; private: std::atomic<bool> _closed = false; diff --git a/be/src/exec/arrow/parquet_row_group_reader.cpp b/be/src/exec/arrow/parquet_row_group_reader.cpp index bbc607f02b..517309db76 100644 --- a/be/src/exec/arrow/parquet_row_group_reader.cpp +++ b/be/src/exec/arrow/parquet_row_group_reader.cpp @@ -57,30 +57,37 @@ return true; \ } -#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \ - std::vector<T> in_values; \ - for (auto val : in_pred_values) { \ - T value = reinterpret_cast<T*>(val)[0]; \ - in_values.emplace_back(value); \ - } \ - if (in_values.empty()) { \ - return false; \ - } \ - std::sort(in_values.begin(), in_values.end()); \ - T in_min = in_values.front(); \ - T in_max = in_values.back(); \ - const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \ - const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \ - if (in_max < group_min || in_min > group_max) { \ - return true; \ +#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \ + std::vector<T> in_values; \ + for (auto val : in_pred_values) { \ + T value = reinterpret_cast<T*>(val)[0]; \ + in_values.emplace_back(value); \ + } \ + if (in_values.empty()) { \ + return false; \ + } \ + auto result = std::minmax_element(in_values.begin(), in_values.end()); \ + T in_min = *result.first; \ + T in_max = *result.second; \ + const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \ + const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \ + if (in_max < group_min || in_min > group_max) { \ + return true; \ } +#define PARQUET_HEAD 4 + namespace doris { -RowGroupReader::RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs, +RowGroupReader::RowGroupReader(int64_t range_start_offset, int64_t range_size, + const std::vector<ExprContext*>& conjunct_ctxs, std::shared_ptr<parquet::FileMetaData>& file_metadata, ParquetReaderWrap* parent) - : _conjunct_ctxs(conjunct_ctxs), _file_metadata(file_metadata), _parent(parent) {} + : _range_start_offset(range_start_offset), + _range_size(range_size), + _conjunct_ctxs(conjunct_ctxs), + _file_metadata(file_metadata), + _parent(parent) {} RowGroupReader::~RowGroupReader() { _slot_conjuncts.clear(); @@ -89,20 +96,67 @@ RowGroupReader::~RowGroupReader() { Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc, const std::map<std::string, int>& map_column, - const std::vector<int>& include_column_ids) { + const std::vector<int>& include_column_ids, + int64_t file_size) { + int total_group = _file_metadata->num_row_groups(); + // It will not filter if head_group_offset equals tail_group_offset + int64_t head_group_offset = _range_start_offset; + int64_t tail_group_offset = _range_start_offset; + int64_t range_end_offset = _range_start_offset + _range_size; + if (_range_size > 0 && file_size > 0) { + // todo: extract to function + for (int row_group_id = 0; row_group_id < total_group; row_group_id++) { + int64_t cur_group_offset = _get_group_offset(row_group_id); + // when a whole file is in a split, range_end_offset is the EOF offset + if (row_group_id == total_group - 1) { + if (cur_group_offset < _range_start_offset) { + head_group_offset = cur_group_offset; + } + if (range_end_offset >= file_size) { + tail_group_offset = file_size; + } else { + tail_group_offset = cur_group_offset; + } + break; + } + int64_t next_group_offset = _get_group_offset(row_group_id + 1); + if (_range_start_offset >= cur_group_offset && + _range_start_offset < next_group_offset) { + // Enter the branch only the fist time to find head group + head_group_offset = cur_group_offset; + } + if (range_end_offset < next_group_offset) { + tail_group_offset = cur_group_offset; + // find tail, break + break; + } + } + if (tail_group_offset < head_group_offset) { + tail_group_offset = head_group_offset; + } + } + std::unordered_set<int> parquet_column_ids(include_column_ids.begin(), include_column_ids.end()); _init_conjuncts(tuple_desc, map_column, parquet_column_ids); - int total_group = _file_metadata->num_row_groups(); _parent->statistics()->total_groups = total_group; _parent->statistics()->total_rows = _file_metadata->num_rows(); - int32_t filtered_num_row_groups = 0; - int64_t filtered_num_rows = 0; - int64_t filtered_total_byte_size = 0; bool update_statistics = false; for (int row_group_id = 0; row_group_id < total_group; row_group_id++) { auto row_group_meta = _file_metadata->RowGroup(row_group_id); + if (_range_size > 0 && file_size > 0) { + int64_t start_offset = _get_group_offset(row_group_id); + int64_t end_offset = row_group_id == total_group - 1 + ? file_size + : _get_group_offset(row_group_id + 1); + if (start_offset >= tail_group_offset || end_offset <= head_group_offset) { + _filter_group.emplace(row_group_id); + VLOG_DEBUG << "Filter extra row group id: " << row_group_id; + continue; + } + } + // if head_read_offset <= start_offset < end_offset <= tail_read_offset for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) { const std::string& col_name = tuple_desc->slots()[slot_id]->col_name(); auto col_iter = map_column.find(col_name); @@ -129,26 +183,36 @@ Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc, bool group_need_filter = _determine_filter_row_group(slot_iter->second, min, max); if (group_need_filter) { update_statistics = true; - filtered_num_row_groups++; - filtered_num_rows += row_group_meta->num_rows(); - filtered_total_byte_size += row_group_meta->total_byte_size(); + _add_filter_group(row_group_id, row_group_meta); VLOG_DEBUG << "Filter row group id: " << row_group_id; - _filter_group.emplace(row_group_id); break; } } } + if (update_statistics) { - _parent->statistics()->filtered_row_groups = filtered_num_row_groups; - _parent->statistics()->filtered_rows = filtered_num_rows; - _parent->statistics()->filtered_total_bytes = filtered_total_byte_size; + _parent->statistics()->filtered_row_groups = _filtered_num_row_groups; + _parent->statistics()->filtered_rows = _filtered_num_rows; + _parent->statistics()->filtered_total_bytes = _filtered_total_byte_size; VLOG_DEBUG << "Parquet file: " << _file_metadata->schema()->name() << ", Num of read row group: " << total_group - << ", and num of skip row group: " << filtered_num_row_groups; + << ", and num of skip row group: " << _filtered_num_row_groups; } return Status::OK(); } +int64_t RowGroupReader::_get_group_offset(int row_group_id) { + return _file_metadata->RowGroup(row_group_id)->ColumnChunk(0)->file_offset() - PARQUET_HEAD; +} + +void RowGroupReader::_add_filter_group(int row_group_id, + std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta) { + _filtered_num_row_groups++; + _filtered_num_rows += row_group_meta->num_rows(); + _filtered_total_byte_size += row_group_meta->total_byte_size(); + _filter_group.emplace(row_group_id); +} + void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc, const std::map<std::string, int>& map_column, const std::unordered_set<int>& include_column_ids) { @@ -292,15 +356,15 @@ bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void* case TYPE_DATETIME: { std::vector<const char*> in_values; for (auto val : in_pred_values) { - const char* value = ((std::string*)val)->c_str(); + const char* value = ((std::string*)val)->data(); in_values.emplace_back(value); } if (in_values.empty()) { return false; } - std::sort(in_values.begin(), in_values.end()); - const char* in_min = in_values.front(); - const char* in_max = in_values.back(); + auto result = std::minmax_element(in_values.begin(), in_values.end()); + const char* in_min = *result.first; + const char* in_max = *result.second; if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) { return true; } @@ -350,7 +414,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const ch case TYPE_CHAR: case TYPE_DATE: case TYPE_DATETIME: { - const char* conjunct_value = ((std::string*)value)->c_str(); + const char* conjunct_value = ((std::string*)value)->data(); if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) { return true; } @@ -400,7 +464,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const ch case TYPE_DATE: case TYPE_DATETIME: { // case TYPE_TIME: - const char* conjunct_value = ((std::string*)value)->c_str(); + const char* conjunct_value = ((std::string*)value)->data(); if (strcmp(max_bytes, conjunct_value) <= 0) { return true; } @@ -450,7 +514,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const ch case TYPE_DATE: case TYPE_DATETIME: { // case TYPE_TIME: - const char* conjunct_value = ((std::string*)value)->c_str(); + const char* conjunct_value = ((std::string*)value)->data(); if (strcmp(max_bytes, conjunct_value) < 0) { return true; } @@ -500,7 +564,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const ch case TYPE_DATE: case TYPE_DATETIME: { // case TYPE_TIME: - const char* conjunct_value = ((std::string*)value)->c_str(); + const char* conjunct_value = ((std::string*)value)->data(); if (strcmp(min_bytes, conjunct_value) >= 0) { return true; } @@ -550,7 +614,7 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const ch case TYPE_DATE: case TYPE_DATETIME: { // case TYPE_TIME: - const char* conjunct_value = ((std::string*)value)->c_str(); + const char* conjunct_value = ((std::string*)value)->data(); if (strcmp(min_bytes, conjunct_value) > 0) { return true; } diff --git a/be/src/exec/arrow/parquet_row_group_reader.h b/be/src/exec/arrow/parquet_row_group_reader.h index 98a6b9ff95..6cd5724828 100644 --- a/be/src/exec/arrow/parquet_row_group_reader.h +++ b/be/src/exec/arrow/parquet_row_group_reader.h @@ -37,18 +37,24 @@ class ParquetReaderWrap; class RowGroupReader { public: - RowGroupReader(const std::vector<ExprContext*>& conjunct_ctxs, + RowGroupReader(int64_t range_start_offset, int64_t range_size, + const std::vector<ExprContext*>& conjunct_ctxs, std::shared_ptr<parquet::FileMetaData>& file_metadata, ParquetReaderWrap* parent); ~RowGroupReader(); Status init_filter_groups(const TupleDescriptor* tuple_desc, const std::map<std::string, int>& map_column, - const std::vector<int>& include_column_ids); + const std::vector<int>& include_column_ids, int64_t file_size); std::unordered_set<int> filter_groups() { return _filter_group; }; private: + void _add_filter_group(int row_group_id, + std::unique_ptr<parquet::RowGroupMetaData>& row_group_meta); + + int64_t _get_group_offset(int row_group_id); + void _init_conjuncts(const TupleDescriptor* tuple_desc, const std::map<std::string, int>& _map_column, const std::unordered_set<int>& include_column_ids); @@ -78,11 +84,18 @@ private: bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes); private: + int64_t _range_start_offset; + int64_t _range_size; + int64_t _file_size; std::map<int, std::vector<ExprContext*>> _slot_conjuncts; std::unordered_set<int> _filter_group; std::vector<ExprContext*> _conjunct_ctxs; std::shared_ptr<parquet::FileMetaData> _file_metadata; ParquetReaderWrap* _parent; + + int32_t _filtered_num_row_groups = 0; + int64_t _filtered_num_rows = 0; + int64_t _filtered_total_byte_size = 0; }; } // namespace doris diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 76caf48e21..d7aa4041e2 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -107,7 +107,7 @@ Status ParquetScanner::open_next_reader() { num_of_columns_from_file = range.num_of_columns_from_file; } _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(), - num_of_columns_from_file); + num_of_columns_from_file, 0, 0); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs, _state->timezone()); diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 79c6037c36..55199335bd 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -66,8 +66,9 @@ Status FileArrowScanner::_open_next_reader() { int32_t num_of_columns_from_file = _file_slot_descs.size(); - _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(), - num_of_columns_from_file); + _cur_file_reader = + _new_arrow_reader(file_reader.release(), _state->batch_size(), + num_of_columns_from_file, range.start_offset, range.size); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, @@ -217,8 +218,11 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr } ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) { - return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file); + int32_t num_of_columns_from_file, + int64_t range_start_offset, + int64_t range_size) { + return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file, + range_start_offset, range_size); } void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) { @@ -237,7 +241,9 @@ VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) { + int32_t num_of_columns_from_file, + int64_t range_start_offset, + int64_t range_size) { return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file); } diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h index 8172c9b83c..3f7537c134 100644 --- a/be/src/vec/exec/file_arrow_scanner.h +++ b/be/src/vec/exec/file_arrow_scanner.h @@ -53,7 +53,8 @@ public: protected: virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) = 0; + int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) = 0; virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {} private: @@ -82,7 +83,8 @@ public: protected: ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) override; + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) override; void _init_profiles(RuntimeProfile* profile) override; void _update_profile(std::shared_ptr<Statistics>& statistics) override; @@ -105,7 +107,8 @@ public: protected: ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) override; + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) override; void _init_profiles(RuntimeProfile* profile) override {}; }; diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index 3244de5b90..200e467810 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -76,8 +76,9 @@ Status VArrowScanner::_open_next_reader() { if (range.__isset.num_of_columns_from_file) { num_of_columns_from_file = range.num_of_columns_from_file; } - _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(), - num_of_columns_from_file); + _cur_file_reader = + _new_arrow_reader(file_reader.release(), _state->batch_size(), + num_of_columns_from_file, range.start_offset, range.size); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs, _state->timezone()); diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h index 109ce53177..7eff7ab329 100644 --- a/be/src/vec/exec/varrow_scanner.h +++ b/be/src/vec/exec/varrow_scanner.h @@ -64,7 +64,8 @@ public: protected: virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) = 0; + int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) = 0; private: // Read next buffer from reader diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp index ca5c7c2aef..9f71d01cfb 100644 --- a/be/src/vec/exec/vorc_scanner.cpp +++ b/be/src/vec/exec/vorc_scanner.cpp @@ -30,7 +30,8 @@ VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile, counter) {} ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) { + int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) { return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file); } diff --git a/be/src/vec/exec/vorc_scanner.h b/be/src/vec/exec/vorc_scanner.h index 12510e9731..b7bd1fdf67 100644 --- a/be/src/vec/exec/vorc_scanner.h +++ b/be/src/vec/exec/vorc_scanner.h @@ -47,7 +47,8 @@ public: protected: ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) override; + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) override; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index cb59ae60bc..f4d74a6207 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -31,8 +31,11 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, counter) {} ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) { - return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file); + int32_t num_of_columns_from_file, + int64_t range_start_offset, + int64_t range_size) { + return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file, + range_start_offset, range_size); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h index 367e2e7472..d8cf597dbe 100644 --- a/be/src/vec/exec/vparquet_scanner.h +++ b/be/src/vec/exec/vparquet_scanner.h @@ -48,7 +48,8 @@ public: protected: ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) override; + int32_t num_of_columns_from_file, int64_t range_start_offset, + int64_t range_size) override; }; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org