zclllyybb commented on code in PR #54036:
URL: https://github.com/apache/doris/pull/54036#discussion_r2318921241


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/MapContainsEntry.java:
##########
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.scalar;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.shape.TernaryExpression;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.BooleanType;
+import org.apache.doris.nereids.types.MapType;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.nereids.types.coercion.FollowToAnyDataType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * ScalarFunction 'map_contains_entry'.
+ */
+public class MapContainsEntry extends ScalarFunction
+        implements TernaryExpression, ExplicitlyCastableSignature {
+
+    public static final List<FunctionSignature> FOLLOW_DATATYPE_SIGNATURE = 
ImmutableList.of(
+            FunctionSignature.ret(BooleanType.INSTANCE)
+                    .args(MapType.of(new AnyDataType(0), new AnyDataType(1)),
+                            new FollowToAnyDataType(0),
+                            new FollowToAnyDataType(1))
+    );
+
+    public static final List<FunctionSignature> MIN_COMMON_TYPE_SIGNATURES = 
ImmutableList.of(

Review Comment:
   为什么需要这个呢?直接把要判断contains的对象转换到map对应的类型,也就是直接用 
`FOLLOW_DATATYPE_SIGNATURE`,会有问题吗?这样BE代码也会清楚很多



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/MapContainsEntry.java:
##########
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.scalar;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.shape.TernaryExpression;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.BooleanType;
+import org.apache.doris.nereids.types.MapType;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.nereids.types.coercion.FollowToAnyDataType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * ScalarFunction 'map_contains_entry'.
+ */
+public class MapContainsEntry extends ScalarFunction
+        implements TernaryExpression, ExplicitlyCastableSignature {
+
+    public static final List<FunctionSignature> FOLLOW_DATATYPE_SIGNATURE = 
ImmutableList.of(
+            FunctionSignature.ret(BooleanType.INSTANCE)
+                    .args(MapType.of(new AnyDataType(0), new AnyDataType(1)),
+                            new FollowToAnyDataType(0),
+                            new FollowToAnyDataType(1))
+    );
+
+    public static final List<FunctionSignature> MIN_COMMON_TYPE_SIGNATURES = 
ImmutableList.of(
+            FunctionSignature.ret(BooleanType.INSTANCE)
+                    .args(MapType.of(new AnyDataType(0), new AnyDataType(1)),
+                            new AnyDataType(0),
+                            new AnyDataType(1))
+    );
+
+    /**
+     * constructor with 3 arguments.
+     */
+    public MapContainsEntry(Expression arg0, Expression arg1, Expression arg2) 
{
+        super("map_contains_entry", arg0, arg1, arg2);
+    }
+
+    /** constructor for withChildren and reuse signature */
+    public MapContainsEntry(ScalarFunctionParams functionParams) {
+        super(functionParams);
+    }
+
+    /**
+     * withChildren.
+     */
+    @Override
+    public MapContainsEntry withChildren(List<Expression> children) {
+        Preconditions.checkArgument(children.size() == 3);
+        return new MapContainsEntry(getFunctionParams(children));
+    }
+
+    @Override
+    public boolean nullable() {
+        return child(0).nullable();
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitMapContainsEntry(this, context);
+    }
+
+    @Override
+    public List<FunctionSignature> getSignatures() {
+        if (getArgument(0).getDataType().isMapType()) {

Review Comment:
   add comment to explain this



##########
be/src/vec/functions/function_map.cpp:
##########
@@ -288,6 +289,83 @@ class FunctionMapEntries : public IFunction {
     }
 };
 
+class FunctionMapToEntries : public IFunction {
+public:
+    static constexpr auto name = "map_entries";
+    static FunctionPtr create() { return 
std::make_shared<FunctionMapToEntries>(); }
+
+    /// Get function name.
+    String get_name() const override { return name; }
+
+    bool is_variadic() const override { return false; }
+
+    size_t get_number_of_arguments() const override { return 1; }
+
+    DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
+        DataTypePtr datatype = arguments[0];
+        if (datatype->is_nullable()) {
+            datatype = assert_cast<const 
DataTypeNullable*>(datatype.get())->get_nested_type();
+        }
+        DCHECK(datatype->get_primitive_type() == PrimitiveType::TYPE_MAP)
+                << "first argument for function: " << name << " should be 
DataTypeMap";
+        const auto* const datatype_map = static_cast<const 
DataTypeMap*>(datatype.get());
+
+        // Create struct type with named fields "key" and "value"
+        // key and value are always nullable
+        auto struct_type = std::make_shared<DataTypeStruct>(
+                DataTypes {make_nullable(datatype_map->get_key_type()),
+                           make_nullable(datatype_map->get_value_type())},
+                Strings {"key", "value"});
+
+        // Theoretically, the struct element will never be null,
+        // but FE expects the array element to be nullable
+        return std::make_shared<DataTypeArray>(make_nullable(struct_type));
+    }
+
+    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                        uint32_t result, size_t input_rows_count) const 
override {
+        auto left_column =
+                
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+        const ColumnMap* map_column = nullptr;
+        ColumnPtr nullmap_column = nullptr;
+
+        if (left_column->is_nullable()) {

Review Comment:
   
这个地方其实完全不用考虑最外层nullable和const的问题,我们在函数外层框架有default_implementation_for_xxx。单参数的直接在外层处理了



##########
be/src/vec/functions/function_map.cpp:
##########
@@ -440,13 +518,400 @@ class FunctionStrToMap : public IFunction {
     }
 };
 
+class FunctionMapContainsEntry : public IFunction {
+public:
+    static constexpr auto name = "map_contains_entry";
+    static FunctionPtr create() { return 
std::make_shared<FunctionMapContainsEntry>(); }
+
+    String get_name() const override { return name; }
+    size_t get_number_of_arguments() const override { return 3; }
+    bool use_default_implementation_for_nulls() const override { return false; 
}
+
+    DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
+        DataTypePtr datatype = arguments[0];
+        if (datatype->is_nullable()) {
+            datatype = assert_cast<const 
DataTypeNullable*>(datatype.get())->get_nested_type();
+        }
+        DCHECK(datatype->get_primitive_type() == TYPE_MAP)
+                << "first argument for function: " << name << " should be 
DataTypeMap";
+
+        if (arguments[0]->is_nullable()) {
+            return make_nullable(std::make_shared<DataTypeBool>());
+        } else {
+            return std::make_shared<DataTypeBool>();
+        }
+    }
+
+    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                        uint32_t result, size_t input_rows_count) const 
override {
+        return _execute_type_check_and_dispatch(block, arguments, result);
+    }
+
+private:
+    // assume result_matches is initialized to all 1s
+    template <typename ColumnType>
+    void _execute_column_comparison(const IColumn& map_entry_column, const 
UInt8* map_entry_nullmap,
+                                    const IColumn& search_column, const UInt8* 
search_nullmap,
+                                    const ColumnArray::Offsets64& map_offsets,
+                                    const UInt8* map_row_nullmap, bool 
search_is_const,
+                                    ColumnUInt8& result_matches) const {
+        auto& result_data = result_matches.get_data();
+        for (size_t row = 0; row < map_offsets.size(); ++row) {
+            if (map_row_nullmap && map_row_nullmap[row]) {
+                continue;
+            }
+            size_t map_start = row == 0 ? 0 : map_offsets[row - 1];
+            size_t map_end = map_offsets[row];
+            // const column always uses index 0
+            size_t search_idx = search_is_const ? 0 : row;
+            for (size_t i = map_start; i < map_end; ++i) {
+                result_data[i] &=
+                        compare_values<ColumnType>(map_entry_column, i, 
map_entry_nullmap,
+                                                   search_column, search_idx, 
search_nullmap)
+                                ? 1
+                                : 0;
+            }
+        }
+    }
+
+    // dispatch column comparison by type, map_entry_column is the column of 
map's key or value, search_column is the column of search key or value
+    void _dispatch_column_comparison(PrimitiveType type, const IColumn& 
map_entry_column,
+                                     const UInt8* map_entry_nullmap, const 
IColumn& search_column,
+                                     const UInt8* search_nullmap,
+                                     const ColumnArray::Offsets64& map_offsets,
+                                     const UInt8* map_row_nullmap, bool 
search_is_const,
+                                     ColumnUInt8& result_matches) const {
+        switch (type) {
+        case TYPE_BOOLEAN:
+            _execute_column_comparison<ColumnUInt8>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_TINYINT:
+            _execute_column_comparison<ColumnInt8>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_SMALLINT:
+            _execute_column_comparison<ColumnInt16>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_INT:
+            _execute_column_comparison<ColumnInt32>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_BIGINT:
+            _execute_column_comparison<ColumnInt64>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_LARGEINT:
+            _execute_column_comparison<ColumnInt128>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_FLOAT:
+            _execute_column_comparison<ColumnFloat32>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DOUBLE:
+            _execute_column_comparison<ColumnFloat64>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DECIMAL32:
+            _execute_column_comparison<ColumnDecimal32>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DECIMAL64:
+            _execute_column_comparison<ColumnDecimal64>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DECIMAL128I:
+            _execute_column_comparison<ColumnDecimal128V3>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DECIMALV2:
+            _execute_column_comparison<ColumnDecimal128V2>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DECIMAL256:
+            _execute_column_comparison<ColumnDecimal256>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_STRING:
+        case TYPE_CHAR:
+        case TYPE_VARCHAR:
+            _execute_column_comparison<ColumnString>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DATE:
+            _execute_column_comparison<ColumnDate>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DATETIME:
+            _execute_column_comparison<ColumnDateTime>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DATEV2:
+            _execute_column_comparison<ColumnDateV2>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_DATETIMEV2:
+            _execute_column_comparison<ColumnDateTimeV2>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        case TYPE_TIMEV2:
+            _execute_column_comparison<ColumnTimeV2>(
+                    map_entry_column, map_entry_nullmap, search_column, 
search_nullmap, map_offsets,
+                    map_row_nullmap, search_is_const, result_matches);
+            break;
+        default:
+            break;
+        }
+    }
+
+    // main loop function
+    ColumnPtr _execute_all_rows(const ColumnMap* map_column, const ColumnPtr& 
map_row_nullmap_col,
+                                const IColumn& key_column, const UInt8* 
key_nullmap,
+                                const IColumn& value_column, const UInt8* 
value_nullmap,
+                                PrimitiveType key_type, PrimitiveType 
value_type, bool key_is_const,
+                                bool value_is_const) const {
+        const auto& map_offsets = map_column->get_offsets();
+
+        // remove the nullable wrapper of map's key and value
+        const auto& map_keys_nullable =
+                reinterpret_cast<const 
ColumnNullable&>(map_column->get_keys());
+        const IColumn* map_keys_column = 
&map_keys_nullable.get_nested_column();
+        const auto& map_keys_nullmap = 
map_keys_nullable.get_null_map_column().get_data().data();
+
+        const auto& map_values_nullable =
+                reinterpret_cast<const 
ColumnNullable&>(map_column->get_values());
+        const IColumn* map_values_column = 
&map_values_nullable.get_nested_column();
+        const auto& map_values_nullmap =
+                map_values_nullable.get_null_map_column().get_data().data();
+
+        auto result_column = ColumnUInt8::create(map_offsets.size(), 0);
+        auto& result_data = result_column->get_data();
+
+        const UInt8* map_row_nullmap = nullptr;
+        if (map_row_nullmap_col) {
+            map_row_nullmap =
+                    assert_cast<const 
ColumnUInt8&>(*map_row_nullmap_col).get_data().data();
+        }
+
+        auto matches = ColumnUInt8::create(map_keys_column->size(), 1);
+
+        // matches &= key_compare
+        _dispatch_column_comparison(key_type, *map_keys_column, 
map_keys_nullmap, key_column,
+                                    key_nullmap, map_offsets, map_row_nullmap, 
key_is_const,
+                                    *matches);
+
+        // matches &= value_compare
+        _dispatch_column_comparison(value_type, *map_values_column, 
map_values_nullmap,
+                                    value_column, value_nullmap, map_offsets, 
map_row_nullmap,
+                                    value_is_const, *matches);
+
+        // aggregate results by map boundaries
+        auto& matches_data = matches->get_data();
+        for (size_t row = 0; row < map_offsets.size(); ++row) {
+            if (map_row_nullmap && map_row_nullmap[row]) {
+                // result is null for this row
+                continue;
+            }
+
+            size_t map_start = row == 0 ? 0 : map_offsets[row - 1];
+            size_t map_end = map_offsets[row];
+
+            bool found = false;
+            for (size_t i = map_start; i < map_end && !found; ++i) {
+                if (matches_data[i]) {
+                    found = true;
+                    break;
+                }
+            }
+            result_data[row] = found;
+        }
+
+        if (map_row_nullmap_col) {
+            return ColumnNullable::create(std::move(result_column), 
map_row_nullmap_col);
+        }
+        return result_column;
+    }
+
+    // type comparability check and dispatch
+    Status _execute_type_check_and_dispatch(Block& block, const ColumnNumbers& 
arguments,
+                                            uint32_t result) const {
+        // get type information
+        auto map_type = 
remove_nullable(block.get_by_position(arguments[0]).type);
+        const auto* map_datatype = assert_cast<const 
DataTypeMap*>(map_type.get());
+        auto map_key_type = remove_nullable(map_datatype->get_key_type());
+        auto map_value_type = remove_nullable(map_datatype->get_value_type());
+        auto search_key_type = 
remove_nullable(block.get_by_position(arguments[1]).type);
+        auto search_value_type = 
remove_nullable(block.get_by_position(arguments[2]).type);
+
+        bool key_types_comparable = 
type_comparable(map_key_type->get_primitive_type(),
+                                                    
search_key_type->get_primitive_type());
+        bool value_types_comparable = 
type_comparable(map_value_type->get_primitive_type(),
+                                                      
search_value_type->get_primitive_type());
+
+        // if types are not comparable, return error
+        if (!key_types_comparable || !value_types_comparable) {
+            return Status::RuntimeError(
+                    "Type mismatch for function {}. "
+                    "Map key type: {}, search key type: {}. "
+                    "Map value type: {}, search value type: {}.",
+                    get_name(), map_key_type->get_name(), 
search_key_type->get_name(),
+                    map_value_type->get_name(), search_value_type->get_name());
+        }
+
+        // type check passed, extract columns and execute
+        // extract map column
+        auto map_column_ptr =
+                
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+        const ColumnMap* map_column = nullptr;
+        ColumnPtr map_row_nullmap_col = nullptr;
+        if (map_column_ptr->is_nullable()) {
+            const auto* nullable_column =
+                    reinterpret_cast<const 
ColumnNullable*>(map_column_ptr.get());
+            map_column = 
check_and_get_column<ColumnMap>(nullable_column->get_nested_column());
+            map_row_nullmap_col = nullable_column->get_null_map_column_ptr();
+        } else {
+            map_column = 
check_and_get_column<ColumnMap>(*map_column_ptr.get());
+        }
+        if (!map_column) {
+            return Status::RuntimeError("unsupported types for function 
{}({})", get_name(),
+                                        
block.get_by_position(arguments[0]).type->get_name());
+        }
+
+        // extract (search) key and value columns
+        const auto& [key_column_ptr, key_is_const] =
+                unpack_if_const(block.get_by_position(arguments[1]).column);
+        const auto& [value_column_ptr, value_is_const] =
+                unpack_if_const(block.get_by_position(arguments[2]).column);
+
+        const IColumn* key_column = nullptr;
+        const IColumn* value_column = nullptr;
+        const UInt8* key_nullmap = nullptr;
+        const UInt8* value_nullmap = nullptr;
+
+        if (key_column_ptr->is_nullable()) {
+            const auto* nullable_column = assert_cast<const 
ColumnNullable*>(key_column_ptr.get());
+            key_column = &nullable_column->get_nested_column();
+            key_nullmap = 
nullable_column->get_null_map_column().get_data().data();
+        } else {
+            key_column = key_column_ptr.get();
+        }
+
+        if (value_column_ptr->is_nullable()) {
+            const auto* nullable_column =
+                    assert_cast<const ColumnNullable*>(value_column_ptr.get());
+            value_column = &nullable_column->get_nested_column();
+            value_nullmap = 
nullable_column->get_null_map_column().get_data().data();
+        } else {
+            value_column = value_column_ptr.get();
+        }
+
+        ColumnPtr return_column = _execute_all_rows(
+                map_column, map_row_nullmap_col, *key_column, key_nullmap, 
*value_column,
+                value_nullmap, map_key_type->get_primitive_type(),
+                map_value_type->get_primitive_type(), key_is_const, 
value_is_const);
+
+        if (return_column) {
+            block.replace_by_position(result, std::move(return_column));
+            return Status::OK();
+        }
+
+        return Status::RuntimeError(
+                "execute failed or unsupported types for function {}({}, {}, 
{})", get_name(),
+                block.get_by_position(arguments[0]).type->get_name(),
+                block.get_by_position(arguments[1]).type->get_name(),
+                block.get_by_position(arguments[2]).type->get_name());
+    }
+
+    // generic type-specialized comparison function
+    template <typename ColumnType>
+    bool compare_values(const IColumn& left_col, size_t left_idx, const UInt8* 
left_nullmap,
+                        const IColumn& right_col, size_t right_idx,
+                        const UInt8* right_nullmap) const {
+        // handle null values
+        bool left_is_null = left_nullmap && left_nullmap[left_idx];
+        bool right_is_null = right_nullmap && right_nullmap[right_idx];
+
+        if (left_is_null && right_is_null) {
+            return true;
+        }
+        if (left_is_null || right_is_null) {
+            return false;
+        }
+
+        // type-specialized handling
+        if constexpr (std::is_same_v<ColumnType, ColumnString>) {

Review Comment:
   为啥要分类型自己处理呢?直接assert_cast到目标类型然后compare_at是不是就行了?



##########
be/src/vec/functions/function_map.cpp:
##########
@@ -288,6 +289,83 @@ class FunctionMapEntries : public IFunction {
     }
 };
 
+class FunctionMapToEntries : public IFunction {

Review Comment:
   just call it `FunctionMapEntries` and rename the conflict symbol



##########
regression-test/suites/load_p0/stream_load/test_map_load_and_function.groovy:
##########
@@ -170,6 +170,148 @@ suite("test_map_load_and_function", "p0") {
     qt_select_map_values1 "SELECT map_values(map('k11', 1000, 'k22', 2000))"
     qt_select_map_values2 "SELECT id, m, map_values(m) FROM ${testTable} ORDER 
BY id"
 
+    // map_contains_entry: basic tests
+    qt_select_map_contains_entry1 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), 'k11', 1000)"
+    qt_select_map_contains_entry2 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), 'k22', 2000)"
+    qt_select_map_contains_entry3 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), 'k11', 2000)"
+    qt_select_map_contains_entry4 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), 'k33', 1000)"
+    qt_select_map_contains_entry5 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), 'nokey', 1000)"
+    qt_select_map_contains_entry6 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), '', 1000)"
+    qt_select_map_contains_entry7 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), NULL, 1000)"
+    qt_select_map_contains_entry8 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), 'k11', NULL)"
+    qt_select_map_contains_entry9 "SELECT map_contains_entry(map('k11', 1000, 
'k22', 2000), NULL, NULL)"
+    qt_select_map_contains_entry10 "SELECT map_contains_entry(map('', 0), '', 
0)"
+    qt_select_map_contains_entry11 "SELECT map_contains_entry(map(), 'k1', 
100)"  
+    qt_select_map_contains_entry12 "SELECT map_contains_entry(map('k1', 100), 
'', 100)"
+    qt_select_map_contains_entry13 "SELECT map_contains_entry(map(NULL, 100), 
NULL, 100)"
+    qt_select_map_contains_entry14 "SELECT map_contains_entry(map('k1', NULL), 
'k1', NULL)"
+
+    // map_contains_entry: tests with actual data from first table
+    qt_select_map_contains_entry101 "SELECT id, m, map_contains_entry(m, 'k1', 
23) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry102 "SELECT id, m, map_contains_entry(m, 'k2', 
20) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry103 "SELECT id, m, map_contains_entry(m, '  
11amory  ', 66) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry104 "SELECT id, m, map_contains_entry(m, 
'beat', 31) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry105 "SELECT id, m, map_contains_entry(m, ' 
clever ', 300) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry106 "SELECT id, m, map_contains_entry(m, '  
33,amory  ', 41) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry107 "SELECT id, m, map_contains_entry(m, ' bet 
', 400) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry108 "SELECT id, m, map_contains_entry(m, ' 
cler ', 33) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry109 "SELECT id, m, map_contains_entry(m, '  
1,amy  ', 2) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry110 "SELECT id, m, map_contains_entry(m, ' k2 
', 26) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry111 "SELECT id, m, map_contains_entry(m, 
'null', 90) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry112 "SELECT id, m, map_contains_entry(m, '', 
33) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry113 "SELECT id, m, map_contains_entry(m, NULL, 
1) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry114 "SELECT id, m, map_contains_entry(m, 
'nokey', 100) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry115 "SELECT id, m, map_contains_entry(m, 'k1', 
NULL) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry116 "SELECT id, m, map_contains_entry(m, NULL, 
NULL) FROM ${testTable} ORDER BY id"
+    qt_select_map_contains_entry_var2 "SELECT id, m, map_keys(m)[1], 
map_values(m)[1], map_contains_entry(m, map_keys(m)[1], map_values(m)[1]) FROM 
${testTable} ORDER BY id"
+
+    // map_contains_entry: tests with duplicate keys
+    qt_select_map_contains_entry_dup1 "SELECT map_contains_entry(map('k1', 
100, 'k1', 200), 'k1', 100)"
+    qt_select_map_contains_entry_dup2 "SELECT map_contains_entry(map('k1', 
100, 'k1', 200), 'k1', 200)"
+    qt_select_map_contains_entry_dup3 "SELECT map_contains_entry(map('k1', 
100, 'k1', 200), 'k1', 300)"
+    qt_select_map_contains_entry_dup4 "SELECT map_contains_entry(map('k1', 
100, 'k2', 200, 'k1', 300), 'k1', 100)"
+    qt_select_map_contains_entry_dup5 "SELECT map_contains_entry(map('k1', 
100, 'k2', 200, 'k1', 300), 'k1', 300)"
+    qt_select_map_contains_entry_dup6 "SELECT map_contains_entry(map('k1', 
100, 'k2', 200, 'k1', 300), 'k2', 200)"
+    qt_select_map_contains_entry_dup7 "SELECT map_contains_entry(map(1, 'a', 
2, 'b', 1, 'c'), 1, 'a')"
+    qt_select_map_contains_entry_dup8 "SELECT map_contains_entry(map(1, 'a', 
2, 'b', 1, 'c'), 1, 'c')"
+    qt_select_map_contains_entry_dup9 "SELECT map_contains_entry(map(1, 'a', 
2, 'b', 1, 'c'), 1, 'b')"
+    qt_select_map_contains_entry_dup10 "SELECT map_contains_entry(map('k1', 
NULL, 'k1', 100), 'k1', NULL)"
+    qt_select_map_contains_entry_dup11 "SELECT map_contains_entry(map('k1', 
NULL, 'k1', 100), 'k1', 100)"
+    qt_select_map_contains_entry_dup12 "SELECT map_contains_entry(map(NULL, 
100, NULL, 200), NULL, 100)"
+    qt_select_map_contains_entry_dup13 "SELECT map_contains_entry(map(NULL, 
100, NULL, 200), NULL, 200)"
+    qt_select_map_contains_entry_dup14 "SELECT map_contains_entry(map('a', 1, 
'b', 2, 'a', 3, 'c', 4, 'a', 5), 'a', 1)"
+    qt_select_map_contains_entry_dup15 "SELECT map_contains_entry(map('a', 1, 
'b', 2, 'a', 3, 'c', 4, 'a', 5), 'a', 3)"
+    qt_select_map_contains_entry_dup16 "SELECT map_contains_entry(map('a', 1, 
'b', 2, 'a', 3, 'c', 4, 'a', 5), 'a', 5)"
+    qt_select_map_contains_entry_dup17 "SELECT map_contains_entry(map('a', 1, 
'b', 2, 'a', 3, 'c', 4, 'a', 5), 'a', 2)"
+
+    // map_contains_entry: tests with time/date type
+    qt_select_map_contains_entry_date1 "SELECT 
map_contains_entry(map('2023-01-01', CAST('2023-01-01' AS DATE), '2023-12-31', 
CAST('2023-12-31' AS DATE)), '2023-01-01', CAST('2023-01-01' AS DATE))"
+    qt_select_map_contains_entry_date2 "SELECT 
map_contains_entry(map('2023-01-01', CAST('2023-01-01' AS DATE), '2023-12-31', 
CAST('2023-12-31' AS DATE)), '2023-01-01', CAST('2023-01-02' AS DATE))"
+    qt_select_map_contains_entry_date3 "SELECT 
map_contains_entry(map(CAST('2023-01-01' AS DATE), 'start', CAST('2023-12-31' 
AS DATE), 'end'), CAST('2023-01-01' AS DATE), 'start')"
+    qt_select_map_contains_entry_date4 "SELECT 
map_contains_entry(map(CAST('2023-01-01' AS DATE), 'start', CAST('2023-12-31' 
AS DATE), 'end'), CAST('2023-01-01' AS DATE), 'end')"
+    qt_select_map_contains_entry_datetime1 "SELECT 
map_contains_entry(map('dt1', CAST('2023-01-01 10:30:00' AS DATETIME), 'dt2', 
CAST('2023-12-31 23:59:59' AS DATETIME)), 'dt1', CAST('2023-01-01 10:30:00' AS 
DATETIME))"
+    qt_select_map_contains_entry_datetime2 "SELECT 
map_contains_entry(map('dt1', CAST('2023-01-01 10:30:00' AS DATETIME), 'dt2', 
CAST('2023-12-31 23:59:59' AS DATETIME)), 'dt1', CAST('2023-01-01 10:30:01' AS 
DATETIME))"
+    qt_select_map_contains_entry_datetime3 "SELECT 
map_contains_entry(map(CAST('2023-01-01 10:30:00' AS DATETIME), 'morning', 
CAST('2023-12-31 23:59:59' AS DATETIME), 'night'), CAST('2023-01-01 10:30:00' 
AS DATETIME), 'morning')"
+    qt_select_map_contains_entry_datetime4 "SELECT 
map_contains_entry(map(CAST('2023-01-01 10:30:00' AS DATETIME), 'morning', 
CAST('2023-12-31 23:59:59' AS DATETIME), 'night'), CAST('2023-01-01 10:30:00' 
AS DATETIME), 'night')"
+    qt_select_map_contains_entry_datev2_1 "SELECT 
map_contains_entry(map('start', datev2('2023-01-01'), 'end', 
datev2('2023-12-31')), 'start', datev2('2023-01-01'))"
+    qt_select_map_contains_entry_datev2_2 "SELECT 
map_contains_entry(map('start', datev2('2023-01-01'), 'end', 
datev2('2023-12-31')), 'start', datev2('2023-01-02'))"
+    qt_select_map_contains_entry_datev2_3 "SELECT 
map_contains_entry(map(datev2('2023-01-01'), 100, datev2('2023-12-31'), 200), 
datev2('2023-01-01'), 100)"
+    qt_select_map_contains_entry_datev2_4 "SELECT 
map_contains_entry(map(datev2('2023-01-01'), 100, datev2('2023-12-31'), 200), 
datev2('2023-01-01'), 200)"
+    qt_select_map_contains_entry_datetimev2_1 "SELECT 
map_contains_entry(map('event1', CAST('2023-01-01 10:30:00.123' AS 
DATETIMEV2(3)), 'event2', CAST('2023-12-31 23:59:59.999' AS DATETIMEV2(3))), 
'event1', CAST('2023-01-01 10:30:00.123' AS DATETIMEV2(3)))"
+    qt_select_map_contains_entry_datetimev2_2 "SELECT 
map_contains_entry(map('event1', CAST('2023-01-01 10:30:00.123' AS 
DATETIMEV2(3)), 'event2', CAST('2023-12-31 23:59:59.999' AS DATETIMEV2(3))), 
'event1', CAST('2023-01-01 10:30:00.124' AS DATETIMEV2(3)))"
+    qt_select_map_contains_entry_datetimev2_3 "SELECT 
map_contains_entry(map(CAST('2023-01-01 10:30:00.123' AS DATETIMEV2(3)), 'A', 
CAST('2023-12-31 23:59:59.999' AS DATETIMEV2(3)), 'B'), CAST('2023-01-01 
10:30:00.123' AS DATETIMEV2(3)), 'A')"
+    qt_select_map_contains_entry_datetimev2_4 "SELECT 
map_contains_entry(map(CAST('2023-01-01 10:30:00.123' AS DATETIMEV2(3)), 'A', 
CAST('2023-12-31 23:59:59.999' AS DATETIMEV2(3)), 'B'), CAST('2023-01-01 
10:30:00.123' AS DATETIMEV2(3)), 'B')"
+
+    // map_contains_entry: test all type combination (N^2 auto-generated)
+    def allTypes = [
+        [type: "STRING", testKey: "'str1'", testValue: "'str2'"],
+        [type: "BOOLEAN", testKey: "true", testValue: "false"],
+        [type: "TINYINT", testKey: "CAST(1 AS TINYINT)", testValue: "CAST(2 AS 
TINYINT)"],
+        [type: "SMALLINT", testKey: "CAST(100 AS SMALLINT)", testValue: 
"CAST(200 AS SMALLINT)"],
+        [type: "INT", testKey: "CAST(1000 AS INT)", testValue: "CAST(2000 AS 
INT)"],
+        [type: "BIGINT", testKey: "CAST(10000 AS BIGINT)", testValue: 
"CAST(20000 AS BIGINT)"],
+        [type: "LARGEINT", testKey: "CAST(100000 AS LARGEINT)", testValue: 
"CAST(200000 AS LARGEINT)"],
+        [type: "FLOAT", testKey: "CAST(1.1 AS FLOAT)", testValue: "CAST(2.2 AS 
FLOAT)"],
+        [type: "DOUBLE", testKey: "CAST(10.1 AS DOUBLE)", testValue: 
"CAST(20.2 AS DOUBLE)"],
+        [type: "DATE", testKey: "CAST('2023-01-01' AS DATE)", testValue: 
"CAST('2023-12-31' AS DATE)"],
+        [type: "DATETIME", testKey: "CAST('2023-01-01 10:30:00' AS DATETIME)", 
testValue: "CAST('2023-12-31 23:59:59' AS DATETIME)"],
+        [type: "DATEV2", testKey: "datev2('2023-01-01')", testValue: 
"datev2('2023-12-31')"],
+        [type: "DATETIMEV2(3)", testKey: "CAST('2023-01-01 10:30:00.123' AS 
DATETIMEV2(3))", testValue: "CAST('2023-12-31 23:59:59.999' AS DATETIMEV2(3))"],
+        [type: "CHAR(5)", testKey: "CAST('abc' AS CHAR(5))", testValue: 
"CAST('xyz' AS CHAR(5))"],
+        [type: "VARCHAR(10)", testKey: "CAST('hello' AS VARCHAR(10))", 
testValue: "CAST('world' AS VARCHAR(10))"]
+    ]
+
+    def testCounter = 1
+    for (int i = 0; i < allTypes.size(); i++) {
+        for (int j = 0; j < allTypes.size(); j++) {
+            def keyType = allTypes[i]
+            def valueType = allTypes[j]
+            def testName = 
"qt_select_map_contains_entry_type_combo_${testCounter}"
+            def mapExpr = "map(${keyType.testKey}, ${valueType.testValue})"
+            def sqlQuery = "SELECT map_contains_entry(${mapExpr}, 
${keyType.testKey}, ${valueType.testValue})"
+            
+            def sqlResult = sql sqlQuery
+            assertTrue(sqlResult[0][0], "MapContainsEntry: Combination 
${testCounter} failed, KeyType: ${keyType.type}, ValueType: ${valueType.type}")
+            testCounter++
+        }
+    }
+
+    // map_contains_entry: decimal-only tests
+    def decimalTypes = [
+        [type: "DECIMALV2(10,2)", testKey: "CAST(123.45 AS DECIMALV2(10,2))", 
testValue: "CAST(234.56 AS DECIMALV2(10,2))"],
+        [type: "DECIMAL(7,3)",    testKey: "CAST(123.456 AS DECIMAL(7,3))",    
testValue: "CAST(234.567 AS DECIMAL(7,3))"],
+        [type: "DECIMAL(10,2)",   testKey: "CAST(123.45 AS DECIMAL(10,2))",    
testValue: "CAST(234.56 AS DECIMAL(10,2))"],
+        [type: "DECIMAL(30,10)",  testKey: "CAST(12345.6789 AS 
DECIMAL(30,10))", testValue: "CAST(23456.7891 AS DECIMAL(30,10))"]
+    ]
+
+    for (def d : decimalTypes) {
+        def mapExpr = "map(${d.testKey}, ${d.testValue})"
+        def sqlQuery = "SELECT map_contains_entry(${mapExpr}, ${d.testKey}, 
${d.testValue})"
+        def sqlResult = sql sqlQuery
+        assertTrue(sqlResult[0][0], "MapContainsEntry: Decimal tests failed 
for ${d.type}")
+    }
+
+    // map_entries: basic tests
+    qt_select_map_entries1 "SELECT map_entries(map('k11', 1000, 'k22', 2000))"

Review Comment:
   please add testcase of `map(null, null)` and `null`.



##########
be/src/vec/functions/function_map.cpp:
##########
@@ -288,6 +289,83 @@ class FunctionMapEntries : public IFunction {
     }
 };
 
+class FunctionMapToEntries : public IFunction {
+public:
+    static constexpr auto name = "map_entries";
+    static FunctionPtr create() { return 
std::make_shared<FunctionMapToEntries>(); }
+
+    /// Get function name.
+    String get_name() const override { return name; }
+
+    bool is_variadic() const override { return false; }
+
+    size_t get_number_of_arguments() const override { return 1; }
+
+    DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
+        DataTypePtr datatype = arguments[0];
+        if (datatype->is_nullable()) {
+            datatype = assert_cast<const 
DataTypeNullable*>(datatype.get())->get_nested_type();
+        }
+        DCHECK(datatype->get_primitive_type() == PrimitiveType::TYPE_MAP)
+                << "first argument for function: " << name << " should be 
DataTypeMap";
+        const auto* const datatype_map = static_cast<const 
DataTypeMap*>(datatype.get());
+
+        // Create struct type with named fields "key" and "value"
+        // key and value are always nullable
+        auto struct_type = std::make_shared<DataTypeStruct>(
+                DataTypes {make_nullable(datatype_map->get_key_type()),
+                           make_nullable(datatype_map->get_value_type())},
+                Strings {"key", "value"});
+
+        // Theoretically, the struct element will never be null,
+        // but FE expects the array element to be nullable
+        return std::make_shared<DataTypeArray>(make_nullable(struct_type));
+    }
+
+    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                        uint32_t result, size_t input_rows_count) const 
override {
+        auto left_column =
+                
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+        const ColumnMap* map_column = nullptr;
+        ColumnPtr nullmap_column = nullptr;
+
+        if (left_column->is_nullable()) {

Review Comment:
   map_column 直接assert_cast出来就行



##########
be/src/vec/functions/function_map.cpp:
##########
@@ -440,13 +518,400 @@ class FunctionStrToMap : public IFunction {
     }
 };
 
+class FunctionMapContainsEntry : public IFunction {
+public:
+    static constexpr auto name = "map_contains_entry";
+    static FunctionPtr create() { return 
std::make_shared<FunctionMapContainsEntry>(); }
+
+    String get_name() const override { return name; }
+    size_t get_number_of_arguments() const override { return 3; }
+    bool use_default_implementation_for_nulls() const override { return false; 
}
+
+    DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
+        DataTypePtr datatype = arguments[0];
+        if (datatype->is_nullable()) {
+            datatype = assert_cast<const 
DataTypeNullable*>(datatype.get())->get_nested_type();
+        }
+        DCHECK(datatype->get_primitive_type() == TYPE_MAP)
+                << "first argument for function: " << name << " should be 
DataTypeMap";
+
+        if (arguments[0]->is_nullable()) {
+            return make_nullable(std::make_shared<DataTypeBool>());
+        } else {
+            return std::make_shared<DataTypeBool>();
+        }
+    }
+
+    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                        uint32_t result, size_t input_rows_count) const 
override {
+        return _execute_type_check_and_dispatch(block, arguments, result);
+    }
+
+private:
+    // assume result_matches is initialized to all 1s
+    template <typename ColumnType>
+    void _execute_column_comparison(const IColumn& map_entry_column, const 
UInt8* map_entry_nullmap,
+                                    const IColumn& search_column, const UInt8* 
search_nullmap,
+                                    const ColumnArray::Offsets64& map_offsets,
+                                    const UInt8* map_row_nullmap, bool 
search_is_const,
+                                    ColumnUInt8& result_matches) const {
+        auto& result_data = result_matches.get_data();
+        for (size_t row = 0; row < map_offsets.size(); ++row) {
+            if (map_row_nullmap && map_row_nullmap[row]) {
+                continue;
+            }
+            size_t map_start = row == 0 ? 0 : map_offsets[row - 1];
+            size_t map_end = map_offsets[row];
+            // const column always uses index 0
+            size_t search_idx = search_is_const ? 0 : row;
+            for (size_t i = map_start; i < map_end; ++i) {
+                result_data[i] &=
+                        compare_values<ColumnType>(map_entry_column, i, 
map_entry_nullmap,
+                                                   search_column, search_idx, 
search_nullmap)
+                                ? 1
+                                : 0;
+            }
+        }
+    }
+
+    // dispatch column comparison by type, map_entry_column is the column of 
map's key or value, search_column is the column of search key or value
+    void _dispatch_column_comparison(PrimitiveType type, const IColumn& 
map_entry_column,
+                                     const UInt8* map_entry_nullmap, const 
IColumn& search_column,
+                                     const UInt8* search_nullmap,
+                                     const ColumnArray::Offsets64& map_offsets,
+                                     const UInt8* map_row_nullmap, bool 
search_is_const,
+                                     ColumnUInt8& result_matches) const {
+        switch (type) {
+        case TYPE_BOOLEAN:

Review Comment:
   why dont have ip types?



##########
be/src/vec/functions/function_map.cpp:
##########
@@ -288,6 +289,83 @@ class FunctionMapEntries : public IFunction {
     }
 };
 
+class FunctionMapToEntries : public IFunction {
+public:
+    static constexpr auto name = "map_entries";
+    static FunctionPtr create() { return 
std::make_shared<FunctionMapToEntries>(); }
+
+    /// Get function name.
+    String get_name() const override { return name; }
+
+    bool is_variadic() const override { return false; }
+
+    size_t get_number_of_arguments() const override { return 1; }
+
+    DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
+        DataTypePtr datatype = arguments[0];
+        if (datatype->is_nullable()) {
+            datatype = assert_cast<const 
DataTypeNullable*>(datatype.get())->get_nested_type();
+        }
+        DCHECK(datatype->get_primitive_type() == PrimitiveType::TYPE_MAP)
+                << "first argument for function: " << name << " should be 
DataTypeMap";
+        const auto* const datatype_map = static_cast<const 
DataTypeMap*>(datatype.get());
+
+        // Create struct type with named fields "key" and "value"
+        // key and value are always nullable
+        auto struct_type = std::make_shared<DataTypeStruct>(
+                DataTypes {make_nullable(datatype_map->get_key_type()),
+                           make_nullable(datatype_map->get_value_type())},
+                Strings {"key", "value"});
+
+        // Theoretically, the struct element will never be null,
+        // but FE expects the array element to be nullable
+        return std::make_shared<DataTypeArray>(make_nullable(struct_type));
+    }
+
+    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                        uint32_t result, size_t input_rows_count) const 
override {
+        auto left_column =
+                
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();

Review Comment:
   no need to convert_to_full_column_if_const, we have default implementation.



##########
be/src/vec/functions/function_map.cpp:
##########
@@ -288,6 +289,83 @@ class FunctionMapEntries : public IFunction {
     }
 };
 
+class FunctionMapToEntries : public IFunction {
+public:
+    static constexpr auto name = "map_entries";
+    static FunctionPtr create() { return 
std::make_shared<FunctionMapToEntries>(); }
+
+    /// Get function name.
+    String get_name() const override { return name; }
+
+    bool is_variadic() const override { return false; }
+
+    size_t get_number_of_arguments() const override { return 1; }
+
+    DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
+        DataTypePtr datatype = arguments[0];
+        if (datatype->is_nullable()) {
+            datatype = assert_cast<const 
DataTypeNullable*>(datatype.get())->get_nested_type();
+        }
+        DCHECK(datatype->get_primitive_type() == PrimitiveType::TYPE_MAP)
+                << "first argument for function: " << name << " should be 
DataTypeMap";
+        const auto* const datatype_map = static_cast<const 
DataTypeMap*>(datatype.get());
+
+        // Create struct type with named fields "key" and "value"
+        // key and value are always nullable
+        auto struct_type = std::make_shared<DataTypeStruct>(
+                DataTypes {make_nullable(datatype_map->get_key_type()),
+                           make_nullable(datatype_map->get_value_type())},
+                Strings {"key", "value"});
+
+        // Theoretically, the struct element will never be null,
+        // but FE expects the array element to be nullable
+        return std::make_shared<DataTypeArray>(make_nullable(struct_type));
+    }
+
+    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                        uint32_t result, size_t input_rows_count) const 
override {
+        auto left_column =
+                
block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
+        const ColumnMap* map_column = nullptr;
+        ColumnPtr nullmap_column = nullptr;
+
+        if (left_column->is_nullable()) {
+            const auto* nullable_column =
+                    reinterpret_cast<const ColumnNullable*>(left_column.get());
+            map_column = 
check_and_get_column<ColumnMap>(nullable_column->get_nested_column());
+            nullmap_column = nullable_column->get_null_map_column_ptr();
+        } else {
+            map_column = check_and_get_column<ColumnMap>(*left_column.get());
+        }
+
+        if (!map_column) {
+            return Status::RuntimeError("unsupported types for function 
{}({})", get_name(),
+                                        
block.get_by_position(arguments[0]).type->get_name());
+        }
+
+        auto struct_column = ColumnStruct::create(
+                Columns {map_column->get_keys_ptr(), 
map_column->get_values_ptr()});
+
+        // all struct elements are not null
+        auto struct_null_map = ColumnUInt8::create(struct_column->size(), 0);
+        auto nullable_struct_column =
+                ColumnNullable::create(std::move(struct_column), 
std::move(struct_null_map));
+
+        auto result_array_column = 
ColumnArray::create(std::move(nullable_struct_column),
+                                                       
map_column->get_offsets_ptr());
+
+        // Handle nullable case for the whole array
+        if (nullmap_column) {

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to