This is an automated email from the ASF dual-hosted git repository.

xuyang pushed a commit to branch struct-type
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/struct-type by this push:
     new af9234b3b6 [Feature](map) add map type to doris (#15966)
af9234b3b6 is described below

commit af9234b3b6398573425b9cd43e23cfada53d07b6
Author: amory <wangqian...@selectdb.com>
AuthorDate: Fri Feb 3 13:26:29 2023 +0800

    [Feature](map) add map type to doris (#15966)
    
    Add complex type map to doris on vectorized engine
---
 be/src/exprs/anyval_util.cpp                       |   6 +
 be/src/olap/field.h                                |  33 +-
 be/src/olap/page_cache.cpp                         |   1 -
 be/src/olap/rowset/segment_v2/column_reader.cpp    | 105 +++++++
 be/src/olap/rowset/segment_v2/column_reader.h      |  35 +++
 be/src/olap/rowset/segment_v2/column_writer.cpp    | 188 +++++++++++-
 be/src/olap/rowset/segment_v2/column_writer.h      |  55 ++++
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  10 +
 be/src/olap/tablet_meta.cpp                        |   6 +
 be/src/olap/tablet_schema.cpp                      |  18 ++
 be/src/olap/types.cpp                              |  48 ++-
 be/src/olap/types.h                                |  92 ++++++
 be/src/runtime/CMakeLists.txt                      |   1 +
 be/src/runtime/map_value.cpp                       |  29 ++
 be/src/runtime/map_value.h                         |  61 ++++
 be/src/runtime/primitive_type.cpp                  |  14 +
 be/src/runtime/types.cpp                           | 144 +++++----
 be/src/udf/udf.h                                   |   2 +
 be/src/vec/CMakeLists.txt                          |   3 +
 be/src/vec/columns/column.h                        |   2 +
 be/src/vec/columns/column_map.cpp                  | 186 +++++++++++
 be/src/vec/columns/column_map.h                    | 153 ++++++++++
 be/src/vec/core/field.h                            |  32 ++
 be/src/vec/core/types.h                            |   3 +
 be/src/vec/data_types/data_type.cpp                |   2 +
 be/src/vec/data_types/data_type.h                  |   5 +-
 be/src/vec/data_types/data_type_factory.cpp        |  29 ++
 be/src/vec/data_types/data_type_factory.hpp        |   1 +
 be/src/vec/data_types/data_type_map.cpp            | 202 ++++++++++++
 be/src/vec/data_types/data_type_map.h              |  81 +++++
 be/src/vec/exprs/vexpr.cpp                         | 339 +++++++++++----------
 be/src/vec/exprs/vmap_literal.cpp                  |  52 ++++
 be/src/vec/exprs/vmap_literal.h                    |  33 ++
 .../vec/functions/array/function_array_element.h   | 104 ++++++-
 be/src/vec/olap/olap_data_convertor.cpp            |  62 +++-
 be/src/vec/olap/olap_data_convertor.h              |  25 ++
 be/src/vec/sink/vmysql_result_writer.cpp           |  29 ++
 fe/fe-core/src/main/cup/sql_parser.cup             |  32 +-
 .../java/org/apache/doris/analysis/CastExpr.java   |   5 +
 .../main/java/org/apache/doris/analysis/Expr.java  |   9 +-
 .../java/org/apache/doris/analysis/MapLiteral.java | 177 +++++++++++
 .../main/java/org/apache/doris/catalog/Column.java |  14 +
 .../java/org/apache/doris/catalog/FunctionSet.java |   3 +-
 .../java/org/apache/doris/catalog/MapType.java     |  48 +++
 .../org/apache/doris/catalog/PrimitiveType.java    |   6 +
 .../main/java/org/apache/doris/catalog/Type.java   |   2 +
 .../java/org/apache/doris/common/util/Util.java    |   1 +
 .../java/org/apache/doris/mysql/MysqlColType.java  |   3 +-
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   2 +
 gensrc/script/doris_builtins_functions.py          |   4 +
 gensrc/script/gen_builtins_functions.py            |   6 +
 gensrc/thrift/Exprs.thrift                         |   3 +
 52 files changed, 2235 insertions(+), 271 deletions(-)

diff --git a/be/src/exprs/anyval_util.cpp b/be/src/exprs/anyval_util.cpp
index b83e04e9ce..26aa171bb1 100644
--- a/be/src/exprs/anyval_util.cpp
+++ b/be/src/exprs/anyval_util.cpp
@@ -212,6 +212,12 @@ FunctionContext::TypeDesc 
AnyValUtil::column_type_to_type_desc(const TypeDescrip
             out.children.push_back(column_type_to_type_desc(t));
         }
         break;
+    case TYPE_MAP:
+        out.type = FunctionContext::TYPE_MAP;
+        for (const auto& t : type.children) {
+            out.children.push_back(column_type_to_type_desc(t));
+        }
+        break;
     case TYPE_STRING:
         out.type = FunctionContext::TYPE_STRING;
         out.len = type.len;
diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 97a516f32c..56f26c33aa 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -29,6 +29,7 @@
 #include "olap/types.h"
 #include "olap/utils.h"
 #include "runtime/collection_value.h"
+#include "runtime/map_value.h"
 #include "runtime/mem_pool.h"
 #include "util/hash_util.hpp"
 #include "util/mem_util.hpp"
@@ -455,10 +456,9 @@ uint32_t Field::hash_code(const CellType& cell, uint32_t 
seed) const {
     return _type_info->hash_code(cell.cell_ptr(), seed);
 }
 
-class StructField : public Field {
+class MapField : public Field {
 public:
-    explicit StructField(const TabletColumn& column) : Field(column) {}
-
+    explicit MapField(const TabletColumn& column) : Field(column) {}
     void consume(RowCursorCell* dst, const char* src, bool src_null, MemPool* 
mem_pool,
                  ObjectPool* agg_pool) const override {
         dst->set_is_null(src_null);
@@ -467,7 +467,17 @@ public:
         }
         _type_info->deep_copy(dst->mutable_cell_ptr(), src, mem_pool);
     }
+    // make variable_ptr memory allocate to cell_ptr as MapValue
+    char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
+        return variable_ptr + _length;
+    }
+
+    size_t get_variable_len() const override { return _length; }
+};
 
+class StructField : public Field {
+public:
+    explicit StructField(const TabletColumn& column) : Field(column) {}
     char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
         auto struct_v = (StructValue*)cell_ptr;
         struct_v->set_values(reinterpret_cast<void**>(variable_ptr));
@@ -798,6 +808,14 @@ public:
                 local->add_sub_field(std::move(item_field));
                 return local;
             }
+            case OLAP_FIELD_TYPE_MAP: {
+                std::unique_ptr<Field> 
key_field(FieldFactory::create(column.get_sub_column(0)));
+                std::unique_ptr<Field> 
val_field(FieldFactory::create(column.get_sub_column(1)));
+                auto* local = new MapField(column);
+                local->add_sub_field(std::move(key_field));
+                local->add_sub_field(std::move(val_field));
+                return local;
+            }
             case OLAP_FIELD_TYPE_DECIMAL:
                 [[fallthrough]];
             case OLAP_FIELD_TYPE_DECIMAL32:
@@ -847,6 +865,15 @@ public:
                 local->add_sub_field(std::move(item_field));
                 return local;
             }
+            case OLAP_FIELD_TYPE_MAP: {
+                DCHECK(column.get_subtype_count() == 2);
+                auto* local = new MapField(column);
+                std::unique_ptr<Field> 
key_field(FieldFactory::create(column.get_sub_column(0)));
+                std::unique_ptr<Field> 
value_field(FieldFactory::create(column.get_sub_column(1)));
+                local->add_sub_field(std::move(key_field));
+                local->add_sub_field(std::move(value_field));
+                return local;
+            }
             case OLAP_FIELD_TYPE_DECIMAL:
                 [[fallthrough]];
             case OLAP_FIELD_TYPE_DECIMAL32:
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 2813f85dd3..a49043aebb 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -65,7 +65,6 @@ bool StoragePageCache::lookup(const CacheKey& key, 
PageCacheHandle* handle,
 void StoragePageCache::insert(const CacheKey& key, const Slice& data, 
PageCacheHandle* handle,
                               segment_v2::PageTypePB page_type, bool 
in_memory) {
     auto deleter = [](const doris::CacheKey& key, void* value) { delete[] 
(uint8_t*)value; };
-
     CachePriority priority = CachePriority::NORMAL;
     if (in_memory) {
         priority = CachePriority::DURABLE;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 968caae9c5..64145eea4e 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -31,6 +31,7 @@
 #include "util/rle_encoding.h" // for RleDecoder
 #include "vec/columns/column.h"
 #include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
 #include "vec/columns/column_struct.h"
 #include "vec/core/types.h"
 #include "vec/runtime/vdatetime_value.h" //for VecDateTime
@@ -101,6 +102,32 @@ Status ColumnReader::create(const ColumnReaderOptions& 
opts, const ColumnMetaPB&
             *reader = std::move(array_reader);
             return Status::OK();
         }
+        case FieldType::OLAP_FIELD_TYPE_MAP: {
+            // map reader now has 3 sub readers for key(arr), value(arr), 
null(scala)
+            std::unique_ptr<ColumnReader> map_reader(
+                    new ColumnReader(opts, meta, num_rows, file_reader));
+            std::unique_ptr<ColumnReader> key_reader;
+            RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(0), num_rows,
+                                                 file_reader, &key_reader));
+            std::unique_ptr<ColumnReader> val_reader;
+            RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(1), num_rows,
+                                                 file_reader, &val_reader));
+            std::unique_ptr<ColumnReader> null_reader;
+            if (meta.is_nullable()) {
+                RETURN_IF_ERROR(ColumnReader::create(opts, 
meta.children_columns(2),
+                                                     
meta.children_columns(2).num_rows(),
+                                                     file_reader, 
&null_reader));
+            }
+            map_reader->_sub_readers.resize(meta.children_columns_size());
+
+            map_reader->_sub_readers[0] = std::move(key_reader);
+            map_reader->_sub_readers[1] = std::move(val_reader);
+            if (meta.is_nullable()) {
+                map_reader->_sub_readers[2] = std::move(null_reader);
+            }
+            *reader = std::move(map_reader);
+            return Status::OK();
+        }
         default:
             return Status::NotSupported("unsupported type for ColumnReader: 
{}",
                                         std::to_string(type));
@@ -485,6 +512,18 @@ Status ColumnReader::new_iterator(ColumnIterator** 
iterator) {
                     null_iterator);
             return Status::OK();
         }
+        case FieldType::OLAP_FIELD_TYPE_MAP: {
+            ColumnIterator* key_iterator = nullptr;
+            RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&key_iterator));
+            ColumnIterator* val_iterator = nullptr;
+            RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&val_iterator));
+            ColumnIterator* null_iterator = nullptr;
+            if (is_nullable()) {
+                RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator));
+            }
+            *iterator = new MapFileColumnIterator(this, null_iterator, 
key_iterator, val_iterator);
+            return Status::OK();
+        }
         default:
             return Status::NotSupported("unsupported type to create iterator: 
{}",
                                         std::to_string(type));
@@ -492,6 +531,72 @@ Status ColumnReader::new_iterator(ColumnIterator** 
iterator) {
     }
 }
 
+///====================== MapFileColumnIterator 
============================////
+MapFileColumnIterator::MapFileColumnIterator(ColumnReader* reader, 
ColumnIterator* null_iterator,
+                                             ColumnIterator* key_iterator,
+                                             ColumnIterator* val_iterator)
+        : _map_reader(reader) {
+    _key_iterator.reset(key_iterator);
+    _val_iterator.reset(val_iterator);
+    if (_map_reader->is_nullable()) {
+        _null_iterator.reset(null_iterator);
+    }
+}
+
+Status MapFileColumnIterator::init(const ColumnIteratorOptions& opts) {
+    RETURN_IF_ERROR(_key_iterator->init(opts));
+    RETURN_IF_ERROR(_val_iterator->init(opts));
+    if (_map_reader->is_nullable()) {
+        RETURN_IF_ERROR(_null_iterator->init(opts));
+    }
+    return Status::OK();
+}
+
+Status MapFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, 
bool* has_null) {
+    return Status::NotSupported("Not support next_batch");
+}
+
+Status MapFileColumnIterator::seek_to_ordinal(ordinal_t ord) {
+    RETURN_IF_ERROR(_key_iterator->seek_to_ordinal(ord));
+    RETURN_IF_ERROR(_val_iterator->seek_to_ordinal(ord));
+    if (_map_reader->is_nullable()) {
+        RETURN_IF_ERROR(_null_iterator->seek_to_ordinal(ord));
+    }
+    return Status::OK();
+}
+
+Status MapFileColumnIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                         bool* has_null) {
+    const auto* column_map = 
vectorized::check_and_get_column<vectorized::ColumnMap>(
+            dst->is_nullable() ? 
static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()
+                               : *dst);
+    size_t num_read = *n;
+    auto column_key_ptr = column_map->get_keys().assume_mutable();
+    auto column_val_ptr = column_map->get_values().assume_mutable();
+    RETURN_IF_ERROR(_key_iterator->next_batch(&num_read, column_key_ptr, 
has_null));
+    RETURN_IF_ERROR(_val_iterator->next_batch(&num_read, column_val_ptr, 
has_null));
+
+    if (dst->is_nullable()) {
+        auto null_map_ptr =
+                
static_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column_ptr();
+        bool null_signs_has_null = false;
+        RETURN_IF_ERROR(_null_iterator->next_batch(&num_read, null_map_ptr, 
&null_signs_has_null));
+        DCHECK(num_read == *n);
+    }
+    return Status::OK();
+}
+
+Status MapFileColumnIterator::read_by_rowids(const rowid_t* rowids, const 
size_t count,
+                                             vectorized::MutableColumnPtr& 
dst) {
+    for (size_t i = 0; i < count; ++i) {
+        RETURN_IF_ERROR(seek_to_ordinal(rowids[i]));
+        size_t num_read = 1;
+        RETURN_IF_ERROR(next_batch(&num_read, dst, nullptr));
+        DCHECK(num_read == 1);
+    }
+    return Status::OK();
+}
+
 
////////////////////////////////////////////////////////////////////////////////
 
 StructFileColumnIterator::StructFileColumnIterator(
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index b10e46ea98..59a5b4ef1a 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -393,6 +393,41 @@ public:
     ordinal_t get_current_ordinal() const override { return 0; }
 };
 
