HappenLee commented on code in PR #17594:
URL: https://github.com/apache/doris/pull/17594#discussion_r1133435376


##########
be/src/vec/exec/format/parquet/vparquet_group_reader.cpp:
##########
@@ -599,6 +668,236 @@ Status RowGroupReader::_filter_block_internal(Block* 
block,
     return Status::OK();
 }
 
+Status RowGroupReader::_rewrite_dict_predicates() {
+    for (vector<std::string>::iterator it = _dict_filter_col_names.begin();
+         it != _dict_filter_col_names.end();) {
+        std::string& dict_filter_col_name = *it;
+        int slot_id = _col_name_to_slot_id->at(dict_filter_col_name);
+        // 1. Get dictionary values to a string column.
+        MutableColumnPtr dict_value_column = ColumnString::create();
+        bool has_dict = false;
+        
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
+                dict_value_column, &has_dict));
+        size_t dict_value_column_size = dict_value_column->size();
+        DCHECK(has_dict);
+        // 2. Build a temp block from the dict string column, then execute 
conjuncts and filter block.
+        // 2.1 Build a temp block from the dict string column to match the 
conjuncts executing.
+        Block temp_block;
+        int dict_pos = -1;
+        int index = 0;
+        for (const auto slot_desc : _tuple_descriptor->slots()) {
+            if (!slot_desc->need_materialize()) {
+                // should be ignored from reading
+                continue;
+            }
+            if (slot_desc->id() == slot_id) {
+                auto data_type = slot_desc->get_data_type_ptr();
+                if (data_type->is_nullable()) {
+                    temp_block.insert(
+                            
{ColumnNullable::create(std::move(dict_value_column),
+                                                    
ColumnUInt8::create(dict_value_column_size, 0)),
+                             
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
+                             ""});
+                } else {
+                    temp_block.insert(
+                            {std::move(dict_value_column), 
std::make_shared<DataTypeString>(), ""});
+                }
+                dict_pos = index;
+
+            } else {
+                
temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                        
slot_desc->get_data_type_ptr(),
+                                                        
slot_desc->col_name()));
+            }
+            ++index;
+        }
+
+        // 2.2 Execute conjuncts and filter block.
+        const std::vector<VExprContext*>* ctxs = nullptr;
+        auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
+        if (iter != _slot_id_to_filter_conjuncts->end()) {
+            ctxs = &(iter->second);
+        } else {
+            std::stringstream msg;
+            msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] 
not found";
+            return Status::NotFound(msg.str());
+        }
+
+        std::vector<uint32_t> columns_to_filter(1, dict_pos);
+        int column_to_keep = temp_block.columns();
+        if (dict_pos != 0) {
+            // VExprContext.execute has an optimization, the filtering is 
executed when block->rows() > 0
+            // The following process may be tricky and time-consuming, but we 
have no other way.
+            
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
+        }
+        std::vector<IColumn::Filter*> filters;
+        RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(*ctxs, filters, 
&temp_block,
+                                                            columns_to_filter, 
column_to_keep));
+        if (dict_pos != 0) {
+            // We have to clean the first column to insert right data.
+            temp_block.get_by_position(0).column->assume_mutable()->clear();
+        }
+
+        // Check some conditions.
+        ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
+        // If dict_column->size() == 0, can filter this row group.
+        if (dict_column->size() == 0) {
+            _is_row_group_filtered = true;
+            return Status::OK();
+        }
+
+        // About Performance: if dict_column size is too large, it will 
generate a large IN filter.
+        if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
+            for (auto& ctx : (*ctxs)) {
+                _filter_conjuncts.push_back(ctx);
+            }
+            it = _dict_filter_col_names.erase(it);
+            continue;
+        }
+
+        // 3. Get dict codes.
+        std::vector<int32_t> dict_codes;
+        if (dict_column->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    static_cast<const ColumnNullable*>(dict_column.get());
+            const ColumnString* nested_column = static_cast<const 
ColumnString*>(
+                    nullable_column->get_nested_column_ptr().get());
+            
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
+                    assert_cast<const ColumnString*>(nested_column), 
&dict_codes));
+        } else {
+            
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
+                    assert_cast<const ColumnString*>(dict_column.get()), 
&dict_codes));
+        }
+
+        // 4. Rewrite conjuncts.
+        _rewrite_dict_conjuncts(dict_codes, slot_id);
+        ++it;
+    }
+    return Status::OK();
+}
+
+Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& 
dict_codes, int slot_id) {
+    VExpr* root;
+    if (dict_codes.size() == 1) {
+        {
+            TFunction fn;
+            TFunctionName fn_name;
+            fn_name.__set_db_name("");
+            fn_name.__set_function_name("eq");
+            fn.__set_name(fn_name);
+            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+            std::vector<TTypeDesc> arg_types;
+            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
+            fn.__set_arg_types(arg_types);
+            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            fn.__set_has_var_args(false);
+
+            TExprNode texpr_node;
+            
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+            texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
+            texpr_node.__set_opcode(TExprOpcode::EQ);
+            texpr_node.__set_vector_opcode(TExprOpcode::EQ);
+            texpr_node.__set_fn(fn);
+            texpr_node.__set_child_type(TPrimitiveType::INT);
+            texpr_node.__set_num_children(2);
+            root = _obj_pool->add(new VectorizedFnCall(texpr_node));
+        }
+        {
+            SlotDescriptor* slot = nullptr;
+            const std::vector<SlotDescriptor*>& slots = 
_tuple_descriptor->slots();
+            for (auto each : slots) {
+                if (each->id() == slot_id) {
+                    slot = each;
+                    break;
+                }
+            }
+            VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot));
+            root->add_child(slot_ref_expr);
+        }
+        {
+            TExprNode texpr_node;
+            texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
+            texpr_node.__set_type(create_type_desc(TYPE_INT));
+            TIntLiteral int_literal;
+            int_literal.__set_value(dict_codes[0]);
+            texpr_node.__set_int_literal(int_literal);
+            VExpr* literal_expr = _obj_pool->add(new VLiteral(texpr_node));
+            root->add_child(literal_expr);
+        }
+    } else {
+        {
+            TTypeDesc type_desc = 
create_type_desc(PrimitiveType::TYPE_BOOLEAN);
+            TExprNode node;
+            node.__set_type(type_desc);
+            node.__set_node_type(TExprNodeType::IN_PRED);
+            node.in_predicate.__set_is_not_in(false);
+            node.__set_opcode(TExprOpcode::FILTER_IN);
+            node.__isset.vector_opcode = true;
+            node.__set_vector_opcode(TExprOpcode::FILTER_IN);
+
+            root = _obj_pool->add(new vectorized::VDirectInPredicate(node));
+            std::shared_ptr<HybridSetBase> 
hybrid_set(create_set(PrimitiveType::TYPE_INT));
+            for (int j = 0; j < dict_codes.size(); ++j) {
+                hybrid_set->insert(&dict_codes[j]);
+            }
+            
static_cast<vectorized::VDirectInPredicate*>(root)->set_filter(hybrid_set);
+        }
+        {
+            SlotDescriptor* slot = nullptr;
+            const std::vector<SlotDescriptor*>& slots = 
_tuple_descriptor->slots();
+            for (auto each : slots) {
+                if (each->id() == slot_id) {
+                    slot = each;
+                    break;
+                }
+            }
+            VExpr* slot_ref_expr = _obj_pool->add(new VSlotRef(slot));
+            root->add_child(slot_ref_expr);
+        }
+    }
+    VExprContext* rewritten_conjunct_ctx = _obj_pool->add(new 
VExprContext(root));
+    RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
+    RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
+    _dict_filter_conjuncts.push_back(rewritten_conjunct_ctx);
+    _filter_conjuncts.push_back(rewritten_conjunct_ctx);
+    return Status::OK();
+}
+
+void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
+    for (auto& dict_filter_col_name : _dict_filter_col_names) {
+        ColumnPtr& column = block->get_by_name(dict_filter_col_name).column;
+        if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*column)) {
+            const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
+            const ColumnDictI32* dict_column =
+                    assert_cast<const ColumnDictI32*>(nested_column.get());
+            DCHECK(dict_column);
+
+            MutableColumnPtr string_column =
+                    
_column_readers[dict_filter_col_name]->convert_dict_column_to_string_column(
+                            dict_column);
+
+            size_t pos = block->get_position_by_name(dict_filter_col_name);

Review Comment:
   869 line search hash map by name, maybe better call `size_t pos = 
block->get_position_by_name(dict_filter_col_name);` in  869, only search hash 
map once time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to