This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b7a50a09fe [Opt](orc-reader) Optimize orc reader by dict filtering. (#20806) b7a50a09fe is described below commit b7a50a09febc5c24cca38cd02bede48252938837 Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Fri Jun 16 13:11:37 2023 +0800 [Opt](orc-reader) Optimize orc reader by dict filtering. (#20806) Optimize orc reader by dict filtering. It is similar with #17594. Test result **ssb-flat-100**: (3 nodes) | Query | before opt | after opt | | ------------- |:-------------:| ---------:| Q1.1 | 1.239 | 1.145 Q1.2 | 1.254 | 1.128 Q1.3 | 1.931 | 1.644 Q2.1 | 1.359 | 1.006 Q2.2 | 1.229 | 0.674 Q2.3 | 0.934 | 0.427 Q3.1 | 2.226 | 1.712 Q3.2 | 2.042 | 1.562 Q3.3 | 1.631 | 1.021 Q3.4 | 1.618 | 0.732 Q4.1 | 2.294 | 1.858 Q4.2 | 2.511 | 1.961 Q4.3 | 1.736 | 1.446 total | 22.004 | 16.316 --- be/src/apache-orc | 2 +- be/src/vec/exec/format/orc/vorc_reader.cpp | 698 ++++++++++++++++++++- be/src/vec/exec/format/orc/vorc_reader.h | 81 ++- .../format/table/transactional_hive_reader.cpp | 14 +- .../exec/format/table/transactional_hive_reader.h | 5 +- be/src/vec/exec/scan/vfile_scanner.cpp | 12 +- 6 files changed, 773 insertions(+), 39 deletions(-) diff --git a/be/src/apache-orc b/be/src/apache-orc index 380df03331..a4e67d732e 160000 --- a/be/src/apache-orc +++ b/be/src/apache-orc @@ -1 +1 @@ -Subproject commit 380df03331c12fa4095dd2613eb5f08ad541eb3e +Subproject commit a4e67d732e9acf3acb45e85c4cfe84d630e71ec1 diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 9fd4bc57d9..f1d6fbc7dd 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -38,6 +38,8 @@ #include "cctz/time_zone.h" #include "common/exception.h" #include "exec/olap_utils.h" +#include "exprs/create_predicate_function.h" +#include "exprs/hybrid_set.h" #include "gutil/casts.h" #include "gutil/strings/substitute.h" #include "io/fs/buffered_reader.h" @@ -71,7 +73,10 @@ #include "vec/data_types/data_type_struct.h" #include "vec/exec/format/table/transactional_hive_common.h" #include "vec/exprs/vbloom_predicate.h" +#include "vec/exprs/vdirect_in_predicate.h" +#include "vec/exprs/vectorized_fn_call.h" #include "vec/exprs/vin_predicate.h" +#include "vec/exprs/vliteral.h" #include "vec/exprs/vruntimefilter_wrapper.h" #include "vec/exprs/vslot_ref.h" #include "vec/runtime/vdatetime_value.h" @@ -87,12 +92,14 @@ enum class FileCachePolicy : uint8_t; namespace doris::vectorized { +// TODO: we need to determine it by test. +static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max(); + #define FOR_FLAT_ORC_COLUMNS(M) \ M(TypeIndex::Int8, Int8, orc::LongVectorBatch) \ M(TypeIndex::UInt8, UInt8, orc::LongVectorBatch) \ M(TypeIndex::Int16, Int16, orc::LongVectorBatch) \ M(TypeIndex::UInt16, UInt16, orc::LongVectorBatch) \ - M(TypeIndex::Int32, Int32, orc::LongVectorBatch) \ M(TypeIndex::UInt32, UInt32, orc::LongVectorBatch) \ M(TypeIndex::Int64, Int64, orc::LongVectorBatch) \ M(TypeIndex::UInt64, UInt64, orc::LongVectorBatch) \ @@ -160,13 +167,9 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r } OrcReader::~OrcReader() { - close(); -} - -void OrcReader::close() { - if (!_closed) { - _collect_profile_on_close(); - _closed = true; + _collect_profile_on_close(); + if (_obj_pool && _obj_pool.get()) { + _obj_pool->clear(); } } @@ -232,12 +235,24 @@ Status OrcReader::_create_file_reader() { Status OrcReader::init_reader( const std::vector<std::string>* column_names, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - const VExprContextSPtrs& conjuncts, bool is_acid) { + const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) { _column_names = column_names; _colname_to_value_range = colname_to_value_range; _text_converter.reset(new TextConverter('\\')); _lazy_read_ctx.conjuncts = conjuncts; _is_acid = is_acid; + _tuple_descriptor = tuple_descriptor; + _row_descriptor = row_descriptor; + _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; + _text_converter.reset(new TextConverter('\\')); + if (not_single_slot_filter_conjuncts) { + _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), + not_single_slot_filter_conjuncts->end()); + } + _obj_pool = std::make_shared<ObjectPool>(); SCOPED_RAW_TIMER(&_statistics.parse_meta_time); RETURN_IF_ERROR(_create_file_reader()); RETURN_IF_ERROR(_init_read_columns()); @@ -735,7 +750,12 @@ Status OrcReader::set_fill_columns( _orc_filter = std::unique_ptr<ORCFilterImpl>(new ORCFilterImpl(this)); } try { - _row_reader = _reader->createRowReader(_row_reader_options, _orc_filter.get()); + _row_reader_options.setEnableLazyDecoding(true); + if (!_lazy_read_ctx.conjuncts.empty()) { + _string_dict_filter = std::make_unique<StringDictFilterImpl>(this); + } + _row_reader = _reader->createRowReader(_row_reader_options, _orc_filter.get(), + _string_dict_filter.get()); _batch = _row_reader->createRowBatch(_batch_size); } catch (std::exception& e) { return Status::InternalError("Failed to create orc row reader. reason = {}", e.what()); @@ -743,6 +763,22 @@ Status OrcReader::set_fill_columns( auto& selected_type = _row_reader->getSelectedType(); int idx = 0; _init_select_types(selected_type, idx); + + if (!_slot_id_to_filter_conjuncts) { + return Status::OK(); + } + + // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts) + // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts. + for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { + auto& [value, slot_desc] = kv.second; + auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id()); + if (iter != _slot_id_to_filter_conjuncts->end()) { + for (auto& ctx : iter->second) { + _filter_conjuncts.push_back(ctx); + } + } + } return Status::OK(); } @@ -937,11 +973,26 @@ Status OrcReader::_decode_string_column(const std::string& col_name, const orc::TypeKind& type_kind, orc::ColumnVectorBatch* cvb, size_t num_values) { SCOPED_RAW_TIMER(&_statistics.decode_value_time); - const static std::string empty_string; - auto* data = down_cast<orc::StringVectorBatch*>(cvb); + auto* data = down_cast<orc::EncodedStringVectorBatch*>(cvb); if (data == nullptr) { return Status::InternalError("Wrong data type for colum '{}'", col_name); } + if (data->isEncoded) { + return _decode_string_dict_encoded_column<is_filter>(col_name, data_column, type_kind, data, + num_values); + } else { + return _decode_string_non_dict_encoded_column<is_filter>(col_name, data_column, type_kind, + data, num_values); + } +} + +template <bool is_filter> +Status OrcReader::_decode_string_non_dict_encoded_column(const std::string& col_name, + const MutableColumnPtr& data_column, + const orc::TypeKind& type_kind, + orc::EncodedStringVectorBatch* cvb, + size_t num_values) { + const static std::string empty_string; std::vector<StringRef> string_values; string_values.reserve(num_values); if (type_kind == orc::TypeKind::CHAR) { @@ -949,8 +1000,8 @@ Status OrcReader::_decode_string_column(const std::string& col_name, if (cvb->hasNulls) { for (int i = 0; i < num_values; ++i) { if (cvb->notNull[i]) { - string_values.emplace_back(data->data[i], - trim_right(data->data[i], data->length[i])); + string_values.emplace_back(cvb->data[i], + trim_right(cvb->data[i], cvb->length[i])); } else { // Orc doesn't fill null values in new batch, but the former batch has been release. // Other types like int/long/timestamp... are flat types without pointer in them, @@ -960,22 +1011,21 @@ Status OrcReader::_decode_string_column(const std::string& col_name, } } else { for (int i = 0; i < num_values; ++i) { - string_values.emplace_back(data->data[i], - trim_right(data->data[i], data->length[i])); + string_values.emplace_back(cvb->data[i], trim_right(cvb->data[i], cvb->length[i])); } } } else { if (cvb->hasNulls) { for (int i = 0; i < num_values; ++i) { if (cvb->notNull[i]) { - string_values.emplace_back(data->data[i], data->length[i]); + string_values.emplace_back(cvb->data[i], cvb->length[i]); } else { string_values.emplace_back(empty_string.data(), 0); } } } else { for (int i = 0; i < num_values; ++i) { - string_values.emplace_back(data->data[i], data->length[i]); + string_values.emplace_back(cvb->data[i], cvb->length[i]); } } } @@ -983,6 +1033,128 @@ Status OrcReader::_decode_string_column(const std::string& col_name, return Status::OK(); } +template <bool is_filter> +Status OrcReader::_decode_string_dict_encoded_column(const std::string& col_name, + const MutableColumnPtr& data_column, + const orc::TypeKind& type_kind, + orc::EncodedStringVectorBatch* cvb, + size_t num_values) { + const static std::string empty_string; + std::vector<StringRef> string_values; + size_t max_value_length = 0; + string_values.reserve(num_values); + UInt8* __restrict filter_data; + if constexpr (is_filter) { + filter_data = _filter->data(); + } + if (type_kind == orc::TypeKind::CHAR) { + // Possibly there are some zero padding characters in CHAR type, we have to strip them off. + if (cvb->hasNulls) { + for (int i = 0; i < num_values; ++i) { + if (cvb->notNull[i]) { + if constexpr (is_filter) { + if (!filter_data[i]) { + continue; + } + } + char* val_ptr; + int64_t length; + cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length); + length = trim_right(val_ptr, length); + if (length > max_value_length) { + max_value_length = length; + } + string_values.emplace_back(val_ptr, length); + } else { + // Orc doesn't fill null values in new batch, but the former batch has been release. + // Other types like int/long/timestamp... are flat types without pointer in them, + // so other types do not need to be handled separately like string. + string_values.emplace_back(empty_string.data(), 0); + } + } + } else { + for (int i = 0; i < num_values; ++i) { + if constexpr (is_filter) { + if (!filter_data[i]) { + continue; + } + } + char* val_ptr; + int64_t length; + cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length); + length = trim_right(val_ptr, length); + if (length > max_value_length) { + max_value_length = length; + } + string_values.emplace_back(val_ptr, length); + } + } + } else { + if (cvb->hasNulls) { + for (int i = 0; i < num_values; ++i) { + if (cvb->notNull[i]) { + if constexpr (is_filter) { + if (!filter_data[i]) { + continue; + } + } + char* val_ptr; + int64_t length; + cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length); + string_values.emplace_back(val_ptr, length); + } else { + string_values.emplace_back(empty_string.data(), 0); + } + } + } else { + for (int i = 0; i < num_values; ++i) { + if constexpr (is_filter) { + if (!filter_data[i]) { + continue; + } + } + char* val_ptr; + int64_t length; + cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length); + if (length > max_value_length) { + max_value_length = length; + } + string_values.emplace_back(val_ptr, length); + } + } + } + data_column->insert_many_strings_overflow(&string_values[0], string_values.size(), + max_value_length); + return Status::OK(); +} + +template <bool is_filter> +Status OrcReader::_decode_int32_column(const std::string& col_name, + const MutableColumnPtr& data_column, + orc::ColumnVectorBatch* cvb, size_t num_values) { + SCOPED_RAW_TIMER(&_statistics.decode_value_time); + if (dynamic_cast<orc::LongVectorBatch*>(cvb) != nullptr) { + return _decode_flat_column<Int32, orc::LongVectorBatch>(col_name, data_column, cvb, + num_values); + } else if (dynamic_cast<orc::EncodedStringVectorBatch*>(cvb) != nullptr) { + auto* data = static_cast<orc::EncodedStringVectorBatch*>(cvb); + if (data == nullptr) { + return Status::InternalError("Wrong data type for colum '{}'", col_name); + } + auto* cvb_data = data->index.data(); + auto& column_data = static_cast<ColumnVector<Int32>&>(*data_column).get_data(); + auto origin_size = column_data.size(); + column_data.resize(origin_size + num_values); + for (int i = 0; i < num_values; ++i) { + column_data[origin_size + i] = (Int32)cvb_data[i]; + } + return Status::OK(); + } else { + DCHECK(false) << "Bad ColumnVectorBatch type."; + return Status::InternalError("Bad ColumnVectorBatch type."); + } +} + Status OrcReader::_fill_doris_array_offsets(const std::string& col_name, ColumnArray::Offsets64& doris_offsets, orc::DataBuffer<int64_t>& orc_offsets, @@ -1045,6 +1217,8 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name, return _decode_flat_column<CppType, OrcColumnType>(col_name, data_column, cvb, num_values); FOR_FLAT_ORC_COLUMNS(DISPATCH) #undef DISPATCH + case TypeIndex::Int32: + return _decode_int32_column<is_filter>(col_name, data_column, cvb, num_values); case TypeIndex::Decimal32: return _decode_decimal_column<Int32, is_filter>(col_name, data_column, data_type, cvb, num_values); @@ -1227,6 +1401,23 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } } + for (auto& dict_filter_cols : _dict_filter_cols) { + MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create(); + size_t pos = block->get_position_by_name(dict_filter_cols.first); + auto& column_with_type_and_name = block->get_by_position(pos); + auto& column_type = column_with_type_and_name.type; + if (column_type->is_nullable()) { + block->get_by_position(pos).type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); + block->replace_by_position( + pos, ColumnNullable::create(std::move(dict_col_ptr), + ColumnUInt8::create(dict_col_ptr->size(), 0))); + } else { + block->get_by_position(pos).type = std::make_shared<DataTypeInt32>(); + block->replace_by_position(pos, std::move(dict_col_ptr)); + } + } + std::vector<orc::ColumnVectorBatch*> batch_vec; _fill_batch_vec(batch_vec, _batch.get(), 0); @@ -1257,8 +1448,13 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } if (!_lazy_read_ctx.conjuncts.empty()) { VExprContextSPtrs filter_conjuncts; - for (auto& conjunct : _lazy_read_ctx.conjuncts) { - filter_conjuncts.push_back(conjunct); + filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(), + _filter_conjuncts.end()); + for (auto& conjunct : _dict_filter_conjuncts) { + filter_conjuncts.emplace_back(conjunct); + } + for (auto& conjunct : _non_dict_filter_conjuncts) { + filter_conjuncts.emplace_back(conjunct); } std::vector<IColumn::Filter*> filters; if (_delete_rows_filter_ptr) { @@ -1274,6 +1470,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { } Block::erase_useless_column(block, column_to_keep); } + _convert_dict_cols_to_string_cols(block, &batch_vec); } return Status::OK(); } @@ -1319,6 +1516,22 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s Block* block = (Block*)arg; size_t origin_column_num = block->columns(); + for (auto& dict_filter_cols : _dict_filter_cols) { + MutableColumnPtr dict_col_ptr = ColumnVector<Int32>::create(); + size_t pos = block->get_position_by_name(dict_filter_cols.first); + auto& column_with_type_and_name = block->get_by_position(pos); + auto& column_type = column_with_type_and_name.type; + if (column_type->is_nullable()) { + block->get_by_position(pos).type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()); + block->replace_by_position( + pos, ColumnNullable::create(std::move(dict_col_ptr), + ColumnUInt8::create(dict_col_ptr->size(), 0))); + } else { + block->get_by_position(pos).type = std::make_shared<DataTypeInt32>(); + block->replace_by_position(pos, std::move(dict_col_ptr)); + } + } std::vector<orc::ColumnVectorBatch*> batch_vec; _fill_batch_vec(batch_vec, &data, 0); std::vector<string> col_names; @@ -1356,11 +1569,20 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s auto* __restrict result_filter_data = _filter->data(); bool can_filter_all = false; VExprContextSPtrs filter_conjuncts; - for (auto& conjunct : _lazy_read_ctx.conjuncts) { - filter_conjuncts.push_back(conjunct); + filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(), + _filter_conjuncts.end()); + for (auto& conjunct : _dict_filter_conjuncts) { + filter_conjuncts.emplace_back(conjunct); + } + for (auto& conjunct : _non_dict_filter_conjuncts) { + filter_conjuncts.emplace_back(conjunct); + } + std::vector<IColumn::Filter*> filters; + if (_delete_rows_filter_ptr) { + filters.push_back(_delete_rows_filter_ptr.get()); } RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts( - filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all)); + filter_conjuncts, &filters, block, _filter.get(), &can_filter_all)); if (_lazy_read_ctx.resize_first_column) { block->get_by_position(0).column->assume_mutable()->clear(); @@ -1386,9 +1608,439 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s new_size += result_filter_data[i] ? 1 : 0; } data.numElements = new_size; + if (data.numElements > 0) { + _convert_dict_cols_to_string_cols(block, &batch_vec); + } else { + _convert_dict_cols_to_string_cols(block, nullptr); + } + return Status::OK(); +} + +Status OrcReader::fill_dict_filter_column_names( + std::unique_ptr<orc::StripeInformation> current_strip_information, + std::list<std::string>& column_names) { + // Check if single slot can be filtered by dict. + if (!_slot_id_to_filter_conjuncts) { + return Status::OK(); + } + _obj_pool->clear(); + _dict_filter_cols.clear(); + _dict_filter_conjuncts.clear(); + _non_dict_filter_conjuncts.clear(); + + const std::list<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first; + const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second; + int i = 0; + for (auto& predicate_col_name : predicate_col_names) { + int slot_id = predicate_col_slot_ids[i]; + if (_can_filter_by_dict(slot_id)) { + _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id)); + column_names.emplace_back(_col_name_to_file_col_name[predicate_col_name]); + } else { + if (_slot_id_to_filter_conjuncts->find(slot_id) != + _slot_id_to_filter_conjuncts->end()) { + for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { + _non_dict_filter_conjuncts.push_back(ctx); + } + } + } + ++i; + } + return Status::OK(); +} + +bool OrcReader::_can_filter_by_dict(int slot_id) { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + if (!slot->type().is_string_type()) { + return false; + } + + if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) { + return false; + } + + // TODOļ¼check expr like 'a > 10 is null', 'a > 10' should can be filter by dict. + for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { + const auto& root_expr = ctx->root(); + if (root_expr->node_type() == TExprNodeType::FUNCTION_CALL) { + std::string is_null_str; + std::string function_name = root_expr->fn().name.function_name; + if (function_name.compare("is_null_pred") == 0 || + function_name.compare("is_not_null_pred") == 0) { + return false; + } + } + } + return true; +} + +Status OrcReader::on_string_dicts_loaded( + std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map, + bool* is_stripe_filtered) { + *is_stripe_filtered = false; + for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) { + std::string& dict_filter_col_name = it->first; + int slot_id = it->second; + + // Can not dict filter col find because stripe is not dict encoded, then remove it. + VExprContextSPtrs ctxs; + auto iter = _slot_id_to_filter_conjuncts->find(slot_id); + if (iter != _slot_id_to_filter_conjuncts->end()) { + for (auto& ctx : iter->second) { + ctxs.push_back(ctx); + } + } else { + std::stringstream msg; + msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found"; + return Status::NotFound(msg.str()); + } + auto file_column_name_to_dict_map_iter = + file_column_name_to_dict_map.find(_col_name_to_file_col_name[dict_filter_col_name]); + if (file_column_name_to_dict_map_iter == file_column_name_to_dict_map.end()) { + it = _dict_filter_cols.erase(it); + for (auto& ctx : ctxs) { + _non_dict_filter_conjuncts.emplace_back(ctx); + } + continue; + } + + // 1. Get dictionary values to a string column. + MutableColumnPtr dict_value_column = ColumnString::create(); + orc::StringDictionary* dict = file_column_name_to_dict_map_iter->second; + + std::vector<StringRef> dict_values; + std::unordered_map<StringRef, int64_t> dict_value_to_code; + size_t max_value_length = 0; + uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1; + dict_values.reserve(dictionaryCount); + for (int i = 0; i < dictionaryCount; ++i) { + char* val_ptr; + int64_t length; + dict->getValueByIndex(i, val_ptr, length); + StringRef dict_value(val_ptr, length); + if (length > max_value_length) { + max_value_length = length; + } + dict_values.emplace_back(dict_value); + dict_value_to_code[dict_value] = i; + } + dict_value_column->insert_many_strings_overflow(&dict_values[0], dict_values.size(), + max_value_length); + size_t dict_value_column_size = dict_value_column->size(); + // 2. Build a temp block from the dict string column, then execute conjuncts and filter block. + // 2.1 Build a temp block from the dict string column to match the conjuncts executing. + Block temp_block; + int dict_pos = -1; + int index = 0; + for (const auto slot_desc : _tuple_descriptor->slots()) { + if (!slot_desc->need_materialize()) { + // should be ignored from reading + continue; + } + if (slot_desc->id() == slot_id) { + auto data_type = slot_desc->get_data_type_ptr(); + if (data_type->is_nullable()) { + temp_block.insert( + {ColumnNullable::create(std::move(dict_value_column), + ColumnUInt8::create(dict_value_column_size, 0)), + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), + ""}); + } else { + temp_block.insert( + {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""}); + } + dict_pos = index; + + } else { + temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + ++index; + } + + // 2.2 Execute conjuncts and filter block. + std::vector<uint32_t> columns_to_filter(1, dict_pos); + int column_to_keep = temp_block.columns(); + if (dict_pos != 0) { + // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 + // The following process may be tricky and time-consuming, but we have no other way. + temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); + } + RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep))); + if (dict_pos != 0) { + // We have to clean the first column to insert right data. + temp_block.get_by_position(0).column->assume_mutable()->clear(); + } + + // Check some conditions. + ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column; + // If dict_column->size() == 0, can filter this stripe. + if (dict_column->size() == 0) { + *is_stripe_filtered = true; + return Status::OK(); + } + + // About Performance: if dict_column size is too large, it will generate a large IN filter. + if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { + it = _dict_filter_cols.erase(it); + for (auto& ctx : ctxs) { + _non_dict_filter_conjuncts.emplace_back(ctx); + } + continue; + } + + // 3. Get dict codes. + std::vector<int32_t> dict_codes; + if (dict_column->is_nullable()) { + const ColumnNullable* nullable_column = + static_cast<const ColumnNullable*>(dict_column.get()); + const ColumnString* nested_column = static_cast<const ColumnString*>( + nullable_column->get_nested_column_ptr().get()); + for (int i = 0; i < nested_column->size(); ++i) { + StringRef dict_value = nested_column->get_data_at(i); + dict_codes.emplace_back(dict_value_to_code[dict_value]); + } + } else { + for (int i = 0; i < dict_column->size(); ++i) { + StringRef dict_value = dict_column->get_data_at(i); + dict_codes.emplace_back(dict_value_to_code[dict_value]); + } + } + + // 4. Rewrite conjuncts. + _rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()); + ++it; + } + return Status::OK(); +} + +Status OrcReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, + bool is_nullable) { + VExprSPtr root; + if (dict_codes.size() == 1) { + { + TFunction fn; + TFunctionName fn_name; + fn_name.__set_db_name(""); + fn_name.__set_function_name("eq"); + fn.__set_name(fn_name); + fn.__set_binary_type(TFunctionBinaryType::BUILTIN); + std::vector<TTypeDesc> arg_types; + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); + fn.__set_arg_types(arg_types); + fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + fn.__set_has_var_args(false); + + TExprNode texpr_node; + texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + texpr_node.__set_node_type(TExprNodeType::BINARY_PRED); + texpr_node.__set_opcode(TExprOpcode::EQ); + texpr_node.__set_vector_opcode(TExprOpcode::EQ); + texpr_node.__set_fn(fn); + texpr_node.__set_child_type(TPrimitiveType::INT); + texpr_node.__set_num_children(2); + texpr_node.__set_is_nullable(is_nullable); + root = VectorizedFnCall::create_shared(texpr_node); + } + { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + root->add_child(VSlotRef::create_shared(slot)); + } + { + TExprNode texpr_node; + texpr_node.__set_node_type(TExprNodeType::INT_LITERAL); + texpr_node.__set_type(create_type_desc(TYPE_INT)); + TIntLiteral int_literal; + int_literal.__set_value(dict_codes[0]); + texpr_node.__set_int_literal(int_literal); + texpr_node.__set_is_nullable(is_nullable); + root->add_child(VLiteral::create_shared(texpr_node)); + } + } else { + { + TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(TExprNodeType::IN_PRED); + node.in_predicate.__set_is_not_in(false); + node.__set_opcode(TExprOpcode::FILTER_IN); + node.__isset.vector_opcode = true; + node.__set_vector_opcode(TExprOpcode::FILTER_IN); + // VdirectInPredicate assume is_nullable = false. + node.__set_is_nullable(false); + + root = vectorized::VDirectInPredicate::create_shared(node); + std::shared_ptr<HybridSetBase> hybrid_set( + create_set(PrimitiveType::TYPE_INT, dict_codes.size())); + for (int j = 0; j < dict_codes.size(); ++j) { + hybrid_set->insert(&dict_codes[j]); + } + static_cast<vectorized::VDirectInPredicate*>(root.get())->set_filter(hybrid_set); + } + { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + root->add_child(VSlotRef::create_shared(slot)); + } + } + VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root); + RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor)); + RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state)); + _dict_filter_conjuncts.emplace_back(rewritten_conjunct_ctx); return Status::OK(); } +Status OrcReader::_convert_dict_cols_to_string_cols( + Block* block, const std::vector<orc::ColumnVectorBatch*>* batch_vec) { + for (auto& dict_filter_cols : _dict_filter_cols) { + size_t pos = block->get_position_by_name(dict_filter_cols.first); + ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos); + const ColumnPtr& column = column_with_type_and_name.column; + auto orc_col_idx = _colname_to_idx.find(dict_filter_cols.first); + if (orc_col_idx == _colname_to_idx.end()) { + return Status::InternalError("Wrong read column '{}' in orc file", + dict_filter_cols.first); + } + if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { + const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); + const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(nested_column.get()); + DCHECK(dict_column); + + MutableColumnPtr string_column; + if (batch_vec != nullptr) { + string_column = _convert_dict_column_to_string_column( + dict_column, (*batch_vec)[orc_col_idx->second], + _col_orc_type[orc_col_idx->second]); + } else { + string_column = ColumnString::create(); + } + + column_with_type_and_name.type = + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()); + block->replace_by_position( + pos, ColumnNullable::create(std::move(string_column), + nullable_column->get_null_map_column_ptr())); + } else { + const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(column.get()); + MutableColumnPtr string_column; + if (batch_vec != nullptr) { + string_column = _convert_dict_column_to_string_column( + dict_column, (*batch_vec)[orc_col_idx->second], + _col_orc_type[orc_col_idx->second]); + } else { + string_column = ColumnString::create(); + } + + column_with_type_and_name.type = std::make_shared<DataTypeString>(); + block->replace_by_position(pos, std::move(string_column)); + } + } + return Status::OK(); +} + +MutableColumnPtr OrcReader::_convert_dict_column_to_string_column( + const ColumnInt32* dict_column, orc::ColumnVectorBatch* cvb, + const orc::Type* orc_column_type) { + SCOPED_RAW_TIMER(&_statistics.decode_value_time); + auto res = ColumnString::create(); + const static std::string empty_string; + auto* encoded_string_vector_batch = static_cast<orc::EncodedStringVectorBatch*>(cvb); + DCHECK(encoded_string_vector_batch); + std::vector<StringRef> string_values; + size_t num_values = dict_column->size(); + const int* dict_data = dict_column->get_data().data(); + string_values.reserve(num_values); + size_t max_value_length = 0; + if (orc_column_type->getKind() == orc::TypeKind::CHAR) { + // Possibly there are some zero padding characters in CHAR type, we have to strip them off. + if (cvb->hasNulls) { + for (int i = 0; i < num_values; ++i) { + if (cvb->notNull[i]) { + char* val_ptr; + int64_t length; + encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr, + length); + length = trim_right(val_ptr, length); + if (length > max_value_length) { + max_value_length = length; + } + string_values.emplace_back(val_ptr, length); + } else { + // Orc doesn't fill null values in new batch, but the former batch has been release. + // Other types like int/long/timestamp... are flat types without pointer in them, + // so other types do not need to be handled separately like string. + string_values.emplace_back(empty_string.data(), 0); + } + } + } else { + for (int i = 0; i < num_values; ++i) { + char* val_ptr; + int64_t length; + encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr, + length); + length = trim_right(val_ptr, length); + if (length > max_value_length) { + max_value_length = length; + } + string_values.emplace_back(val_ptr, length); + } + } + } else { + if (cvb->hasNulls) { + for (int i = 0; i < num_values; ++i) { + if (cvb->notNull[i]) { + char* val_ptr; + int64_t length; + encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr, + length); + if (length > max_value_length) { + max_value_length = length; + } + string_values.emplace_back(val_ptr, length); + } else { + string_values.emplace_back(empty_string.data(), 0); + } + } + } else { + for (int i = 0; i < num_values; ++i) { + char* val_ptr; + int64_t length; + encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr, + length); + if (length > max_value_length) { + max_value_length = length; + } + string_values.emplace_back(val_ptr, length); + } + } + } + res->insert_many_strings_overflow(&string_values[0], num_values, max_value_length); + return res; +} + void ORCFileInputStream::beforeReadStripe( std::unique_ptr<orc::StripeInformation> current_strip_information, std::vector<bool> selected_columns) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 2a62bfb93c..de7a8d182f 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -143,7 +143,10 @@ public: Status init_reader( const std::vector<std::string>* column_names, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - const VExprContextSPtrs& conjuncts, bool is_acid); + const VExprContextSPtrs& conjuncts, bool is_acid, + const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); Status set_fill_columns( const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& @@ -167,8 +170,6 @@ public: void _build_delete_row_filter(const Block* block, size_t rows); - void close(); - int64_t size() const; std::unordered_map<std::string, TypeDescriptor> get_name_to_type() override; @@ -178,12 +179,20 @@ public: Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; - Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg); - void set_delete_rows(const TransactionalHiveReader::AcidRowIDSet* delete_rows) { _delete_rows = delete_rows; } + Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg); + + Status fill_dict_filter_column_names( + std::unique_ptr<orc::StripeInformation> current_strip_information, + std::list<std::string>& column_names); + + Status on_string_dicts_loaded( + std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map, + bool* is_stripe_filtered); + private: struct OrcProfile { RuntimeProfile::Counter* read_time; @@ -209,6 +218,27 @@ private: OrcReader* orcReader; }; + class StringDictFilterImpl : public orc::StringDictFilter { + public: + StringDictFilterImpl(OrcReader* orc_reader) : _orc_reader(orc_reader) {} + ~StringDictFilterImpl() override = default; + + virtual void fillDictFilterColumnNames( + std::unique_ptr<orc::StripeInformation> current_strip_information, + std::list<std::string>& column_names) const override { + _orc_reader->fill_dict_filter_column_names(std::move(current_strip_information), + column_names); + } + virtual void onStringDictsLoaded( + std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map, + bool* is_stripe_filtered) const override { + _orc_reader->on_string_dicts_loaded(column_name_to_dict_map, is_stripe_filtered); + } + + private: + OrcReader* _orc_reader; + }; + // Create inner orc file, // return EOF if file is empty // return EROOR if encounter error. @@ -344,6 +374,10 @@ private: return Status::OK(); } + template <bool is_filter> + Status _decode_int32_column(const std::string& col_name, const MutableColumnPtr& data_column, + orc::ColumnVectorBatch* cvb, size_t num_values); + template <typename DecimalPrimitiveType, bool is_filter> Status _decode_decimal_column(const std::string& col_name, const MutableColumnPtr& data_column, const DataTypePtr& data_type, orc::ColumnVectorBatch* cvb, @@ -410,6 +444,20 @@ private: const orc::TypeKind& type_kind, orc::ColumnVectorBatch* cvb, size_t num_values); + template <bool is_filter> + Status _decode_string_non_dict_encoded_column(const std::string& col_name, + const MutableColumnPtr& data_column, + const orc::TypeKind& type_kind, + orc::EncodedStringVectorBatch* cvb, + size_t num_values); + + template <bool is_filter> + Status _decode_string_dict_encoded_column(const std::string& col_name, + const MutableColumnPtr& data_column, + const orc::TypeKind& type_kind, + orc::EncodedStringVectorBatch* cvb, + size_t num_values); + Status _fill_doris_array_offsets(const std::string& col_name, ColumnArray::Offsets64& doris_offsets, orc::DataBuffer<int64_t>& orc_offsets, size_t num_values, @@ -419,6 +467,17 @@ private: void _collect_profile_on_close(); + bool _can_filter_by_dict(int slot_id); + + Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, bool is_nullable); + + Status _convert_dict_cols_to_string_cols(Block* block, + const std::vector<orc::ColumnVectorBatch*>* batch_vec); + + MutableColumnPtr _convert_dict_column_to_string_column(const ColumnInt32* dict_column, + orc::ColumnVectorBatch* cvb, + const orc::Type* orc_column_typ); + private: RuntimeProfile* _profile = nullptr; RuntimeState* _state = nullptr; @@ -449,7 +508,6 @@ private: std::unique_ptr<ORCFileInputStream> _file_input_stream; Statistics _statistics; OrcProfile _orc_profile; - bool _closed = false; std::unique_ptr<orc::ColumnVectorBatch> _batch; std::unique_ptr<orc::Reader> _reader; @@ -473,6 +531,17 @@ private: std::unique_ptr<TextConverter> _text_converter = nullptr; const TransactionalHiveReader::AcidRowIDSet* _delete_rows = nullptr; std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr = nullptr; + + const TupleDescriptor* _tuple_descriptor; + const RowDescriptor* _row_descriptor; + const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts; + VExprContextSPtrs _dict_filter_conjuncts; + VExprContextSPtrs _non_dict_filter_conjuncts; + VExprContextSPtrs _filter_conjuncts; + // std::pair<col_name, slot_id> + std::vector<std::pair<std::string, int>> _dict_filter_cols; + std::shared_ptr<ObjectPool> _obj_pool; + std::unique_ptr<orc::StringDictFilter> _string_dict_filter; }; class ORCFileInputStream : public orc::InputStream { diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index a2ae767d56..79341f40aa 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -57,12 +57,17 @@ TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader> Status TransactionalHiveReader::init_reader( const std::vector<std::string>& column_names, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - const VExprContextSPtrs& conjuncts) { + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) { OrcReader* orc_reader = static_cast<OrcReader*>(_file_format_reader.get()); _col_names.insert(_col_names.end(), column_names.begin(), column_names.end()); _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()); - Status status = orc_reader->init_reader(&_col_names, colname_to_value_range, conjuncts, true); + Status status = orc_reader->init_reader( + &_col_names, colname_to_value_range, conjuncts, true, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); return status; } @@ -132,8 +137,9 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) { OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE, _state->timezone(), _io_ctx, false); - RETURN_IF_ERROR(delete_reader.init_reader( - &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, nullptr, {}, false)); + RETURN_IF_ERROR( + delete_reader.init_reader(&TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, + nullptr, {}, false, nullptr, nullptr, nullptr, nullptr)); std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>> partition_columns; diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index b19102a366..5b3609e47f 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -106,7 +106,10 @@ public: Status init_reader( const std::vector<std::string>& column_names, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, - const VExprContextSPtrs& conjuncts); + const VExprContextSPtrs& conjuncts, const TupleDescriptor* tuple_descriptor, + const RowDescriptor* row_descriptor, + const VExprContextSPtrs* not_single_slot_filter_conjuncts, + const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts); private: struct TransactionalHiveProfile { diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 535dac3a68..b0a2d2e327 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -673,13 +673,17 @@ Status VFileScanner::_get_next_reader() { TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, _state, _params, range, _io_ctx.get()); - init_status = tran_orc_reader->init_reader(_file_col_names, _colname_to_value_range, - _push_down_conjuncts); + init_status = tran_orc_reader->init_reader( + _file_col_names, _colname_to_value_range, _push_down_conjuncts, + _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range)); _cur_reader = std::move(tran_orc_reader); } else { - init_status = orc_reader->init_reader(&_file_col_names, _colname_to_value_range, - _push_down_conjuncts, false); + init_status = orc_reader->init_reader( + &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, + _real_tuple_desc, _default_val_row_desc.get(), + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); _cur_reader = std::move(orc_reader); } break; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org