+// This iterator is used to read map value column
+class MapFileColumnIterator final : public ColumnIterator {
+public:
+    explicit MapFileColumnIterator(ColumnReader* reader, ColumnIterator* 
null_iterator,
+                                   ColumnIterator* key_iterator, 
ColumnIterator* val_iterator);
+
+    ~MapFileColumnIterator() override = default;
+
+    Status init(const ColumnIteratorOptions& opts) override;
+
+    Status next_batch(size_t* n, ColumnBlockView* dst, bool* has_null) 
override;
+
+    Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override;
+
+    Status read_by_rowids(const rowid_t* rowids, const size_t count,
+                          vectorized::MutableColumnPtr& dst) override;
+
+    Status seek_to_first() override {
+        RETURN_IF_ERROR(_key_iterator->seek_to_first());
+        RETURN_IF_ERROR(_val_iterator->seek_to_first());
+        RETURN_IF_ERROR(_null_iterator->seek_to_first());
+        return Status::OK();
+    }
+
+    Status seek_to_ordinal(ordinal_t ord) override;
+
+    ordinal_t get_current_ordinal() const override { return 
_key_iterator->get_current_ordinal(); }
+
+private:
+    ColumnReader* _map_reader;
+    std::unique_ptr<ColumnIterator> _null_iterator;
+    std::unique_ptr<ColumnIterator> _key_iterator; // ArrayFileColumnIterator
+    std::unique_ptr<ColumnIterator> _val_iterator; // ArrayFileColumnIterator
+};
+
 class StructFileColumnIterator final : public ColumnIterator {
 public:
     explicit StructFileColumnIterator(ColumnReader* reader, ColumnIterator* 
null_iterator,
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 5e2de337aa..8d89f2a763 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -245,6 +245,79 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
             *writer = std::move(writer_local);
             return Status::OK();
         }
+        case FieldType::OLAP_FIELD_TYPE_MAP: {
+            DCHECK(column->get_subtype_count() == 2);
+            ScalarColumnWriter* null_writer = nullptr;
+            // create null writer
+            if (opts.meta->is_nullable()) {
+                FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
+                ColumnWriterOptions null_options;
+                null_options.meta = opts.meta->add_children_columns();
+                null_options.meta->set_column_id(3);
+                null_options.meta->set_unique_id(3);
+                null_options.meta->set_type(null_type);
+                null_options.meta->set_is_nullable(false);
+                null_options.meta->set_length(
+                        
get_scalar_type_info<OLAP_FIELD_TYPE_TINYINT>()->size());
+                null_options.meta->set_encoding(DEFAULT_ENCODING);
+                null_options.meta->set_compression(opts.meta->compression());
+
+                null_options.need_zone_map = false;
+                null_options.need_bloom_filter = false;
+                null_options.need_bitmap_index = false;
+
+                TabletColumn null_column =
+                        TabletColumn(OLAP_FIELD_AGGREGATION_NONE, null_type, 
false,
+                                     null_options.meta->unique_id(), 
null_options.meta->length());
+                null_column.set_name("nullable");
+                null_column.set_index_length(-1); // no short key index
+                std::unique_ptr<Field> 
null_field(FieldFactory::create(null_column));
+                null_writer =
+                        new ScalarColumnWriter(null_options, 
std::move(null_field), file_writer);
+            }
+
+            // create key & value writer
+            std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
+            for (int i = 0; i < 2; ++i) {
+                std::unique_ptr<ColumnWriter> inner_array_writer;
+                ColumnWriterOptions arr_opts;
+                TabletColumn array_column(OLAP_FIELD_AGGREGATION_NONE, 
OLAP_FIELD_TYPE_ARRAY);
+
+                array_column.set_index_length(-1);
+                arr_opts.meta = opts.meta->mutable_children_columns(i);
+                ColumnMetaPB* child_meta = 
arr_opts.meta->add_children_columns();
+                // inner column meta from actual opts meta
+                const TabletColumn& inner_column =
+                        column->get_sub_column(i); // field_type is true key 
and value
+                
array_column.add_sub_column(const_cast<TabletColumn&>(inner_column));
+                array_column.set_name("map.arr");
+                child_meta->set_type(inner_column.type());
+                child_meta->set_length(inner_column.length());
+                child_meta->set_column_id(arr_opts.meta->column_id() + 1);
+                child_meta->set_unique_id(arr_opts.meta->unique_id() + 1);
+                child_meta->set_compression(arr_opts.meta->compression());
+                child_meta->set_encoding(arr_opts.meta->encoding());
+                child_meta->set_is_nullable(true);
+
+                // set array column meta
+                arr_opts.meta->set_type(OLAP_FIELD_TYPE_ARRAY);
+                arr_opts.meta->set_encoding(opts.meta->encoding());
+                arr_opts.meta->set_compression(opts.meta->compression());
+                arr_opts.need_zone_map = false;
+                // no need inner array's null map
+                arr_opts.meta->set_is_nullable(false);
+                RETURN_IF_ERROR(ColumnWriter::create(arr_opts, &array_column, 
file_writer,
+                                                     &inner_array_writer));
+                inner_writer_list.push_back(std::move(inner_array_writer));
+            }
+            // create map writer
+            std::unique_ptr<ColumnWriter> sub_column_writer;
+            std::unique_ptr<ColumnWriter> writer_local = 
std::unique_ptr<ColumnWriter>(
+                    new MapColumnWriter(opts, std::move(field), null_writer, 
inner_writer_list));
+
+            *writer = std::move(writer_local);
+            return Status::OK();
+        }
         default:
             return Status::NotSupported("unsupported type for ColumnWriter: 
{}",
                                         std::to_string(field->type()));
@@ -869,7 +942,7 @@ Status ArrayColumnWriter::append_nulls(size_t num_rows) {
 
 Status ArrayColumnWriter::write_null_column(size_t num_rows, bool is_null) {
     uint8_t null_sign = is_null ? 1 : 0;
-    while (num_rows > 0) {
+    while (is_nullable() && num_rows > 0) {
         // TODO llj bulk write
         const uint8_t* null_sign_ptr = &null_sign;
         RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, 1));
@@ -882,5 +955,118 @@ Status ArrayColumnWriter::finish_current_page() {
     return Status::NotSupported("array writer has no data, can not 
finish_current_page");
 }
 
+/// ============================= MapColumnWriter =====================////
+MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
+                                 ScalarColumnWriter* null_writer,
+                                 std::vector<std::unique_ptr<ColumnWriter>>& 
kv_writers)
+        : ColumnWriter(std::move(field), opts.meta->is_nullable()), 
_opts(opts) {
+    CHECK_EQ(kv_writers.size(), 2);
+    if (is_nullable()) {
+        _null_writer.reset(null_writer);
+    }
+    for (auto& sub_writers : kv_writers) {
+        _kv_writers.push_back(std::move(sub_writers));
+    }
+}
+
+Status MapColumnWriter::init() {
+    if (is_nullable()) {
+        RETURN_IF_ERROR(_null_writer->init());
+    }
+    for (auto& sub_writer : _kv_writers) {
+        RETURN_IF_ERROR(sub_writer->init());
+    }
+    return Status::OK();
+}
+
+uint64_t MapColumnWriter::estimate_buffer_size() {
+    size_t estimate = 0;
+    for (auto& sub_writer : _kv_writers) {
+        estimate += sub_writer->estimate_buffer_size();
+    }
+    if (is_nullable()) {
+        estimate += _null_writer->estimate_buffer_size();
+    }
+    return estimate;
+}
+
+Status MapColumnWriter::finish() {
+    if (is_nullable()) {
+        RETURN_IF_ERROR(_null_writer->finish());
+    }
+    for (auto& sub_writer : _kv_writers) {
+        RETURN_IF_ERROR(sub_writer->finish());
+    }
+    return Status::OK();
+}
+
+// todo. make keys and values write
+Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
+    auto kv_ptr = reinterpret_cast<const uint64_t*>(*ptr);
+    for (size_t i = 0; i < 2; ++i) {
+        auto data = *(kv_ptr + i);
+        const uint8_t* val_ptr = (const uint8_t*)data;
+        RETURN_IF_ERROR(_kv_writers[i]->append_data(&val_ptr, num_rows));
+    }
+    if (is_nullable()) {
+        return write_null_column(num_rows, false);
+    }
+    return Status::OK();
+}
+
+Status MapColumnWriter::write_data() {
+    if (is_nullable()) {
+        RETURN_IF_ERROR(_null_writer->write_data());
+    }
+    for (auto& sub_writer : _kv_writers) {
+        RETURN_IF_ERROR(sub_writer->write_data());
+    }
+    return Status::OK();
+}
+
+Status MapColumnWriter::write_ordinal_index() {
+    if (is_nullable()) {
+        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
+    }
+    for (auto& sub_writer : _kv_writers) {
+        RETURN_IF_ERROR(sub_writer->write_ordinal_index());
+    }
+    return Status::OK();
+}
+
+Status MapColumnWriter::append_nulls(size_t num_rows) {
+    for (auto& sub_writer : _kv_writers) {
+        RETURN_IF_ERROR(sub_writer->append_nulls(num_rows));
+    }
+    return write_null_column(num_rows, true);
+}
+
+Status MapColumnWriter::write_null_column(size_t num_rows, bool is_null) {
+    if (is_nullable()) {
+        uint8_t null_sign = is_null ? 1 : 0;
+        std::vector<vectorized::UInt8> null_signs(num_rows, null_sign);
+        const uint8_t* null_sign_ptr = null_signs.data();
+        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
+    }
+    return Status::OK();
+}
+
+Status MapColumnWriter::finish_current_page() {
+    if (is_nullable()) {
+        RETURN_IF_ERROR(_null_writer->finish_current_page());
+    }
+    for (auto& sub_writer : _kv_writers) {
+        RETURN_IF_ERROR(sub_writer->finish_current_page());
+    }
+    return Status::OK();
+}
+
+Status MapColumnWriter::write_inverted_index() {
+    if (_opts.inverted_index) {
+        return _inverted_index_builder->finish();
+    }
+    return Status::OK();
+}
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h 
b/be/src/olap/rowset/segment_v2/column_writer.h
index 274c9be6f3..7d140324dd 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -371,5 +371,60 @@ private:
     ColumnWriterOptions _opts;
 };
 
+class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
+public:
+    explicit MapColumnWriter(const ColumnWriterOptions& opts, 
std::unique_ptr<Field> field,
+                             ScalarColumnWriter* null_writer,
+                             std::vector<std::unique_ptr<ColumnWriter>>& 
_kv_writers);
+
+    ~MapColumnWriter() override = default;
+
+    Status init() override;
+
+    Status append_data(const uint8_t** ptr, size_t num_rows) override;
+
+    uint64_t estimate_buffer_size() override;
+
+    Status finish() override;
+    Status write_data() override;
+    Status write_ordinal_index() override;
+    Status write_inverted_index() override;
+    Status append_nulls(size_t num_rows) override;
+
+    Status finish_current_page() override;
+
+    Status write_zone_map() override {
+        if (_opts.need_zone_map) {
+            return Status::NotSupported("map not support zone map");
+        }
+        return Status::OK();
+    }
+
+    Status write_bitmap_index() override {
+        if (_opts.need_bitmap_index) {
+            return Status::NotSupported("map not support bitmap index");
+        }
+        return Status::OK();
+    }
+    Status write_bloom_filter_index() override {
+        if (_opts.need_bloom_filter) {
+            return Status::NotSupported("map not support bloom filter index");
+        }
+        return Status::OK();
+    }
+
+    // according key writer to get next rowid
+    ordinal_t get_next_rowid() const override { return 
_kv_writers[0]->get_next_rowid(); }
+
+private:
+    Status write_null_column(size_t num_rows, bool is_null);
+
+    std::vector<std::unique_ptr<ColumnWriter>> _kv_writers;
+    // we need null writer to make sure a row is null or not
+    std::unique_ptr<ScalarColumnWriter> _null_writer;
+    std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
+    ColumnWriterOptions _opts;
+};
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 28ef07d40d..51d4dfdb45 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -163,6 +163,16 @@ Status SegmentWriter::init(const std::vector<uint32_t>& 
col_ids, bool has_key) {
             }
         }
 
+        if (column.type() == FieldType::OLAP_FIELD_TYPE_MAP) {
+            opts.need_zone_map = false;
+            if (opts.need_bloom_filter) {
+                return Status::NotSupported("Do not support bloom filter for 
map type");
+            }
+            if (opts.need_bitmap_index) {
+                return Status::NotSupported("Do not support bitmap index for 
map type");
+            }
+        }
+
         std::unique_ptr<ColumnWriter> writer;
         RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, 
&writer));
         RETURN_IF_ERROR(writer->init());
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 24b5460549..a0e2a61bbf 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -304,6 +304,12 @@ void TabletMeta::init_column_from_tcolumn(uint32_t 
unique_id, const TColumn& tco
         ColumnPB* children_column = column->add_children_columns();
         init_column_from_tcolumn(0, tcolumn.children_column[0], 
children_column);
     }
+    if (tcolumn.column_type.type == TPrimitiveType::MAP) {
+        ColumnPB* key_column = column->add_children_columns();
+        init_column_from_tcolumn(0, tcolumn.children_column[0], key_column);
+        ColumnPB* val_column = column->add_children_columns();
+        init_column_from_tcolumn(0, tcolumn.children_column[1], val_column);
+    }
 }
 
 Status TabletMeta::create_from_file(const string& file_path) {
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 9fe92eb6b4..cc130f0a36 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -316,6 +316,8 @@ uint32_t 
TabletColumn::get_field_length_by_type(TPrimitiveType::type type, uint3
         return OLAP_STRUCT_MAX_LENGTH;
     case TPrimitiveType::ARRAY:
         return OLAP_ARRAY_MAX_LENGTH;
+    case TPrimitiveType::MAP:
+        return OLAP_ARRAY_MAX_LENGTH;
     case TPrimitiveType::DECIMAL32:
         return 4;
     case TPrimitiveType::DECIMAL64:
@@ -418,6 +420,15 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
         child_column.init_from_pb(column.children_columns(0));
         add_sub_column(child_column);
     }
+    if (_type == FieldType::OLAP_FIELD_TYPE_MAP) {
+        DCHECK(column.children_columns_size() == 2) << "MAP type has more than 
2 children types.";
+        TabletColumn key_column;
+        TabletColumn value_column;
+        key_column.init_from_pb(column.children_columns(0));
+        value_column.init_from_pb(column.children_columns(1));
+        add_sub_column(key_column);
+        add_sub_column(value_column);
+    }
 }
 
 void TabletColumn::to_schema_pb(ColumnPB* column) const {
@@ -454,6 +465,13 @@ void TabletColumn::to_schema_pb(ColumnPB* column) const {
         ColumnPB* child = column->add_children_columns();
         _sub_columns[0].to_schema_pb(child);
     }
+    if (_type == OLAP_FIELD_TYPE_MAP) {
+        DCHECK(_sub_columns.size() == 2) << "MAP type has more than 2 children 
types.";
+        ColumnPB* child_key = column->add_children_columns();
+        _sub_columns[0].to_schema_pb(child_key);
+        ColumnPB* child_val = column->add_children_columns();
+        _sub_columns[1].to_schema_pb(child_val);
+    }
 }
 
 void TabletColumn::add_sub_column(TabletColumn& sub_column) {
diff --git a/be/src/olap/types.cpp b/be/src/olap/types.cpp
index 916f7768af..bee790e066 100644
--- a/be/src/olap/types.cpp
+++ b/be/src/olap/types.cpp
@@ -201,6 +201,15 @@ TypeInfoPtr get_type_info(segment_v2::ColumnMetaPB* 
column_meta_pb) {
         }
         return create_static_type_info_ptr(
                 get_array_type_info((FieldType)child_column->type(), 
iterations));
+    } else if (UNLIKELY(type == OLAP_FIELD_TYPE_MAP)) {
+        segment_v2::ColumnMetaPB key_meta = 
column_meta_pb->children_columns(0);
+        TypeInfoPtr key_type_info = get_type_info(&key_meta);
+        segment_v2::ColumnMetaPB value_meta = 
column_meta_pb->children_columns(1);
+        TypeInfoPtr value_type_info = get_type_info(&value_meta);
+
+        MapTypeInfo* map_type_info =
+                new MapTypeInfo(std::move(key_type_info), 
std::move(value_type_info));
+        return create_static_type_info_ptr(map_type_info);
     } else {
         return create_static_type_info_ptr(get_scalar_type_info(type));
     }
@@ -240,6 +249,13 @@ TypeInfoPtr get_type_info(const TabletColumn* col) {
             child_column = &child_column->get_sub_column(0);
         }
         return 
create_static_type_info_ptr(get_array_type_info(child_column->type(), 
iterations));
+    } else if (UNLIKELY(type == OLAP_FIELD_TYPE_MAP)) {
+        const auto* key_column = &col->get_sub_column(0);
+        TypeInfoPtr key_type = get_type_info(key_column);
+        const auto* val_column = &col->get_sub_column(1);
+        TypeInfoPtr value_type = get_type_info(val_column);
+        MapTypeInfo* map_type_info = new MapTypeInfo(std::move(key_type), 
std::move(value_type));
+        return create_static_type_info_ptr(map_type_info);
     } else {
         return create_static_type_info_ptr(get_scalar_type_info(type));
     }
