xiaokang commented on code in PR #17330:
URL: https://github.com/apache/doris/pull/17330#discussion_r1125607634


##########
be/src/vec/data_types/data_type_map.cpp:
##########
@@ -21,12 +21,9 @@
 
 namespace doris::vectorized {
 
-DataTypeMap::DataTypeMap(const DataTypePtr& keys_, const DataTypePtr& values_) 
{
-    key_type = make_nullable(keys_);
-    value_type = make_nullable(values_);
-
-    keys = std::make_shared<DataTypeArray>(key_type);
-    values = std::make_shared<DataTypeArray>(value_type);
+DataTypeMap::DataTypeMap(const DataTypePtr& key, const DataTypePtr& value) {

Review Comment:
   key -> key_type_



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -25,50 +25,108 @@ namespace doris::vectorized {
 /** A column of map values.
   */
 std::string ColumnMap::get_name() const {
-    return "Map(" + keys->get_name() + ", " + values->get_name() + ")";
+    return "Map(" + keys_column->get_name() + ", " + values_column->get_name() 
+ ")";
 }
 
-ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values)
-        : keys(std::move(keys)), values(std::move(values)) {
-    check_size();
-}
+ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, 
MutableColumnPtr&& offsets)
+        : keys_column(std::move(keys)),
+          values_column(std::move(values)),
+          offsets_column(std::move(offsets)) {
+    const COffsets* offsets_concrete = typeid_cast<const 
COffsets*>(offsets_column.get());
 
-ColumnArray::Offsets64& ColumnMap::get_offsets() const {
-    const ColumnArray& column_keys = assert_cast<const 
ColumnArray&>(get_keys());
-    // todo . did here check size ?
-    return const_cast<Offsets64&>(column_keys.get_offsets());
-}
+    if (!offsets_concrete) {
+        LOG(FATAL) << "offsets_column must be a ColumnUInt64";
+    }
 
-void ColumnMap::check_size() const {
-    const auto* key_array = typeid_cast<const ColumnArray*>(keys.get());
-    const auto* value_array = typeid_cast<const ColumnArray*>(values.get());
-    CHECK(key_array) << "ColumnMap keys can be created only from array";
-    CHECK(value_array) << "ColumnMap values can be created only from array";
-    CHECK_EQ(get_keys_ptr()->size(), get_values_ptr()->size());
+    if (!offsets_concrete->empty() && keys && values) {
+        auto last_offset = offsets_concrete->get_data().back();
+
+        /// This will also prevent possible overflow in offset.
+        if (keys_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
key_column";
+        }
+        if (values_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
value_column";
+        }
+    }
 }
 
 // todo. here to resize every row map
 MutableColumnPtr ColumnMap::clone_resized(size_t to_size) const {
-    auto res = ColumnMap::create(keys->clone_resized(to_size), 
values->clone_resized(to_size));
+    auto res = ColumnMap::create(get_keys().clone_empty(), 
get_values().clone_empty(),
+                                 COffsets::create());
+    if (to_size == 0) {
+        return res;
+    }
+
+    size_t from_size = size();
+
+    if (to_size <= from_size) {
+        res->get_offsets().assign(get_offsets().begin(), get_offsets().begin() 
+ to_size);
+        res->get_keys().insert_range_from(get_keys(), 0, get_offsets()[to_size 
- 1]);
+        res->get_values().insert_range_from(get_values(), 0, 
get_offsets()[to_size - 1]);
+    } else {
+        /// Copy column and append empty arrays for extra elements.
+        Offset64 offset = 0;
+        if (from_size > 0) {
+            res->get_offsets().assign(get_offsets().begin(), 
get_offsets().end());
+            res->get_keys().insert_range_from(get_keys(), 0, 
get_keys().size());
+            res->get_values().insert_range_from(get_values(), 0, 
get_values().size());
+            offset = get_offsets().back();
+        }
+        res->get_offsets().resize(to_size);
+        for (size_t i = from_size; i < to_size; ++i) {
+            res->get_offsets()[i] = offset;
+        }
+    }
     return res;
 }
 
 // to support field functions
 Field ColumnMap::operator[](size_t n) const {
-    // Map is FieldVector , see in field.h
-    Map res(2);
-    keys->get(n, res[0]);
-    values->get(n, res[1]);
+    // Map is FieldVector, now we keep key value in seperate  , see in field.h
+    Map m(2);
+    size_t start_offset = offset_at(n);
+    size_t element_size = size_at(n);
+
+    if (element_size > max_array_size_as_field) {
+        LOG(FATAL) << "element size " << start_offset
+                   << " is too large to be manipulated as single map field,"
+                   << "maximum size " << max_array_size_as_field;
+    }
 
-    return res;
+    Array k(element_size), v(element_size);
+
+    for (size_t i = 0; i < element_size; ++i) {
+        k[i] = get_keys()[start_offset + i];
+        v[i] = get_values()[start_offset + i];
+    }
+
+    m.push_back(k);
+    m.push_back(v);
+    return m;
 }
 
 // here to compare to below
 void ColumnMap::get(size_t n, Field& res) const {

Review Comment:
   resuse operator[] to impl get() or reverse, to avoid duplicate code



##########
be/src/vec/data_types/data_type_factory.cpp:
##########
@@ -363,9 +363,8 @@ DataTypePtr DataTypeFactory::create_data_type(const 
PColumnMeta& pcolumn) {
     case PGenericType::MAP:
         DCHECK(pcolumn.children_size() == 2);
         // here to check pcolumn is list?
-        nested = std::make_shared<vectorized::DataTypeMap>(
-                create_data_type(pcolumn.children(0).children(0)),

Review Comment:
   why add an addition children(0)?



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +208,191 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    char* pos = arena.alloc_continue(2 * sizeof(array_size), begin);
+    memcpy(pos, &array_size, 2 * sizeof(array_size));

Review Comment:
   It will overflow to copy 2*sizeof(array_size) from address &array_size.



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +208,191 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    char* pos = arena.alloc_continue(2 * sizeof(array_size), begin);
+    memcpy(pos, &array_size, 2 * sizeof(array_size));
+    StringRef res(pos, 2 * sizeof(array_size));
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_keys().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_values().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    size_t array_size = unaligned_load<size_t>(pos);
+    pos += 2 * sizeof(array_size);
 
+    for (size_t i = 0; i < array_size; ++i) {
+        pos = get_keys().deserialize_and_insert_from_arena(pos);
+    }
+
+    for (size_t i = 0; i < array_size; ++i) {
+        pos = get_values().deserialize_and_insert_from_arena(pos);
+    }
+
+    get_offsets().push_back(get_offsets().back() + array_size);
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    for (size_t i = 0; i < array_size; ++i) {
+        get_keys().update_hash_with_value(offset + i, hash);
+        get_values().update_hash_with_value(offset + i, hash);
+    }
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {
+        LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from 
method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size())
+                   << ")]";
+    }
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, 
nested_length);
+    values_column->insert_range_from(src_concrete.get_values(), nested_offset, 
nested_length);
+
+    auto& cur_offsets = get_offsets();
+    const auto& src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        // -1 is ok, because PaddedPODArray pads zeros on the left.
+        size_t prev_max_offset = cur_offsets.back();
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i) {
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+        }
+    }
 }
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
-    return ColumnMap::create(keys->filter(filt, result_size_hint),
-                             values->filter(filt, result_size_hint));
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 size_t ColumnMap::filter(const Filter& filter) {
-    const auto key_result_size = keys->filter(filter);
-    const auto value_result_size = values->filter(filter);
-    CHECK_EQ(key_result_size, value_result_size);
-    return value_result_size;
+    return this->filter(filter, 0)->size();
 }
 
 Status ColumnMap::filter_by_selector(const uint16_t* sel, size_t sel_size, 
IColumn* col_ptr) {
     auto to = reinterpret_cast<vectorized::ColumnMap*>(col_ptr);
 
-    auto& array_keys = assert_cast<vectorized::ColumnArray&>(*keys);
-    array_keys.filter_by_selector(sel, sel_size, &to->get_keys());
+    auto& to_offsets = to->get_offsets();
+
+    size_t element_size = 0;
+    size_t max_offset = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        element_size += size_at(sel[i]);
+        max_offset = std::max(max_offset, offset_at(sel[i]));
+    }
+    if (max_offset > std::numeric_limits<uint16_t>::max()) {
+        return Status::IOError("map elements too large than uint16_t::max");
+    }
 
-    auto& array_values = assert_cast<vectorized::ColumnArray&>(*values);
-    array_values.filter_by_selector(sel, sel_size, &to->get_values());
+    to_offsets.reserve(to_offsets.size() + sel_size);
+    auto nested_sel = std::make_unique<uint16_t[]>(element_size);
+    size_t nested_sel_size = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        auto row_off = offset_at(sel[i]);
+        auto row_size = size_at(sel[i]);
+        to_offsets.push_back(to_offsets.back() + row_size);
+        for (auto j = 0; j < row_size; ++j) {
+            nested_sel[nested_sel_size++] = row_off + j;
+        }
+    }
 
