This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 096aa25ca6 [improvement](orc-reader) Implements ORC lazy materialization (#18615) 096aa25ca6 is described below commit 096aa25ca68af5af738732bdbd67207855c32fc3 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Tue May 9 23:33:33 2023 +0800 [improvement](orc-reader) Implements ORC lazy materialization (#18615) - Implements ORC lazy materialization, integrate with the implementation of https://github.com/apache/doris-thirdparty/pull/56 and https://github.com/apache/doris-thirdparty/pull/62. - Refactor code: Move `execute_conjuncts()` and `execute_conjuncts_and_filter_block()` in `parquet_group_reader `to `VExprContext`, used by parquet reader and orc reader. - Add session variables `enable_parquet_lazy_materialization` and `enable_orc_lazy_materialization` to control whether enable lazy materialization. - Modify `build.sh` to update apache-orc submodule or download package every time. --- be/src/apache-orc | 2 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 454 +++++++++++++++++---- be/src/vec/exec/format/orc/vorc_reader.h | 163 ++++++-- .../exec/format/parquet/vparquet_group_reader.cpp | 135 +----- .../exec/format/parquet/vparquet_group_reader.h | 8 - be/src/vec/exec/format/parquet/vparquet_reader.cpp | 14 +- be/src/vec/exec/format/parquet/vparquet_reader.h | 14 +- be/src/vec/exec/scan/vfile_scanner.cpp | 12 +- be/src/vec/exprs/vexpr_context.cpp | 87 ++++ be/src/vec/exprs/vexpr_context.h | 7 + build.sh | 23 +- docs/en/docs/advanced/variables.md | 8 + docs/zh-CN/docs/advanced/variables.md | 9 + .../java/org/apache/doris/qe/SessionVariable.java | 40 ++ gensrc/thrift/PaloInternalService.thrift | 4 + 15 files changed, 716 insertions(+), 264 deletions(-) diff --git a/be/src/apache-orc b/be/src/apache-orc index baf9e30475..0e53506146 160000 --- a/be/src/apache-orc +++ b/be/src/apache-orc @@ -1 +1 @@ -Subproject commit baf9e30475baa924bde1e4a135e6517845a08d53 +Subproject commit 0e53506146c965a5a71f0582691ab2ea148dae7c diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 7d85b12902..ea36eec962 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -50,11 +50,14 @@ #include "orc/sargs/SearchArgument.hh" #include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" +#include "runtime/descriptors.h" #include "runtime/primitive_type.h" +#include "runtime/thread_context.h" #include "util/slice.h" #include "util/timezone_utils.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" +#include "vec/columns/column_const.h" #include "vec/columns/column_map.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_struct.h" @@ -66,6 +69,10 @@ #include "vec/data_types/data_type_map.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_struct.h" +#include "vec/exprs/vbloom_predicate.h" +#include "vec/exprs/vin_predicate.h" +#include "vec/exprs/vruntimefilter_wrapper.h" +#include "vec/exprs/vslot_ref.h" #include "vec/runtime/vdatetime_value.h" namespace doris { @@ -119,7 +126,7 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector<std::string>& column_names, size_t batch_size, - const std::string& ctz, io::IOContext* io_ctx) + const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat) : _profile(profile), _state(state), _scan_params(params), @@ -130,7 +137,8 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state, _ctz(ctz), _column_names(column_names), _is_hive(params.__isset.slot_name_to_schema_pos), - _io_ctx(io_ctx) { + _io_ctx(io_ctx), + _enable_lazy_mat(enable_lazy_mat) { TimezoneUtils::find_cctz_time_zone(ctz, _time_zone); _init_profile(); _init_system_properties(); @@ -139,7 +147,7 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state, OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector<std::string>& column_names, const std::string& ctz, - io::IOContext* io_ctx) + io::IOContext* io_ctx, bool enable_lazy_mat) : _profile(nullptr), _scan_params(params), _scan_range(range), @@ -147,7 +155,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r _column_names(column_names), _is_hive(params.__isset.slot_name_to_schema_pos), _file_system(nullptr), - _io_ctx(io_ctx) { + _io_ctx(io_ctx), + _enable_lazy_mat(enable_lazy_mat) { _init_system_properties(); _init_file_description(); } @@ -221,37 +230,13 @@ Status OrcReader::_create_file_reader() { } Status OrcReader::init_reader( - std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, + VExprContext* vconjunct_ctx) { + _colname_to_value_range = colname_to_value_range; + _lazy_read_ctx.vconjunct_ctx = vconjunct_ctx; SCOPED_RAW_TIMER(&_statistics.parse_meta_time); RETURN_IF_ERROR(_create_file_reader()); - // _init_bloom_filter(colname_to_value_range); - - // create orc row reader - _row_reader_options.range(_range_start_offset, _range_size); - _row_reader_options.setTimezoneName(_ctz); RETURN_IF_ERROR(_init_read_columns()); - _init_search_argument(colname_to_value_range); - _row_reader_options.include(_read_cols); - try { - _row_reader = _reader->createRowReader(_row_reader_options); - _batch = _row_reader->createRowBatch(_batch_size); - } catch (std::exception& e) { - return Status::InternalError("Failed to create orc row reader. reason = {}", e.what()); - } - auto& selected_type = _row_reader->getSelectedType(); - _col_orc_type.resize(selected_type.getSubtypeCount()); - for (int i = 0; i < selected_type.getSubtypeCount(); ++i) { - std::string name; - // For hive engine, translate the column name in orc file to schema column name. - // This is for Hive 1.x which use internal column name _col0, _col1... - if (_is_hive) { - name = _file_col_to_schema_col[selected_type.getFieldName(i)]; - } else { - name = _get_field_name_lower_case(&selected_type, i); - } - _colname_to_idx[name] = i; - _col_orc_type[i] = selected_type.getSubtype(i); - } return Status::OK(); } @@ -293,18 +278,12 @@ Status OrcReader::_init_read_columns() { if (_is_hive) { _file_col_to_schema_col[orc_cols[pos]] = col_name; } + _col_name_to_file_col_name[col_name] = orc_cols[pos]; } } return Status::OK(); } -struct OrcPredicate { - std::string col_name; - orc::PredicateDataType data_type; - std::vector<orc::Literal> literals; - SQLFilterOp op; -}; - // orc only support LONG, FLOAT, STRING, DATE, DECIMAL, TIMESTAMP, BOOLEAN to push down predicates static std::unordered_map<orc::TypeKind, orc::PredicateDataType> TYPEKIND_TO_PREDICATE_TYPE = { {orc::TypeKind::BYTE, orc::PredicateDataType::LONG}, @@ -544,10 +523,10 @@ bool static build_search_argument(std::vector<OrcPredicate>& predicates, int ind return true; } -void OrcReader::_init_search_argument( +bool OrcReader::_init_search_argument( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { if (colname_to_value_range->empty()) { - return; + return false; } std::vector<OrcPredicate> predicates; auto& root_type = _reader->getType(); @@ -570,15 +549,203 @@ void OrcReader::_init_search_argument( } } if (predicates.empty()) { - return; + return false; } std::unique_ptr<orc::SearchArgumentBuilder> builder = orc::SearchArgumentFactory::newBuilder(); if (build_search_argument(predicates, 0, builder)) { std::unique_ptr<orc::SearchArgument> sargs = builder->build(); _row_reader_options.searchArgument(std::move(sargs)); + return true; + } else { + return false; } } +Status OrcReader::set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContext*>& missing_columns) { + SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + + // std::unordered_map<column_name, std::pair<col_id, slot_id>> + std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns; + std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) { + if (VSlotRef* slot_ref = typeid_cast<VSlotRef*>(expr)) { + auto expr_name = slot_ref->expr_name(); + auto iter = _col_name_to_file_col_name.find(expr_name); + if (iter != _col_name_to_file_col_name.end()) { + expr_name = iter->second; + } + predicate_columns.emplace(expr_name, + std::make_pair(slot_ref->column_id(), slot_ref->slot_id())); + if (slot_ref->column_id() == 0) { + _lazy_read_ctx.resize_first_column = false; + } + return; + } else if (VRuntimeFilterWrapper* runtime_filter = + typeid_cast<VRuntimeFilterWrapper*>(expr)) { + VExpr* filter_impl = const_cast<VExpr*>(runtime_filter->get_impl()); + if (VBloomPredicate* bloom_predicate = typeid_cast<VBloomPredicate*>(filter_impl)) { + for (VExpr* child : bloom_predicate->children()) { + visit_slot(child); + } + } else if (VInPredicate* in_predicate = typeid_cast<VInPredicate*>(filter_impl)) { + if (in_predicate->children().size() > 0) { + visit_slot(in_predicate->children()[0]); + } + } else { + for (VExpr* child : filter_impl->children()) { + visit_slot(child); + } + } + } else { + for (VExpr* child : expr->children()) { + visit_slot(child); + } + } + }; + if (_lazy_read_ctx.vconjunct_ctx != nullptr) { + visit_slot(_lazy_read_ctx.vconjunct_ctx->root()); + } + + for (auto& read_col : _read_cols_lower_case) { + _lazy_read_ctx.all_read_columns.emplace_back(read_col); + if (predicate_columns.size() > 0) { + auto iter = predicate_columns.find(read_col); + if (iter == predicate_columns.end()) { + _lazy_read_ctx.lazy_read_columns.emplace_back(read_col); + } else { + _lazy_read_ctx.predicate_columns.first.emplace_back(iter->first); + _lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first); + } + } + } + + for (auto& kv : partition_columns) { + auto iter = predicate_columns.find(kv.first); + if (iter == predicate_columns.end()) { + _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second); + } else { + _lazy_read_ctx.predicate_partition_columns.emplace(kv.first, kv.second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first); + } + } + + for (auto& kv : missing_columns) { + auto iter = predicate_columns.find(kv.first); + if (iter == predicate_columns.end()) { + _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second); + } else { + _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, kv.second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first); + } + } + + if (_enable_lazy_mat && _lazy_read_ctx.predicate_columns.first.size() > 0 && + _lazy_read_ctx.lazy_read_columns.size() > 0) { + _lazy_read_ctx.can_lazy_read = true; + } + + if (!_lazy_read_ctx.can_lazy_read) { + for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { + _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second); + } + for (auto& kv : _lazy_read_ctx.predicate_missing_columns) { + _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second); + } + } + + _fill_all_columns = true; + + // create orc row reader + _row_reader_options.range(_range_start_offset, _range_size); + _row_reader_options.setTimezoneName(_ctz); + if (!_init_search_argument(_colname_to_value_range)) { + _lazy_read_ctx.can_lazy_read = false; + } + _row_reader_options.include(_read_cols); + if (_lazy_read_ctx.can_lazy_read) { + _row_reader_options.filter(_lazy_read_ctx.predicate_columns.first); + _orc_filter = std::unique_ptr<ORCFilterImpl>(new ORCFilterImpl(this)); + } + try { + _row_reader = _reader->createRowReader(_row_reader_options, _orc_filter.get()); + _batch = _row_reader->createRowBatch(_batch_size); + } catch (std::exception& e) { + return Status::InternalError("Failed to create orc row reader. reason = {}", e.what()); + } + auto& selected_type = _row_reader->getSelectedType(); + _col_orc_type.resize(selected_type.getSubtypeCount()); + for (int i = 0; i < selected_type.getSubtypeCount(); ++i) { + std::string name; + // For hive engine, translate the column name in orc file to schema column name. + // This is for Hive 1.x which use internal column name _col0, _col1... + if (_is_hive) { + name = _file_col_to_schema_col[selected_type.getFieldName(i)]; + } else { + name = _get_field_name_lower_case(&selected_type, i); + } + _colname_to_idx[name] = i; + _col_orc_type[i] = selected_type.getSubtype(i); + } + return Status::OK(); +} + +Status OrcReader::_fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns) { + for (auto& kv : partition_columns) { + auto doris_column = block->get_by_name(kv.first).column; + IColumn* col_ptr = const_cast<IColumn*>(doris_column.get()); + auto& [value, slot_desc] = kv.second; + if (!_text_converter->write_vec_column(slot_desc, col_ptr, const_cast<char*>(value.c_str()), + value.size(), true, false, rows)) { + return Status::InternalError("Failed to fill partition column: {}={}", + slot_desc->col_name(), value); + } + } + return Status::OK(); +} + +Status OrcReader::_fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, VExprContext*>& missing_columns) { + for (auto& kv : missing_columns) { + if (kv.second == nullptr) { + // no default column, fill with null + auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(block->get_by_name(kv.first).column)).mutate().get()); + nullable_column->insert_many_defaults(rows); + } else { + // fill with default value + auto* ctx = kv.second; + auto origin_column_num = block->columns(); + int result_column_id = -1; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); + bool is_origin_column = result_column_id < origin_column_num; + if (!is_origin_column) { + // call resize because the first column of _src_block_ptr may not be filled by reader, + // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()` + // has only one row. + std::move(*block->get_by_position(result_column_id).column).mutate()->resize(rows); + auto result_column_ptr = block->get_by_position(result_column_id).column; + // result_column_ptr maybe a ColumnConst, convert it to a normal column + result_column_ptr = result_column_ptr->convert_to_full_column_if_const(); + auto origin_column_type = block->get_by_name(kv.first).type; + bool is_nullable = origin_column_type->is_nullable(); + block->replace_by_position( + block->get_position_by_name(kv.first), + is_nullable ? make_nullable(result_column_ptr) : result_column_ptr); + block->erase(result_column_id); + } + } + } + return Status::OK(); +} + void OrcReader::_init_bloom_filter( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { // generate bloom filter @@ -690,6 +857,7 @@ static inline size_t trim_right(const char* s, size_t size) { return size; } +template <bool is_filter> Status OrcReader::_decode_string_column(const std::string& col_name, const MutableColumnPtr& data_column, const orc::TypeKind& type_kind, orc::ColumnVectorBatch* cvb, @@ -763,6 +931,7 @@ Status OrcReader::_fill_doris_array_offsets(const std::string& col_name, return Status::OK(); } +template <bool is_filter> Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, const ColumnPtr& doris_column, const DataTypePtr& data_type, @@ -803,30 +972,34 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, FOR_FLAT_ORC_COLUMNS(DISPATCH) #undef DISPATCH case TypeIndex::Decimal32: - return _decode_decimal_column<Int32>(col_name, data_column, data_type, cvb, num_values); + return _decode_decimal_column<Int32, is_filter>(col_name, data_column, data_type, cvb, + num_values); case TypeIndex::Decimal64: - return _decode_decimal_column<Int64>(col_name, data_column, data_type, cvb, num_values); + return _decode_decimal_column<Int64, is_filter>(col_name, data_column, data_type, cvb, + num_values); case TypeIndex::Decimal128: - return _decode_decimal_column<Int128>(col_name, data_column, data_type, cvb, num_values); + return _decode_decimal_column<Int128, is_filter>(col_name, data_column, data_type, cvb, + num_values); case TypeIndex::Decimal128I: - return _decode_decimal_column<Int128>(col_name, data_column, data_type, cvb, num_values); + return _decode_decimal_column<Int128, is_filter>(col_name, data_column, data_type, cvb, + num_values); case TypeIndex::Date: - return _decode_time_column<VecDateTimeValue, Int64, orc::LongVectorBatch>( + return _decode_time_column<VecDateTimeValue, Int64, orc::LongVectorBatch, is_filter>( col_name, data_column, cvb, num_values); case TypeIndex::DateV2: - return _decode_time_column<DateV2Value<DateV2ValueType>, UInt32, orc::LongVectorBatch>( - col_name, data_column, cvb, num_values); + return _decode_time_column<DateV2Value<DateV2ValueType>, UInt32, orc::LongVectorBatch, + is_filter>(col_name, data_column, cvb, num_values); case TypeIndex::DateTime: - return _decode_time_column<VecDateTimeValue, Int64, orc::TimestampVectorBatch>( + return _decode_time_column<VecDateTimeValue, Int64, orc::TimestampVectorBatch, is_filter>( col_name, data_column, cvb, num_values); case TypeIndex::DateTimeV2: return _decode_time_column<DateV2Value<DateTimeV2ValueType>, UInt64, - orc::TimestampVectorBatch>(col_name, data_column, cvb, - num_values); + orc::TimestampVectorBatch, is_filter>(col_name, data_column, cvb, + num_values); case TypeIndex::String: case TypeIndex::FixedString: - return _decode_string_column(col_name, data_column, orc_column_type->getKind(), cvb, - num_values); + return _decode_string_column<is_filter>(col_name, data_column, orc_column_type->getKind(), + cvb, num_values); case TypeIndex::Array: { if (orc_column_type->getKind() != orc::TypeKind::LIST) { return Status::InternalError("Wrong data type for colum '{}'", col_name); @@ -841,7 +1014,7 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get()) ->get_nested_type()); const orc::Type* nested_orc_type = orc_column_type->getSubtype(0); - return _orc_column_to_doris_column( + return _orc_column_to_doris_column<is_filter>( col_name, static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type, nested_orc_type, orc_list->elements.get(), element_size); } @@ -864,11 +1037,12 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, const orc::Type* orc_value_type = orc_column_type->getSubtype(1); const ColumnPtr& doris_key_column = doris_map.get_keys_ptr(); const ColumnPtr& doris_value_column = doris_map.get_values_ptr(); - RETURN_IF_ERROR(_orc_column_to_doris_column(col_name, doris_key_column, doris_key_type, - orc_key_type, orc_map->keys.get(), - element_size)); - return _orc_column_to_doris_column(col_name, doris_value_column, doris_value_type, - orc_value_type, orc_map->elements.get(), element_size); + RETURN_IF_ERROR(_orc_column_to_doris_column<is_filter>(col_name, doris_key_column, + doris_key_type, orc_key_type, + orc_map->keys.get(), element_size)); + return _orc_column_to_doris_column<is_filter>(col_name, doris_value_column, + doris_value_type, orc_value_type, + orc_map->elements.get(), element_size); } case TypeIndex::Struct: { if (orc_column_type->getKind() != orc::TypeKind::STRUCT) { @@ -886,8 +1060,8 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, const orc::Type* orc_type = orc_column_type->getSubtype(i); const ColumnPtr& doris_field = doris_struct.get_column_ptr(i); const DataTypePtr& doris_type = doris_struct_type->get_element(i); - RETURN_IF_ERROR(_orc_column_to_doris_column(col_name, doris_field, doris_type, orc_type, - orc_field, num_values)); + RETURN_IF_ERROR(_orc_column_to_doris_column<is_filter>( + col_name, doris_field, doris_type, orc_type, orc_field, num_values)); } return Status::OK(); } @@ -904,32 +1078,148 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - SCOPED_RAW_TIMER(&_statistics.column_read_time); - { - SCOPED_RAW_TIMER(&_statistics.get_batch_time); - // reset decimal_scale_params_index - _decimal_scale_params_index = 0; - if (!_row_reader->next(*_batch)) { - *eof = true; - *read_rows = 0; - return Status::OK(); - } - } - const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields; - for (auto& col : _read_cols_lower_case) { - auto& column_with_type_and_name = block->get_by_name(col); + if (_lazy_read_ctx.can_lazy_read) { + std::vector<uint32_t> columns_to_filter; + int column_to_keep = block->columns(); + columns_to_filter.resize(column_to_keep); + for (uint32_t i = 0; i < column_to_keep; ++i) { + columns_to_filter[i] = i; + } + uint64_t rr; + SCOPED_RAW_TIMER(&_statistics.column_read_time); + { + SCOPED_RAW_TIMER(&_statistics.get_batch_time); + // reset decimal_scale_params_index; + _decimal_scale_params_index = 0; + rr = _row_reader->nextBatch(*_batch, block); + if (rr == 0) { + *eof = true; + *read_rows = 0; + return Status::OK(); + } + } + const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields; + for (auto& col_name : _lazy_read_ctx.lazy_read_columns) { + auto& column_with_type_and_name = block->get_by_name(col_name); + auto& column_ptr = column_with_type_and_name.column; + auto& column_type = column_with_type_and_name.type; + auto orc_col_idx = _colname_to_idx.find(col_name); + if (orc_col_idx == _colname_to_idx.end()) { + return Status::InternalError("Wrong read column '{}' in orc file", col_name); + } + RETURN_IF_ERROR(_orc_column_to_doris_column<true>( + col_name, column_ptr, column_type, _col_orc_type[orc_col_idx->second], + batch_vec[orc_col_idx->second], _batch->numElements)); + } + *read_rows = rr; + + RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, *_filter)); + Block::erase_useless_column(block, column_to_keep); + } else { + uint64_t rr; + SCOPED_RAW_TIMER(&_statistics.column_read_time); + { + SCOPED_RAW_TIMER(&_statistics.get_batch_time); + // reset decimal_scale_params_index; + _decimal_scale_params_index = 0; + rr = _row_reader->nextBatch(*_batch, block); + if (rr == 0) { + *eof = true; + *read_rows = 0; + return Status::OK(); + } + } + const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields; + for (auto& col_name : _lazy_read_ctx.all_read_columns) { + auto& column_with_type_and_name = block->get_by_name(col_name); + auto& column_ptr = column_with_type_and_name.column; + auto& column_type = column_with_type_and_name.type; + auto orc_col_idx = _colname_to_idx.find(col_name); + if (orc_col_idx == _colname_to_idx.end()) { + return Status::InternalError("Wrong read column '{}' in orc file", col_name); + } + RETURN_IF_ERROR(_orc_column_to_doris_column( + col_name, column_ptr, column_type, _col_orc_type[orc_col_idx->second], + batch_vec[orc_col_idx->second], _batch->numElements)); + } + *read_rows = rr; + RETURN_IF_ERROR( + _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); + RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); + + if (_lazy_read_ctx.vconjunct_ctx != nullptr) { + std::vector<uint32_t> columns_to_filter; + int column_to_keep = block->columns(); + columns_to_filter.resize(column_to_keep); + for (uint32_t i = 0; i < column_to_keep; ++i) { + columns_to_filter[i] = i; + } + std::vector<VExprContext*> filter_conjuncts; + filter_conjuncts.push_back(_lazy_read_ctx.vconjunct_ctx); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + filter_conjuncts, nullptr, block, columns_to_filter, column_to_keep))); + } + } + return Status::OK(); +} + +Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) { + Block* block = (Block*)arg; + + const auto& batch_vec = down_cast<orc::StructVectorBatch*>(&data)->fields; + for (auto& col_name : _lazy_read_ctx.predicate_columns.first) { + auto& column_with_type_and_name = block->get_by_name(col_name); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; - auto orc_col_idx = _colname_to_idx.find(col); + auto orc_col_idx = _colname_to_idx.find(col_name); if (orc_col_idx == _colname_to_idx.end()) { - return Status::InternalError("Wrong read column '{}' in orc file", col); + return Status::InternalError("Wrong read column '{}' in orc file", col_name); } RETURN_IF_ERROR(_orc_column_to_doris_column( - col, column_ptr, column_type, _col_orc_type[orc_col_idx->second], - batch_vec[orc_col_idx->second], _batch->numElements)); + col_name, column_ptr, column_type, _col_orc_type[orc_col_idx->second], + batch_vec[orc_col_idx->second], data.numElements)); + } + RETURN_IF_ERROR( + _fill_partition_columns(block, size, _lazy_read_ctx.predicate_partition_columns)); + RETURN_IF_ERROR(_fill_missing_columns(block, size, _lazy_read_ctx.predicate_missing_columns)); + if (_lazy_read_ctx.resize_first_column) { + block->get_by_position(0).column->assume_mutable()->resize(size); + _lazy_read_ctx.resize_first_column = true; + } + + _filter.reset(new IColumn::Filter(size, 1)); + auto* __restrict result_filter_data = _filter->data(); + bool can_filter_all = false; + std::vector<VExprContext*> filter_conjuncts; + filter_conjuncts.push_back(_lazy_read_ctx.vconjunct_ctx); + RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts( + filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all))); + + if (_lazy_read_ctx.resize_first_column) { + block->get_by_position(0).column->assume_mutable()->clear(); + } + + if (can_filter_all) { + for (auto& col : _lazy_read_ctx.predicate_columns.first) { + // clean block to read predicate columns + block->get_by_name(col).column->assume_mutable()->clear(); + } + for (auto& col : _lazy_read_ctx.predicate_partition_columns) { + block->get_by_name(col.first).column->assume_mutable()->clear(); + } + for (auto& col : _lazy_read_ctx.predicate_missing_columns) { + block->get_by_name(col.first).column->assume_mutable()->clear(); + } + } + + uint16_t new_size = 0; + for (uint16_t i = 0; i < size; i++) { + sel[new_size] = i; + new_size += result_filter_data[i] ? 1 : 0; } - *read_rows = _batch->numElements; + data.numElements = new_size; return Status::OK(); } -} // namespace doris::vectorized +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 5c451e76d4..690a8932d0 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -33,6 +33,7 @@ #include "common/config.h" #include "common/status.h" #include "exec/olap_common.h" +#include "exec/text_converter.h" #include "io/file_factory.h" #include "io/fs/file_reader.h" #include "io/fs/file_reader_writer_fwd.h" @@ -81,6 +82,38 @@ namespace doris::vectorized { class ORCFileInputStream; +struct OrcPredicate { + std::string col_name; + orc::PredicateDataType data_type; + std::vector<orc::Literal> literals; + SQLFilterOp op; +}; + +struct LazyReadContext { + VExprContext* vconjunct_ctx = nullptr; + bool can_lazy_read = false; + // block->rows() returns the number of rows of the first column, + // so we should check and resize the first column + bool resize_first_column = true; + std::list<std::string> all_read_columns; + // include predicate_partition_columns & predicate_missing_columns + std::vector<uint32_t> all_predicate_col_ids; + // save slot_id to find dict filter column name, because expr column name may + // be different with orc column name + // std::pair<std::list<col_name>, std::vector<slot_id>> + std::pair<std::list<std::string>, std::vector<int>> predicate_columns; + std::list<std::string> predicate_orc_columns; + std::vector<std::string> lazy_read_columns; + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + predicate_partition_columns; + // lazy read partition columns or all partition columns + std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> + partition_columns; + std::unordered_map<std::string, VExprContext*> predicate_missing_columns; + // lazy read missing columns or all missing columns + std::unordered_map<std::string, VExprContext*> missing_columns; +}; + class OrcReader : public GenericReader { ENABLE_FACTORY_CREATOR(OrcReader); @@ -98,16 +131,31 @@ public: OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector<std::string>& column_names, - size_t batch_size, const std::string& ctz, io::IOContext* io_ctx); + size_t batch_size, const std::string& ctz, io::IOContext* io_ctx, + bool enable_lazy_mat = true); OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector<std::string>& column_names, const std::string& ctz, - io::IOContext* io_ctx); + io::IOContext* io_ctx, bool enable_lazy_mat = true); ~OrcReader() override; Status init_reader( - std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, + VExprContext* vconjunct_ctx); + + Status set_fill_columns( + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns, + const std::unordered_map<std::string, VExprContext*>& missing_columns) override; + + Status _fill_partition_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& + partition_columns); + Status _fill_missing_columns( + Block* block, size_t rows, + const std::unordered_map<std::string, VExprContext*>& missing_columns); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; @@ -122,6 +170,8 @@ public: Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; + Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg); + private: struct OrcProfile { RuntimeProfile::Counter* read_time; @@ -134,6 +184,19 @@ private: RuntimeProfile::Counter* decode_null_map_time; }; + class ORCFilterImpl : public orc::ORCFilter { + public: + ORCFilterImpl(OrcReader* orcReader) : orcReader(orcReader) {} + ~ORCFilterImpl() override = default; + void filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, + void* arg) const override { + orcReader->filter(data, sel, size, arg); + } + + private: + OrcReader* orcReader; + }; + // Create inner orc file, // return EOF if file is empty // return EROOR if encounter error. @@ -142,12 +205,13 @@ private: void _init_profile(); Status _init_read_columns(); TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type); - void _init_search_argument( + bool _init_search_argument( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); void _init_bloom_filter( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); void _init_system_properties(); void _init_file_description(); + template <bool is_filter = false> Status _orc_column_to_doris_column(const std::string& col_name, const ColumnPtr& doris_column, const DataTypePtr& data_type, const orc::Type* orc_column_type, @@ -194,7 +258,7 @@ private: } } - template <typename DecimalPrimitiveType, typename OrcColumnType> + template <typename DecimalPrimitiveType, typename OrcColumnType, bool is_filter> Status _decode_explicit_decimal_column(const std::string& col_name, const MutableColumnPtr& data_column, const DataTypePtr& data_type, @@ -218,42 +282,68 @@ private: auto origin_size = column_data.size(); column_data.resize(origin_size + num_values); - for (int i = 0; i < num_values; ++i) { - int128_t value; - if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) { - value = static_cast<int128_t>(cvb_data[i]); - } else { - uint64_t hi = data->values[i].getHighBits(); - uint64_t lo = data->values[i].getLowBits(); - value = (((int128_t)hi) << 64) | (int128_t)lo; - } - if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + for (int i = 0; i < num_values; ++i) { + int128_t value; + if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) { + value = static_cast<int128_t>(cvb_data[i]); + } else { + uint64_t hi = data->values[i].getHighBits(); + uint64_t lo = data->values[i].getLowBits(); + value = (((int128_t)hi) << 64) | (int128_t)lo; + } value *= scale_params.scale_factor; - } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]); + v = (DecimalPrimitiveType)value; + } + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + for (int i = 0; i < num_values; ++i) { + int128_t value; + if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) { + value = static_cast<int128_t>(cvb_data[i]); + } else { + uint64_t hi = data->values[i].getHighBits(); + uint64_t lo = data->values[i].getLowBits(); + value = (((int128_t)hi) << 64) | (int128_t)lo; + } value /= scale_params.scale_factor; + auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]); + v = (DecimalPrimitiveType)value; + } + } else { + for (int i = 0; i < num_values; ++i) { + int128_t value; + if constexpr (std::is_same_v<OrcColumnType, orc::Decimal64VectorBatch>) { + value = static_cast<int128_t>(cvb_data[i]); + } else { + uint64_t hi = data->values[i].getHighBits(); + uint64_t lo = data->values[i].getLowBits(); + value = (((int128_t)hi) << 64) | (int128_t)lo; + } + auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]); + v = (DecimalPrimitiveType)value; } - auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[origin_size + i]); - v = (DecimalPrimitiveType)value; } return Status::OK(); } - template <typename DecimalPrimitiveType> + template <typename DecimalPrimitiveType, bool is_filter> Status _decode_decimal_column(const std::string& col_name, const MutableColumnPtr& data_column, const DataTypePtr& data_type, orc::ColumnVectorBatch* cvb, size_t num_values) { SCOPED_RAW_TIMER(&_statistics.decode_value_time); if (dynamic_cast<orc::Decimal64VectorBatch*>(cvb) != nullptr) { - return _decode_explicit_decimal_column<DecimalPrimitiveType, orc::Decimal64VectorBatch>( - col_name, data_column, data_type, cvb, num_values); + return _decode_explicit_decimal_column<DecimalPrimitiveType, orc::Decimal64VectorBatch, + is_filter>(col_name, data_column, data_type, cvb, + num_values); } else { - return _decode_explicit_decimal_column<DecimalPrimitiveType, - orc::Decimal128VectorBatch>( - col_name, data_column, data_type, cvb, num_values); + return _decode_explicit_decimal_column<DecimalPrimitiveType, orc::Decimal128VectorBatch, + is_filter>(col_name, data_column, data_type, cvb, + num_values); } } - template <typename CppType, typename DorisColumnType, typename OrcColumnType> + template <typename CppType, typename DorisColumnType, typename OrcColumnType, bool is_filter> Status _decode_time_column(const std::string& col_name, const MutableColumnPtr& data_column, orc::ColumnVectorBatch* cvb, size_t num_values) { SCOPED_RAW_TIMER(&_statistics.decode_value_time); @@ -264,9 +354,18 @@ private: auto& column_data = static_cast<ColumnVector<DorisColumnType>&>(*data_column).get_data(); auto origin_size = column_data.size(); column_data.resize(origin_size + num_values); + UInt8* __restrict filter_data; + if constexpr (is_filter) { + filter_data = _filter->data(); + } for (int i = 0; i < num_values; ++i) { auto& v = reinterpret_cast<CppType&>(column_data[origin_size + i]); if constexpr (std::is_same_v<OrcColumnType, orc::LongVectorBatch>) { // date + if constexpr (is_filter) { + if (!filter_data[i]) { + continue; + } + } int64_t& date_value = data->data[i]; v.from_unixtime(date_value * 24 * 60 * 60, _time_zone); // day to seconds if constexpr (std::is_same_v<CppType, VecDateTimeValue>) { @@ -274,6 +373,11 @@ private: v.cast_to_date(); } } else { // timestamp + if constexpr (is_filter) { + if (!filter_data[i]) { + continue; + } + } v.from_unixtime(data->data[i], _time_zone); if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { // nanoseconds will lose precision. only keep microseconds. @@ -284,6 +388,7 @@ private: return Status::OK(); } + template <bool is_filter> Status _decode_string_column(const std::string& col_name, const MutableColumnPtr& data_column, const orc::TypeKind& type_kind, orc::ColumnVectorBatch* cvb, size_t num_values); @@ -321,6 +426,7 @@ private: std::unordered_map<std::string, std::string> _file_col_to_schema_col; // Flag for hive engine. True if the external table engine is Hive. bool _is_hive = false; + std::unordered_map<std::string, std::string> _col_name_to_file_col_name; std::vector<const orc::Type*> _col_orc_type; std::unique_ptr<ORCFileInputStream> _file_input_stream; Statistics _statistics; @@ -330,15 +436,22 @@ private: std::unique_ptr<orc::ColumnVectorBatch> _batch; std::unique_ptr<orc::Reader> _reader; std::unique_ptr<orc::RowReader> _row_reader; + std::unique_ptr<ORCFilterImpl> _orc_filter; orc::ReaderOptions _reader_options; orc::RowReaderOptions _row_reader_options; std::shared_ptr<io::FileSystem> _file_system; io::IOContext* _io_ctx; + bool _enable_lazy_mat = true; std::vector<DecimalScaleParams> _decimal_scale_params; size_t _decimal_scale_params_index; + + std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; + std::unique_ptr<IColumn::Filter> _filter = nullptr; + LazyReadContext _lazy_read_ctx; + std::unique_ptr<TextConverter> _text_converter = nullptr; }; class ORCFileInputStream : public orc::InputStream { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index cb0f6ddca0..1ffc901ee6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -319,8 +319,10 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ if (_position_delete_ctx.has_filter) { filters.push_back(_pos_delete_filter_ptr.get()); } - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_execute_conjuncts_and_filter_block( - _filter_conjuncts, filters, block, columns_to_filter, column_to_keep))); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _filter_conjuncts, &filters, block, columns_to_filter, + column_to_keep))); _convert_dict_cols_to_string_cols(block); } else { RETURN_IF_CATCH_EXCEPTION( @@ -427,8 +429,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re if (_position_delete_ctx.has_filter) { filters.push_back(_pos_delete_filter_ptr.get()); } - RETURN_IF_ERROR(_execute_conjuncts(_filter_conjuncts, filters, block, &result_filter, - &can_filter_all)); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(_filter_conjuncts, &filters, block, + &result_filter, &can_filter_all)); if (_lazy_read_ctx.resize_first_column) { // We have to clean the first column to insert right data. @@ -498,8 +500,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // generated from next batch, so the filter column is removed ahead. DCHECK_EQ(block->rows(), 0); } else { - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_filter_block_internal( - block, _lazy_read_ctx.all_predicate_col_ids, result_filter))); + RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal( + block, _lazy_read_ctx.all_predicate_col_ids, result_filter)); Block::erase_useless_column(block, origin_column_num); } } else { @@ -695,43 +697,14 @@ Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) { Status RowGroupReader::_filter_block(Block* block, int column_to_keep, const std::vector<uint32_t>& columns_to_filter) { if (_pos_delete_filter_ptr) { - RETURN_IF_ERROR( - _filter_block_internal(block, columns_to_filter, (*_pos_delete_filter_ptr))); + RETURN_IF_CATCH_EXCEPTION( + Block::filter_block_internal(block, columns_to_filter, (*_pos_delete_filter_ptr))); } Block::erase_useless_column(block, column_to_keep); return Status::OK(); } -// need exception safety -Status RowGroupReader::_filter_block_internal(Block* block, - const std::vector<uint32_t>& columns_to_filter, - const IColumn::Filter& filter) { - size_t filter_size = filter.size(); - size_t count = filter_size - simd::count_zero_num((int8_t*)filter.data(), filter_size); - if (count == 0) { - for (auto& col : columns_to_filter) { - std::move(*block->get_by_position(col).column).assume_mutable()->clear(); - } - } else { - for (auto& col : columns_to_filter) { - size_t size = block->get_by_position(col).column->size(); - if (size != count) { - auto& column = block->get_by_position(col).column; - if (column->size() != count) { - if (column->use_count() == 1) { - const auto result_size = column->assume_mutable()->filter(filter); - CHECK_EQ(result_size, count); - } else { - column = column->filter(filter, count); - } - } - } - } - } - return Status::OK(); -} - Status RowGroupReader::_rewrite_dict_predicates() { for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) { std::string& dict_filter_col_name = it->first; @@ -793,9 +766,8 @@ Status RowGroupReader::_rewrite_dict_predicates() { // The following process may be tricky and time-consuming, but we have no other way. temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); } - std::vector<IColumn::Filter*> filters; - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_execute_conjuncts_and_filter_block( - *ctxs, filters, &temp_block, columns_to_filter, column_to_keep))); + RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + *ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep))); if (dict_pos != 0) { // We have to clean the first column to insert right data. temp_block.get_by_position(0).column->assume_mutable()->clear(); @@ -974,87 +946,4 @@ ParquetColumnReader::Statistics RowGroupReader::statistics() { return st; } -// TODO Performance Optimization -Status RowGroupReader::_execute_conjuncts(const std::vector<VExprContext*>& ctxs, - const std::vector<IColumn::Filter*>& filters, - Block* block, IColumn::Filter* result_filter, - bool* can_filter_all) { - *can_filter_all = false; - auto* __restrict result_filter_data = result_filter->data(); - for (auto* ctx : ctxs) { - int result_column_id = -1; - RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); - ColumnPtr& filter_column = block->get_by_position(result_column_id).column; - if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { - size_t column_size = nullable_column->size(); - if (column_size == 0) { - *can_filter_all = true; - return Status::OK(); - } else { - const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); - const IColumn::Filter& filter = - assert_cast<const ColumnUInt8&>(*nested_column).get_data(); - auto* __restrict filter_data = filter.data(); - const size_t size = filter.size(); - auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); - - for (size_t i = 0; i < size; ++i) { - result_filter_data[i] &= (!null_map_data[i]) & filter_data[i]; - } - if (memchr(filter_data, 0x1, size) == nullptr) { - *can_filter_all = true; - return Status::OK(); - } - } - } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { - // filter all - if (!const_column->get_bool(0)) { - *can_filter_all = true; - return Status::OK(); - } - } else { - const IColumn::Filter& filter = - assert_cast<const ColumnUInt8&>(*filter_column).get_data(); - auto* __restrict filter_data = filter.data(); - - const size_t size = filter.size(); - for (size_t i = 0; i < size; ++i) { - result_filter_data[i] &= filter_data[i]; - } - - if (memchr(filter_data, 0x1, size) == nullptr) { - *can_filter_all = true; - return Status::OK(); - } - } - } - for (auto* filter : filters) { - auto* __restrict filter_data = filter->data(); - const size_t size = filter->size(); - for (size_t i = 0; i < size; ++i) { - result_filter_data[i] &= filter_data[i]; - } - } - return Status::OK(); -} - -// TODO Performance Optimization -// need exception safety -Status RowGroupReader::_execute_conjuncts_and_filter_block( - const std::vector<VExprContext*>& ctxs, const std::vector<IColumn::Filter*>& filters, - Block* block, std::vector<uint32_t>& columns_to_filter, int column_to_keep) { - IColumn::Filter result_filter(block->rows(), 1); - bool can_filter_all; - RETURN_IF_ERROR(_execute_conjuncts(ctxs, filters, block, &result_filter, &can_filter_all)); - if (can_filter_all) { - for (auto& col : columns_to_filter) { - std::move(*block->get_by_position(col).column).assume_mutable()->clear(); - } - } else { - RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, result_filter)); - } - Block::erase_useless_column(block, column_to_keep); - return Status::OK(); -} - } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 910ab5ce1c..a6af66d7a3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -187,14 +187,6 @@ private: Status _rewrite_dict_predicates(); Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, bool is_nullable); void _convert_dict_cols_to_string_cols(Block* block); - Status _execute_conjuncts(const std::vector<VExprContext*>& ctxs, - const std::vector<IColumn::Filter*>& filters, Block* block, - IColumn::Filter* result_filter, bool* can_filter_all); - Status _execute_conjuncts_and_filter_block(const std::vector<VExprContext*>& ctxs, - const std::vector<IColumn::Filter*>& filters, - Block* block, - std::vector<uint32_t>& columns_to_filter, - int column_to_keep); io::FileReaderSPtr _file_reader; std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 48332dfff0..40d3492229 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -69,7 +69,8 @@ namespace doris::vectorized { ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz, - io::IOContext* io_ctx, RuntimeState* state, ShardedKVCache* kv_cache) + io::IOContext* io_ctx, RuntimeState* state, ShardedKVCache* kv_cache, + bool enable_lazy_mat) : _profile(profile), _scan_params(params), _scan_range(range), @@ -79,19 +80,21 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams _ctz(ctz), _io_ctx(io_ctx), _state(state), - _kv_cache(kv_cache) { + _kv_cache(kv_cache), + _enable_lazy_mat(enable_lazy_mat) { _init_profile(); _init_system_properties(); _init_file_description(); } ParquetReader::ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range, - io::IOContext* io_ctx, RuntimeState* state) + io::IOContext* io_ctx, RuntimeState* state, bool enable_lazy_mat) : _profile(nullptr), _scan_params(params), _scan_range(range), _io_ctx(io_ctx), - _state(state) { + _state(state), + _enable_lazy_mat(enable_lazy_mat) { _init_system_properties(); _init_file_description(); } @@ -410,7 +413,8 @@ Status ParquetReader::set_fill_columns( } } - if (!_has_complex_type && _lazy_read_ctx.predicate_columns.first.size() > 0 && + if (!_has_complex_type && _enable_lazy_mat && + _lazy_read_ctx.predicate_columns.first.size() > 0 && _lazy_read_ctx.lazy_read_columns.size() > 0) { _lazy_read_ctx.can_lazy_read = true; } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 89dbc832a6..4f45e6979f 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -92,10 +92,11 @@ public: ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, const TFileRangeDesc& range, size_t batch_size, cctz::time_zone* ctz, - io::IOContext* io_ctx, RuntimeState* state, ShardedKVCache* kv_cache = nullptr); + io::IOContext* io_ctx, RuntimeState* state, ShardedKVCache* kv_cache = nullptr, + bool enable_lazy_mat = true); ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range, - io::IOContext* io_ctx, RuntimeState* state); + io::IOContext* io_ctx, RuntimeState* state, bool enable_lazy_mat = true); ~ParquetReader() override; // for test @@ -252,14 +253,15 @@ private: bool _closed = false; io::IOContext* _io_ctx; RuntimeState* _state; + // Cache to save some common part such as file footer. + // Owned by scan node and shared by all parquet readers of this scan node. + // Maybe null if not used + ShardedKVCache* _kv_cache = nullptr; + bool _enable_lazy_mat = true; const TupleDescriptor* _tuple_descriptor; const RowDescriptor* _row_descriptor; const std::unordered_map<std::string, int>* _colname_to_slot_id; const std::vector<VExprContext*>* _not_single_slot_filter_conjuncts; const std::unordered_map<int, std::vector<VExprContext*>>* _slot_id_to_filter_conjuncts; - // Cache to save some common part such as file footer. - // Owned by scan node and shared by all parquet readers of this scan node. - // Maybe null if not used - ShardedKVCache* _kv_cache = nullptr; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index d8db35adac..f4a9265d62 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -581,7 +581,7 @@ Status VFileScanner::_get_next_reader() { std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique( _profile, _params, range, _state->query_options().batch_size, const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get(), _state, - _kv_cache); + _kv_cache, _state->query_options().enable_parquet_lazy_mat); RETURN_IF_ERROR(parquet_reader->open()); if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) { RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr)); @@ -610,10 +610,16 @@ Status VFileScanner::_get_next_reader() { break; } case TFileFormatType::FORMAT_ORC: { + if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) { + RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr)); + _discard_conjuncts(); + } _cur_reader = OrcReader::create_unique( _profile, _state, _params, range, _file_col_names, - _state->query_options().batch_size, _state->timezone(), _io_ctx.get()); - init_status = ((OrcReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range); + _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), + _state->query_options().enable_orc_lazy_mat); + init_status = ((OrcReader*)(_cur_reader.get())) + ->init_reader(_colname_to_value_range, _push_down_expr); break; } case TFileFormatType::FORMAT_CSV_PLAIN: diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 6fc72fb03a..f34aabbfcc 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -28,6 +28,7 @@ #include "runtime/thread_context.h" #include "udf/udf.h" #include "util/stack_util.h" +#include "vec/columns/column_const.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/columns_with_type_and_name.h" #include "vec/exprs/vexpr.h" @@ -127,6 +128,92 @@ Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block, int col return Block::filter_block(block, result_column_id, column_to_keep); } +// TODO Performance Optimization +Status VExprContext::execute_conjuncts(const std::vector<VExprContext*>& ctxs, + const std::vector<IColumn::Filter*>* filters, Block* block, + IColumn::Filter* result_filter, bool* can_filter_all) { + DCHECK(result_filter->size() == block->rows()); + *can_filter_all = false; + auto* __restrict result_filter_data = result_filter->data(); + for (auto* ctx : ctxs) { + int result_column_id = -1; + RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); + ColumnPtr& filter_column = block->get_by_position(result_column_id).column; + if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { + size_t column_size = nullable_column->size(); + if (column_size == 0) { + *can_filter_all = true; + return Status::OK(); + } else { + const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); + const IColumn::Filter& filter = + assert_cast<const ColumnUInt8&>(*nested_column).get_data(); + auto* __restrict filter_data = filter.data(); + const size_t size = filter.size(); + auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); + + for (size_t i = 0; i < size; ++i) { + result_filter_data[i] &= (!null_map_data[i]) & filter_data[i]; + } + if (memchr(result_filter_data, 0x1, size) == nullptr) { + *can_filter_all = true; + return Status::OK(); + } + } + } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { + // filter all + if (!const_column->get_bool(0)) { + *can_filter_all = true; + return Status::OK(); + } + } else { + const IColumn::Filter& filter = + assert_cast<const ColumnUInt8&>(*filter_column).get_data(); + auto* __restrict filter_data = filter.data(); + + const size_t size = filter.size(); + for (size_t i = 0; i < size; ++i) { + result_filter_data[i] &= filter_data[i]; + } + + if (memchr(result_filter_data, 0x1, size) == nullptr) { + *can_filter_all = true; + return Status::OK(); + } + } + } + if (filters != nullptr) { + for (auto* filter : *filters) { + auto* __restrict filter_data = filter->data(); + const size_t size = filter->size(); + for (size_t i = 0; i < size; ++i) { + result_filter_data[i] &= filter_data[i]; + } + } + } + return Status::OK(); +} + +// TODO Performance Optimization +// need exception safety +Status VExprContext::execute_conjuncts_and_filter_block( + const std::vector<VExprContext*>& ctxs, const std::vector<IColumn::Filter*>* filters, + Block* block, std::vector<uint32_t>& columns_to_filter, int column_to_keep) { + IColumn::Filter result_filter(block->rows(), 1); + bool can_filter_all; + RETURN_IF_ERROR(execute_conjuncts(ctxs, filters, block, &result_filter, &can_filter_all)); + if (can_filter_all) { + for (auto& col : columns_to_filter) { + std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + } + } else { + RETURN_IF_CATCH_EXCEPTION( + Block::filter_block_internal(block, columns_to_filter, result_filter)); + } + Block::erase_useless_column(block, column_to_keep); + return Status::OK(); +} + Status VExprContext::get_output_block_after_execute_exprs( const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, const Block& input_block, Block* output_block) { diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 91410fa02a..5069a5a615 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -68,6 +68,13 @@ public: [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block, int column_to_keep); + [[nodiscard]] static Status execute_conjuncts(const std::vector<VExprContext*>& ctxs, + const std::vector<IColumn::Filter*>* filters, + Block* block, IColumn::Filter* result_filter, + bool* can_filter_all); + [[nodiscard]] static Status execute_conjuncts_and_filter_block( + const std::vector<VExprContext*>& ctxs, const std::vector<IColumn::Filter*>* filters, + Block* block, std::vector<uint32_t>& columns_to_filter, int column_to_keep); [[nodiscard]] static Status get_output_block_after_execute_exprs( const std::vector<vectorized::VExprContext*>&, const Block&, Block*); diff --git a/build.sh b/build.sh index e1ef0a9ae4..b2d96878ac 100755 --- a/build.sh +++ b/build.sh @@ -246,17 +246,18 @@ if [[ ! -f "${DORIS_THIRDPARTY}/installed/lib/libbacktrace.a" ]]; then fi fi -if [[ ! -f "${DORIS_HOME}/be/src/apache-orc/README.md" ]]; then - echo "apache-orc not exists, need to update submodules ..." - set +e - cd "${DORIS_HOME}" - git submodule update --init --recursive be/src/apache-orc - exit_code=$? - set -e - if [[ "${exit_code}" -ne 0 ]]; then - mkdir -p "${DORIS_HOME}/be/src/apache-orc" - curl -L https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz | tar -xz -C "${DORIS_HOME}/be/src/apache-orc" --strip-components=1 - fi +echo "Update apache-orc ..." +set +e +cd "${DORIS_HOME}" +echo "Update apache-orc submodule ..." +git submodule update --init --recursive be/src/apache-orc +exit_code=$? +set -e +if [[ "${exit_code}" -ne 0 ]]; then + echo "Update apache-orc submodule failed, start to download and extract apache-orc package ..." + rm -rf "${DORIS_HOME}/be/src/apache-orc" + mkdir -p "${DORIS_HOME}/be/src/apache-orc" + curl -L https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz | tar -xz -C "${DORIS_HOME}/be/src/apache-orc" --strip-components=1 fi if [[ "${CLEAN}" -eq 1 && "${BUILD_BE}" -eq 0 && "${BUILD_FE}" -eq 0 && "${BUILD_SPARK_DPP}" -eq 0 ]]; then diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index 6ca725d29b..315c3fbf57 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -646,6 +646,14 @@ Translated with www.DeepL.com/Translator (free version) | 10000000 | +--------------+ ``` + +* `enable_parquet_lazy_materialization` + + Controls whether to use lazy materialization technology in parquet reader. The default value is true. + +* `enable_orc_lazy_materialization` + + Controls whether to use lazy materialization technology in orc reader. The default value is true. *** diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 1980790f5e..eb34066cda 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -632,6 +632,15 @@ try (Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/ | 10000000 | +--------------+ ``` + +* `enable_parquet_lazy_materialization` + + 控制 parquet reader 是否启用延迟物化技术。默认为 true。 + +* `enable_orc_lazy_materialization` + + 控制 orc reader 是否启用延迟物化技术。默认为 true。 + *** #### 关于语句执行超时控制的补充说明 diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 09e8abd396..ec5c59ebee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -309,6 +309,10 @@ public class SessionVariable implements Serializable, Writable { // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3. public static final String FILE_SPLIT_SIZE = "file_split_size"; + public static final String ENABLE_PARQUET_LAZY_MAT = "enable_parquet_lazy_materialization"; + + public static final String ENABLE_ORC_LAZY_MAT = "enable_orc_lazy_materialization"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -841,6 +845,22 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true) public long fileSplitSize = 0; + @VariableMgr.VarAttr( + name = ENABLE_PARQUET_LAZY_MAT, + description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。", + "Controls whether to use lazy materialization technology in parquet reader. " + + "The default value is true."}, + needForward = true) + public boolean enableParquetLazyMat = true; + + @VariableMgr.VarAttr( + name = ENABLE_ORC_LAZY_MAT, + description = {"控制 orc reader 是否启用延迟物化技术。默认为 true。", + "Controls whether to use lazy materialization technology in orc reader. " + + "The default value is true."}, + needForward = true) + public boolean enableOrcLazyMat = true; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { @@ -1439,6 +1459,23 @@ public class SessionVariable implements Serializable, Writable { this.fileSplitSize = fileSplitSize; } + public boolean isEnableParquetLazyMat() { + return enableParquetLazyMat; + } + + public void setEnableParquetLazyMat(boolean enableParquetLazyMat) { + this.enableParquetLazyMat = enableParquetLazyMat; + } + + public boolean isEnableOrcLazyMat() { + return enableOrcLazyMat; + } + + public void setEnableOrcLazyMat(boolean enableOrcLazyMat) { + this.enableOrcLazyMat = enableOrcLazyMat; + } + + /** * getInsertVisibleTimeoutMs. **/ @@ -1771,6 +1808,9 @@ public class SessionVariable implements Serializable, Writable { tResult.setDryRunQuery(true); } + tResult.setEnableParquetLazyMat(enableParquetLazyMat); + tResult.setEnableOrcLazyMat(enableOrcLazyMat); + return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 6afe288ee8..4b052c044f 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -215,6 +215,10 @@ struct TQueryOptions { // Specify base path for file cache 70: optional string file_cache_base_path + + 71: optional bool enable_parquet_lazy_mat = true + + 72: optional bool enable_orc_lazy_mat = true } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org