HappenLee commented on code in PR #17594: URL: https://github.com/apache/doris/pull/17594#discussion_r1133435376
########## be/src/vec/exec/format/parquet/vparquet_group_reader.cpp: ########## @@ -599,6 +668,236 @@ Status RowGroupReader::_filter_block_internal(Block* block, return Status::OK(); } +Status RowGroupReader::_rewrite_dict_predicates() { + for (vector<std::string>::iterator it = _dict_filter_col_names.begin(); + it != _dict_filter_col_names.end();) { + std::string& dict_filter_col_name = *it; + int slot_id = _col_name_to_slot_id->at(dict_filter_col_name); + // 1. Get dictionary values to a string column. + MutableColumnPtr dict_value_column = ColumnString::create(); + bool has_dict = false; + RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column( + dict_value_column, &has_dict)); + size_t dict_value_column_size = dict_value_column->size(); + DCHECK(has_dict); + // 2. Build a temp block from the dict string column, then execute conjuncts and filter block. + // 2.1 Build a temp block from the dict string column to match the conjuncts executing. + Block temp_block; + int dict_pos = -1; + int index = 0; + for (const auto slot_desc : _tuple_descriptor->slots()) { + if (!slot_desc->need_materialize()) { + // should be ignored from reading + continue; + } + if (slot_desc->id() == slot_id) { + auto data_type = slot_desc->get_data_type_ptr(); + if (data_type->is_nullable()) { + temp_block.insert( + {ColumnNullable::create(std::move(dict_value_column), + ColumnUInt8::create(dict_value_column_size, 0)), + std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()), + ""}); + } else { + temp_block.insert( + {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""}); + } + dict_pos = index; + + } else { + temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + ++index; + } + + // 2.2 Execute conjuncts and filter block. + const std::vector<VExprContext*>* ctxs = nullptr; + auto iter = _slot_id_to_filter_conjuncts->find(slot_id); + if (iter != _slot_id_to_filter_conjuncts->end()) { + ctxs = &(iter->second); + } else { + std::stringstream msg; + msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found"; + return Status::NotFound(msg.str()); + } + + std::vector<uint32_t> columns_to_filter(1, dict_pos); + int column_to_keep = temp_block.columns(); + if (dict_pos != 0) { + // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0 + // The following process may be tricky and time-consuming, but we have no other way. + temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); + } + std::vector<IColumn::Filter*> filters; + RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(*ctxs, filters, &temp_block, + columns_to_filter, column_to_keep)); + if (dict_pos != 0) { + // We have to clean the first column to insert right data. + temp_block.get_by_position(0).column->assume_mutable()->clear(); + } + + // Check some conditions. + ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column; + // If dict_column->size() == 0, can filter this row group. + if (dict_column->size() == 0) { + _is_row_group_filtered = true; + return Status::OK(); + } + + // About Performance: if dict_column size is too large, it will generate a large IN filter. + if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) { + for (auto& ctx : (*ctxs)) { + _filter_conjuncts.push_back(ctx); + } + it = _dict_filter_col_names.erase(it); + continue; + } + + // 3. Get dict codes. + std::vector<int32_t> dict_codes; + if (dict_column->is_nullable()) { + const ColumnNullable* nullable_column = + static_cast<const ColumnNullable*>(dict_column.get()); + const ColumnString* nested_column = static_cast<const ColumnString*>( + nullable_column->get_nested_column_ptr().get()); + RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes( + assert_cast<const ColumnString*>(nested_column), &dict_codes)); + } else { + RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes( + assert_cast<const ColumnString*>(dict_column.get()), &dict_codes)); + } + + // 4. Rewrite conjuncts. + _rewrite_dict_conjuncts(dict_codes, slot_id); + ++it; + } + return Status::OK(); +} + +Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id) { + VExpr* root; + if (dict_codes.size() == 1) { + { + TFunction fn; + TFunctionName fn_name; + fn_name.__set_db_name(""); + fn_name.__set_function_name("eq"); + fn.__set_name(fn_name); + fn.__set_binary_type(TFunctionBinaryType::BUILTIN); + std::vector<TTypeDesc> arg_types; + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); + arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT)); + fn.__set_arg_types(arg_types); + fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + fn.__set_has_var_args(false); + + TExprNode texpr_node; + texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + texpr_node.__set_node_type(TExprNodeType::BINARY_PRED); + texpr_node.__set_opcode(TExprOpcode::EQ); + texpr_node.__set_vector_opcode(TExprOpcode::EQ); + texpr_node.__set_fn(fn); + texpr_node.__set_child_type(TPrimitiveType::INT); + texpr_node.__set_num_children(2); + root = _obj_pool->add(new VectorizedFnCall(texpr_node)); + } + { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot)); + root->add_child(slot_ref_expr); + } + { + TExprNode texpr_node; + texpr_node.__set_node_type(TExprNodeType::INT_LITERAL); + texpr_node.__set_type(create_type_desc(TYPE_INT)); + TIntLiteral int_literal; + int_literal.__set_value(dict_codes[0]); + texpr_node.__set_int_literal(int_literal); + VExpr* literal_expr = _obj_pool->add(new VLiteral(texpr_node)); + root->add_child(literal_expr); + } + } else { + { + TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(TExprNodeType::IN_PRED); + node.in_predicate.__set_is_not_in(false); + node.__set_opcode(TExprOpcode::FILTER_IN); + node.__isset.vector_opcode = true; + node.__set_vector_opcode(TExprOpcode::FILTER_IN); + + root = _obj_pool->add(new vectorized::VDirectInPredicate(node)); + std::shared_ptr<HybridSetBase> hybrid_set(create_set(PrimitiveType::TYPE_INT)); + for (int j = 0; j < dict_codes.size(); ++j) { + hybrid_set->insert(&dict_codes[j]); + } + static_cast<vectorized::VDirectInPredicate*>(root)->set_filter(hybrid_set); + } + { + SlotDescriptor* slot = nullptr; + const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots(); + for (auto each : slots) { + if (each->id() == slot_id) { + slot = each; + break; + } + } + VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot)); + root->add_child(slot_ref_expr); + } + } + VExprContext* rewritten_conjunct_ctx = _obj_pool->add(new VExprContext(root)); + RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor)); + RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state)); + _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx); + _filter_conjuncts.push_back(rewritten_conjunct_ctx); + return Status::OK(); +} + +void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { + for (auto& dict_filter_col_name : _dict_filter_col_names) { + ColumnPtr& column = block->get_by_name(dict_filter_col_name).column; + if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { + const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); + const ColumnDictI32* dict_column = + assert_cast<const ColumnDictI32*>(nested_column.get()); + DCHECK(dict_column); + + MutableColumnPtr string_column = + _column_readers[dict_filter_col_name]->convert_dict_column_to_string_column( + dict_column); + + size_t pos = block->get_position_by_name(dict_filter_col_name); Review Comment: 869 line search hash map by nameļ¼ maybe better call `size_t pos = block->get_position_by_name(dict_filter_col_name);` in 869, only search hash map once time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org