+    if (nested_sel_size > 0) {
+        keys_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_keys());
+        values_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_values());
+    }
     return Status::OK();
 }
 
 ColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) const {
-    return ColumnMap::create(keys->permute(perm, limit), values->permute(perm, 
limit));
+    // Make a temp column array
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->permute(perm, limit);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->permute(perm, limit);
+
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 ColumnPtr ColumnMap::replicate(const Offsets& offsets) const {
-    return ColumnMap::create(keys->replicate(offsets), 
values->replicate(offsets));
+    // Make a temp column array for reusing its replicate function
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->replicate(offsets);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->replicate(offsets);
+    auto res = ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                                 assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                                 assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
+    return res;
 }
 
 void ColumnMap::reserve(size_t n) {
-    get_keys().reserve(n);
-    get_values().reserve(n);
+    get_offsets().reserve(n);
+    keys_column->reserve(n);

Review Comment:
   keys/values_column reserve n*avg(kvs per row) is better. avg(kvs per row) 
can be caculated by get_offsets().back() / get_offsets().size.



##########
be/src/vec/olap/olap_data_convertor.cpp:
##########
@@ -810,30 +808,36 @@ Status 
OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
         const ColumnMap* column_map, const DataTypeMap* data_type_map) {
     ColumnPtr key_data = column_map->get_keys_ptr();
     ColumnPtr value_data = column_map->get_values_ptr();
-    if (column_map->get_keys().is_nullable()) {
-        const auto& key_nullable_column =
-                assert_cast<const ColumnNullable&>(column_map->get_keys());
-        key_data = key_nullable_column.get_nested_column_ptr();
-    }
 
-    if (column_map->get_values().is_nullable()) {
-        const auto& val_nullable_column =
-                assert_cast<const ColumnNullable&>(column_map->get_values());
-        value_data = val_nullable_column.get_nested_column_ptr();
-    }
+    // offsets data
+    auto& offsets = column_map->get_offsets();
+    // make first offset
+    auto offsets_col = ColumnArray::ColumnOffsets::create();
+
+    _offsets.reserve(offsets.size() + 1);
+    _offsets.push_back(_row_pos);

Review Comment:
   add comment for offsets.size()+1 and push_back(_row_pos)



##########
be/src/olap/rowset/segment_v2/column_writer.cpp:
##########
@@ -997,24 +1022,36 @@ Status MapColumnWriter::finish() {
 
 Status MapColumnWriter::append_nullable(const uint8_t* null_map, const 
uint8_t** ptr,
                                         size_t num_rows) {
+    RETURN_IF_ERROR(append_data(ptr, num_rows));
     if (is_nullable()) {
         RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
     }
-    RETURN_IF_ERROR(append_data(ptr, num_rows));
     return Status::OK();
 }
 
+// write key value data with offsets
 Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
     auto kv_ptr = reinterpret_cast<const uint64_t*>(*ptr);

Review Comment:
   name kv_ptr is not precise now



##########
be/src/vec/data_types/data_type_map.cpp:
##########
@@ -201,6 +192,7 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* 
column) const {
             has_quota = false;
             StringRef value_element(rb.position(), rb.count());
             if (!next_slot_from_string(rb, value_element, has_quota)) {
+                map_column->pop_back(element_num);

Review Comment:
   should use nested_value_column->pop_back



##########
be/src/olap/rowset/segment_v2/column_writer.cpp:
##########
@@ -247,14 +248,67 @@ Status ColumnWriter::create(const ColumnWriterOptions& 
opts, const TabletColumn*
         }
         case FieldType::OLAP_FIELD_TYPE_MAP: {
             DCHECK(column->get_subtype_count() == 2);
+            // create key & value writer
+            std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
+            for (int i = 0; i < 2; ++i) {
+                const TabletColumn& item_column = column->get_sub_column(i);
+                // create item writer
+                ColumnWriterOptions item_options;
+                item_options.meta = opts.meta->mutable_children_columns(i);
+                item_options.need_zone_map = false;
+                item_options.need_bloom_filter = item_column.is_bf_column();
+                item_options.need_bitmap_index = 
item_column.has_bitmap_index();
+                item_options.inverted_index = nullptr;

Review Comment:
   set_unique_id and many is not called for item_options



##########
be/src/vec/data_types/data_type_map.cpp:
##########
@@ -172,21 +166,18 @@ Status DataTypeMap::from_string(ReadBuffer& rb, IColumn* 
column) const {
         // {"aaa": 1, "bbb": 20}, need to handle key slot and value slot to 
make key column arr and value arr
         // skip "{"
         ++rb.position();
-        auto& keys_arr = 
reinterpret_cast<ColumnArray&>(map_column->get_keys());
-        ColumnArray::Offsets64& key_off = keys_arr.get_offsets();
-        auto& values_arr = 
reinterpret_cast<ColumnArray&>(map_column->get_values());
-        ColumnArray::Offsets64& val_off = values_arr.get_offsets();
-
-        IColumn& nested_key_column = keys_arr.get_data();
+        ColumnArray::Offsets64& map_off = map_column->get_offsets();
+        IColumn& nested_key_column = map_column->get_keys();
         DCHECK(nested_key_column.is_nullable());
-        IColumn& nested_val_column = values_arr.get_data();
+        IColumn& nested_val_column = map_column->get_values();
         DCHECK(nested_val_column.is_nullable());
 
         size_t element_num = 0;
         while (!rb.eof()) {
             StringRef key_element(rb.position(), rb.count());
             bool has_quota = false;
             if (!next_slot_from_string(rb, key_element, has_quota)) {
+                map_column->pop_back(element_num);

Review Comment:
   should use nested_key_column->pop_back



##########
be/src/olap/rowset/segment_v2/column_writer.cpp:
##########
@@ -997,24 +1022,36 @@ Status MapColumnWriter::finish() {
 
 Status MapColumnWriter::append_nullable(const uint8_t* null_map, const 
uint8_t** ptr,
                                         size_t num_rows) {
+    RETURN_IF_ERROR(append_data(ptr, num_rows));
     if (is_nullable()) {
         RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
     }
-    RETURN_IF_ERROR(append_data(ptr, num_rows));
     return Status::OK();
 }
 
+// write key value data with offsets
 Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
     auto kv_ptr = reinterpret_cast<const uint64_t*>(*ptr);

Review Comment:
   change the name and add some commnt for its structure



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -83,34 +141,45 @@ void ColumnMap::insert_data(const char*, size_t) {
 void ColumnMap::insert(const Field& x) {
     const auto& map = doris::vectorized::get<const Map&>(x);
     CHECK_EQ(map.size(), 2);
-    keys->insert(map[0]);
-    values->insert(map[1]);
+    const auto& k_f = doris::vectorized::get<const Array&>(map[0]);
+    const auto& v_f = doris::vectorized::get<const Array&>(map[1]);
+
+    size_t element_size = k_f.size();

Review Comment:
   add DCHECK_EQ(k_f.size(), v_f.size())



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +202,166 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    StringRef res(begin, 0);
+    auto keys_ref = keys_column->serialize_value_into_arena(n, arena, begin);
+    res.data = keys_ref.data - res.size;
+    res.size += keys_ref.size;
+    auto value_ref = values_column->serialize_value_into_arena(n, arena, 
begin);
+    res.data = value_ref.data - res.size;
+    res.size += value_ref.size;
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    pos = keys_column->deserialize_and_insert_from_arena(pos);
+    pos = values_column->deserialize_and_insert_from_arena(pos);
 
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    keys_column->update_hash_with_value(n, hash);
+    values_column->update_hash_with_value(n, hash);
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {
+        LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from 
method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size())
+                   << ")]";
+    }
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, 
nested_length);
+    values_column->insert_range_from(src_concrete.get_values(), nested_offset, 
nested_length);
+
+    auto& cur_offsets = get_offsets();
+    const auto& src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        // -1 is ok, because PaddedPODArray pads zeros on the left.
+        size_t prev_max_offset = cur_offsets.back();
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i) {
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+        }
+    }
 }
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
-    return ColumnMap::create(keys->filter(filt, result_size_hint),
-                             values->filter(filt, result_size_hint));
+    auto k_arr =

Review Comment:
   Is any data copy to create Array and Map back and forth?



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -83,34 +141,45 @@ void ColumnMap::insert_data(const char*, size_t) {
 void ColumnMap::insert(const Field& x) {
     const auto& map = doris::vectorized::get<const Map&>(x);
     CHECK_EQ(map.size(), 2);
-    keys->insert(map[0]);
-    values->insert(map[1]);
+    const auto& k_f = doris::vectorized::get<const Array&>(map[0]);
+    const auto& v_f = doris::vectorized::get<const Array&>(map[1]);
+
+    size_t element_size = k_f.size();
+
+    for (size_t i = 0; i < element_size; ++i) {
+        keys_column->insert(k_f[i]);

Review Comment:
   can insert_many available be used here for vectorization



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -119,9 +188,13 @@ void ColumnMap::insert_from(const IColumn& src_, size_t n) 
{
                (get_values().is_nullable() && 
!src.get_values().is_nullable())) {
         DCHECK(false);
     } else {
-        keys->insert_from(*assert_cast<const ColumnMap&>(src_).keys, n);
-        values->insert_from(*assert_cast<const ColumnMap&>(src_).values, n);
+        keys_column->insert_range_from(assert_cast<const 
ColumnMap&>(src_).get_keys(), offset,

Review Comment:
   use casted src



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +208,191 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    char* pos = arena.alloc_continue(2 * sizeof(array_size), begin);
+    memcpy(pos, &array_size, 2 * sizeof(array_size));
+    StringRef res(pos, 2 * sizeof(array_size));
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_keys().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_values().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    size_t array_size = unaligned_load<size_t>(pos);
+    pos += 2 * sizeof(array_size);
 
+    for (size_t i = 0; i < array_size; ++i) {
+        pos = get_keys().deserialize_and_insert_from_arena(pos);
+    }
+
+    for (size_t i = 0; i < array_size; ++i) {
+        pos = get_values().deserialize_and_insert_from_arena(pos);
+    }
+
+    get_offsets().push_back(get_offsets().back() + array_size);
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    for (size_t i = 0; i < array_size; ++i) {
+        get_keys().update_hash_with_value(offset + i, hash);
+        get_values().update_hash_with_value(offset + i, hash);
+    }
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {

Review Comment:
   src_concrete.size()



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +208,191 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    char* pos = arena.alloc_continue(2 * sizeof(array_size), begin);
+    memcpy(pos, &array_size, 2 * sizeof(array_size));
+    StringRef res(pos, 2 * sizeof(array_size));
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_keys().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_values().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    return res;
+}
+
 const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
-    pos = keys->deserialize_and_insert_from_arena(pos);
-    pos = values->deserialize_and_insert_from_arena(pos);
+    size_t array_size = unaligned_load<size_t>(pos);
+    pos += 2 * sizeof(array_size);
 
+    for (size_t i = 0; i < array_size; ++i) {
+        pos = get_keys().deserialize_and_insert_from_arena(pos);
+    }
+
+    for (size_t i = 0; i < array_size; ++i) {
+        pos = get_values().deserialize_and_insert_from_arena(pos);
+    }
+
+    get_offsets().push_back(get_offsets().back() + array_size);
     return pos;
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    keys->update_hash_with_value(n, hash);
-    values->update_hash_with_value(n, hash);
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    for (size_t i = 0; i < array_size; ++i) {
+        get_keys().update_hash_with_value(offset + i, hash);
+        get_values().update_hash_with_value(offset + i, hash);
+    }
 }
 
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
-    keys->insert_range_from(*assert_cast<const ColumnMap&>(src).keys, start, 
length);
-    values->insert_range_from(*assert_cast<const ColumnMap&>(src).values, 
start, length);
+    if (length == 0) {
+        return;
+    }
+
+    const ColumnMap& src_concrete = assert_cast<const ColumnMap&>(src);
+
+    if (start + length > src_concrete.get_offsets().size()) {
+        LOG(FATAL) << "Parameter out of bound in ColumnMap::insert_range_from 
method. [start("
+                   << std::to_string(start) << ") + length(" << 
std::to_string(length)
+                   << ") > offsets.size(" << 
std::to_string(src_concrete.get_offsets().size())
+                   << ")]";
+    }
+
+    size_t nested_offset = src_concrete.offset_at(start);
+    size_t nested_length = src_concrete.get_offsets()[start + length - 1] - 
nested_offset;
+
+    keys_column->insert_range_from(src_concrete.get_keys(), nested_offset, 
nested_length);
+    values_column->insert_range_from(src_concrete.get_values(), nested_offset, 
nested_length);
+
+    auto& cur_offsets = get_offsets();
+    const auto& src_offsets = src_concrete.get_offsets();
+
+    if (start == 0 && cur_offsets.empty()) {
+        cur_offsets.assign(src_offsets.begin(), src_offsets.begin() + length);
+    } else {
+        size_t old_size = cur_offsets.size();
+        // -1 is ok, because PaddedPODArray pads zeros on the left.
+        size_t prev_max_offset = cur_offsets.back();
+        cur_offsets.resize(old_size + length);
+
+        for (size_t i = 0; i < length; ++i) {
+            cur_offsets[old_size + i] = src_offsets[start + i] - nested_offset 
+ prev_max_offset;
+        }
+    }
 }
 
 ColumnPtr ColumnMap::filter(const Filter& filt, ssize_t result_size_hint) 
const {
-    return ColumnMap::create(keys->filter(filt, result_size_hint),
-                             values->filter(filt, result_size_hint));
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->filter(filt, result_size_hint);
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 size_t ColumnMap::filter(const Filter& filter) {
-    const auto key_result_size = keys->filter(filter);
-    const auto value_result_size = values->filter(filter);
-    CHECK_EQ(key_result_size, value_result_size);
-    return value_result_size;
+    return this->filter(filter, 0)->size();
 }
 
 Status ColumnMap::filter_by_selector(const uint16_t* sel, size_t sel_size, 
IColumn* col_ptr) {
     auto to = reinterpret_cast<vectorized::ColumnMap*>(col_ptr);
 
-    auto& array_keys = assert_cast<vectorized::ColumnArray&>(*keys);
-    array_keys.filter_by_selector(sel, sel_size, &to->get_keys());
+    auto& to_offsets = to->get_offsets();
+
+    size_t element_size = 0;
+    size_t max_offset = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        element_size += size_at(sel[i]);
+        max_offset = std::max(max_offset, offset_at(sel[i]));
+    }
+    if (max_offset > std::numeric_limits<uint16_t>::max()) {
+        return Status::IOError("map elements too large than uint16_t::max");
+    }
 
-    auto& array_values = assert_cast<vectorized::ColumnArray&>(*values);
-    array_values.filter_by_selector(sel, sel_size, &to->get_values());
+    to_offsets.reserve(to_offsets.size() + sel_size);
+    auto nested_sel = std::make_unique<uint16_t[]>(element_size);
+    size_t nested_sel_size = 0;
+    for (size_t i = 0; i < sel_size; ++i) {
+        auto row_off = offset_at(sel[i]);
+        auto row_size = size_at(sel[i]);
+        to_offsets.push_back(to_offsets.back() + row_size);
+        for (auto j = 0; j < row_size; ++j) {
+            nested_sel[nested_sel_size++] = row_off + j;
+        }
+    }
 
+    if (nested_sel_size > 0) {
+        keys_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_keys());
+        values_column->filter_by_selector(nested_sel.get(), nested_sel_size, 
&to->get_values());
+    }
     return Status::OK();
 }
 
 ColumnPtr ColumnMap::permute(const Permutation& perm, size_t limit) const {
-    return ColumnMap::create(keys->permute(perm, limit), values->permute(perm, 
limit));
+    // Make a temp column array
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->permute(perm, limit);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->permute(perm, limit);
+
+    return ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                             assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
 }
 
 ColumnPtr ColumnMap::replicate(const Offsets& offsets) const {
-    return ColumnMap::create(keys->replicate(offsets), 
values->replicate(offsets));
+    // Make a temp column array for reusing its replicate function
+    auto k_arr =
+            ColumnArray::create(keys_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->replicate(offsets);
+    auto v_arr =
+            ColumnArray::create(values_column->assume_mutable(), 
offsets_column->assume_mutable())
+                    ->replicate(offsets);
+    auto res = ColumnMap::create(assert_cast<const 
ColumnArray&>(*k_arr).get_data_ptr(),
+                                 assert_cast<const 
ColumnArray&>(*v_arr).get_data_ptr(),
+                                 assert_cast<const 
ColumnArray&>(*k_arr).get_offsets_ptr());
+    return res;
 }
 
 void ColumnMap::reserve(size_t n) {
-    get_keys().reserve(n);
-    get_values().reserve(n);
+    get_offsets().reserve(n);
+    keys_column->reserve(n);
+    values_column->reserve(n);
 }
 
 size_t ColumnMap::byte_size() const {
-    return get_keys().byte_size() + get_values().byte_size();
+    return keys_column->byte_size() + values_column->byte_size() +
+           get_offsets().size() * sizeof(get_offsets()[0]);

Review Comment:
   get_offsets().byte_size()



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -135,71 +208,191 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
+    size_t array_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    char* pos = arena.alloc_continue(2 * sizeof(array_size), begin);
+    memcpy(pos, &array_size, 2 * sizeof(array_size));
+    StringRef res(pos, 2 * sizeof(array_size));
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_keys().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;
+        res.size += value_ref.size;
+    }
+
+    for (size_t i = 0; i < array_size; ++i) {
+        auto value_ref = get_values().serialize_value_into_arena(offset + i, 
arena, begin);
+        res.data = value_ref.data - res.size;

Review Comment:
   Is it right to just overwrite res.data? 



##########
be/src/olap/rowset/segment_v2/column_writer.cpp:
##########
@@ -965,9 +986,11 @@ MapColumnWriter::MapColumnWriter(const 
ColumnWriterOptions& opts, std::unique_pt
 }
 
 Status MapColumnWriter::init() {
+    RETURN_IF_ERROR(_offsets_writer->init());
     if (is_nullable()) {
         RETURN_IF_ERROR(_null_writer->init());
     }
+    _offsets_writer->register_flush_page_callback(this);

Review Comment:
   add comment for register_flush_page_callback



##########
be/src/vec/functions/array/function_array_element.h:
##########
@@ -236,25 +231,28 @@ class FunctionArrayElement : public IFunction {
                            const UInt8* src_null_map, UInt8* dst_null_map) {
         auto left_column = 
arguments[0].column->convert_to_full_column_if_const();
         DataTypePtr val_type =
-                reinterpret_cast<const 
DataTypeMap&>(*arguments[0].type).get_values();
+                reinterpret_cast<const 
DataTypeMap&>(*arguments[0].type).get_value_type();
         const auto& map_column = reinterpret_cast<const 
ColumnMap&>(*left_column);
 
-        const ColumnArray& column_keys = assert_cast<const 
ColumnArray&>(map_column.get_keys());
+        // create column array to find keys
+        auto key_arr = ColumnArray::create(map_column.get_keys_ptr(), 
map_column.get_offsets_ptr());

Review Comment:
   we can encapsulate get_key_array() and get_value_array() in ColumnMap



##########
be/src/vec/olap/olap_data_convertor.h:
##########
@@ -422,6 +422,7 @@ class OlapBlockDataConvertor {
         OlapColumnDataConvertorBaseUPtr _key_convertor;
         OlapColumnDataConvertorBaseUPtr _value_convertor;
         std::vector<const void*> _results;
+        PaddedPODArray<UInt64> _offsets;

Review Comment:
   it may be not safe to return address of _offsets in _results since the 
convertor lifecycle may be out of the return results' lifecycle



##########
be/src/olap/rowset/segment_v2/column_reader.cpp:
##########
@@ -680,9 +720,51 @@ Status StructFileColumnIterator::read_by_rowids(const 
rowid_t* rowids, const siz
 }
 
 
////////////////////////////////////////////////////////////////////////////////
+Status OffsetFileColumnIterator::init(const ColumnIteratorOptions& opts) {
+    RETURN_IF_ERROR(_offset_iterator->init(opts));
+    return Status::OK();
+}
+
+Status OffsetFileColumnIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                            bool* has_null) {
+    RETURN_IF_ERROR(_offset_iterator->next_batch(n, dst, has_null));
+    return Status::OK();
+}
+
+Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) {
+    if (_offset_iterator->get_current_page()->has_remaining()) {
+        PageDecoder* offset_page_decoder = 
_offset_iterator->get_current_page()->data_decoder;
+        vectorized::MutableColumnPtr offset_col = 
vectorized::ColumnUInt64::create();
+        size_t n = 1;
+        RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, offset_col)); 
// not null
+        DCHECK(offset_col->size() == 1);
+        *offset = offset_col->get_uint(0);
+    } else {
+        *offset = 
_offset_iterator->get_current_page()->next_array_item_ordinal;
+    }
+    return Status::OK();
+}
 
+Status OffsetFileColumnIterator::_calculate_offsets(

Review Comment:
   add comment for this function



##########
be/src/olap/rowset/segment_v2/column_reader.cpp:
##########
@@ -580,13 +601,32 @@ Status MapFileColumnIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr
     const auto* column_map = 
vectorized::check_and_get_column<vectorized::ColumnMap>(
             dst->is_nullable() ? 
static_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()
                                : *dst);
-    size_t num_read = *n;
-    auto column_key_ptr = column_map->get_keys().assume_mutable();
-    auto column_val_ptr = column_map->get_values().assume_mutable();
-    RETURN_IF_ERROR(_key_iterator->next_batch(&num_read, column_key_ptr, 
has_null));
-    RETURN_IF_ERROR(_val_iterator->next_batch(&num_read, column_val_ptr, 
has_null));
+    auto column_offsets_ptr = 
column_map->get_offsets_column().assume_mutable();
+    bool offsets_has_null = false;

Review Comment:
   xx_has_null is not used after xx->next_batch



##########
be/src/olap/rowset/segment_v2/column_reader.cpp:
##########
@@ -680,9 +720,51 @@ Status StructFileColumnIterator::read_by_rowids(const 
rowid_t* rowids, const siz
 }
 
 
////////////////////////////////////////////////////////////////////////////////
+Status OffsetFileColumnIterator::init(const ColumnIteratorOptions& opts) {
+    RETURN_IF_ERROR(_offset_iterator->init(opts));
+    return Status::OK();
+}
+
+Status OffsetFileColumnIterator::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                            bool* has_null) {
+    RETURN_IF_ERROR(_offset_iterator->next_batch(n, dst, has_null));
+    return Status::OK();
+}
+
+Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) {
+    if (_offset_iterator->get_current_page()->has_remaining()) {
+        PageDecoder* offset_page_decoder = 
_offset_iterator->get_current_page()->data_decoder;
+        vectorized::MutableColumnPtr offset_col = 
vectorized::ColumnUInt64::create();
+        size_t n = 1;
+        RETURN_IF_ERROR(offset_page_decoder->peek_next_batch(&n, offset_col)); 
// not null
+        DCHECK(offset_col->size() == 1);
+        *offset = offset_col->get_uint(0);
+    } else {
+        *offset = 
_offset_iterator->get_current_page()->next_array_item_ordinal;
+    }
+    return Status::OK();
+}
 
+Status OffsetFileColumnIterator::_calculate_offsets(

Review Comment:
   it seems that this function can be defined as static, since it does not use 
any class member 



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -25,50 +25,108 @@ namespace doris::vectorized {
 /** A column of map values.
   */
 std::string ColumnMap::get_name() const {
-    return "Map(" + keys->get_name() + ", " + values->get_name() + ")";
+    return "Map(" + keys_column->get_name() + ", " + values_column->get_name() 
+ ")";
 }
 
-ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values)
-        : keys(std::move(keys)), values(std::move(values)) {
-    check_size();
-}
+ColumnMap::ColumnMap(MutableColumnPtr&& keys, MutableColumnPtr&& values, 
MutableColumnPtr&& offsets)
+        : keys_column(std::move(keys)),
+          values_column(std::move(values)),
+          offsets_column(std::move(offsets)) {
+    const COffsets* offsets_concrete = typeid_cast<const 
COffsets*>(offsets_column.get());
 
-ColumnArray::Offsets64& ColumnMap::get_offsets() const {
-    const ColumnArray& column_keys = assert_cast<const 
ColumnArray&>(get_keys());
-    // todo . did here check size ?
-    return const_cast<Offsets64&>(column_keys.get_offsets());
-}
+    if (!offsets_concrete) {
+        LOG(FATAL) << "offsets_column must be a ColumnUInt64";
+    }
 
-void ColumnMap::check_size() const {
-    const auto* key_array = typeid_cast<const ColumnArray*>(keys.get());
-    const auto* value_array = typeid_cast<const ColumnArray*>(values.get());
-    CHECK(key_array) << "ColumnMap keys can be created only from array";
-    CHECK(value_array) << "ColumnMap values can be created only from array";
-    CHECK_EQ(get_keys_ptr()->size(), get_values_ptr()->size());
+    if (!offsets_concrete->empty() && keys && values) {
+        auto last_offset = offsets_concrete->get_data().back();
+
+        /// This will also prevent possible overflow in offset.
+        if (keys_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
key_column";
+        }
+        if (values_column->size() != last_offset) {
+            LOG(FATAL) << "offsets_column has data inconsistent with 
value_column";
+        }
+    }
 }
 
 // todo. here to resize every row map
 MutableColumnPtr ColumnMap::clone_resized(size_t to_size) const {
-    auto res = ColumnMap::create(keys->clone_resized(to_size), 
values->clone_resized(to_size));
+    auto res = ColumnMap::create(get_keys().clone_empty(), 
get_values().clone_empty(),
+                                 COffsets::create());
+    if (to_size == 0) {
+        return res;
+    }
+
+    size_t from_size = size();
+
+    if (to_size <= from_size) {
+        res->get_offsets().assign(get_offsets().begin(), get_offsets().begin() 
+ to_size);
+        res->get_keys().insert_range_from(get_keys(), 0, get_offsets()[to_size 
- 1]);
+        res->get_values().insert_range_from(get_values(), 0, 
get_offsets()[to_size - 1]);
+    } else {
+        /// Copy column and append empty arrays for extra elements.
+        Offset64 offset = 0;
+        if (from_size > 0) {
+            res->get_offsets().assign(get_offsets().begin(), 
get_offsets().end());
+            res->get_keys().insert_range_from(get_keys(), 0, 
get_keys().size());
+            res->get_values().insert_range_from(get_values(), 0, 
get_values().size());
+            offset = get_offsets().back();
+        }
+        res->get_offsets().resize(to_size);
+        for (size_t i = from_size; i < to_size; ++i) {
+            res->get_offsets()[i] = offset;
+        }
+    }
     return res;
 }
 
 // to support field functions
 Field ColumnMap::operator[](size_t n) const {
-    // Map is FieldVector , see in field.h
-    Map res(2);
-    keys->get(n, res[0]);
-    values->get(n, res[1]);
+    // Map is FieldVector, now we keep key value in seperate  , see in field.h
+    Map m(2);
+    size_t start_offset = offset_at(n);
+    size_t element_size = size_at(n);
+
+    if (element_size > max_array_size_as_field) {
+        LOG(FATAL) << "element size " << start_offset
+                   << " is too large to be manipulated as single map field,"
+                   << "maximum size " << max_array_size_as_field;
+    }
 
-    return res;
+    Array k(element_size), v(element_size);

Review Comment:
   use name keys instead of k is better to indicate it's a array of keys



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to