This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c358a43f35 [feature-wip] support parquet predicate push down (#10512) c358a43f35 is described below commit c358a43f3571059c9b641f3851ec84d8390fa95f Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Fri Jul 8 23:11:25 2022 +0800 [feature-wip] support parquet predicate push down (#10512) --- be/src/common/config.h | 1 + be/src/exec/CMakeLists.txt | 1 + be/src/exec/arrow/arrow_reader.cpp | 1 + be/src/exec/arrow/arrow_reader.h | 15 +- be/src/exec/arrow/orc_reader.cpp | 4 +- be/src/exec/arrow/orc_reader.h | 4 +- be/src/exec/arrow/parquet_reader.cpp | 93 ++-- be/src/exec/arrow/parquet_reader.h | 14 +- be/src/exec/arrow/parquet_row_group_reader.cpp | 568 +++++++++++++++++++++++++ be/src/exec/arrow/parquet_row_group_reader.h | 89 ++++ be/src/exec/base_scanner.cpp | 6 + be/src/exec/base_scanner.h | 8 + be/src/exec/broker_scan_node.cpp | 1 + be/src/exec/parquet_scanner.cpp | 11 +- be/src/exprs/expr_context.h | 1 + be/src/vec/exec/file_arrow_scanner.cpp | 7 +- be/src/vec/exec/file_scan_node.cpp | 1 + be/src/vec/exec/file_scanner.cpp | 6 + be/src/vec/exec/file_scanner.h | 8 + be/src/vec/exec/varrow_scanner.cpp | 22 +- be/src/vec/exec/varrow_scanner.h | 10 + be/src/vec/exec/vbroker_scan_node.cpp | 1 + be/src/vec/exec/vparquet_scanner.cpp | 2 +- be/test/exec/parquet_scanner_test.cpp | 64 ++- 24 files changed, 889 insertions(+), 49 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 2f5ee5cb71..ca0c66654d 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -752,6 +752,7 @@ CONF_Int32(object_pool_buffer_size, "100"); // ParquetReaderWrap prefetch buffer size CONF_Int32(parquet_reader_max_buffer_size, "50"); +CONF_Bool(parquet_predicate_push_down, "false"); // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index cc94f5fe9a..9a25789dd9 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -25,6 +25,7 @@ set(EXEC_FILES arrow/arrow_reader.cpp arrow/orc_reader.cpp arrow/parquet_reader.cpp + arrow/parquet_row_group_reader.cpp analytic_eval_node.cpp blocking_join_node.cpp broker_scan_node.cpp diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index 94289a990b..9d20697148 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -43,6 +43,7 @@ ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, _rb_reader = nullptr; _total_groups = 0; _current_group = 0; + _statistics = std::make_shared<Statistics>(); } ArrowReaderWrap::~ArrowReaderWrap() { diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index 5a1e00f022..ad888061be 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -33,6 +33,7 @@ #include <string> #include "common/status.h" +#include "exprs/expr_context.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" @@ -48,6 +49,14 @@ class SlotDescriptor; class MemPool; class FileReader; +struct Statistics { + int32_t filtered_row_groups = 0; + int32_t total_groups = 0; + int64_t filtered_rows = 0; + int64_t total_rows = 0; + int64_t filtered_total_bytes = 0; +}; + class ArrowFile : public arrow::io::RandomAccessFile { public: ArrowFile(FileReader* file); @@ -72,7 +81,9 @@ public: ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); virtual ~ArrowReaderWrap(); - virtual Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, + virtual Status init_reader(const TupleDescriptor* tuple_desc, + const std::vector<SlotDescriptor*>& tuple_slot_descs, + const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) = 0; // for row virtual Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, @@ -81,6 +92,7 @@ public: } // for vec virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) = 0; + std::shared_ptr<Statistics>& statistics() { return _statistics; } void close(); virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); } @@ -96,6 +108,7 @@ protected: int _current_group; // current group(stripe) std::map<std::string, int> _map_column; // column-name <---> column-index std::vector<int> _include_column_ids; // columns that need to get from file + std::shared_ptr<Statistics> _statistics; }; } // namespace doris diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index 536b852ad6..f94d24610c 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -34,7 +34,9 @@ ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size, _cur_file_eof = false; } -Status ORCReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, +Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, + const std::vector<SlotDescriptor*>& tuple_slot_descs, + const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) { // Open ORC file reader auto maybe_reader = diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index dd7853efe7..3b46cc6073 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -35,7 +35,9 @@ public: ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); ~ORCReaderWrap() override = default; - Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, + Status init_reader(const TupleDescriptor* tuple_desc, + const std::vector<SlotDescriptor*>& tuple_slot_descs, + const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) override; Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override; diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index 6885ebb781..f24768369e 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -36,13 +36,13 @@ namespace doris { // Broker - -ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file) +ParquetReaderWrap::ParquetReaderWrap(RuntimeProfile* profile, FileReader* file_reader, + int64_t batch_size, int32_t num_of_columns_from_file) : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file), _rows_of_group(0), _current_line_of_group(0), - _current_line_of_batch(0) {} + _current_line_of_batch(0), + _profile(profile) {} ParquetReaderWrap::~ParquetReaderWrap() { _closed = true; @@ -52,7 +52,9 @@ ParquetReaderWrap::~ParquetReaderWrap() { } } -Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, +Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, + const std::vector<SlotDescriptor*>& tuple_slot_descs, + const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) { try { parquet::ArrowReaderProperties arrow_reader_properties = @@ -99,22 +101,12 @@ Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_ _timezone = timezone; RETURN_IF_ERROR(column_indices(tuple_slot_descs)); - - _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this); - - // read batch - RETURN_IF_ERROR(read_next_batch()); - _current_line_of_batch = 0; - //save column type - std::shared_ptr<arrow::Schema> field_schema = _batch->schema(); - for (int i = 0; i < _include_column_ids.size(); i++) { - std::shared_ptr<arrow::Field> field = field_schema->field(i); - if (!field) { - LOG(WARNING) << "Get field schema failed. Column order:" << i; - return Status::InternalError(_status.ToString()); - } - _parquet_column_type.emplace_back(field->type()->id()); + if (config::parquet_predicate_push_down) { + _row_group_reader.reset( + new RowGroupReader(_profile, conjunct_ctxs, _file_metadata, this)); + _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids); } + _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this); return Status::OK(); } catch (parquet::ParquetException& e) { std::stringstream str_error; @@ -188,19 +180,25 @@ Status ParquetReaderWrap::read_record_batch(bool* eof) { } Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) { - if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) { - RETURN_IF_ERROR(read_record_batch(eof)); + std::unique_lock<std::mutex> lock(_mtx); + while (!_closed && _queue.empty()) { + if (_batch_eof) { + _include_column_ids.clear(); + *eof = true; + _batch_eof = false; + return Status::OK(); + } + _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); } - *batch = get_batch(); + if (UNLIKELY(_closed)) { + return Status::InternalError(_status.message()); + } + *batch = _queue.front(); + _queue.pop_front(); + _queue_writer_cond.notify_one(); return Status::OK(); } -const std::shared_ptr<arrow::RecordBatch>& ParquetReaderWrap::get_batch() { - _current_line_of_batch += _batch->num_rows(); - _current_line_of_group += _batch->num_rows(); - return _batch; -} - Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf, int32_t* wbytes) { const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type()); @@ -240,8 +238,32 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::Timestam return Status::OK(); } +Status ParquetReaderWrap::init_parquet_type() { + // read batch + RETURN_IF_ERROR(read_next_batch()); + _current_line_of_batch = 0; + if (_batch == nullptr) { + return Status::OK(); + } + //save column type + std::shared_ptr<arrow::Schema> field_schema = _batch->schema(); + for (int i = 0; i < _include_column_ids.size(); i++) { + std::shared_ptr<arrow::Field> field = field_schema->field(i); + if (!field) { + LOG(WARNING) << "Get field schema failed. Column order:" << i; + return Status::InternalError(_status.ToString()); + } + _parquet_column_type.emplace_back(field->type()->id()); + } + return Status::OK(); +} + Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, MemPool* mem_pool, bool* eof) { + if (_batch == nullptr) { + _current_line_of_group += _rows_of_group; + return read_record_batch(eof); + } uint8_t tmp_buf[128] = {0}; int32_t wbytes = 0; const uint8_t* value = nullptr; @@ -535,8 +557,18 @@ void ParquetReaderWrap::prefetch_batch() { int total_groups = _total_groups; while (true) { if (_closed || current_group >= total_groups) { + _batch_eof = true; + _queue_reader_cond.notify_one(); return; } + if (config::parquet_predicate_push_down) { + auto filter_group_set = _row_group_reader->filter_groups(); + if (filter_group_set.end() != filter_group_set.find(current_group)) { + // find filter group, skip + current_group++; + continue; + } + } _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader); if (!_status.ok()) { _closed = true; @@ -556,6 +588,9 @@ void ParquetReaderWrap::prefetch_batch() { Status ParquetReaderWrap::read_next_batch() { std::unique_lock<std::mutex> lock(_mtx); while (!_closed && _queue.empty()) { + if (_batch_eof) { + return Status::OK(); + } _queue_reader_cond.wait_for(lock, std::chrono::seconds(1)); } diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h index 870c560fe4..8d9a420f55 100644 --- a/be/src/exec/arrow/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -41,6 +41,7 @@ #include "common/config.h" #include "common/status.h" #include "exec/arrow/arrow_reader.h" +#include "exec/arrow/parquet_row_group_reader.h" #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" @@ -55,12 +56,13 @@ class Tuple; class SlotDescriptor; class MemPool; class FileReader; +class RowGroupReader; // Reader of parquet file class ParquetReaderWrap final : public ArrowReaderWrap { public: // batch_size is not use here - ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, + ParquetReaderWrap(RuntimeProfile* profile, FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file); ~ParquetReaderWrap() override; @@ -68,8 +70,11 @@ public: Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs, MemPool* mem_pool, bool* eof) override; Status size(int64_t* size) override; - Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs, + Status init_reader(const TupleDescriptor* tuple_desc, + const std::vector<SlotDescriptor*>& tuple_slot_descs, + const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) override; + Status init_parquet_type(); Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override; private: @@ -77,7 +82,6 @@ private: int32_t len); Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); Status read_record_batch(bool* eof); - const std::shared_ptr<arrow::RecordBatch>& get_batch(); Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf, int32_t* wbtyes); @@ -95,16 +99,18 @@ private: int _rows_of_group; // rows in a group. int _current_line_of_group; int _current_line_of_batch; - + RuntimeProfile* _profile; std::string _timezone; private: std::atomic<bool> _closed = false; + std::atomic<bool> _batch_eof = false; arrow::Status _status; std::mutex _mtx; std::condition_variable _queue_reader_cond; std::condition_variable _queue_writer_cond; std::list<std::shared_ptr<arrow::RecordBatch>> _queue; + std::unique_ptr<doris::RowGroupReader> _row_group_reader; const size_t _max_queue_size = config::parquet_reader_max_buffer_size; std::thread _thread; }; diff --git a/be/src/exec/arrow/parquet_row_group_reader.cpp b/be/src/exec/arrow/parquet_row_group_reader.cpp new file mode 100644 index 0000000000..48e382e88c --- /dev/null +++ b/be/src/exec/arrow/parquet_row_group_reader.cpp @@ -0,0 +1,568 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/arrow/parquet_row_group_reader.h" + +#include <exprs/expr_context.h> +#include <exprs/in_predicate.h> +#include <parquet/encoding.h> + +#include <cstring> + +#define _PLAIN_DECODE(T, value, min_bytes, max_bytes, out_value, out_min, out_max) \ + const T out_min = reinterpret_cast<const T*>(min_bytes)[0]; \ + const T out_max = reinterpret_cast<const T*>(max_bytes)[0]; \ + T out_value = *((T*)value); + +#define _PLAIN_DECODE_SINGLE(T, value, bytes, conjunct_value, out) \ + const T out = reinterpret_cast<const T*>(bytes)[0]; \ + T conjunct_value = *((T*)value); + +#define _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) \ + if (conjunct_value < min || conjunct_value > max) { \ + return true; \ + } + +#define _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) \ + if (max <= conjunct_value) { \ + return true; \ + } + +#define _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) \ + if (max < conjunct_value) { \ + return true; \ + } + +#define _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) \ + if (min >= conjunct_value) { \ + return true; \ + } + +#define _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) \ + if (min > conjunct_value) { \ + return true; \ + } + +#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \ + std::vector<T> in_values; \ + for (auto val : in_pred_values) { \ + T value = reinterpret_cast<T*>(val)[0]; \ + in_values.emplace_back(value); \ + } \ + if (in_values.empty()) { \ + return false; \ + } \ + std::sort(in_values.begin(), in_values.end()); \ + T in_min = in_values.front(); \ + T in_max = in_values.back(); \ + const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \ + const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \ + if (in_max < group_min || in_min > group_max) { \ + return true; \ + } + +namespace doris { + +RowGroupReader::RowGroupReader(RuntimeProfile* profile, + const std::vector<ExprContext*>& conjunct_ctxs, + std::shared_ptr<parquet::FileMetaData>& file_metadata, + ParquetReaderWrap* parent) + : _conjunct_ctxs(conjunct_ctxs), + _file_metadata(file_metadata), + _profile(profile), + _parent(parent) {} + +RowGroupReader::~RowGroupReader() { + _slot_conjuncts.clear(); + _filter_group.clear(); +} + +Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc, + const std::map<std::string, int>& map_column, + const std::vector<int>& include_column_ids) { + std::unordered_set<int> parquet_column_ids(include_column_ids.begin(), + include_column_ids.end()); + _init_conjuncts(tuple_desc, map_column, parquet_column_ids); + int total_group = _file_metadata->num_row_groups(); + _parent->statistics()->total_groups = total_group; + _parent->statistics()->total_rows = _file_metadata->num_rows(); + + int32_t filtered_num_row_groups = 0; + int64_t filtered_num_rows = 0; + int64_t filtered_total_byte_size = 0; + bool update_statistics = false; + for (int row_group_id = 0; row_group_id < total_group; row_group_id++) { + auto row_group_meta = _file_metadata->RowGroup(row_group_id); + for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) { + const std::string& col_name = tuple_desc->slots()[slot_id]->col_name(); + auto col_iter = map_column.find(col_name); + if (col_iter == map_column.end()) { + continue; + } + int parquet_col_id = col_iter->second; + if (parquet_column_ids.end() == parquet_column_ids.find(parquet_col_id)) { + // Column not exist in parquet file + continue; + } + auto slot_iter = _slot_conjuncts.find(slot_id); + if (slot_iter == _slot_conjuncts.end()) { + continue; + } + auto statistic = row_group_meta->ColumnChunk(parquet_col_id)->statistics(); + if (!statistic->HasMinMax()) { + continue; + } + // Min-max of statistic is plain-encoded value + const std::string& min = statistic->EncodeMin(); + const std::string& max = statistic->EncodeMax(); + + bool group_need_filter = _determine_filter_row_group(slot_iter->second, min, max); + if (group_need_filter) { + update_statistics = true; + filtered_num_row_groups++; + filtered_num_rows += row_group_meta->num_rows(); + filtered_total_byte_size += row_group_meta->total_byte_size(); + VLOG_DEBUG << "Filter row group id: " << row_group_id; + _filter_group.emplace(row_group_id); + break; + } + } + } + if (update_statistics) { + _parent->statistics()->filtered_row_groups = filtered_num_row_groups; + _parent->statistics()->filtered_rows = filtered_num_rows; + _parent->statistics()->filtered_total_bytes = filtered_total_byte_size; + VLOG_DEBUG << "Parquet file: " << _file_metadata->schema()->name() + << ", Num of read row group: " << total_group + << ", and num of skip row group: " << filtered_num_row_groups; + } + return Status::OK(); +} + +void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc, + const std::map<std::string, int>& map_column, + const std::unordered_set<int>& include_column_ids) { + if (tuple_desc->slots().empty()) { + return; + } + for (int i = 0; i < tuple_desc->slots().size(); i++) { + auto col_iter = map_column.find(tuple_desc->slots()[i]->col_name()); + if (col_iter == map_column.end()) { + continue; + } + int parquet_col_id = col_iter->second; + if (include_column_ids.end() == include_column_ids.find(parquet_col_id)) { + continue; + } + for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); conj_idx++) { + Expr* conjunct = _conjunct_ctxs[conj_idx]->root(); + if (conjunct->get_num_children() == 0) { + continue; + } + Expr* raw_slot = conjunct->get_child(0); + if (TExprNodeType::SLOT_REF != raw_slot->node_type()) { + continue; + } + SlotRef* slot_ref = (SlotRef*)raw_slot; + SlotId conjunct_slot_id = slot_ref->slot_id(); + if (conjunct_slot_id == tuple_desc->slots()[i]->id()) { + // Get conjuncts by conjunct_slot_id + auto iter = _slot_conjuncts.find(conjunct_slot_id); + if (_slot_conjuncts.end() == iter) { + std::vector<ExprContext*> conjuncts; + conjuncts.emplace_back(_conjunct_ctxs[conj_idx]); + _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts)); + } else { + std::vector<ExprContext*> conjuncts = iter->second; + conjuncts.emplace_back(_conjunct_ctxs[conj_idx]); + } + } + } + } +} + +bool RowGroupReader::_determine_filter_row_group(const std::vector<ExprContext*>& conjuncts, + const std::string& encoded_min, + const std::string& encoded_max) { + const char* min_bytes = encoded_min.data(); + const char* max_bytes = encoded_max.data(); + bool need_filter = false; + for (int i = 0; i < conjuncts.size(); i++) { + Expr* conjunct = conjuncts[i]->root(); + if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { + _eval_binary_predicate(conjuncts[i], min_bytes, max_bytes, need_filter); + } else if (TExprNodeType::IN_PRED == conjunct->node_type()) { + _eval_in_predicate(conjuncts[i], min_bytes, max_bytes, need_filter); + } + } + return need_filter; +} + +void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* min_bytes, + const char* max_bytes, bool& need_filter) { + Expr* conjunct = ctx->root(); + Expr* expr = conjunct->get_child(1); + if (expr == nullptr) { + return; + } + // supported conjunct example: slot_ref < 123, slot_ref > func(123), .. + auto conjunct_type = expr->type().type; + void* conjunct_value = ctx->get_value(expr, nullptr); + switch (conjunct->op()) { + case TExprOpcode::EQ: + need_filter = _eval_eq(conjunct_type, conjunct_value, min_bytes, max_bytes); + break; + case TExprOpcode::NE: + break; + case TExprOpcode::GT: + need_filter = _eval_gt(conjunct_type, conjunct_value, max_bytes); + break; + case TExprOpcode::GE: + need_filter = _eval_ge(conjunct_type, conjunct_value, max_bytes); + break; + case TExprOpcode::LT: + need_filter = _eval_lt(conjunct_type, conjunct_value, min_bytes); + break; + case TExprOpcode::LE: + need_filter = _eval_le(conjunct_type, conjunct_value, min_bytes); + break; + default: + break; + } +} + +void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes, + const char* max_bytes, bool& need_filter) { + Expr* conjunct = ctx->root(); + std::vector<void*> in_pred_values; + const InPredicate* pred = static_cast<const InPredicate*>(conjunct); + HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin(); + // TODO: process expr: in(func(123),123) + while (iter->has_next()) { + if (nullptr == iter->get_value()) { + return; + } + in_pred_values.emplace_back(const_cast<void*>(iter->get_value())); + iter->next(); + } + auto conjunct_type = conjunct->get_child(1)->type().type; + switch (conjunct->op()) { + case TExprOpcode::FILTER_IN: + need_filter = _eval_in_val(conjunct_type, in_pred_values, min_bytes, max_bytes); + break; + // case TExprOpcode::FILTER_NOT_IN: + default: + need_filter = false; + } +} + +bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values, + const char* min_bytes, const char* max_bytes) { + switch (conjunct_type) { + case TYPE_TINYINT: { + _FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes) + break; + } + case TYPE_SMALLINT: { + _FILTER_GROUP_BY_IN(int16_t, in_pred_values, min_bytes, max_bytes) + break; + } + case TYPE_INT: { + _FILTER_GROUP_BY_IN(int32_t, in_pred_values, min_bytes, max_bytes) + break; + } + case TYPE_BIGINT: { + _FILTER_GROUP_BY_IN(int64_t, in_pred_values, min_bytes, max_bytes) + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_DATE: + case TYPE_DATETIME: { + std::vector<const char*> in_values; + for (auto val : in_pred_values) { + const char* value = ((std::string*)val)->c_str(); + in_values.emplace_back(value); + } + if (in_values.empty()) { + return false; + } + std::sort(in_values.begin(), in_values.end()); + const char* in_min = in_values.front(); + const char* in_max = in_values.back(); + if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) { + return true; + } + break; + } + default: + return false; + } + return false; +} + +bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes, + const char* max_bytes) { + switch (conjunct_type) { + case TYPE_TINYINT: { + _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) + break; + } + case TYPE_SMALLINT: { + _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) + break; + } + case TYPE_INT: { + _PLAIN_DECODE(int32_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) + break; + } + case TYPE_BIGINT: { + _PLAIN_DECODE(int64_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) + break; + } + case TYPE_DOUBLE: { + _PLAIN_DECODE(double, value, min_bytes, max_bytes, conjunct_value, min, max) + _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) + break; + } + case TYPE_FLOAT: { + _PLAIN_DECODE(float, value, min_bytes, max_bytes, conjunct_value, min, max) + _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_DATE: + case TYPE_DATETIME: { + const char* conjunct_value = ((std::string*)value)->c_str(); + if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) { + return true; + } + break; + } + default: + return false; + } + return false; +} + +bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) { + switch (conjunct_type) { + case TYPE_TINYINT: { + _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) + break; + } + case TYPE_SMALLINT: { + _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) + break; + } + case TYPE_INT: { + _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) + break; + } + case TYPE_BIGINT: { + _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) + break; + } + case TYPE_DOUBLE: { + _PLAIN_DECODE_SINGLE(double, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) + break; + } + case TYPE_FLOAT: { + _PLAIN_DECODE_SINGLE(float, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_DATE: + case TYPE_DATETIME: { + // case TYPE_TIME: + const char* conjunct_value = ((std::string*)value)->c_str(); + if (strcmp(max_bytes, conjunct_value) <= 0) { + return true; + } + break; + } + default: + return false; + } + return false; +} + +bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) { + switch (conjunct_type) { + case TYPE_TINYINT: { + _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) + break; + } + case TYPE_SMALLINT: { + _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) + break; + } + case TYPE_INT: { + _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) + break; + } + case TYPE_BIGINT: { + _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) + break; + } + case TYPE_DOUBLE: { + _PLAIN_DECODE_SINGLE(double, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) + break; + } + case TYPE_FLOAT: { + _PLAIN_DECODE_SINGLE(float, value, max_bytes, conjunct_value, max) + _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_DATE: + case TYPE_DATETIME: { + // case TYPE_TIME: + const char* conjunct_value = ((std::string*)value)->c_str(); + if (strcmp(max_bytes, conjunct_value) < 0) { + return true; + } + break; + } + default: + return false; + } + return false; +} + +bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) { + switch (conjunct_type) { + case TYPE_TINYINT: { + _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) + break; + } + case TYPE_SMALLINT: { + _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) + break; + } + case TYPE_INT: { + _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) + break; + } + case TYPE_BIGINT: { + _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) + break; + } + case TYPE_DOUBLE: { + _PLAIN_DECODE_SINGLE(double, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) + break; + } + case TYPE_FLOAT: { + _PLAIN_DECODE_SINGLE(float, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_DATE: + case TYPE_DATETIME: { + // case TYPE_TIME: + const char* conjunct_value = ((std::string*)value)->c_str(); + if (strcmp(min_bytes, conjunct_value) >= 0) { + return true; + } + break; + } + default: + return false; + } + return false; +} + +bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) { + switch (conjunct_type) { + case TYPE_TINYINT: { + _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) + break; + } + case TYPE_SMALLINT: { + _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) + break; + } + case TYPE_INT: { + _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) + break; + } + case TYPE_BIGINT: { + _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) + break; + } + case TYPE_DOUBLE: { + _PLAIN_DECODE_SINGLE(double, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) + break; + } + case TYPE_FLOAT: { + _PLAIN_DECODE_SINGLE(float, value, min_bytes, conjunct_value, min) + _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) + break; + } + case TYPE_STRING: + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_DATE: + case TYPE_DATETIME: { + // case TYPE_TIME: + const char* conjunct_value = ((std::string*)value)->c_str(); + if (strcmp(min_bytes, conjunct_value) > 0) { + return true; + } + break; + } + default: + return false; + } + return false; +} +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/arrow/parquet_row_group_reader.h b/be/src/exec/arrow/parquet_row_group_reader.h new file mode 100644 index 0000000000..d29a6c2e05 --- /dev/null +++ b/be/src/exec/arrow/parquet_row_group_reader.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <arrow/type_fwd.h> +#include <exprs/expr.h> +#include <parquet/arrow/reader.h> +#include <parquet/encoding.h> +#include <parquet/file_reader.h> +#include <parquet/metadata.h> +#include <parquet/statistics.h> +#include <parquet/types.h> + +#include <unordered_set> + +#include "common/status.h" +#include "exec/arrow/parquet_reader.h" + +namespace doris { + +class ParquetReaderWrap; + +class RowGroupReader { +public: + RowGroupReader(RuntimeProfile* profile, const std::vector<ExprContext*>& conjunct_ctxs, + std::shared_ptr<parquet::FileMetaData>& file_metadata, + ParquetReaderWrap* parent); + ~RowGroupReader(); + + Status init_filter_groups(const TupleDescriptor* tuple_desc, + const std::map<std::string, int>& map_column, + const std::vector<int>& include_column_ids); + + std::unordered_set<int> filter_groups() { return _filter_group; }; + +private: + void _init_conjuncts(const TupleDescriptor* tuple_desc, + const std::map<std::string, int>& _map_column, + const std::unordered_set<int>& include_column_ids); + + bool _determine_filter_row_group(const std::vector<ExprContext*>& conjuncts, + const std::string& encoded_min, + const std::string& encoded_max); + + void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes, + bool& need_filter); + + void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes, + bool& need_filter); + + bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values, + const char* min_bytes, const char* max_bytes); + + bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes, + const char* max_bytes); + + bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes); + + bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes); + + bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes); + + bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes); + +private: + std::map<int, std::vector<ExprContext*>> _slot_conjuncts; + std::unordered_set<int> _filter_group; + + std::vector<ExprContext*> _conjunct_ctxs; + std::shared_ptr<parquet::FileMetaData> _file_metadata; + RuntimeProfile* _profile; + ParquetReaderWrap* _parent; +}; +} // namespace doris diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 3ec5aba1cd..ef85e56dd3 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -95,6 +95,12 @@ Status BaseScanner::open() { return Status::OK(); } +void BaseScanner::reg_conjunct_ctxs(const TupleId& tupleId, + const std::vector<ExprContext*>& conjunct_ctxs) { + _conjunct_ctxs = conjunct_ctxs; + _tupleId = tupleId; +} + Status BaseScanner::init_expr_ctxes() { // Construct _src_slot_descs const TupleDescriptor* src_tuple_desc = diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index fe3e088d4e..7efdee0f2f 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -64,6 +64,10 @@ public: } } + // Register conjuncts for push down + virtual void reg_conjunct_ctxs(const TupleId& tupleId, + const std::vector<ExprContext*>& conjunct_ctxs); + virtual Status init_expr_ctxes(); // Open this scanner, will initialize information need to virtual Status open(); @@ -142,6 +146,10 @@ protected: vectorized::Block _src_block; int _num_of_columns_from_file; + // slot_ids for parquet predicate push down are in tuple desc + TupleId _tupleId; + std::vector<ExprContext*> _conjunct_ctxs; + private: Status _filter_src_block(); void _fill_columns_from_path(); diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 532a9d7d06..49fed4c3ce 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -263,6 +263,7 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan _pre_filter_texprs, counter); } } + scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs); std::unique_ptr<BaseScanner> scanner(scan); return scanner; } diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 776115427e..2e9ea1f3eb 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -106,11 +106,11 @@ Status ParquetScanner::open_next_reader() { if (range.__isset.num_of_columns_from_file) { num_of_columns_from_file = range.num_of_columns_from_file; } - _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(), - num_of_columns_from_file); - - Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone()); - + _cur_file_reader = new ParquetReaderWrap(_profile, file_reader.release(), + _state->batch_size(), num_of_columns_from_file); + auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); + Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs, + _state->timezone()); if (status.is_end_of_file()) { continue; } else { @@ -118,6 +118,7 @@ Status ParquetScanner::open_next_reader() { return Status::InternalError("file: {}, error:{}", range.path, status.get_error_msg()); } else { + RETURN_IF_ERROR(_cur_file_reader->init_parquet_type()); return status; } } diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index 7fcb277bb0..6543ff1b9c 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -164,6 +164,7 @@ private: friend class BloomFilterPredicate; friend class OlapScanNode; friend class EsPredicate; + friend class RowGroupReader; friend class vectorized::VOlapScanNode; /// FunctionContexts for each registered expression. The FunctionContexts are created diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 57defdb8ef..9adad2d71a 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -70,8 +70,9 @@ Status FileArrowScanner::_open_next_reader() { _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(), num_of_columns_from_file); - Status status = _cur_file_reader->init_reader(_file_slot_descs, _state->timezone()); - + auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); + Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, + _state->timezone()); if (status.is_end_of_file()) { continue; } else { @@ -207,7 +208,7 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file) { - return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file); + return new ParquetReaderWrap(_profile, file_reader, batch_size, num_of_columns_from_file); } VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index 4105f8dc01..741b66dd81 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -315,6 +315,7 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, _pre_filter_texprs, counter); } + scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs); std::unique_ptr<FileScanner> scanner(scan); return scanner; } diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp index d9780f8dbe..3168e220d5 100644 --- a/be/src/vec/exec/file_scanner.cpp +++ b/be/src/vec/exec/file_scanner.cpp @@ -69,6 +69,12 @@ Status FileScanner::open() { return Status::OK(); } +void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId, + const std::vector<ExprContext*>& conjunct_ctxs) { + _conjunct_ctxs = conjunct_ctxs; + _tupleId = tupleId; +} + Status FileScanner::_init_expr_ctxes() { const TupleDescriptor* src_tuple_desc = _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id); diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h index cb21b876d5..abdc2eaa4e 100644 --- a/be/src/vec/exec/file_scanner.h +++ b/be/src/vec/exec/file_scanner.h @@ -34,6 +34,9 @@ public: virtual ~FileScanner() = default; + virtual void reg_conjunct_ctxs(const TupleId& tupleId, + const std::vector<ExprContext*>& conjunct_ctxs); + // Open this scanner, will initialize information need to virtual Status open(); @@ -85,6 +88,11 @@ protected: std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr; int _num_of_columns_from_file; + // File formats based push down predicate + std::vector<ExprContext*> _conjunct_ctxs; + // slot_ids for parquet predicate push down are in tuple desc + TupleId _tupleId; + private: Status _init_expr_ctxes(); Status _filter_block(vectorized::Block* output_block); diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index 05329f1d04..9d2bfb8a52 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -37,7 +37,13 @@ VArrowScanner::VArrowScanner(RuntimeState* state, RuntimeProfile* profile, _cur_file_reader(nullptr), _cur_file_eof(false), _batch(nullptr), - _arrow_batch_cur_idx(0) {} + _arrow_batch_cur_idx(0) { + _filtered_row_groups_counter = ADD_COUNTER(_profile, "FileFilteredRowGroups", TUnit::UNIT); + _filtered_rows_counter = ADD_COUNTER(_profile, "FileFilteredRows", TUnit::UNIT); + _filtered_bytes_counter = ADD_COUNTER(_profile, "FileFilteredBytes", TUnit::BYTES); + _total_rows_counter = ADD_COUNTER(_profile, "FileTotalRows", TUnit::UNIT); + _total_groups_counter = ADD_COUNTER(_profile, "FileTotalRowGroups", TUnit::UNIT); +} VArrowScanner::~VArrowScanner() { close(); @@ -72,8 +78,9 @@ Status VArrowScanner::_open_next_reader() { } _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(), num_of_columns_from_file); - - Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone()); + auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); + Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs, + _state->timezone()); if (status.is_end_of_file()) { continue; @@ -82,12 +89,21 @@ Status VArrowScanner::_open_next_reader() { return Status::InternalError(" file: {} error:{}", range.path, status.get_error_msg()); } else { + update_profile(_cur_file_reader->statistics()); return status; } } } } +void VArrowScanner::update_profile(std::shared_ptr<Statistics>& statistics) { + COUNTER_UPDATE(_total_groups_counter, statistics->total_groups); + COUNTER_UPDATE(_filtered_row_groups_counter, statistics->filtered_row_groups); + COUNTER_UPDATE(_total_rows_counter, statistics->total_rows); + COUNTER_UPDATE(_filtered_rows_counter, statistics->filtered_rows); + COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes); +} + Status VArrowScanner::open() { RETURN_IF_ERROR(BaseScanner::open()); if (_ranges.empty()) { diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h index 176ddb58f2..109ce53177 100644 --- a/be/src/vec/exec/varrow_scanner.h +++ b/be/src/vec/exec/varrow_scanner.h @@ -29,6 +29,7 @@ #include <vector> #include "common/status.h" +#include "exec/arrow/parquet_reader.h" #include "exec/base_scanner.h" #include "gen_cpp/Types_types.h" #include "runtime/mem_pool.h" @@ -56,6 +57,9 @@ public: virtual Status get_next(Block* block, bool* eof) override; + // Update file predicate filter profile + void update_profile(std::shared_ptr<Statistics>& statistics); + virtual void close() override; protected: @@ -77,6 +81,12 @@ private: bool _cur_file_eof; // is read over? std::shared_ptr<arrow::RecordBatch> _batch; size_t _arrow_batch_cur_idx; + + RuntimeProfile::Counter* _filtered_row_groups_counter; + RuntimeProfile::Counter* _filtered_rows_counter; + RuntimeProfile::Counter* _filtered_bytes_counter; + RuntimeProfile::Counter* _total_rows_counter; + RuntimeProfile::Counter* _total_groups_counter; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp index 6f63324e6d..dc738c5a09 100644 --- a/be/src/vec/exec/vbroker_scan_node.cpp +++ b/be/src/vec/exec/vbroker_scan_node.cpp @@ -321,6 +321,7 @@ std::unique_ptr<BaseScanner> VBrokerScanNode::create_scanner(const TBrokerScanRa scan_range.ranges, scan_range.broker_addresses, _pre_filter_texprs, counter); } + scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs); std::unique_ptr<BaseScanner> scanner(scan); return scanner; } diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index cb59ae60bc..319d8a3ccb 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -32,7 +32,7 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file) { - return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file); + return new ParquetReaderWrap(_profile, file_reader, batch_size, num_of_columns_from_file); } } // namespace doris::vectorized diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp index cbd673d4dc..5b1020224e 100644 --- a/be/test/exec/parquet_scanner_test.cpp +++ b/be/test/exec/parquet_scanner_test.cpp @@ -58,6 +58,7 @@ private: int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id); void create_expr_info(); void init_desc_table(); + void init_filter_expr(); RuntimeState _runtime_state; ObjectPool _obj_pool; std::map<std::string, SlotDescriptor*> _slots_map; @@ -406,10 +407,70 @@ void ParquetScannerTest::create_expr_info() { _params.__set_src_tuple_id(TUPLE_ID_SRC); } +void ParquetScannerTest::init_filter_expr() { + TTypeDesc bool_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BOOLEAN); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + bool_type.types.push_back(node); + } + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::BIGINT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + + // create predicate + ::doris::TExpr expr; + + // create predicate elements: LeftExpr op RightExpr + // expr: log_time > 1 + ::doris::TExprNode op; + op.node_type = TExprNodeType::BINARY_PRED; + op.opcode = TExprOpcode::GT; + op.type = bool_type; + op.num_children = 2; + op.child_type = TPrimitiveType::BIGINT; + op.__isset.opcode = true; + expr.nodes.push_back(op); + + // log_time + ::doris::TExprNode slot_ref; + slot_ref.node_type = TExprNodeType::SLOT_REF; + slot_ref.type = int_type; + slot_ref.slot_ref.slot_id = 1; + slot_ref.slot_ref.tuple_id = 0; + slot_ref.num_children = 0; + slot_ref.__isset.slot_ref = true; + expr.nodes.push_back(slot_ref); + + ::doris::TExprNode int_expr; + int_expr.node_type = TExprNodeType::INT_LITERAL; + int_expr.type = int_type; + int_expr.int_literal.value = 1; + int_expr.num_children = 0; + int_expr.__isset.int_literal = true; + + expr.nodes.push_back(int_expr); + + std::vector<::doris::TExpr> conjuncts; + conjuncts.push_back(expr); + // push down conjuncts; + _tnode.__set_conjuncts(conjuncts); +} + void ParquetScannerTest::init() { create_expr_info(); init_desc_table(); - + init_filter_expr(); // Node Id _tnode.node_id = 0; _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE; @@ -419,6 +480,7 @@ void ParquetScannerTest::init() { _tnode.nullable_tuples.push_back(false); _tnode.broker_scan_node.tuple_id = 0; _tnode.__isset.broker_scan_node = true; + _tnode.__isset.conjuncts = true; } TEST_F(ParquetScannerTest, normal) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org