@@ -248,22 +264,24 @@ TypeInfoPtr get_type_info(const TabletColumn* col) {
 TypeInfoPtr clone_type_info(const TypeInfo* type_info) {
     if (is_scalar_type(type_info->type())) {
         return create_static_type_info_ptr(type_info);
-    } else {
-        auto type = type_info->type();
-        if (type == OLAP_FIELD_TYPE_STRUCT) {
-            const auto struct_type_info = dynamic_cast<const 
StructTypeInfo*>(type_info);
-            std::vector<TypeInfoPtr> clone_type_infos;
-            const std::vector<TypeInfoPtr>* sub_type_infos = 
struct_type_info->type_infos();
-            clone_type_infos.reserve(sub_type_infos->size());
-            for (size_t i = 0; i < sub_type_infos->size(); i++) {
-                
clone_type_infos.push_back(clone_type_info((*sub_type_infos)[i].get()));
-            }
-            return create_dynamic_type_info_ptr(new 
StructTypeInfo(clone_type_infos));
-        } else {
-            const auto array_type_info = dynamic_cast<const 
ArrayTypeInfo*>(type_info);
-            return create_dynamic_type_info_ptr(
-                    new 
ArrayTypeInfo(clone_type_info(array_type_info->item_type_info())));
+    } else if (type_info->type() == OLAP_FIELD_TYPE_MAP) {
+        const auto map_type_info = dynamic_cast<const MapTypeInfo*>(type_info);
+        return create_dynamic_type_info_ptr(
+                new 
MapTypeInfo(clone_type_info(map_type_info->get_key_type_info()),
+                                
clone_type_info(map_type_info->get_value_type_info())));
+    } else if (type_info->type() == OLAP_FIELD_TYPE_STRUCT) {
+        const auto struct_type_info = dynamic_cast<const 
StructTypeInfo*>(type_info);
+        std::vector<TypeInfoPtr> clone_type_infos;
+        const std::vector<TypeInfoPtr>* sub_type_infos = 
struct_type_info->type_infos();
+        clone_type_infos.reserve(sub_type_infos->size());
+        for (size_t i = 0; i < sub_type_infos->size(); i++) {
+            
clone_type_infos.push_back(clone_type_info((*sub_type_infos)[i].get()));
         }
+        return create_dynamic_type_info_ptr(new 
StructTypeInfo(clone_type_infos));
+    } else if (type_info->type() == OLAP_FIELD_TYPE_ARRAY) {
+        const auto array_type_info = dynamic_cast<const 
ArrayTypeInfo*>(type_info);
+        return create_dynamic_type_info_ptr(
+                new 
ArrayTypeInfo(clone_type_info(array_type_info->item_type_info())));
     }
 }
 
diff --git a/be/src/olap/types.h b/be/src/olap/types.h
index a8f21b66ae..b759795b2d 100644
--- a/be/src/olap/types.h
+++ b/be/src/olap/types.h
@@ -31,6 +31,7 @@
 #include "olap/olap_define.h"
 #include "runtime/collection_value.h"
 #include "runtime/jsonb_value.h"
+#include "runtime/map_value.h"
 #include "runtime/mem_pool.h"
 #include "runtime/struct_value.h"
 #include "util/jsonb_document.h"
@@ -431,6 +432,93 @@ private:
     TypeInfoPtr _item_type_info;
     const size_t _item_size;
 };
+///====================== MapType Info ==========================///
+class MapTypeInfo : public TypeInfo {
+public:
+    explicit MapTypeInfo(TypeInfoPtr key_type_info, TypeInfoPtr 
value_type_info)
+            : _key_type_info(std::move(key_type_info)),
+              _value_type_info(std::move(value_type_info)) {}
+    ~MapTypeInfo() override = default;
+
+    inline bool equal(const void* left, const void* right) const override {
+        auto l_value = reinterpret_cast<const MapValue*>(left);
+        auto r_value = reinterpret_cast<const MapValue*>(right);
+        return l_value->size() == r_value->size();
+    }
+
+    int cmp(const void* left, const void* right) const override {
+        auto l_value = reinterpret_cast<const MapValue*>(left);
+        auto r_value = reinterpret_cast<const MapValue*>(right);
+        uint32_t l_size = l_value->size();
+        uint32_t r_size = r_value->size();
+        if (l_size < r_size) {
+            return -1;
+        } else if (l_size > r_size) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    void shallow_copy(void* dest, const void* src) const override {
+        auto dest_value = reinterpret_cast<MapValue*>(dest);
+        auto src_value = reinterpret_cast<const MapValue*>(src);
+        dest_value->shallow_copy(src_value);
+    }
+
+    void deep_copy(void* dest, const void* src, MemPool* mem_pool) const 
override { DCHECK(false); }
+
+    void copy_object(void* dest, const void* src, MemPool* mem_pool) const 
override {
+        deep_copy(dest, src, mem_pool);
+    }
+
+    void direct_copy(void* dest, const void* src) const override { 
CHECK(false); }
+
+    void direct_copy(uint8_t** base, void* dest, const void* src) const { 
CHECK(false); }
+
+    void direct_copy_may_cut(void* dest, const void* src) const override { 
direct_copy(dest, src); }
+
+    Status convert_from(void* dest, const void* src, const TypeInfo* src_type, 
MemPool* mem_pool,
+                        size_t variable_len = 0) const override {
+        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
+    }
+
+    Status from_string(void* buf, const std::string& scan_key, const int 
precision = 0,
+                       const int scale = 0) const override {
+        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
+    }
+
+    std::string to_string(const void* src) const override { return "{}"; }
+
+    void set_to_max(void* buf) const override {
+        DCHECK(false) << "set_to_max of list is not implemented.";
+    }
+
+    void set_to_min(void* buf) const override {
+        DCHECK(false) << "set_to_min of list is not implemented.";
+    }
+
+    uint32_t hash_code(const void* data, uint32_t seed) const override {
+        auto map_value = reinterpret_cast<const MapValue*>(data);
+        auto size = map_value->size();
+        uint32_t result = HashUtil::hash(&size, sizeof(size), seed);
+        result = seed * result + 
_key_type_info->hash_code(map_value->key_data(), seed) +
+                 _value_type_info->hash_code(map_value->value_data(), seed);
+        return result;
+    }
+
+    // todo . is here only to need return 16 for two ptr?
+    const size_t size() const override { return sizeof(MapValue); }
+
+    FieldType type() const override { return OLAP_FIELD_TYPE_MAP; }
+
+    inline const TypeInfo* get_key_type_info() const { return 
_key_type_info.get(); }
+    inline const TypeInfo* get_value_type_info() const { return 
_value_type_info.get(); }
+
+private:
+    TypeInfoPtr _key_type_info;
+    TypeInfoPtr _value_type_info;
+};
 
 class StructTypeInfo : public TypeInfo {
 public:
@@ -818,6 +906,10 @@ template <>
 struct CppTypeTraits<OLAP_FIELD_TYPE_ARRAY> {
     using CppType = CollectionValue;
 };
+template <>
+struct CppTypeTraits<OLAP_FIELD_TYPE_MAP> {
+    using CppType = MapValue;
+};
 template <FieldType field_type>
 struct BaseFieldtypeTraits : public CppTypeTraits<field_type> {
     using CppType = typename CppTypeTraits<field_type>::CppType;
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 92ffc2071d..fc6eb82978 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -50,6 +50,7 @@ set(RUNTIME_FILES
     large_int_value.cpp
     struct_value.cpp
     collection_value.cpp
+    map_value.cpp
     tuple.cpp
     tuple_row.cpp
     fragment_mgr.cpp
diff --git a/be/src/runtime/map_value.cpp b/be/src/runtime/map_value.cpp
new file mode 100644
index 0000000000..1828fd27b3
--- /dev/null
+++ b/be/src/runtime/map_value.cpp
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "map_value.h"
+
+namespace doris {
+
+///====================== map-value funcs ======================///
+void MapValue::shallow_copy(const MapValue* value) {
+    _length = value->_length;
+    _key_data = value->_key_data;
+    _value_data = value->_value_data;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/map_value.h b/be/src/runtime/map_value.h
new file mode 100644
index 0000000000..d275316cf6
--- /dev/null
+++ b/be/src/runtime/map_value.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <type_traits>
+
+#include "runtime/primitive_type.h"
+
+namespace doris {
+
+/**
+ * MapValue is for map type in memory
+ */
+class MapValue {
+public:
+    MapValue() = default;
+
+    explicit MapValue(int32_t length) : _key_data(nullptr), 
_value_data(nullptr), _length(length) {}
+
+    MapValue(void* k_data, void* v_data, int32_t length)
+            : _key_data(k_data), _value_data(v_data), _length(length) {}
+
+    int32_t size() const { return _length; }
+
+    int32_t length() const { return _length; }
+
+    void shallow_copy(const MapValue* other);
+
+    const void* key_data() const { return _key_data; }
+    void* mutable_key_data() const { return _key_data; }
+    const void* value_data() const { return _value_data; }
+    void* mutable_value_data() const { return _value_data; }
+
+    void set_length(int32_t length) { _length = length; }
+    void set_key(void* data) { _key_data = data; }
+    void set_value(void* data) { _value_data = data; }
+
+private:
+    // child column data pointer
+    void* _key_data;
+    void* _value_data;
+    // length for map size
+    int32_t _length;
+
+}; //map-value
+} // namespace doris
diff --git a/be/src/runtime/primitive_type.cpp 
b/be/src/runtime/primitive_type.cpp
index 27bc9f5b6e..2dd7c438fc 100644
--- a/be/src/runtime/primitive_type.cpp
+++ b/be/src/runtime/primitive_type.cpp
@@ -21,6 +21,7 @@
 #include "runtime/collection_value.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/jsonb_value.h"
+#include "runtime/map_value.h"
 #include "runtime/string_value.h"
 #include "runtime/struct_value.h"
 
@@ -54,6 +55,8 @@ PrimitiveType convert_type_to_primitive(FunctionContext::Type 
type) {
         return PrimitiveType::TYPE_BOOLEAN;
     case FunctionContext::Type::TYPE_ARRAY:
         return PrimitiveType::TYPE_ARRAY;
+    case FunctionContext::Type::TYPE_MAP:
+        return PrimitiveType::TYPE_MAP;
     case FunctionContext::Type::TYPE_STRUCT:
         return PrimitiveType::TYPE_STRUCT;
     case FunctionContext::Type::TYPE_OBJECT:
@@ -266,6 +269,9 @@ PrimitiveType thrift_to_type(TPrimitiveType::type ttype) {
     case TPrimitiveType::ARRAY:
         return TYPE_ARRAY;
 
+    case TPrimitiveType::MAP:
+        return TYPE_MAP;
+
     case TPrimitiveType::STRUCT:
         return TYPE_STRUCT;
 
@@ -363,6 +369,9 @@ TPrimitiveType::type to_thrift(PrimitiveType ptype) {
     case TYPE_ARRAY:
         return TPrimitiveType::ARRAY;
 
+    case TYPE_MAP:
+        return TPrimitiveType::MAP;
+
     case TYPE_STRUCT:
         return TPrimitiveType::STRUCT;
 
@@ -460,6 +469,9 @@ std::string type_to_string(PrimitiveType t) {
     case TYPE_ARRAY:
         return "ARRAY";
 
+    case TYPE_MAP:
+        return "MAP";
+
     case TYPE_STRUCT:
         return "STRUCT";
 
@@ -602,6 +614,8 @@ int get_slot_size(PrimitiveType type) {
         return sizeof(JsonBinaryValue);
     case TYPE_ARRAY:
         return sizeof(CollectionValue);
+    case TYPE_MAP:
+        return sizeof(MapValue);
     case TYPE_STRUCT:
         return sizeof(StructValue);
 
diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp
index 45b8f8e5f5..386f3b57f0 100644
--- a/be/src/runtime/types.cpp
+++ b/be/src/runtime/types.cpp
@@ -95,15 +95,17 @@ TypeDescriptor::TypeDescriptor(const 
std::vector<TTypeNode>& types, int* idx)
     //     ++(*idx);
     //     children.push_back(TypeDescriptor(types, idx));
     //     break;
-    // case TTypeNodeType::MAP:
-    //     DCHECK(!node.__isset.scalar_type);
-    //     DCHECK_LT(*idx, types.size() - 2);
-    //     type = TYPE_MAP;
-    //     ++(*idx);
-    //     children.push_back(TypeDescriptor(types, idx));
-    //     ++(*idx);
-    //     children.push_back(TypeDescriptor(types, idx));
-    //     break;
+    case TTypeNodeType::MAP: {
+        DCHECK(!node.__isset.scalar_type);
+        DCHECK_LT(*idx, types.size() - 2);
+        DCHECK(!node.__isset.contains_null);
+        type = TYPE_MAP;
+        ++(*idx);
+        children.push_back(TypeDescriptor(types, idx));
+        ++(*idx);
+        children.push_back(TypeDescriptor(types, idx));
+        break;
+    }
     default:
         DCHECK(false) << node.type;
     }
@@ -150,8 +152,6 @@ void TypeDescriptor::to_thrift(TTypeDesc* thrift_type) 
const {
 }
 
 void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
-    DCHECK(!is_complex_type() || type == TYPE_ARRAY || type == TYPE_STRUCT)
-            << "Don't support complex type now, type=" << type;
     auto node = ptype->add_types();
     node->set_type(TTypeNodeType::SCALAR);
     auto scalar_type = node->mutable_scalar_type();
@@ -181,6 +181,11 @@ void TypeDescriptor::to_protobuf(PTypeDesc* ptype) const {
         for (const TypeDescriptor& child : children) {
             child.to_protobuf(ptype);
         }
+    } else if (type == TYPE_MAP) {
+        node->set_type(TTypeNodeType::MAP);
+        for (const TypeDescriptor& child : children) {
+            child.to_protobuf(ptype);
+        }
     }
 }
 
@@ -223,6 +228,12 @@ TypeDescriptor::TypeDescriptor(const 
google::protobuf::RepeatedPtrField<PTypeNod
         children.push_back(TypeDescriptor(types, idx));
         break;
     }
+    case TTypeNodeType::MAP: {
+        type = TYPE_MAP;
+        ++(*idx);
+        children.push_back(TypeDescriptor(types, idx));
+        ++(*idx);
+        children.push_back(TypeDescriptor(types, idx));
     case TTypeNodeType::STRUCT: {
         type = TYPE_STRUCT;
         size_t children_size = node.struct_fields_size();
@@ -240,64 +251,67 @@ TypeDescriptor::TypeDescriptor(const 
google::protobuf::RepeatedPtrField<PTypeNod
     default:
         DCHECK(false) << node.type();
     }
-}
-
-std::string TypeDescriptor::debug_string() const {
-    std::stringstream ss;
-    switch (type) {
-    case TYPE_CHAR:
-        ss << "CHAR(" << len << ")";
-        return ss.str();
-    case TYPE_DECIMALV2:
-        ss << "DECIMALV2(" << precision << ", " << scale << ")";
-        return ss.str();
-    case TYPE_DECIMAL32:
-        ss << "DECIMAL32(" << precision << ", " << scale << ")";
-        return ss.str();
-    case TYPE_DECIMAL64:
-        ss << "DECIMAL64(" << precision << ", " << scale << ")";
-        return ss.str();
-    case TYPE_DECIMAL128I:
-        ss << "DECIMAL128(" << precision << ", " << scale << ")";
-        return ss.str();
-    case TYPE_ARRAY: {
-        ss << "ARRAY<" << children[0].debug_string() << ">";
-        return ss.str();
     }
-    case TYPE_STRUCT: {
-        ss << "STRUCT<";
-        for (size_t i = 0; i < children.size(); i++) {
-            ss << field_names[i];
-            ss << ":";
-            ss << children[i].debug_string();
-            if (i != children.size() - 1) {
-                ss << ",";
+
+    std::string TypeDescriptor::debug_string() const {
+        std::stringstream ss;
+        switch (type) {
+        case TYPE_CHAR:
+            ss << "CHAR(" << len << ")";
+            return ss.str();
+        case TYPE_DECIMALV2:
+            ss << "DECIMALV2(" << precision << ", " << scale << ")";
+            return ss.str();
+        case TYPE_DECIMAL32:
+            ss << "DECIMAL32(" << precision << ", " << scale << ")";
+            return ss.str();
+        case TYPE_DECIMAL64:
+            ss << "DECIMAL64(" << precision << ", " << scale << ")";
+            return ss.str();
+        case TYPE_DECIMAL128I:
+            ss << "DECIMAL128(" << precision << ", " << scale << ")";
+            return ss.str();
+        case TYPE_ARRAY: {
+            ss << "ARRAY<" << children[0].debug_string() << ">";
+            return ss.str();
+        }
+        case TYPE_MAP:
+            ss << "MAP<" << children[0].debug_string() << ", " << 
children[1].debug_string() << ">";
+            return ss.str();
+        case TYPE_STRUCT: {
+            ss << "STRUCT<";
+            for (size_t i = 0; i < children.size(); i++) {
+                ss << field_names[i];
+                ss << ":";
+                ss << children[i].debug_string();
+                if (i != children.size() - 1) {
+                    ss << ",";
+                }
             }
+            ss << ">";
+            return ss.str();
+        }
+        default:
+            return type_to_string(type);
         }
-        ss << ">";
-        return ss.str();
-    }
-    default:
-        return type_to_string(type);
     }
-}
 
-std::ostream& operator<<(std::ostream& os, const TypeDescriptor& type) {
-    os << type.debug_string();
-    return os;
-}
+    std::ostream& operator<<(std::ostream& os, const TypeDescriptor& type) {
+        os << type.debug_string();
+        return os;
+    }
 
-TTypeDesc create_type_desc(PrimitiveType type, int precision, int scale) {
-    TTypeDesc type_desc;
-    std::vector<TTypeNode> node_type;
-    node_type.emplace_back();
-    TScalarType scalarType;
-    scalarType.__set_type(to_thrift(type));
-    scalarType.__set_len(-1);
-    scalarType.__set_precision(precision);
-    scalarType.__set_scale(scale);
-    node_type.back().__set_scalar_type(scalarType);
-    type_desc.__set_types(node_type);
-    return type_desc;
-}
+    TTypeDesc create_type_desc(PrimitiveType type, int precision, int scale) {
+        TTypeDesc type_desc;
+        std::vector<TTypeNode> node_type;
+        node_type.emplace_back();
+        TScalarType scalarType;
+        scalarType.__set_type(to_thrift(type));
+        scalarType.__set_len(-1);
+        scalarType.__set_precision(precision);
+        scalarType.__set_scale(scale);
+        node_type.back().__set_scalar_type(scalarType);
+        type_desc.__set_types(node_type);
+        return type_desc;
+    }
 } // namespace doris
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index ec60fae072..8e99302cc3 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -89,6 +89,7 @@ public:
         TYPE_DECIMALV2,
         TYPE_OBJECT,
         TYPE_ARRAY,
+        TYPE_MAP,
         TYPE_STRUCT,
         TYPE_QUANTILE_STATE,
         TYPE_DATEV2,
@@ -911,6 +912,7 @@ struct CollectionVal : public AnyVal {
         return val;
     }
 };
+
 typedef uint8_t* BufferVal;
 } // namespace doris_udf
 
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 7a569ded2e..6d0b535315 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -55,6 +55,7 @@ set(VEC_FILES
   columns/column_nullable.cpp
   columns/column_string.cpp
   columns/column_vector.cpp
+  columns/column_map.cpp
   columns/columns_common.cpp
   common/demangle.cpp
   common/exception.cpp
@@ -88,6 +89,7 @@ set(VEC_FILES
   data_types/data_type_number_base.cpp
   data_types/data_type_string.cpp
   data_types/data_type_decimal.cpp
+  data_types/data_type_map.cpp
   data_types/get_least_supertype.cpp
   data_types/nested_utils.cpp
   data_types/data_type_date.cpp
@@ -136,6 +138,7 @@ set(VEC_FILES
   exprs/vexpr_context.cpp
   exprs/vliteral.cpp
   exprs/varray_literal.cpp
+  exprs/vmap_literal.cpp
   exprs/vstruct_literal.cpp
   exprs/vin_predicate.cpp
   exprs/vbloom_predicate.cpp
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 6bd5ac7855..354ff42768 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -590,6 +590,8 @@ public:
 
     virtual bool is_column_array() const { return false; }
 
+    virtual bool is_column_map() const { return false; }
+
     /// If the only value column can contain is NULL.
     /// Does not imply type of object, because it can be 
ColumnNullable(ColumnNothing) or ColumnConst(ColumnNullable(ColumnNothing))
     virtual bool only_null() const { return false; }
diff --git a/be/src/vec/columns/column_map.cpp 
b/be/src/vec/columns/column_map.cpp
new file mode 100644
index 0000000000..f477b9f336
--- /dev/null
+++ b/be/src/vec/columns/column_map.cpp
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnMap.cpp
+// and modified by Doris
+
+#include "vec/columns/column_map.h"
+
+namespace doris::vectorized {
+
+/** A column of map values.
+  */
+std::string ColumnMap::get_name() const {
+    return "Map(" + keys->get_name() + ", " + values->get_name() + ")";
+}
+
+ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values)
+        : keys(std::move(keys)), values(std::move(values)) {
+    check_size();
+}
+
+ColumnArray::Offsets64& ColumnMap::get_offsets() const {
+    const ColumnArray& column_keys = assert_cast<const 
ColumnArray&>(get_keys());
+    // todo . did here check size ?
+    return const_cast<Offsets64&>(column_keys.get_offsets());
+}
+
+void ColumnMap::check_size() const {
+    const auto* key_array = typeid_cast<const ColumnArray*>(keys.get());
+    const auto* value_array = typeid_cast<const ColumnArray*>(values.get());
+    CHECK(key_array) << "ColumnMap keys can be created only from array";
+    CHECK(value_array) << "ColumnMap values can be created only from array";
+    CHECK_EQ(get_keys_ptr()->size(), get_values_ptr()->size());
+}
+
+// todo. here to resize every row map
+MutableColumnPtr ColumnMap::clone_resized(size_t to_size) const {
+    auto res = ColumnMap::create(keys->clone_resized(to_size), 
values->clone_resized(to_size));
+    return res;
+}
+
+// to support field functions
+Field ColumnMap::operator[](size_t n) const {
+    // Map is FieldVector , see in field.h
+    Map res(2);
+    keys->get(n, res[0]);
+    values->get(n, res[1]);
+
+    return res;
+}
+
+// here to compare to below
+void ColumnMap::get(size_t n, Field& res) const {
+    Map map(2);
+    keys->get(n, map[0]);
+    values->get(n, map[1]);
+
+    res = map;
+}
+
+StringRef ColumnMap::get_data_at(size_t n) const {
+    LOG(FATAL) << "Method get_data_at is not supported for " << get_name();
+}
+
+void ColumnMap::insert_data(const char*, size_t) {
+    LOG(FATAL) << "Method insert_data is not supported for " << get_name();
+}
+
+void ColumnMap::insert(const Field& x) {
+    const auto& map = doris::vectorized::get<const Map&>(x);
+    CHECK_EQ(map.size(), 2);
+    keys->insert(map[0]);
+    values->insert(map[1]);
+}
+
+void ColumnMap::insert_default() {
+    keys->insert_default();
+    values->insert_default();
+}
+
+void ColumnMap::pop_back(size_t n) {
+    keys->pop_back(n);
+    values->pop_back(n);
+}
+
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values->serialize_value_into_arena(n, arena, begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
+void ColumnMap::insert_from(const IColumn& src_, size_t n) {
+    const ColumnMap& src = assert_cast<const ColumnMap&>(src_);
+
+    if ((!get_keys().is_nullable() && src.get_keys().is_nullable()) ||
+        (!get_values().is_nullable() && src.get_values().is_nullable())) {
+        DCHECK(false);
+    } else if ((get_keys().is_nullable() && !src.get_keys().is_nullable()) ||
+               (get_values().is_nullable() && 
!src.get_values().is_nullable())) {
+        DCHECK(false);
+    } else {
+        keys->insert_from(*assert_cast<const ColumnMap&>(src_).keys, n);
+        values->insert_from(*assert_cast<const ColumnMap&>(src_).values, n);
+    }
+}
+
+void ColumnMap::insert_indices_from(const IColumn& src, const int* 
indices_begin,
+                                    const int* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        if (*x == -1) {
+            ColumnMap::insert_default();
+        } else {
+            ColumnMap::insert_from(src, *x);
+        }
+    }
+}
+
+const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
+    pos = keys->deserialize_and_insert_from_arena(pos);
+    pos = values->deserialize_and_insert_from_arena(pos);
+
+    return pos;
+}
+
+void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
+    keys->update_hash_with_value(n, hash);
+    values->update_hash_with_value(n, hash);
+}
+
+void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
+    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
+    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+}
+
+ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
+    return ColumnMap::create(keys->filter(filt, result_size_hint),
+                             values->filter(filt, result_size_hint));
+}
+
+ColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) const {
+    return ColumnMap::create(keys->permute(perm, limit), values->permute(perm, 
limit));
+}
+
+ColumnPtr ColumnMap::replicate(const Offsets& offsets) const {
+    return ColumnMap::create(keys->replicate(offsets), 
values->replicate(offsets));
+}
+
+void ColumnMap::reserve(size_t n) {
+    get_keys().reserve(n);
+    get_values().reserve(n);
+}
+
+size_t ColumnMap::byte_size() const {
+    return get_keys().byte_size() + get_values().byte_size();
+}
+
+size_t ColumnMap::allocated_bytes() const {
+    return get_keys().allocated_bytes() + get_values().allocated_bytes();
+}
+
+void ColumnMap::protect() {
+    get_keys().protect();
+    get_values().protect();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
new file mode 100644
index 0000000000..4840ee4f69
--- /dev/null
+++ b/be/src/vec/columns/column_map.h
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnMap.cpp
+// and modified by Doris
+
+#pragma once
+
+#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_impl.h"
+#include "vec/common/arena.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+/** A column of map values.
+  */
+class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
+public:
+    /** Create immutable column using immutable arguments. This arguments may 
be shared with other columns.
+      * Use IColumn::mutate in order to make mutable column and mutate shared 
nested columns.
+      */
+    using Base = COWHelper<IColumn, ColumnMap>;
+
+    static Ptr create(const ColumnPtr& keys, const ColumnPtr& values) {
+        return ColumnMap::create(keys->assume_mutable(), 
values->assume_mutable());
+    }
+
+    template <typename... Args,
+              typename = typename 
std::enable_if<IsMutableColumns<Args...>::value>::type>
+    static MutablePtr create(Args&&... args) {
+        return Base::create(std::forward<Args>(args)...);
+    }
+
+    std::string get_name() const override;
+    const char* get_family_name() const override { return "Map"; }
+    TypeIndex get_data_type() const { return TypeIndex::Map; }
+
+    void for_each_subcolumn(ColumnCallback callback) override {
+        callback(keys);
+        callback(values);
+    }
+
+    void clear() override {
+        keys->clear();
+        values->clear();
+    }
+
+    MutableColumnPtr clone_resized(size_t size) const override;
+
+    bool can_be_inside_nullable() const override { return true; }
+    size_t size() const override { return keys->size(); }
+    Field operator[](size_t n) const override;
+    void get(size_t n, Field& res) const override;
+    StringRef get_data_at(size_t n) const override;
+
+    void insert_data(const char* pos, size_t length) override;
+    void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
+    void insert_from(const IColumn& src_, size_t n) override;
+    void insert(const Field& x) override;
+    void insert_default() override;
+
+    void pop_back(size_t n) override;
+    bool is_column_map() const override { return true; }
+    StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& 
begin) const override;
+    const char* deserialize_and_insert_from_arena(const char* pos) override;
+
+    void update_hash_with_value(size_t n, SipHash& hash) const override;
+
+    ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
+    ColumnPtr permute(const Permutation& perm, size_t limit) const override;
+    ColumnPtr replicate(const Offsets& offsets) const override;
+    MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) 
const override {
+        return scatter_impl<ColumnMap>(num_columns, selector);
+    }
+    void get_extremes(Field& min, Field& max) const override {
+        LOG(FATAL) << "get_extremes not implemented";
+    };
+    [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs_,
+                                int nan_direction_hint) const override {
+        LOG(FATAL) << "compare_at not implemented";
+    }
+    void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
+                         Permutation& res) const override {
+        LOG(FATAL) << "get_permutation not implemented";
+    }
+    void insert_indices_from(const IColumn& src, const int* indices_begin,
+                             const int* indices_end) override;
+
+    void append_data_by_selector(MutableColumnPtr& res,
+                                 const IColumn::Selector& selector) const 
override {
+        return append_data_by_selector_impl<ColumnMap>(res, selector);
+    }
+
+    void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) 
override {
+        LOG(FATAL) << "replace_column_data not implemented";
+    }
+    void replace_column_data_default(size_t self_row = 0) override {
+        LOG(FATAL) << "replace_column_data_default not implemented";
+    }
+    void check_size() const;
+    ColumnArray::Offsets64& get_offsets() const;
+    void reserve(size_t n) override;
+    size_t byte_size() const override;
+    size_t allocated_bytes() const override;
+    void protect() override;
+
+    /******************** keys and values ***************/
+    const ColumnPtr& get_keys_ptr() const { return keys; }
+    ColumnPtr& get_keys_ptr() { return keys; }
+
+    const IColumn& get_keys() const { return *keys; }
+    IColumn& get_keys() { return *keys; }
+
+    const ColumnPtr& get_values_ptr() const { return values; }
+    ColumnPtr& get_values_ptr() { return values; }
+
+    const IColumn& get_values() const { return *values; }
+    IColumn& get_values() { return *values; }
+
+private:
+    friend class COWHelper<IColumn, ColumnMap>;
+
+    WrappedPtr keys;   // nullable
+    WrappedPtr values; // nullable
+
+    size_t ALWAYS_INLINE offset_at(ssize_t i) const { return get_offsets()[i - 
1]; }
+    size_t ALWAYS_INLINE size_at(ssize_t i) const {
+        return get_offsets()[i] - get_offsets()[i - 1];
+    }
+
+    explicit ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values);
+
+    ColumnMap(const ColumnMap&) = default;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/core/field.h b/be/src/vec/core/field.h
index 7dd913264f..6d6ea071fc 100644
--- a/be/src/vec/core/field.h
+++ b/be/src/vec/core/field.h
@@ -85,6 +85,7 @@ using FieldVector = std::vector<Field>;
 
 DEFINE_FIELD_VECTOR(Array);
 DEFINE_FIELD_VECTOR(Tuple);
+DEFINE_FIELD_VECTOR(Map);
 
 #undef DEFINE_FIELD_VECTOR
 
@@ -308,6 +309,7 @@ public:
             AggregateFunctionState = 22,
             JSONB = 23,
             Decimal128I = 24,
+            Map = 25,
         };
 
         static const int MIN_NON_POD = 16;
@@ -334,6 +336,8 @@ public:
                 return "Array";
             case Tuple:
                 return "Tuple";
+            case Map:
+                return "Map";
             case Decimal32:
                 return "Decimal32";
             case Decimal64:
@@ -508,6 +512,8 @@ public:
             return get<Array>() < rhs.get<Array>();
         case Types::Tuple:
             return get<Tuple>() < rhs.get<Tuple>();
+        case Types::Map:
+            return get<Map>() < rhs.get<Map>();
         case Types::Decimal32:
             return get<DecimalField<Decimal32>>() < 
rhs.get<DecimalField<Decimal32>>();
         case Types::Decimal64:
@@ -553,6 +559,8 @@ public:
             return get<Array>() <= rhs.get<Array>();
         case Types::Tuple:
             return get<Tuple>() <= rhs.get<Tuple>();
+        case Types::Map:
+            return get<Map>() < rhs.get<Map>();
         case Types::Decimal32:
             return get<DecimalField<Decimal32>>() <= 
rhs.get<DecimalField<Decimal32>>();
         case Types::Decimal64:
@@ -590,6 +598,8 @@ public:
             return get<Array>() == rhs.get<Array>();
         case Types::Tuple:
             return get<Tuple>() == rhs.get<Tuple>();
+        case Types::Map:
+            return get<Map>() < rhs.get<Map>();
         case Types::UInt128:
             return get<UInt128>() == rhs.get<UInt128>();
         case Types::Int128:
@@ -680,6 +690,9 @@ private:
         case Types::Tuple:
             f(field.template get<Tuple>());
             return;
+        case Types::Map:
+            f(field.template get<Map>());
+            return;
         case Types::Decimal32:
             f(field.template get<DecimalField<Decimal32>>());
             return;
@@ -752,6 +765,9 @@ private:
         case Types::Tuple:
             destroy<Tuple>();
             break;
+        case Types::Map:
+            destroy<Map>();
+            break;
         case Types::AggregateFunctionState:
             destroy<AggregateFunctionStateData>();
             break;
@@ -813,6 +829,10 @@ struct Field::TypeToEnum<Tuple> {
     static const Types::Which value = Types::Tuple;
 };
 template <>
+struct Field::TypeToEnum<Map> {
+    static const Types::Which value = Types::Map;
+};
+template <>
 struct Field::TypeToEnum<DecimalField<Decimal32>> {
     static const Types::Which value = Types::Decimal32;
 };
@@ -874,6 +894,10 @@ struct Field::EnumToType<Field::Types::Tuple> {
     using Type = Tuple;
 };
 template <>
+struct Field::EnumToType<Field::Types::Map> {
+    using Type = Map;
+};
+template <>
 struct Field::EnumToType<Field::Types::Decimal32> {
     using Type = DecimalField<Decimal32>;
 };
@@ -923,6 +947,10 @@ struct TypeName<Tuple> {
     static std::string get() { return "Tuple"; }
 };
 template <>
+struct TypeName<Map> {
+    static std::string get() { return "Map"; }
+};
+template <>
 struct TypeName<AggregateFunctionStateData> {
     static std::string get() { return "AggregateFunctionState"; }
 };
@@ -1050,6 +1078,10 @@ struct NearestFieldTypeImpl<Tuple> {
     using Type = Tuple;
 };
 template <>
+struct NearestFieldTypeImpl<Map> {
+    using Type = Map;
+};
+template <>
 struct NearestFieldTypeImpl<bool> {
     using Type = UInt64;
 };
diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h
index aea08e11be..c9ee56c099 100644
--- a/be/src/vec/core/types.h
+++ b/be/src/vec/core/types.h
@@ -80,6 +80,7 @@ enum class TypeIndex {
     FixedLengthObject,
     JSONB,
     Decimal128I,
+    Map,
     Struct,
 };
 
@@ -506,6 +507,8 @@ inline const char* getTypeName(TypeIndex idx) {
         return "Array";
     case TypeIndex::Tuple:
         return "Tuple";
+    case TypeIndex::Map:
+        return "Map";
     case TypeIndex::Set:
         return "Set";
     case TypeIndex::Interval:
diff --git a/be/src/vec/data_types/data_type.cpp 
b/be/src/vec/data_types/data_type.cpp
index 360fa48a98..3fa53aa49c 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -151,6 +151,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const 
IDataType* data_type) {
         return PGenericType::FIXEDLENGTHOBJECT;
     case TypeIndex::JSONB:
         return PGenericType::JSONB;
+    case TypeIndex::Map:
+        return PGenericType::MAP;
     default:
         return PGenericType::UNKNOWN;
     }
diff --git a/be/src/vec/data_types/data_type.h 
b/be/src/vec/data_types/data_type.h
index e5f0e6aa4c..5e338b9658 100644
--- a/be/src/vec/data_types/data_type.h
+++ b/be/src/vec/data_types/data_type.h
@@ -314,6 +314,7 @@ struct WhichDataType {
     bool is_uuid() const { return idx == TypeIndex::UUID; }
     bool is_array() const { return idx == TypeIndex::Array; }
     bool is_tuple() const { return idx == TypeIndex::Tuple; }
+    bool is_map() const { return idx == TypeIndex::Map; }
     bool is_set() const { return idx == TypeIndex::Set; }
     bool is_interval() const { return idx == TypeIndex::Interval; }
 
@@ -355,7 +356,9 @@ inline bool is_tuple(const DataTypePtr& data_type) {
 inline bool is_array(const DataTypePtr& data_type) {
     return WhichDataType(data_type).is_array();
 }
-
+inline bool is_map(const DataTypePtr& data_type) {
+    return WhichDataType(data_type).is_map();
+}
 inline bool is_nothing(const DataTypePtr& data_type) {
     return WhichDataType(data_type).is_nothing();
 }
diff --git a/be/src/vec/data_types/data_type_factory.cpp 
b/be/src/vec/data_types/data_type_factory.cpp
index 78c5fae6b6..f1cc28003e 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -29,6 +29,11 @@ DataTypePtr DataTypeFactory::create_data_type(const 
doris::Field& col_desc) {
     if (col_desc.type() == OLAP_FIELD_TYPE_ARRAY) {
         DCHECK(col_desc.get_sub_field_count() == 1);
         nested = 
std::make_shared<DataTypeArray>(create_data_type(*col_desc.get_sub_field(0)));
+    } else if (col_desc.type() == OLAP_FIELD_TYPE_MAP) {
+        DCHECK(col_desc.get_sub_field_count() == 2);
+        nested = std::make_shared<vectorized::DataTypeMap>(
+                create_data_type(*col_desc.get_sub_field(0)),
+                create_data_type(*col_desc.get_sub_field(1)));
     } else if (col_desc.type() == OLAP_FIELD_TYPE_STRUCT) {
         DCHECK(col_desc.get_sub_field_count() >= 1);
         size_t field_size = col_desc.get_sub_field_count();
@@ -57,6 +62,11 @@ DataTypePtr DataTypeFactory::create_data_type(const 
TabletColumn& col_desc, bool
     if (col_desc.type() == OLAP_FIELD_TYPE_ARRAY) {
         DCHECK(col_desc.get_subtype_count() == 1);
         nested = 
std::make_shared<DataTypeArray>(create_data_type(col_desc.get_sub_column(0)));
+    } else if (col_desc.type() == OLAP_FIELD_TYPE_MAP) {
+        DCHECK(col_desc.get_subtype_count() == 2);
+        nested = std::make_shared<vectorized::DataTypeMap>(
+                create_data_type(col_desc.get_sub_column(0)),
+                create_data_type(col_desc.get_sub_column(1)));
     } else if (col_desc.type() == OLAP_FIELD_TYPE_STRUCT) {
         DCHECK(col_desc.get_subtype_count() >= 1);
         size_t col_size = col_desc.get_subtype_count();
@@ -155,6 +165,12 @@ DataTypePtr DataTypeFactory::create_data_type(const 
TypeDescriptor& col_desc, bo
         nested = std::make_shared<vectorized::DataTypeArray>(
                 create_data_type(col_desc.children[0], 
col_desc.contains_nulls[0]));
         break;
+    case TYPE_MAP:
+        DCHECK(col_desc.children.size() == 2);
+        nested = std::make_shared<vectorized::DataTypeMap>(
+                create_data_type(col_desc.children[0], 
col_desc.contains_nulls[0]),
+                create_data_type(col_desc.children[1], 
col_desc.contains_nulls[1]));
+        break;
     case TYPE_STRUCT: {
         DCHECK(col_desc.children.size() >= 1);
         size_t child_size = col_desc.children.size();
@@ -338,6 +354,13 @@ DataTypePtr DataTypeFactory::create_data_type(const 
PColumnMeta& pcolumn) {
     case PGenericType::FIXEDLENGTHOBJECT:
         nested = std::make_shared<DataTypeFixedLengthObject>();
         break;
+    case PGenericType::MAP:
+        DCHECK(pcolumn.children_size() == 2);
+        // here to check pcolumn is list?
+        nested = std::make_shared<vectorized::DataTypeMap>(
+                create_data_type(pcolumn.children(0).children(0)),
+                create_data_type(pcolumn.children(1).children(0)));
+        break;
     case PGenericType::STRUCT: {
         size_t col_size = pcolumn.children_size();
         DCHECK(col_size >= 1);
@@ -421,6 +444,12 @@ DataTypePtr DataTypeFactory::create_data_type(const 
arrow::DataType* type, bool
         nested = std::make_shared<vectorized::DataTypeArray>(
                 create_data_type(type->field(0)->type().get(), true));
         break;
+    case ::arrow::Type::MAP:
+        DCHECK(type->num_fields() == 2);
+        nested = std::make_shared<vectorized::DataTypeMap>(
+                create_data_type(type->field(0)->type().get(), true),
+                create_data_type(type->field(1)->type().get(), true));
+        break;
     default:
         DCHECK(false) << "invalid arrow type:" << (int)(type->id());
         break;
diff --git a/be/src/vec/data_types/data_type_factory.hpp 
b/be/src/vec/data_types/data_type_factory.hpp
index 9bc5e20d5c..879418a326 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -37,6 +37,7 @@
 #include "vec/data_types/data_type_fixed_length_object.h"
 #include "vec/data_types/data_type_hll.h"
 #include "vec/data_types/data_type_jsonb.h"
+#include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_nothing.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_number.h"
diff --git a/be/src/vec/data_types/data_type_map.cpp 
b/be/src/vec/data_types/data_type_map.cpp
new file mode 100644
index 0000000000..c40e0362c5
--- /dev/null
+++ b/be/src/vec/data_types/data_type_map.cpp
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "data_type_map.h"
+
+#include "gen_cpp/data.pb.h"
+#include "vec/columns/column_array.h"
+#include "vec/columns/column_map.h"
+#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris::vectorized {
+
+DataTypeMap::DataTypeMap(const DataTypePtr& keys_, const DataTypePtr& values_) 
{
+    key_type = keys_;
+    value_type = values_;
+
+    keys = std::make_shared<DataTypeArray>(key_type);
+    values = std::make_shared<DataTypeArray>(value_type);
+}
+
+std::string DataTypeMap::to_string(const IColumn& column, size_t row_num) 
const {
+    const ColumnMap& map_column = assert_cast<const ColumnMap&>(column);
+    const ColumnArray::Offsets64& offsets = map_column.get_offsets();
+
+    size_t offset = offsets[row_num - 1];
+    size_t next_offset = offsets[row_num];
+
+    auto& keys_arr = assert_cast<const ColumnArray&>(map_column.get_keys());
+    auto& values_arr = assert_cast<const 
ColumnArray&>(map_column.get_values());
+
+    const IColumn& nested_keys_column = keys_arr.get_data();
+    const IColumn& nested_values_column = values_arr.get_data();
+
+    std::stringstream ss;
+    ss << "{";
+    for (size_t i = offset; i < next_offset; ++i) {
+        if (i != offset) {
+            ss << ", ";
+        }
+        if (nested_keys_column.is_null_at(i)) {
+            ss << "NULL";
+        } else if 
(WhichDataType(remove_nullable(key_type)).is_string_or_fixed_string()) {
+            ss << "'" << key_type->to_string(nested_keys_column, i) << "'";
+        } else {
+            ss << key_type->to_string(nested_keys_column, i);
+        }
+        ss << ":";
+        if (nested_values_column.is_null_at(i)) {
+            ss << "NULL";
+        } else if 
(WhichDataType(remove_nullable(value_type)).is_string_or_fixed_string()) {
+            ss << "'" << value_type->to_string(nested_values_column, i) << "'";
+        } else {
+            ss << value_type->to_string(nested_values_column, i);
+        }
+    }
+    ss << "}";
+    return ss.str();
+}
+
+void DataTypeMap::to_string(const class doris::vectorized::IColumn& column, 
size_t row_num,
+                            class doris::vectorized::BufferWritable& ostr) 
const {
+    std::string ss = to_string(column, row_num);
+    ostr.write(ss.c_str(), strlen(ss.c_str()));
+}
+
+Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* column) const {
+    DCHECK(!rb.eof());
+    auto* map_column = assert_cast<ColumnMap*>(column);
+
+    if (*rb.position() != '{') {
+        return Status::InvalidArgument("map does not start with '{' character, 
found '{}'",
+                                       *rb.position());
+    }
+    if (*(rb.end() - 1) != '}') {
+        return Status::InvalidArgument("map does not end with '}' character, 
found '{}'",
+                                       *(rb.end() - 1));
+    }
+
+    std::stringstream keyCharset;
+    std::stringstream valCharset;
+
+    if (rb.count() == 2) {
+        // empty map {} , need to make empty array to add offset
+        keyCharset << "[]";
+        valCharset << "[]";
+    } else {
+        // {"aaa": 1, "bbb": 20}, need to handle key and value to make key 
column arr and value arr
+        // skip "{"
+        ++rb.position();
+        keyCharset << "[";
+        valCharset << "[";
+        while (!rb.eof()) {
+            size_t kv_len = 0;
+            auto start = rb.position();
+            while (!rb.eof() && *start != ',' && *start != '}') {
+                kv_len++;
+                start++;
+            }
+            if (kv_len >= rb.count()) {
+                return Status::InvalidArgument("Invalid Length");
+            }
+
+            size_t k_len = 0;
+            auto k_rb = rb.position();
+            while (kv_len > 0 && *k_rb != ':') {
+                k_len++;
+                k_rb++;
+            }
+            ReadBuffer key_rb(rb.position(), k_len);
+            ReadBuffer val_rb(k_rb + 1, kv_len - k_len - 1);
+
+            // handle key
+            keyCharset << key_rb.to_string();
+            keyCharset << ",";
+
+            // handle value
+            valCharset << val_rb.to_string();
+            valCharset << ",";
+
+            rb.position() += kv_len + 1;
+        }
+        keyCharset << ']';
+        valCharset << ']';
+    }
+
+    ReadBuffer kb(keyCharset.str().data(), keyCharset.str().length());
+    ReadBuffer vb(valCharset.str().data(), valCharset.str().length());
+    keys->from_string(kb, &map_column->get_keys());
+    values->from_string(vb, &map_column->get_values());
+    return Status::OK();
+}
+
+MutableColumnPtr DataTypeMap::create_column() const {
+    return ColumnMap::create(keys->create_column(), values->create_column());
+}
+
+void DataTypeMap::to_pb_column_meta(PColumnMeta* col_meta) const {
+    IDataType::to_pb_column_meta(col_meta);
+    auto key_children = col_meta->add_children();
+    auto value_children = col_meta->add_children();
+    keys->to_pb_column_meta(key_children);
+    values->to_pb_column_meta(value_children);
+}
+
+bool DataTypeMap::equals(const IDataType& rhs) const {
+    if (typeid(rhs) != typeid(*this)) {
+        return false;
+    }
+
+    const DataTypeMap& rhs_map = static_cast<const DataTypeMap&>(rhs);
+
+    if (!keys->equals(*rhs_map.keys)) {
+        return false;
+    }
+
+    if (!values->equals(*rhs_map.values)) {
+        return false;
+    }
+
+    return true;
+}
+
+int64_t DataTypeMap::get_uncompressed_serialized_bytes(const IColumn& column,
+                                                       int data_version) const 
{
+    auto ptr = column.convert_to_full_column_if_const();
+    const auto& data_column = assert_cast<const ColumnMap&>(*ptr.get());
+    return 
get_keys()->get_uncompressed_serialized_bytes(data_column.get_keys(), 
data_version) +
+           
get_values()->get_uncompressed_serialized_bytes(data_column.get_values(), 
data_version);
+}
+
+// serialize to binary
+char* DataTypeMap::serialize(const IColumn& column, char* buf, int 
data_version) const {
+    auto ptr = column.convert_to_full_column_if_const();
+    const auto& map_column = assert_cast<const ColumnMap&>(*ptr.get());
+
+    buf = get_keys()->serialize(map_column.get_keys(), buf, data_version);
+    return get_values()->serialize(map_column.get_values(), buf, data_version);
+}
+
+const char* DataTypeMap::deserialize(const char* buf, IColumn* column, int 
data_version) const {
+    const auto* map_column = assert_cast<const ColumnMap*>(column);
+    buf = get_keys()->deserialize(buf, 
map_column->get_keys_ptr()->assume_mutable(), data_version);
+    return get_values()->deserialize(buf, 
map_column->get_values_ptr()->assume_mutable(),
+                                     data_version);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_map.h 
b/be/src/vec/data_types/data_type_map.h
new file mode 100644
index 0000000000..58261b0b3d
--- /dev/null
+++ b/be/src/vec/data_types/data_type_map.h
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/DataTypes/DataTypeMap.h
+// and modified by Doris
+
+#pragma once
+
+#include "vec/data_types/data_type.h"
+
+namespace doris::vectorized {
+/** Map data type.
+  *
+  * Map's key and value only have types.
+  * If only one type is set, then key's type is "String" in default.
+  */
+class DataTypeMap final : public IDataType {
+private:
+    DataTypePtr key_type;
+    DataTypePtr value_type;
+    DataTypePtr keys;   // array
+    DataTypePtr values; // array
+
+public:
+    static constexpr bool is_parametric = true;
+
+    DataTypeMap(const DataTypePtr& keys_, const DataTypePtr& values_);
+
+    TypeIndex get_type_id() const override { return TypeIndex::Map; }
+    std::string do_get_name() const override {
+        return "Map(" + key_type->get_name() + ", " + value_type->get_name() + 
")";
+    }
+    const char* get_family_name() const override { return "Map"; }
+
+    bool can_be_inside_nullable() const override { return true; }
+    MutableColumnPtr create_column() const override;
+    Field get_default() const override { return Map(); };
+    bool equals(const IDataType& rhs) const override;
+    bool get_is_parametric() const override { return true; }
+    bool have_subtypes() const override { return true; }
+    bool is_comparable() const override {
+        return key_type->is_comparable() && value_type->is_comparable();
+    }
+    bool can_be_compared_with_collation() const override { return false; }
+    bool is_value_unambiguously_represented_in_contiguous_memory_region() 
const override {
+        return true;
+    }
+
+    const DataTypePtr& get_keys() const { return keys; }
+    const DataTypePtr& get_values() const { return values; }
+
+    const DataTypePtr& get_key_type() const { return key_type; }
+    const DataTypePtr& get_value_type() const { return value_type; }
+
+    int64_t get_uncompressed_serialized_bytes(const IColumn& column,
+                                              int be_exec_version) const 
override;
+    char* serialize(const IColumn& column, char* buf, int be_exec_version) 
const override;
+    const char* deserialize(const char* buf, IColumn* column, int 
be_exec_version) const override;
+
+    void to_pb_column_meta(PColumnMeta* col_meta) const override;
+
+    std::string to_string(const IColumn& column, size_t row_num) const 
override;
+    void to_string(const IColumn& column, size_t row_num, BufferWritable& 
ostr) const override;
+    Status from_string(ReadBuffer& rb, IColumn* column) const override;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 923aee07d6..ac6b4e53c6 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -33,6 +33,7 @@
 #include "vec/exprs/vin_predicate.h"
 #include "vec/exprs/vinfo_func.h"
 #include "vec/exprs/vliteral.h"
+#include "vec/exprs/vmap_literal.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
 #include "vec/exprs/vslot_ref.h"
 #include "vec/exprs/vstruct_literal.h"
@@ -125,6 +126,8 @@ Status VExpr::create_expr(doris::ObjectPool* pool, const 
doris::TExprNode& texpr
         *expr = pool->add(new VArrayLiteral(texpr_node));
         return Status::OK();
     }
+    case TExprNodeType::MAP_LITERAL: {
+        *expr = pool->add(new VMapLiteral(texpr_node));
     case TExprNodeType::STRUCT_LITERAL: {
         *expr = pool->add(new VStructLiteral(texpr_node));
         return Status::OK();
@@ -171,218 +174,220 @@ Status VExpr::create_expr(doris::ObjectPool* pool, 
const doris::TExprNode& texpr
     default:
         return Status::InternalError("Unknown expr node type: {}", 
texpr_node.node_type);
     }
-    return Status::OK();
-}
+        return Status::OK();
+    }
 
-Status VExpr::create_tree_from_thrift(doris::ObjectPool* pool,
-                                      const std::vector<doris::TExprNode>& 
nodes, VExpr* parent,
-                                      int* node_idx, VExpr** root_expr, 
VExprContext** ctx) {
-    // propagate error case
-    if (*node_idx >= nodes.size()) {
-        return Status::InternalError("Failed to reconstruct expression tree 
from thrift.");
-    }
-    int num_children = nodes[*node_idx].num_children;
-    VExpr* expr = nullptr;
-    RETURN_IF_ERROR(create_expr(pool, nodes[*node_idx], &expr));
-    DCHECK(expr != nullptr);
-    if (parent != nullptr) {
-        parent->add_child(expr);
-    } else {
-        DCHECK(root_expr != nullptr);
-        DCHECK(ctx != nullptr);
-        *root_expr = expr;
-        *ctx = pool->add(new VExprContext(expr));
-    }
-    for (int i = 0; i < num_children; i++) {
-        *node_idx += 1;
-        RETURN_IF_ERROR(create_tree_from_thrift(pool, nodes, expr, node_idx, 
nullptr, nullptr));
-        // we are expecting a child, but have used all nodes
-        // this means we have been given a bad tree and must fail
+    Status VExpr::create_tree_from_thrift(doris::ObjectPool * pool,
+                                          const std::vector<doris::TExprNode>& 
nodes, VExpr* parent,
+                                          int* node_idx, VExpr** root_expr, 
VExprContext** ctx) {
+        // propagate error case
         if (*node_idx >= nodes.size()) {
             return Status::InternalError("Failed to reconstruct expression 
tree from thrift.");
         }
-    }
-    return Status::OK();
-}
-
-Status VExpr::create_expr_tree(doris::ObjectPool* pool, const doris::TExpr& 
texpr,
-                               VExprContext** ctx) {
-    if (texpr.nodes.size() == 0) {
-        *ctx = nullptr;
+        int num_children = nodes[*node_idx].num_children;
+        VExpr* expr = nullptr;
+        RETURN_IF_ERROR(create_expr(pool, nodes[*node_idx], &expr));
+        DCHECK(expr != nullptr);
+        if (parent != nullptr) {
+            parent->add_child(expr);
+        } else {
+            DCHECK(root_expr != nullptr);
+            DCHECK(ctx != nullptr);
+            *root_expr = expr;
+            *ctx = pool->add(new VExprContext(expr));
+        }
+        for (int i = 0; i < num_children; i++) {
+            *node_idx += 1;
+            RETURN_IF_ERROR(create_tree_from_thrift(pool, nodes, expr, 
node_idx, nullptr, nullptr));
+            // we are expecting a child, but have used all nodes
+            // this means we have been given a bad tree and must fail
+            if (*node_idx >= nodes.size()) {
+                return Status::InternalError("Failed to reconstruct expression 
tree from thrift.");
+            }
+        }
         return Status::OK();
     }
-    int node_idx = 0;
-    VExpr* e = nullptr;
-    Status status = create_tree_from_thrift(pool, texpr.nodes, nullptr, 
&node_idx, &e, ctx);
-    if (status.ok() && node_idx + 1 != texpr.nodes.size()) {
-        status = Status::InternalError(
-                "Expression tree only partially reconstructed. Not all thrift 
nodes were used.");
-    }
-    if (!status.ok()) {
-        LOG(ERROR) << "Could not construct expr tree.\n"
-                   << status << "\n"
-                   << apache::thrift::ThriftDebugString(texpr);
-    }
-    return status;
-}
 
-Status VExpr::create_expr_trees(ObjectPool* pool, const 
std::vector<doris::TExpr>& texprs,
-                                std::vector<VExprContext*>* ctxs) {
-    ctxs->clear();
-    for (int i = 0; i < texprs.size(); ++i) {
-        VExprContext* ctx = nullptr;
-        RETURN_IF_ERROR(create_expr_tree(pool, texprs[i], &ctx));
-        ctxs->push_back(ctx);
+    Status VExpr::create_expr_tree(doris::ObjectPool * pool, const 
doris::TExpr& texpr,
+                                   VExprContext** ctx) {
+        if (texpr.nodes.size() == 0) {
+            *ctx = nullptr;
+            return Status::OK();
+        }
+        int node_idx = 0;
+        VExpr* e = nullptr;
+        Status status = create_tree_from_thrift(pool, texpr.nodes, nullptr, 
&node_idx, &e, ctx);
+        if (status.ok() && node_idx + 1 != texpr.nodes.size()) {
+            status = Status::InternalError(
+                    "Expression tree only partially reconstructed. Not all 
thrift nodes were "
+                    "used.");
+        }
+        if (!status.ok()) {
+            LOG(ERROR) << "Could not construct expr tree.\n"
+                       << status << "\n"
+                       << apache::thrift::ThriftDebugString(texpr);
+        }
+        return status;
     }
-    return Status::OK();
-}
 
-Status VExpr::prepare(const std::vector<VExprContext*>& ctxs, RuntimeState* 
state,
-                      const RowDescriptor& row_desc) {
-    for (auto ctx : ctxs) {
-        RETURN_IF_ERROR(ctx->prepare(state, row_desc));
+    Status VExpr::create_expr_trees(ObjectPool * pool, const 
std::vector<doris::TExpr>& texprs,
+                                    std::vector<VExprContext*>* ctxs) {
+        ctxs->clear();
+        for (int i = 0; i < texprs.size(); ++i) {
+            VExprContext* ctx = nullptr;
+            RETURN_IF_ERROR(create_expr_tree(pool, texprs[i], &ctx));
+            ctxs->push_back(ctx);
+        }
+        return Status::OK();
     }
-    return Status::OK();
-}
 
-void VExpr::close(const std::vector<VExprContext*>& ctxs, RuntimeState* state) 
{
-    for (auto ctx : ctxs) {
-        ctx->close(state);
+    Status VExpr::prepare(const std::vector<VExprContext*>& ctxs, 
RuntimeState* state,
+                          const RowDescriptor& row_desc) {
+        for (auto ctx : ctxs) {
+            RETURN_IF_ERROR(ctx->prepare(state, row_desc));
+        }
+        return Status::OK();
     }
-}
 
-Status VExpr::open(const std::vector<VExprContext*>& ctxs, RuntimeState* 
state) {
-    for (int i = 0; i < ctxs.size(); ++i) {
-        RETURN_IF_ERROR(ctxs[i]->open(state));
+    void VExpr::close(const std::vector<VExprContext*>& ctxs, RuntimeState* 
state) {
+        for (auto ctx : ctxs) {
+            ctx->close(state);
+        }
     }
-    return Status::OK();
-}
 
-Status VExpr::clone_if_not_exists(const std::vector<VExprContext*>& ctxs, 
RuntimeState* state,
-                                  std::vector<VExprContext*>* new_ctxs) {
-    DCHECK(new_ctxs != nullptr);
-    if (!new_ctxs->empty()) {
-        // 'ctxs' was already cloned into '*new_ctxs', nothing to do.
-        DCHECK_EQ(new_ctxs->size(), ctxs.size());
-        for (int i = 0; i < new_ctxs->size(); ++i) {
-            DCHECK((*new_ctxs)[i]->_is_clone);
+    Status VExpr::open(const std::vector<VExprContext*>& ctxs, RuntimeState* 
state) {
+        for (int i = 0; i < ctxs.size(); ++i) {
+            RETURN_IF_ERROR(ctxs[i]->open(state));
         }
         return Status::OK();
     }
-    new_ctxs->resize(ctxs.size());
-    for (int i = 0; i < ctxs.size(); ++i) {
-        RETURN_IF_ERROR(ctxs[i]->clone(state, &(*new_ctxs)[i]));
-    }
-    return Status::OK();
-}
-std::string VExpr::debug_string() const {
-    // TODO: implement partial debug string for member vars
-    std::stringstream out;
-    out << " type=" << _type.debug_string();
-    out << " codegen="
-        << "false";
 
-    if (!_children.empty()) {
-        out << " children=" << debug_string(_children);
+    Status VExpr::clone_if_not_exists(const std::vector<VExprContext*>& ctxs, 
RuntimeState* state,
+                                      std::vector<VExprContext*>* new_ctxs) {
+        DCHECK(new_ctxs != nullptr);
+        if (!new_ctxs->empty()) {
+            // 'ctxs' was already cloned into '*new_ctxs', nothing to do.
+            DCHECK_EQ(new_ctxs->size(), ctxs.size());
+            for (int i = 0; i < new_ctxs->size(); ++i) {
+                DCHECK((*new_ctxs)[i]->_is_clone);
+            }
+            return Status::OK();
+        }
+        new_ctxs->resize(ctxs.size());
+        for (int i = 0; i < ctxs.size(); ++i) {
+            RETURN_IF_ERROR(ctxs[i]->clone(state, &(*new_ctxs)[i]));
+        }
+        return Status::OK();
     }
+    std::string VExpr::debug_string() const {
+        // TODO: implement partial debug string for member vars
+        std::stringstream out;
+        out << " type=" << _type.debug_string();
+        out << " codegen="
+            << "false";
 
-    return out.str();
-}
-
-std::string VExpr::debug_string(const std::vector<VExpr*>& exprs) {
-    std::stringstream out;
-    out << "[";
+        if (!_children.empty()) {
+            out << " children=" << debug_string(_children);
+        }
 
-    for (int i = 0; i < exprs.size(); ++i) {
-        out << (i == 0 ? "" : " ") << exprs[i]->debug_string();
+        return out.str();
     }
 
-    out << "]";
-    return out.str();
-}
+    std::string VExpr::debug_string(const std::vector<VExpr*>& exprs) {
+        std::stringstream out;
+        out << "[";
 
-std::string VExpr::debug_string(const std::vector<VExprContext*>& ctxs) {
-    std::vector<VExpr*> exprs;
-    for (int i = 0; i < ctxs.size(); ++i) {
-        exprs.push_back(ctxs[i]->root());
+        for (int i = 0; i < exprs.size(); ++i) {
+            out << (i == 0 ? "" : " ") << exprs[i]->debug_string();
+        }
+
+        out << "]";
+        return out.str();
     }
-    return debug_string(exprs);
-}
 
-bool VExpr::is_constant() const {
-    for (int i = 0; i < _children.size(); ++i) {
-        if (!_children[i]->is_constant()) {
-            return false;
+    std::string VExpr::debug_string(const std::vector<VExprContext*>& ctxs) {
+        std::vector<VExpr*> exprs;
+        for (int i = 0; i < ctxs.size(); ++i) {
+            exprs.push_back(ctxs[i]->root());
         }
+        return debug_string(exprs);
     }
 
-    return true;
-}
+    bool VExpr::is_constant() const {
+        for (int i = 0; i < _children.size(); ++i) {
+            if (!_children[i]->is_constant()) {
+                return false;
+            }
+        }
 
-Status VExpr::get_const_col(VExprContext* context, ColumnPtrWrapper** output) {
-    *output = nullptr;
-    if (!is_constant()) {
-        return Status::OK();
+        return true;
     }
 
-    if (_constant_col != nullptr) {
+    Status VExpr::get_const_col(VExprContext * context, ColumnPtrWrapper * 
*output) {
+        *output = nullptr;
+        if (!is_constant()) {
+            return Status::OK();
+        }
+
+        if (_constant_col != nullptr) {
+            *output = _constant_col.get();
+            return Status::OK();
+        }
+
+        int result = -1;
+        Block block;
+        // If block is empty, some functions will produce no result. So we 
insert a column with
+        // single value here.
+        block.insert({ColumnUInt8::create(1), 
std::make_shared<DataTypeUInt8>(), ""});
+        RETURN_IF_ERROR(execute(context, &block, &result));
+        DCHECK(result != -1);
+        const auto& column = block.get_by_position(result).column;
+        _constant_col = std::make_shared<ColumnPtrWrapper>(column);
         *output = _constant_col.get();
         return Status::OK();
     }
 
-    int result = -1;
-    Block block;
-    // If block is empty, some functions will produce no result. So we insert 
a column with
-    // single value here.
-    block.insert({ColumnUInt8::create(1), std::make_shared<DataTypeUInt8>(), 
""});
-    RETURN_IF_ERROR(execute(context, &block, &result));
-    DCHECK(result != -1);
-    const auto& column = block.get_by_position(result).column;
-    _constant_col = std::make_shared<ColumnPtrWrapper>(column);
-    *output = _constant_col.get();
-    return Status::OK();
-}
+    void VExpr::register_function_context(doris::RuntimeState * state, 
VExprContext * context) {
+        FunctionContext::TypeDesc return_type = 
AnyValUtil::column_type_to_type_desc(_type);
+        std::vector<FunctionContext::TypeDesc> arg_types;
+        for (int i = 0; i < _children.size(); ++i) {
+            
arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        }
 
-void VExpr::register_function_context(doris::RuntimeState* state, 
VExprContext* context) {
-    FunctionContext::TypeDesc return_type = 
AnyValUtil::column_type_to_type_desc(_type);
-    std::vector<FunctionContext::TypeDesc> arg_types;
-    for (int i = 0; i < _children.size(); ++i) {
-        
arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        _fn_context_index = context->register_func(state, return_type, 
arg_types, 0);
     }
 
-    _fn_context_index = context->register_func(state, return_type, arg_types, 
0);
-}
-
-Status VExpr::init_function_context(VExprContext* context,
-                                    FunctionContext::FunctionStateScope scope,
-                                    const FunctionBasePtr& function) const {
-    FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
-    if (scope == FunctionContext::FRAGMENT_LOCAL) {
-        std::vector<ColumnPtrWrapper*> constant_cols;
-        for (auto c : _children) {
-            ColumnPtrWrapper* const_col_wrapper = nullptr;
-            RETURN_IF_ERROR(c->get_const_col(context, &const_col_wrapper));
-            constant_cols.push_back(const_col_wrapper);
+    Status VExpr::init_function_context(VExprContext * context,
+                                        FunctionContext::FunctionStateScope 
scope,
+                                        const FunctionBasePtr& function) const 
{
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        if (scope == FunctionContext::FRAGMENT_LOCAL) {
+            std::vector<ColumnPtrWrapper*> constant_cols;
+            for (auto c : _children) {
+                ColumnPtrWrapper* const_col_wrapper = nullptr;
+                RETURN_IF_ERROR(c->get_const_col(context, &const_col_wrapper));
+                constant_cols.push_back(const_col_wrapper);
+            }
+            fn_ctx->impl()->set_constant_cols(constant_cols);
         }
-        fn_ctx->impl()->set_constant_cols(constant_cols);
-    }
 
-    if (scope == FunctionContext::FRAGMENT_LOCAL) {
-        RETURN_IF_ERROR(function->prepare(fn_ctx, 
FunctionContext::FRAGMENT_LOCAL));
+        if (scope == FunctionContext::FRAGMENT_LOCAL) {
+            RETURN_IF_ERROR(function->prepare(fn_ctx, 
FunctionContext::FRAGMENT_LOCAL));
+        }
+        RETURN_IF_ERROR(function->prepare(fn_ctx, 
FunctionContext::THREAD_LOCAL));
+        return Status::OK();
     }
-    RETURN_IF_ERROR(function->prepare(fn_ctx, FunctionContext::THREAD_LOCAL));
-    return Status::OK();
-}
 
-void VExpr::close_function_context(VExprContext* context, 
FunctionContext::FunctionStateScope scope,
-                                   const FunctionBasePtr& function) const {
-    if (_fn_context_index != -1 && !context->_stale) {
-        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
-        function->close(fn_ctx, FunctionContext::THREAD_LOCAL);
-        if (scope == FunctionContext::FRAGMENT_LOCAL) {
-            function->close(fn_ctx, FunctionContext::FRAGMENT_LOCAL);
+    void VExpr::close_function_context(VExprContext * context,
+                                       FunctionContext::FunctionStateScope 
scope,
+                                       const FunctionBasePtr& function) const {
+        if (_fn_context_index != -1 && !context->_stale) {
+            FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+            function->close(fn_ctx, FunctionContext::THREAD_LOCAL);
+            if (scope == FunctionContext::FRAGMENT_LOCAL) {
+                function->close(fn_ctx, FunctionContext::FRAGMENT_LOCAL);
+            }
         }
     }
-}
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vmap_literal.cpp 
b/be/src/vec/exprs/vmap_literal.cpp
new file mode 100644
index 0000000000..954142f04d
--- /dev/null
+++ b/be/src/vec/exprs/vmap_literal.cpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exprs/vmap_literal.h"
+
+//insert into table_map values ({'name':'zhangsan', 'gender':'male'}), 
({'name':'lisi', 'gender':'female'});
+namespace doris::vectorized {
+
+Status VMapLiteral::prepare(RuntimeState* state, const RowDescriptor& row_desc,
+                            VExprContext* context) {
+    DCHECK_EQ(type().children.size(), 2) << "map children type not 2";
+
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, row_desc, context));
+    // map-field should contain two vector field for keys and values
+    Field map = Map();
+    Field keys = Array();
+    Field values = Array();
+    // each child is slot with key1, value1, key2, value2...
+    for (int idx = 0; idx < _children.size(); ++idx) {
+        Field item;
+        ColumnPtrWrapper* const_col_wrapper = nullptr;
+        RETURN_IF_ERROR(_children[idx]->get_const_col(context, 
&const_col_wrapper));
+        const_col_wrapper->column_ptr->get(0, item);
+
+        if ((idx & 1) == 0) {
+            keys.get<Array>().push_back(item);
+        } else {
+            values.get<Array>().push_back(item);
+        }
+    }
+    map.get<Map>().push_back(keys);
+    map.get<Map>().push_back(values);
+
+    _column_ptr = _data_type->create_column_const(1, map);
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vmap_literal.h b/be/src/vec/exprs/vmap_literal.h
new file mode 100644
index 0000000000..6206d4c58f
--- /dev/null
+++ b/be/src/vec/exprs/vmap_literal.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "vec/exprs/vliteral.h"
+
+namespace doris {
+
+namespace vectorized {
+class VMapLiteral : public VLiteral {
+public:
+    VMapLiteral(const TExprNode& node) : VLiteral(node, false) {}
+    ~VMapLiteral() override = default;
+    Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
+                   VExprContext* context) override;
+};
+} // namespace vectorized
+
+} // namespace doris
diff --git a/be/src/vec/functions/array/function_array_element.h 
b/be/src/vec/functions/array/function_array_element.h
index 6722e09e9c..ac685b172b 100644
--- a/be/src/vec/functions/array/function_array_element.h
+++ b/be/src/vec/functions/array/function_array_element.h
@@ -22,9 +22,10 @@
 #include <string_view>
 
 #include "vec/columns/column_array.h"
-#include "vec/columns/column_const.h"
+#include "vec/columns/column_map.h"
 #include "vec/columns/column_string.h"
 #include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/functions/function.h"
 #include "vec/functions/function_helpers.h"
@@ -44,12 +45,21 @@ public:
     size_t get_number_of_arguments() const override { return 2; }
 
     DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
-        DCHECK(is_array(arguments[0]))
-                << "first argument for function: " << name << " should be 
DataTypeArray";
-        DCHECK(is_integer(arguments[1]))
-                << "second argument for function: " << name << " should be 
Integer";
-        return make_nullable(
-                
check_and_get_data_type<DataTypeArray>(arguments[0].get())->get_nested_type());
+        DCHECK(is_array(arguments[0]) || is_map(arguments[0]))
+                << "first argument for function: " << name
+                << " should be DataTypeArray or DataTypeMap";
+        if (is_array(arguments[0])) {
+            DCHECK(is_integer(arguments[1])) << "second argument for function: 
" << name
+                                             << " should be Integer for array 
element";
+            return make_nullable(
+                    
check_and_get_data_type<DataTypeArray>(arguments[0].get())->get_nested_type());
+        } else if (is_map(arguments[0])) {
+            return make_nullable(
+                    
check_and_get_data_type<DataTypeMap>(arguments[0].get())->get_value_type());
+        } else {
+            LOG(ERROR) << "element_at only support array and map so far.";
+            return nullptr;
+        }
     }
 
     Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
@@ -68,19 +78,64 @@ public:
         } else {
             args = {col_left, block.get_by_position(arguments[1])};
         }
-
-        auto res_column = _execute_non_nullable(args, input_rows_count, 
src_null_map, dst_null_map);
+        ColumnPtr res_column = nullptr;
+        if (args[0].column->is_column_map()) {
+            res_column = _execute_map(args, input_rows_count, src_null_map, 
dst_null_map);
+        } else {
+            res_column = _execute_non_nullable(args, input_rows_count, 
src_null_map, dst_null_map);
+        }
         if (!res_column) {
             return Status::RuntimeError("unsupported types for function {}({}, 
{})", get_name(),
                                         
block.get_by_position(arguments[0]).type->get_name(),
                                         
block.get_by_position(arguments[1]).type->get_name());
         }
-        block.replace_by_position(
-                result, ColumnNullable::create(std::move(res_column), 
std::move(dst_null_column)));
+        block.replace_by_position(result,
+                                  ColumnNullable::create(res_column, 
std::move(dst_null_column)));
         return Status::OK();
     }
 
 private:
+    //=========================== map element===========================//
+    ColumnPtr _get_mapped_idx(const ColumnArray& key_column,
+                              const ColumnWithTypeAndName& argument) {
+        return _mapped_key(key_column, argument);
+    }
+
+    ColumnPtr _mapped_key(const ColumnArray& column, const 
ColumnWithTypeAndName& argument) {
+        auto right_column = argument.column->convert_to_full_column_if_const();
+        const ColumnArray::Offsets64& offsets = column.get_offsets();
+        ColumnPtr nested_ptr = nullptr;
+        if (is_column_nullable(column.get_data())) {
+            nested_ptr = reinterpret_cast<const 
ColumnNullable&>(column.get_data())
+                                 .get_nested_column_ptr();
+        } else {
+            nested_ptr = column.get_data_ptr();
+        }
+        size_t rows = offsets.size();
+        // prepare return data
+        auto matched_indices = ColumnVector<Int8>::create();
+        matched_indices->reserve(rows);
+
+        for (size_t i = 0; i < rows; i++) {
+            bool matched = false;
+            size_t begin = offsets[i - 1];
+            size_t end = offsets[i];
+            for (size_t j = begin; j < end; j++) {
+                if (nested_ptr->compare_at(j, i, *right_column, -1) == 0) {
+                    matched_indices->insert_value(j - begin + 1);
+                    matched = true;
+                    break;
+                }
+            }
+
+            if (!matched) {
+                matched_indices->insert_value(end - begin + 1); // make 
indices for null
+            }
+        }
+
+        return matched_indices;
+    }
+
     template <typename ColumnType>
     ColumnPtr _execute_number(const ColumnArray::Offsets64& offsets, const 
IColumn& nested_column,
                               const UInt8* arr_null_map, const IColumn& 
indices,
@@ -176,6 +231,33 @@ private:
         return dst_column;
     }
 
+    ColumnPtr _execute_map(const ColumnsWithTypeAndName& arguments, size_t 
input_rows_count,
+                           const UInt8* src_null_map, UInt8* dst_null_map) {
+        auto left_column = 
arguments[0].column->convert_to_full_column_if_const();
+        DataTypePtr val_type =
+                reinterpret_cast<const 
DataTypeMap&>(*arguments[0].type).get_values();
+        const auto& map_column = reinterpret_cast<const 
ColumnMap&>(*left_column);
+
+        const ColumnArray& column_keys = assert_cast<const 
ColumnArray&>(map_column.get_keys());
+
+        const auto& offsets = column_keys.get_offsets();
+        const size_t rows = offsets.size();
+
+        if (rows <= 0) {
+            return nullptr;
+        }
+
+        ColumnPtr matched_indices = _get_mapped_idx(column_keys, arguments[1]);
+        if (!matched_indices) {
+            return nullptr;
+        }
+        DataTypePtr indices_type(std::make_shared<vectorized::DataTypeInt8>());
+        ColumnWithTypeAndName indices(matched_indices, indices_type, 
"indices");
+        ColumnWithTypeAndName data(map_column.get_values_ptr(), val_type, 
"value");
+        ColumnsWithTypeAndName args = {data, indices};
+        return _execute_non_nullable(args, input_rows_count, src_null_map, 
dst_null_map);
+    }
+
     ColumnPtr _execute_non_nullable(const ColumnsWithTypeAndName& arguments,
                                     size_t input_rows_count, const UInt8* 
src_null_map,
                                     UInt8* dst_null_map) {
diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index c96726d5fe..75aae96146 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -56,7 +56,6 @@ 
OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
     case FieldType::OLAP_FIELD_TYPE_CHAR: {
         return std::make_unique<OlapColumnDataConvertorChar>(column.length());
     }
-    case FieldType::OLAP_FIELD_TYPE_MAP:
     case FieldType::OLAP_FIELD_TYPE_VARCHAR: {
         return std::make_unique<OlapColumnDataConvertorVarChar>(false);
     }
@@ -129,6 +128,15 @@ 
OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
         return std::make_unique<OlapColumnDataConvertorArray>(
                 create_olap_column_data_convertor(sub_column));
     }
+    case FieldType::OLAP_FIELD_TYPE_MAP: {
+        const auto& key_column = column.get_sub_column(0);
+        const auto& value_column = column.get_sub_column(1);
+        return std::make_unique<OlapColumnDataConvertorMap>(
+                std::make_unique<OlapColumnDataConvertorArray>(
+                        create_olap_column_data_convertor(key_column)),
+                std::make_unique<OlapColumnDataConvertorArray>(
+                        create_olap_column_data_convertor(value_column)));
+    }
     default: {
         DCHECK(false) << "Invalid type in RowBlockV2:" << column.type();
         return nullptr;
@@ -774,4 +782,56 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap(
     return Status::OK();
 }
 
+Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap() {
+    const ColumnMap* column_map = nullptr;
+    const DataTypeMap* data_type_map = nullptr;
+    if (_nullmap) {
+        const auto* nullable_column =
+                assert_cast<const ColumnNullable*>(_typed_column.column.get());
+        column_map = assert_cast<const 
ColumnMap*>(nullable_column->get_nested_column_ptr().get());
+        data_type_map = assert_cast<const DataTypeMap*>(
+                (assert_cast<const 
DataTypeNullable*>(_typed_column.type.get())->get_nested_type())
+                        .get());
+    } else {
+        column_map = assert_cast<const ColumnMap*>(_typed_column.column.get());
+        data_type_map = assert_cast<const 
DataTypeMap*>(_typed_column.type.get());
+    }
+    assert(column_map);
+    assert(data_type_map);
+
+    return convert_to_olap(column_map, data_type_map);
+}
+
+Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
+        const ColumnMap* column_map, const DataTypeMap* data_type_map) {
+    ColumnPtr key_data = column_map->get_keys_ptr();
+    ColumnPtr value_data = column_map->get_values_ptr();
+    if (column_map->get_keys().is_nullable()) {
+        const auto& key_nullable_column =
+                assert_cast<const ColumnNullable&>(column_map->get_keys());
+        key_data = key_nullable_column.get_nested_column_ptr();
+    }
+
+    if (column_map->get_values().is_nullable()) {
+        const auto& val_nullable_column =
+                assert_cast<const ColumnNullable&>(column_map->get_values());
+        value_data = val_nullable_column.get_nested_column_ptr();
+    }
+
+    ColumnWithTypeAndName key_typed_column = {key_data, 
remove_nullable(data_type_map->get_keys()),
+                                              "map.key"};
+    _key_convertor->set_source_column(key_typed_column, _row_pos, _num_rows);
+    _key_convertor->convert_to_olap();
+
+    ColumnWithTypeAndName value_typed_column = {
+            value_data, remove_nullable(data_type_map->get_values()), 
"map.value"};
+    _value_convertor->set_source_column(value_typed_column, _row_pos, 
_num_rows);
+    _value_convertor->convert_to_olap();
+
+    _results[0] = _key_convertor->get_data();
+    _results[1] = _value_convertor->get_data();
+
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/olap/olap_data_convertor.h 
b/be/src/vec/olap/olap_data_convertor.h
index aafbe8d3e3..1192838a6c 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -19,9 +19,11 @@
 
 #include "olap/types.h"
 #include "runtime/mem_pool.h"
+#include "vec/columns/column_map.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type_map.h"
 
 namespace doris {
 
@@ -394,6 +396,29 @@ private:
         OlapColumnDataConvertorBaseUPtr _item_convertor;
     };
 
+    class OlapColumnDataConvertorMap : public OlapColumnDataConvertorBase {
+    public:
+        OlapColumnDataConvertorMap(OlapColumnDataConvertorBaseUPtr 
key_convertor,
+                                   OlapColumnDataConvertorBaseUPtr 
value_convertor)
+                : _key_convertor(std::move(key_convertor)),
+                  _value_convertor(std::move(value_convertor)) {
+            _results.resize(2);
+        }
+
+        Status convert_to_olap() override;
+        const void* get_data() const override { return _results.data(); };
+
+        const void* get_data_at(size_t offset) const override {
+            LOG(FATAL) << "now not support get_data_at for 
OlapColumnDataConvertorMap";
+        };
+
+    private:
+        Status convert_to_olap(const ColumnMap* column_map, const DataTypeMap* 
data_type_map);
+        OlapColumnDataConvertorBaseUPtr _key_convertor;
+        OlapColumnDataConvertorBaseUPtr _value_convertor;
+        std::vector<const void*> _results;
+    }; //OlapColumnDataConvertorMap
+
 private:
     std::vector<OlapColumnDataConvertorBaseUPtr> _convertors;
 };
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp 
b/be/src/vec/sink/vmysql_result_writer.cpp
index 3eb131b29b..fdc0b57353 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -29,6 +29,7 @@
 #include "vec/common/assert_cast.h"
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_struct.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
@@ -185,6 +186,22 @@ Status VMysqlResultWriter::_add_one_column(const 
ColumnPtr& column_ptr,
                 begin = false;
             }
             buf_ret = _buffer.push_string("]", 1);
+            _buffer.close_dynamic_mode();
+            result->result_batch.rows[i].append(_buffer.buf(), 
_buffer.length());
+        }
+    } else if constexpr (type == TYPE_MAP) {
+        DCHECK_GE(sub_types.size(), 1);
+        auto& map_type = assert_cast<const DataTypeMap&>(*sub_types[0]);
+        for (ssize_t i = 0; i < row_size; ++i) {
+            if (0 != buf_ret) {
+                return Status::InternalError("pack mysql buffer failed.");
+            }
+            _buffer.reset();
+
+            _buffer.open_dynamic_mode();
+            std::string cell_str = map_type.to_string(*column, i);
+            buf_ret = _buffer.push_string(cell_str.c_str(), 
strlen(cell_str.c_str()));
+
             _buffer.close_dynamic_mode();
             result->result_batch.rows[i].append(_buffer.buf(), 
_buffer.length());
         }
@@ -715,6 +732,18 @@ Status VMysqlResultWriter::append_block(Block& 
input_block) {
             }
             break;
         }
+        case TYPE_MAP: {
+            if (type_ptr->is_nullable()) {
+                auto& nested_type =
+                        assert_cast<const 
DataTypeNullable&>(*type_ptr).get_nested_type(); //for map
+                status = _add_one_column<PrimitiveType::TYPE_MAP, 
true>(column_ptr, result, scale,
+                                                                        
{nested_type});
+            } else {
+                status = _add_one_column<PrimitiveType::TYPE_MAP, 
false>(column_ptr, result, scale,
+                                                                         
{type_ptr});
+            }
+            break;
+        }
         default: {
             LOG(WARNING) << "can't convert this type to mysql type. type = "
                          << _output_vexpr_ctxs[i]->root()->type();
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 35be33e918..776fb91f17 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -689,7 +689,7 @@ nonterminal SelectList select_clause, select_list, 
select_sublist;
 nonterminal SelectListItem select_list_item, star_expr;
 nonterminal Expr expr, non_pred_expr, arithmetic_expr, 
timestamp_arithmetic_expr, expr_or_default;
 nonterminal Expr set_expr_or_default;
-nonterminal ArrayList<Expr> expr_list, values, row_value, opt_values;
+nonterminal ArrayList<Expr> expr_list, values, row_value, opt_values, kv_list;
 nonterminal ArrayList<Expr> func_arg_list;
 nonterminal ArrayList<Expr> expr_pipe_list;
 nonterminal String select_alias, opt_table_alias, lock_alias, opt_alias;
@@ -727,6 +727,7 @@ nonterminal ArrayList<CaseWhenClause> case_when_clause_list;
 nonterminal FunctionParams function_params;
 nonterminal Expr function_call_expr, array_expr;
 nonterminal ArrayLiteral array_literal;
+nonterminal MapLiteral map_literal;
 nonterminal StructField struct_field;
 nonterminal ArrayList<StructField> struct_field_list;
 nonterminal StructLiteral struct_literal;
@@ -5831,6 +5832,33 @@ array_expr ::=
   :}
   ;
 
+kv_list ::=
+  expr:k COLON expr:v
+  {:
+     ArrayList<Expr> list = new ArrayList<Expr>();
+     list.add(k);
+     list.add(v);
+     RESULT = list ;
+  :}
+  |kv_list:list COMMA expr:k COLON expr:v
+  {:
+       list.add(k);
+       list.add(v);
+       RESULT = list;
+  :}
+  ;
+
+map_literal ::=
+  LBRACE RBRACE
+  {:
+    RESULT = new MapLiteral();
+  :}
+  | LBRACE kv_list:list RBRACE
+  {:
+    RESULT = new MapLiteral(list.toArray(new LiteralExpr[0]));
+  :}
+  ;
+
 struct_field ::=
   ident:name COLON type:type
   {: RESULT = new StructField(name, type); :}
@@ -5877,6 +5905,8 @@ non_pred_expr ::=
   {: RESULT = a; :}
   | array_literal:a
   {: RESULT = a; :}
+  | map_literal:a
+  {: RESULT = a; :}
   | struct_literal:s
   {: RESULT = s; :}
   | function_call_expr:e
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
index 77975c1eb9..e0be9ffdf5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
@@ -318,6 +318,11 @@ public class CastExpr extends Expr {
                     type, Function.NullableMode.ALWAYS_NULLABLE,
                     Lists.newArrayList(Type.VARCHAR), false,
                     "doris::CastFunctions::cast_to_array_val", null, null, 
true);
+        } else if (type.isMapType()) {
+            fn = ScalarFunction.createBuiltin(getFnName(Type.MAP),
+                type, Function.NullableMode.ALWAYS_NULLABLE,
+                Lists.newArrayList(Type.VARCHAR), false,
+                "doris::CastFunctions::cast_to_map_val", null, null, true);
         } else if (type.isStructType()) {
             fn = ScalarFunction.createBuiltin(getFnName(Type.STRUCT),
                     type, Function.NullableMode.ALWAYS_NULLABLE,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
old mode 100755
new mode 100644
index 31b2382f0a..ee2c09ed4e
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -1325,7 +1325,7 @@ public abstract class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
     }
 
     public Expr checkTypeCompatibility(Type targetType) throws 
AnalysisException {
-        if (targetType.getPrimitiveType() != PrimitiveType.ARRAY
+        if (targetType.getPrimitiveType() != PrimitiveType.ARRAY && 
targetType.getPrimitiveType() != PrimitiveType.MAP
                 && targetType.getPrimitiveType() == type.getPrimitiveType()) {
             if (targetType.isDecimalV2() && type.isDecimalV2()) {
                 return this;
@@ -1791,7 +1791,8 @@ public abstract class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
         CAST_EXPR(14),
         JSON_LITERAL(15),
         ARITHMETIC_EXPR(16),
-        STRUCT_LITERAL(17);
+        STRUCT_LITERAL(17),
+        MAP_LITERAL(18);
 
         private static Map<Integer, ExprSerCode> codeMap = Maps.newHashMap();
 
@@ -1843,6 +1844,8 @@ public abstract class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
             output.writeInt(ExprSerCode.FUNCTION_CALL.getCode());
         } else if (expr instanceof ArrayLiteral) {
             output.writeInt(ExprSerCode.ARRAY_LITERAL.getCode());
+        } else if (expr instanceof MapLiteral) {
+            output.writeInt(ExprSerCode.MAP_LITERAL.getCode());
         } else if (expr instanceof StructLiteral) {
             output.writeInt(ExprSerCode.STRUCT_LITERAL.getCode());
         } else if (expr instanceof CastExpr) {
@@ -1894,6 +1897,8 @@ public abstract class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
                 return FunctionCallExpr.read(in);
             case ARRAY_LITERAL:
                 return ArrayLiteral.read(in);
+            case MAP_LITERAL:
+                return MapLiteral.read(in);
             case STRUCT_LITERAL:
                 return StructLiteral.read(in);
             case CAST_EXPR:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MapLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/MapLiteral.java
new file mode 100644
index 0000000000..92f558641a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MapLiteral.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TExprNodeType;
+import org.apache.doris.thrift.TTypeDesc;
+import org.apache.doris.thrift.TTypeNode;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+// INSERT INTO table_map VALUES ({'key1':1, 'key2':10, 'k3':100}), 
({'key1':2,'key2':20}), ({'key1':3,'key2':30});
+// MapLiteral is one row-based literal
+public class MapLiteral extends LiteralExpr {
+
+    public MapLiteral() {
+        type = new MapType(Type.NULL, Type.NULL);
+        children = new ArrayList<>();
+    }
+
+    public MapLiteral(LiteralExpr... exprs) throws AnalysisException {
+        Type keyType = Type.NULL;
+        Type valueType = Type.NULL;
+        children = new ArrayList<>();
+        int idx = 0;
+        for (LiteralExpr expr : exprs) {
+            if (idx % 2 == 0) {
+                if (keyType == Type.NULL) {
+                    keyType = expr.getType();
+                } else {
+                    keyType = Type.getAssignmentCompatibleType(keyType, 
expr.getType(), true);
+                }
+                if (keyType == Type.INVALID) {
+                    throw new AnalysisException("Invalid element type in Map");
+                }
+            } else {
+                if (valueType == Type.NULL) {
+                    valueType = expr.getType();
+                } else {
+                    valueType = Type.getAssignmentCompatibleType(valueType, 
expr.getType(), true);
+                }
+                if (valueType == Type.INVALID) {
+                    throw new AnalysisException("Invalid element type in Map");
+                }
+            }
+            children.add(expr);
+            ++ idx;
+        }
+
+        type = new MapType(keyType, valueType);
+    }
+
+    protected MapLiteral(MapLiteral other) {
+        super(other);
+    }
+
+    @Override
+    public Expr uncheckedCastTo(Type targetType) throws AnalysisException {
+        if (!targetType.isMapType()) {
+            return super.uncheckedCastTo(targetType);
+        }
+        MapLiteral literal = new MapLiteral(this);
+        Type keyType = ((MapType) targetType).getKeyType();
+        Type valueType = ((MapType) targetType).getValueType();
+
+        for (int i = 0; i < children.size(); ++ i) {
+            Expr child = children.get(i);
+            if ((i % 2) == 0) {
+                literal.children.set(i, child.uncheckedCastTo(keyType));
+            } else {
+                literal.children.set(i, child.uncheckedCastTo(valueType));
+            }
+        }
+        literal.setType(targetType);
+        return literal;
+    }
+
+    @Override
+    public void checkValueValid() throws AnalysisException {
+        for (Expr e : children) {
+            e.checkValueValid();
+        }
+    }
+
+    @Override
+    protected String toSqlImpl() {
+        List<String> list = new ArrayList<>(children.size());
+        for (int i = 0; i < children.size(); i += 2) {
+            list.add(children.get(i).toSqlImpl() + ":" + children.get(i + 
1).toSqlImpl());
+        }
+        return "MAP{" + StringUtils.join(list, ", ") + "}";
+    }
+
+    @Override
+    protected void toThrift(TExprNode msg) {
+        msg.node_type = TExprNodeType.MAP_LITERAL;
+        TTypeDesc container = new TTypeDesc();
+        container.setTypes(new ArrayList<TTypeNode>());
+        type.toThrift(container);
+        msg.setType(container);
+    }
+
+    @Override
+    public Expr clone() {
+        return new MapLiteral(this);
+    }
+
+    @Override
+    public boolean isMinValue() {
+        return false;
+    }
+
+    @Override
+    public int compareLiteral(LiteralExpr expr) {
+        return 0;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        int size = in.readInt();
+        children = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            children.add(Expr.readIn(in));
+        }
+    }
+
+    public static MapLiteral read(DataInput in) throws IOException {
+        MapLiteral literal = new MapLiteral();
+        literal.readFields(in);
+        return literal;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        out.writeInt(children.size());
+        for (Expr e : children) {
+            Expr.writeTo(e, out);
+        }
+    }
+
+    @Override
+    public String getStringValue() {
+        return toSqlImpl();
+    }
+
+    @Override
+    public String getStringValueForArray() {
+        return null;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 7b0de9b7e4..2c49d8593b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -61,6 +61,8 @@ public class Column implements Writable, GsonPostProcessable {
     private static final String COLUMN_ARRAY_CHILDREN = "item";
     private static final String COLUMN_STRUCT_CHILDREN = "field";
     public static final int COLUMN_UNIQUE_ID_INIT_VALUE = -1;
+    private static final String COLUMN_MAP_KEY = "key";
+    private static final String COLUMN_MAP_VALUE = "value";
 
     @SerializedName(value = "name")
     private String name;
@@ -188,6 +190,11 @@ public class Column implements Writable, 
GsonPostProcessable {
             Column c = new Column(COLUMN_ARRAY_CHILDREN, ((ArrayType) 
type).getItemType());
             c.setIsAllowNull(((ArrayType) type).getContainsNull());
             column.addChildrenColumn(c);
+        } else if (type.isMapType()) {
+            Column k = new Column(COLUMN_MAP_KEY, ((MapType) 
type).getKeyType());
+            Column v = new Column(COLUMN_MAP_VALUE, ((MapType) 
type).getValueType());
+            column.addChildrenColumn(k);
+            column.addChildrenColumn(v);
         } else if (type.isStructType()) {
             ArrayList<StructField> fields = ((StructType) type).getFields();
             for (StructField field : fields) {
@@ -438,6 +445,12 @@ public class Column implements Writable, 
GsonPostProcessable {
             Column children = column.getChildren().get(0);
             tColumn.setChildrenColumn(new ArrayList<>());
             setChildrenTColumn(children, tColumn);
+        } else if (column.type.isMapType()) {
+            Column k = column.getChildren().get(0);
+            Column v = column.getChildren().get(1);
+            tColumn.setChildrenColumn(new ArrayList<>());
+            setChildrenTColumn(k, tColumn);
+            setChildrenTColumn(v, tColumn);
         } else if (column.type.isStructType()) {
             List<Column> childrenColumns = column.getChildren();
             tColumn.setChildrenColumn(new ArrayList<>());
@@ -447,6 +460,7 @@ public class Column implements Writable, 
GsonPostProcessable {
         }
     }
 
+
     public void checkSchemaChangeAllowed(Column other) throws DdlException {
         if (Strings.isNullOrEmpty(other.name)) {
             throw new DdlException("Dest column name is empty");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
index 651df130e7..1c07867588 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
@@ -1272,10 +1272,9 @@ public class FunctionSet<T> {
         final Type[] candicateArgTypes = candicate.getArgs();
         if (!(descArgTypes[0] instanceof ScalarType)
                 || !(candicateArgTypes[0] instanceof ScalarType)) {
-            if (candicateArgTypes[0] instanceof ArrayType) {
+            if (candicateArgTypes[0] instanceof ArrayType || 
candicateArgTypes[0] instanceof MapType) {
                 return descArgTypes[0].matchesType(candicateArgTypes[0]);
             }
-
             return false;
         }
         final ScalarType descArgType = (ScalarType) descArgTypes[0];
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MapType.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MapType.java
index adf74f5c6a..986fb3658a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MapType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MapType.java
@@ -17,18 +17,25 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.thrift.TColumnType;
 import org.apache.doris.thrift.TTypeDesc;
 import org.apache.doris.thrift.TTypeNode;
 import org.apache.doris.thrift.TTypeNodeType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Objects;
 
 /**
  * Describes a MAP type. MAP types have a scalar key and an arbitrarily-typed 
value.
  */
 public class MapType extends Type {
+
+    @SerializedName(value = "keyType")
     private final Type keyType;
+    @SerializedName(value = "valueType")
     private final Type valueType;
 
     public MapType() {
@@ -75,6 +82,30 @@ public class MapType extends Type {
                 keyType.toSql(depth + 1), valueType.toSql(depth + 1));
     }
 
+    @Override
+    public boolean matchesType(Type t) {
+        if (equals(t)) {
+            return true;
+        }
+
+        if (!t.isMapType()) {
+            return false;
+        }
+
+        if ((keyType.isNull() || ((MapType) t).getKeyType().isNull())
+                && (valueType.isNull() || ((MapType) 
t).getKeyType().isNull())) {
+            return true;
+        }
+
+        return keyType.matchesType(((MapType) t).keyType)
+            && (valueType.matchesType(((MapType) t).valueType));
+    }
+
+    @Override
+    public String toString() {
+        return  toSql(0).toUpperCase();
+    }
+
     @Override
     protected String prettyPrint(int lpad) {
         String leftPadding = Strings.repeat(" ", lpad);
@@ -88,6 +119,11 @@ public class MapType extends Type {
         return String.format("%sMAP<%s,%s>", leftPadding, keyType.toSql(), 
structStr);
     }
 
+    public static boolean canCastTo(MapType type, MapType targetType) {
+        return Type.canCastTo(type.getKeyType(), targetType.getKeyType())
+            && Type.canCastTo(type.getValueType(), targetType.getValueType());
+    }
+
     @Override
     public boolean supportSubType(Type subType) {
         return true;
@@ -103,4 +139,16 @@ public class MapType extends Type {
         keyType.toThrift(container);
         valueType.toThrift(container);
     }
+
+    @Override
+    public TColumnType toColumnTypeThrift() {
+        TColumnType thrift = new TColumnType();
+        thrift.type = PrimitiveType.MAP.toThrift();
+        return thrift;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(keyType, valueType);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
index 9e279c8d2d..cb12dc17bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
@@ -1106,6 +1106,10 @@ public enum PrimitiveType {
         return this == ARRAY;
     }
 
+    public boolean isMapType() {
+        return this == MAP;
+    }
+
     public boolean isComplexType() {
         return this == HLL || this == BITMAP;
     }
@@ -1167,6 +1171,8 @@ public enum PrimitiveType {
                 return MysqlColType.MYSQL_TYPE_BLOB;
             case JSONB:
                 return MysqlColType.MYSQL_TYPE_JSON;
+            case MAP:
+                return MysqlColType.MYSQL_TYPE_MAP;
             default:
                 return MysqlColType.MYSQL_TYPE_STRING;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
index 587266926c..250356a7d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
@@ -530,6 +530,8 @@ public abstract class Type {
             return ScalarType.canCastTo((ScalarType) sourceType, (ScalarType) 
targetType);
         } else if (sourceType.isArrayType() && targetType.isArrayType()) {
             return ArrayType.canCastTo((ArrayType) sourceType, (ArrayType) 
targetType);
+        } else if (sourceType.isMapType() && targetType.isMapType()) {
+            return MapType.canCastTo((MapType) sourceType, (MapType) 
targetType);
         } else if (targetType.isArrayType() && !((ArrayType) 
targetType).getItemType().isScalarType()
                 && !sourceType.isNull()) {
             // TODO: current not support cast any non-array type(except for 
null) to nested array type.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 4dcb130087..877f4c28d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -87,6 +87,7 @@ public class Util {
         TYPE_STRING_MAP.put(PrimitiveType.BITMAP, "bitmap");
         TYPE_STRING_MAP.put(PrimitiveType.QUANTILE_STATE, "quantile_state");
         TYPE_STRING_MAP.put(PrimitiveType.ARRAY, "Array<%s>");
+        TYPE_STRING_MAP.put(PrimitiveType.MAP, "Map<%s,%s>");
         TYPE_STRING_MAP.put(PrimitiveType.NULL_TYPE, "null");
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlColType.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlColType.java
index d451b5ee38..75b13848ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlColType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlColType.java
@@ -52,7 +52,8 @@ public enum MysqlColType {
     MYSQL_TYPE_BLOB(252, "BLOB"),
     MYSQL_TYPE_VARSTRING(253, "VAR STRING"),
     MYSQL_TYPE_STRING(254, "STRING"),
-    MYSQL_TYPE_GEOMETRY(255, "GEOMETRY");
+    MYSQL_TYPE_GEOMETRY(255, "GEOMETRY"),
+    MYSQL_TYPE_MAP(400, "MAP");
 
     private MysqlColType(int code, String desc) {
         this.code = code;
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index c56743bd22..8fd6b7c0d9 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -664,6 +664,8 @@ EndOfLineComment = "--" 
!({HintContent}|{ContainsLineTerminator}) {LineTerminato
 "!" { return newToken(SqlParserSymbols.NOT, null); }
 "<" { return newToken(SqlParserSymbols.LESSTHAN, null); }
 ">" { return newToken(SqlParserSymbols.GREATERTHAN, null); }
+"{" { return newToken(SqlParserSymbols.LBRACE, null); }
+"}" { return newToken(SqlParserSymbols.RBRACE, null); }
 "\"" { return newToken(SqlParserSymbols.UNMATCHED_STRING_LITERAL, null); }
 "'" { return newToken(SqlParserSymbols.UNMATCHED_STRING_LITERAL, null); }
 "`" { return newToken(SqlParserSymbols.UNMATCHED_STRING_LITERAL, null); }
diff --git a/gensrc/script/doris_builtins_functions.py 
b/gensrc/script/doris_builtins_functions.py
index 9b231881de..9a37505764 100755
--- a/gensrc/script/doris_builtins_functions.py
+++ b/gensrc/script/doris_builtins_functions.py
@@ -148,6 +148,10 @@ visible_functions = [
     [['element_at', '%element_extract%'], 'VARCHAR', ['ARRAY_VARCHAR', 
'BIGINT'], '', '', '', 'vec', 'ALWAYS_NULLABLE'],
     [['element_at', '%element_extract%'], 'STRING', ['ARRAY_STRING', 
'BIGINT'], '', '', '', 'vec', 'ALWAYS_NULLABLE'],
 
+
+    # map element
+    [['element_at', '%element_extract%'], 'INT', ['MAP_STRING_INT', 'STRING'], 
'', '', '', 'vec', 'ALWAYS_NULLABLE'],
+
     [['arrays_overlap'], 'BOOLEAN', ['ARRAY_BOOLEAN', 'ARRAY_BOOLEAN'], '', 
'', '', 'vec', 'ALWAYS_NULLABLE'],
     [['arrays_overlap'], 'BOOLEAN', ['ARRAY_TINYINT', 'ARRAY_TINYINT'], '', 
'', '', 'vec', 'ALWAYS_NULLABLE'],
     [['arrays_overlap'], 'BOOLEAN', ['ARRAY_SMALLINT', 'ARRAY_SMALLINT'], '', 
'', '', 'vec', 'ALWAYS_NULLABLE'],
diff --git a/gensrc/script/gen_builtins_functions.py 
b/gensrc/script/gen_builtins_functions.py
index bd9a82e4c0..ab354734b3 100755
--- a/gensrc/script/gen_builtins_functions.py
+++ b/gensrc/script/gen_builtins_functions.py
@@ -53,6 +53,7 @@ java_registry_preamble = '\
 package org.apache.doris.builtins;\n\
 \n\
 import org.apache.doris.catalog.ArrayType;\n\
+import org.apache.doris.catalog.MapType;\n\
 import org.apache.doris.catalog.Type;\n\
 import org.apache.doris.catalog.Function;\n\
 import org.apache.doris.catalog.FunctionSet;\n\
@@ -107,12 +108,17 @@ for example:
     in[TINYINT]     --> out[Type.TINYINT]
     in[INT]         --> out[Type.INT]
     in[ARRAY_INT]   --> out[new ArrayType(Type.INT)]
+    in[MAP_STRING_INT]   --> out[new MapType(Type.STRING,Type.INT)]
 """
 def generate_fe_datatype(str_type):
     if str_type.startswith("ARRAY_"):
         vec_type = str_type.split('_', 1);
         if len(vec_type) > 1 and vec_type[0] == "ARRAY":
             return "new ArrayType(" + generate_fe_datatype(vec_type[1]) + ")"
+    if str_type.startswith("MAP_"):
+        vec_type = str_type.split('_', 2)
+        if len(vec_type) > 2 and vec_type[0] == "MAP": 
+            return "new MapType(" + generate_fe_datatype(vec_type[1]) + "," + 
generate_fe_datatype(vec_type[2])+")"
     if str_type == "DECIMALV2":
         return "Type.MAX_DECIMALV2_TYPE"
     if str_type == "DECIMAL32":
diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift
index d25dce63a1..770227402b 100644
--- a/gensrc/thrift/Exprs.thrift
+++ b/gensrc/thrift/Exprs.thrift
@@ -61,6 +61,9 @@ enum TExprNodeType {
 
   // for fulltext search
   MATCH_PRED,
+
+  // for map 
+  MAP_LITERAL,
 }
 
 //enum TAggregationOp {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to