This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/variant-sparse by this push:
     new 553db1844da [fix] (variant) add serialize and deserialize (#45487)
553db1844da is described below

commit 553db1844da808caf4de3609a44b7b9d6bd87fc5
Author: Sun Chenyang <suncheny...@selectdb.com>
AuthorDate: Mon Dec 16 21:37:36 2024 +0800

    [fix] (variant) add serialize and deserialize (#45487)
---
 .../rowset/segment_v2/hierarchical_data_reader.cpp |   2 +-
 .../rowset/segment_v2/hierarchical_data_reader.h   |   4 +-
 be/src/vec/columns/column_object.cpp               | 367 +++++++++++++--------
 be/src/vec/columns/column_object.h                 |  38 +--
 .../vec/data_types/serde/data_type_array_serde.cpp |  18 +
 .../vec/data_types/serde/data_type_array_serde.h   |   3 +
 .../vec/data_types/serde/data_type_jsonb_serde.cpp |  12 +
 .../vec/data_types/serde/data_type_jsonb_serde.h   |   3 +
 .../data_types/serde/data_type_nullable_serde.cpp  |  15 +
 .../data_types/serde/data_type_nullable_serde.h    |   3 +
 .../data_types/serde/data_type_number_serde.cpp    |  10 +
 .../vec/data_types/serde/data_type_number_serde.h  |   3 +
 be/src/vec/data_types/serde/data_type_serde.h      |   2 +-
 .../vec/data_types/serde/data_type_string_serde.h  |  12 +
 14 files changed, 331 insertions(+), 161 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
index db6bac6b8b4..c85e4b429ad 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
@@ -206,7 +206,7 @@ Status 
ExtractReader::extract_to(vectorized::MutableColumnPtr& dst, size_t nrows
                  ""},
                 expected_type, &cast_column));
         variant.get_root()->insert_range_from(*cast_column, 0, nrows);
-        variant.set_num_rows(variant.get_root()->size());
+        // variant.set_num_rows(variant.get_root()->size());
     }
     if (dst->is_nullable()) {
         // fill nullmap
diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h 
b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
index f85038713ca..6c8ced89cd2 100644
--- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
+++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
@@ -240,10 +240,10 @@ private:
                 src_null_map.clear();
                 assert_cast<ColumnObject&>(
                         
assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column())
-                        .clear_subcolumns_data();
+                        .clear_column_data();
             } else {
                 ColumnObject& root_column = 
assert_cast<ColumnObject&>(*_root_reader->column);
-                root_column.clear_subcolumns_data();
+                root_column.clear_column_data();
             }
         } else {
             if (dst->is_nullable()) {
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index c1a50f6064b..9d6e260724b 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -616,18 +616,20 @@ bool ColumnObject::Subcolumn::is_finalized() const {
 }
 
 template <typename Func>
-MutableColumnPtr ColumnObject::apply_for_subcolumns(Func&& func) const {
+MutableColumnPtr ColumnObject::apply_for_columns(Func&& func) const {
     if (!is_finalized()) {
         auto finalized = clone_finalized();
         auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
-        return finalized_object.apply_for_subcolumns(std::forward<Func>(func));
+        return finalized_object.apply_for_columns(std::forward<Func>(func));
     }
     auto res = ColumnObject::create(is_nullable, false);
     for (const auto& subcolumn : subcolumns) {
-        auto new_subcolumn = func(subcolumn->data.get_finalized_column());
+        auto new_subcolumn = func(subcolumn->data.get_finalized_column_ptr());
         res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(),
                             subcolumn->data.get_least_common_type());
     }
+    auto sparse_column = func(serialized_sparse_column);
+    res->serialized_sparse_column = sparse_column->assume_mutable();
     check_consistency();
     return res;
 }
@@ -642,6 +644,7 @@ void ColumnObject::resize(size_t n) {
         for (auto& subcolumn : subcolumns) {
             subcolumn->data.pop_back(num_rows - n);
         }
+        serialized_sparse_column->pop_back(num_rows - n);
     }
     num_rows = n;
 }
@@ -809,8 +812,13 @@ ColumnObject::ColumnObject(Subcolumns&& subcolumns_, bool 
is_nullable_)
     check_consistency();
 }
 
+ColumnObject::ColumnObject(size_t num_rows) : is_nullable(true) {
+    insert_many_defaults(num_rows);
+    check_consistency();
+}
+
 void ColumnObject::check_consistency() const {
-    if (subcolumns.empty()) {
+    if (subcolumns.empty() && serialized_sparse_column->empty()) {
         return;
     }
     for (const auto& leaf : subcolumns) {
@@ -820,6 +828,11 @@ void ColumnObject::check_consistency() const {
                                    leaf->path.get_path(), num_rows, 
leaf->data.size());
         }
     }
+    if (num_rows != serialized_sparse_column->size()) {
+        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                               "unmatched sparse column:, expeted rows: {}, 
but meet: {}", num_rows,
+                               serialized_sparse_column->size());
+    }
 }
 
 size_t ColumnObject::size() const {
@@ -835,13 +848,11 @@ MutableColumnPtr ColumnObject::clone_resized(size_t 
new_size) const {
     }
     // If subcolumns are empty, then res will be empty but new_size > 0
     if (subcolumns.empty()) {
-        // Add an emtpy column with new_size rows
-        auto res = ColumnObject::create(true, false);
-        res->set_num_rows(new_size);
+        auto res = ColumnObject::create(new_size);
         return res;
     }
-    auto res = apply_for_subcolumns(
-            [&](const auto& subcolumn) { return 
subcolumn.clone_resized(new_size); });
+    auto res = apply_for_columns(
+            [&](const ColumnPtr column) { return 
column->clone_resized(new_size); });
     return res;
 }
 
@@ -850,6 +861,7 @@ size_t ColumnObject::byte_size() const {
     for (const auto& entry : subcolumns) {
         res += entry->data.byteSize();
     }
+    res += serialized_sparse_column->byte_size();
     return res;
 }
 
@@ -858,6 +870,7 @@ size_t ColumnObject::allocated_bytes() const {
     for (const auto& entry : subcolumns) {
         res += entry->data.allocatedBytes();
     }
+    res += serialized_sparse_column->allocated_bytes();
     return res;
 }
 
@@ -940,6 +953,7 @@ void ColumnObject::insert_default() {
     for (auto& entry : subcolumns) {
         entry->data.insert_default();
     }
+    serialized_sparse_column->insert_default();
     ++num_rows;
 }
 
@@ -1000,16 +1014,18 @@ void 
ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std:
 
     // remove default
     row -= num_of_defaults_in_prefix;
-    is_null = false;
     for (size_t i = 0; i < data.size(); ++i) {
         const auto& part = data[i];
         if (row < part->size()) {
-            // insert key
-            key->insert_data(path.data(), path.size());
-            // insert value
-            const auto& part_type = data_types[i];
-            const auto& serde = part_type->get_serde();
-            serde->write_one_cell_to_binary(*part, value, row);
+            if (assert_cast<const ColumnNullable&>(*part).is_null_at(row)) {
+                is_null = true;
+            } else {
+                is_null = false;
+                // insert key
+                key->insert_data(path.data(), path.size());
+                // insert value
+                data_types[i]->get_serde()->write_one_cell_to_binary(*part, 
value, row);
+            }
             return;
         }
 
@@ -1020,6 +1036,110 @@ void 
ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std:
                            "Index ({}) for serialize to sparse column is out 
of range", row);
 }
 
+const char* parse_binary_from_sparse_column(TypeIndex type, const char* data, 
Field& res,
+                                            FieldInfo& info_res) {
+    const char* end = data;
+    switch (type) {
+    case TypeIndex::String: {
+        const size_t size = *reinterpret_cast<const size_t*>(data);
+        data += sizeof(size_t);
+        res = Field(String(data, size));
+        end = data + size;
+        break;
+    }
+    case TypeIndex::Int8: {
+        res = *reinterpret_cast<const Int8*>(data);
+        end = data + sizeof(Int8);
+        break;
+    }
+    case TypeIndex::Int16: {
+        res = *reinterpret_cast<const Int16*>(data);
+        end = data + sizeof(Int16);
+        break;
+    }
+    case TypeIndex::Int32: {
+        res = *reinterpret_cast<const Int32*>(data);
+        end = data + sizeof(Int32);
+        break;
+    }
+    case TypeIndex::Int64: {
+        res = *reinterpret_cast<const Int64*>(data);
+        end = data + sizeof(Int64);
+        break;
+    }
+    case TypeIndex::Float32: {
+        res = *reinterpret_cast<const Float32*>(data);
+        end = data + sizeof(Float32);
+        break;
+    }
+    case TypeIndex::Float64: {
+        res = *reinterpret_cast<const Float64*>(data);
+        end = data + sizeof(Float64);
+        break;
+    }
+    case TypeIndex::JSONB: {
+        size_t size = *reinterpret_cast<const size_t*>(data);
+        data += sizeof(size_t);
+        res = JsonbField(data, size);
+        end = data + size;
+        break;
+    }
+    case TypeIndex::Array: {
+        const size_t size = *reinterpret_cast<const size_t*>(data);
+        data += sizeof(size_t);
+        res = Array(size);
+        vectorized::Array& array = res.get<Array>();
+        info_res.num_dimensions++;
+        for (size_t i = 0; i < size; ++i) {
+            const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++);
+            if (is_null) {
+                array.emplace_back(Null());
+                continue;
+            }
+            Field nested_field;
+            const TypeIndex nested_type =
+                    assert_cast<const TypeIndex>(*reinterpret_cast<const 
uint8_t*>(data++));
+            data = parse_binary_from_sparse_column(nested_type, data, 
nested_field, info_res);
+            array.emplace_back(std::move(nested_field));
+        }
+        end = data;
+        break;
+    }
+    default:
+        throw doris::Exception(ErrorCode::OUT_OF_BOUND,
+                               "Type ({}) for deserialize_from_sparse_column 
is invalid", type);
+    }
+    return end;
+}
+
+std::pair<Field, FieldInfo> ColumnObject::deserialize_from_sparse_column(const 
ColumnString* value,
+                                                                         
size_t row) const {
+    const auto& data_ref = value->get_data_at(row);
+    const char* data = data_ref.data;
+    DCHECK(data_ref.size > 0);
+
+    FieldInfo info_res = {
+            .scalar_type_id = TypeIndex::Nothing,
+            .have_nulls = false,
+            .need_convert = false,
+            .num_dimensions = 1,
+    };
+    // 0 is null
+    const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++);
+    if (is_null) {
+        DCHECK(data_ref.size == 1);
+        return {Null(), info_res};
+    }
+
+    DCHECK(data_ref.size > 1);
+    const TypeIndex type = assert_cast<const 
TypeIndex>(*reinterpret_cast<const uint8_t*>(data++));
+    info_res.scalar_type_id = type;
+    Field res;
+    const char* end = parse_binary_from_sparse_column(type, data, res, 
info_res);
+    DCHECK_EQ(end - data_ref.data, data_ref.size);
+    return {std::move(res), std::move(info_res)};
+}
+
 Field ColumnObject::operator[](size_t n) const {
     Field object;
     get(n, object);
@@ -1043,6 +1163,18 @@ void ColumnObject::get(size_t n, Field& res) const {
             object.try_emplace(entry->path.get_path(), field);
         }
     }
+
+    const auto& [path, value] = get_sparse_data_paths_and_values();
+    auto& sparse_column_offsets = serialized_sparse_column_offsets();
+    size_t offset = sparse_column_offsets[n - 1];
+    size_t end = sparse_column_offsets[n];
+    // Iterator over [path, binary value]
+    for (size_t i = offset; i != end; ++i) {
+        const StringRef path_data = path->get_data_at(i);
+        const auto& data = deserialize_from_sparse_column(value, i);
+        object.try_emplace(std::string(path_data.data, path_data.size), 
data.first);
+    }
+
     if (object.empty()) {
         res = Null();
     }
@@ -1159,7 +1291,7 @@ void 
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
     const auto& src_serialized_sparse_column_offsets = 
src.serialized_sparse_column_offsets();
     if (src_serialized_sparse_column_offsets[start - 1] ==
         src_serialized_sparse_column_offsets[start + length - 1]) {
-        size_t current_size = size();
+        size_t current_size = num_rows;
 
         /// If no src subcolumns should be inserted into sparse column, insert 
defaults.
         if (sorted_src_subcolumn_for_sparse_column.empty()) {
@@ -1228,7 +1360,8 @@ void 
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
             const PathInData column_path(src_sparse_path);
             if (auto* subcolumn = get_subcolumn(column_path); subcolumn != 
nullptr) {
                 // Deserialize binary value into subcolumn from src serialized 
sparse column data.
-                
subcolumn->deserialize_from_sparse_column(src_sparse_column_values, i);
+                const auto& data = 
src.deserialize_from_sparse_column(src_sparse_column_values, i);
+                subcolumn->insert(data.first, data.second);
             } else {
                 // Before inserting this path into sparse column check if we 
need to
                 // insert suibcolumns from 
sorted_src_subcolumn_for_sparse_column before.
@@ -1284,16 +1417,6 @@ void 
ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column(
     return;
 }
 
-ColumnPtr ColumnObject::replicate(const Offsets& offsets) const {
-    if (subcolumns.empty()) {
-        // Add an emtpy column with offsets.back rows
-        auto res = ColumnObject::create(true, false);
-        res->set_num_rows(offsets.back());
-    }
-    return apply_for_subcolumns(
-            [&](const auto& subcolumn) { return subcolumn.replicate(offsets); 
});
-}
-
 ColumnPtr ColumnObject::permute(const Permutation& perm, size_t limit) const {
     if (subcolumns.empty()) {
         if (limit == 0) {
@@ -1306,19 +1429,17 @@ ColumnPtr ColumnObject::permute(const Permutation& 
perm, size_t limit) const {
             throw doris::Exception(ErrorCode::INTERNAL_ERROR,
                                    "Size of permutation is less than 
required.");
         }
-        // Add an emtpy column with limit rows
-        auto res = ColumnObject::create(true, false);
-        res->set_num_rows(limit);
+        auto res = ColumnObject::create(limit);
         return res;
     }
-    return apply_for_subcolumns(
-            [&](const auto& subcolumn) { return subcolumn.permute(perm, 
limit); });
+    return apply_for_columns([&](const ColumnPtr column) { return 
column->permute(perm, limit); });
 }
 
 void ColumnObject::pop_back(size_t length) {
     for (auto& entry : subcolumns) {
         entry->data.pop_back(length);
     }
+    serialized_sparse_column->pop_back(length);
     num_rows -= length;
 }
 
@@ -1441,15 +1562,6 @@ bool ColumnObject::add_sub_column(const PathInData& key, 
size_t new_size) {
     return true;
 }
 
-PathsInData ColumnObject::getKeys() const {
-    PathsInData keys;
-    keys.reserve(subcolumns.size());
-    for (const auto& entry : subcolumns) {
-        keys.emplace_back(entry->path);
-    }
-    return keys;
-}
-
 bool ColumnObject::is_finalized() const {
     return std::all_of(subcolumns.begin(), subcolumns.end(),
                        [](const auto& entry) { return 
entry->data.is_finalized(); });
@@ -1704,8 +1816,6 @@ Status ColumnObject::serialize_sparse_columns(
         std::map<std::string_view, Subcolumn>&& remaing_subcolumns) {
     CHECK(is_finalized());
 
-    serialized_sparse_column = ColumnMap::create(ColumnString::create(), 
ColumnString::create(),
-                                                 
ColumnArray::ColumnOffsets::create());
     if (remaing_subcolumns.empty()) {
         serialized_sparse_column->insert_many_defaults(num_rows);
         return Status::OK();
@@ -1784,39 +1894,6 @@ Status ColumnObject::finalize(FinalizeMode mode) {
         new_subcolumns.get_mutable_root()->data.finalize(mode);
     }
 
-    // pick sparse columns
-    std::set<std::string_view> selected_path;
-    std::vector<std::string_view> remaining_path;
-    if (subcolumns.size() > MAX_SUBCOLUMNS) {
-        // pick subcolumns sort by size of none null values
-        std::unordered_map<std::string_view, size_t> none_null_value_sizes;
-        // 1. get the none null value sizes
-        for (auto&& entry : subcolumns) {
-            if (entry->data.is_root) {
-                continue;
-            }
-            size_t size = entry->data.get_non_null_value_size();
-            none_null_value_sizes[entry->path.get_path()] = size;
-        }
-        // 2. sort by the size
-        std::vector<std::pair<std::string_view, size_t>> sorted_by_size(
-                none_null_value_sizes.begin(), none_null_value_sizes.end());
-        std::sort(sorted_by_size.begin(), sorted_by_size.end(),
-                  [](const auto& a, const auto& b) { return a.second > 
b.second; });
-
-        // 3. pick MAX_SUBCOLUMNS selected subcolumns
-        for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, 
sorted_by_size.size()); ++i) {
-            selected_path.insert(sorted_by_size[i].first);
-        }
-
-        // 4. put remaining subcolumns to remaining_subcolumns
-        for (const auto& entry : sorted_by_size) {
-            if (selected_path.find(entry.first) == selected_path.end()) {
-                remaining_path.emplace_back(entry.first);
-            }
-        }
-    }
-
     // finalize all subcolumns
     for (auto&& entry : subcolumns) {
         const auto& least_common_type = entry->data.get_least_common_type();
@@ -1840,24 +1917,57 @@ Status ColumnObject::finalize(FinalizeMode mode) {
         }
     }
 
-    // add selected subcolumns to new_subcolumns
-    for (auto&& entry : subcolumns) {
-        if (selected_path.find(entry->path.get_path()) != selected_path.end()) 
{
-            new_subcolumns.add(entry->path, entry->data);
+    // merge and encode sparse column
+    if (mode == FinalizeMode::WRITE_MODE) {
+        // pick sparse columns
+        std::set<std::string_view> selected_path;
+        std::vector<std::string_view> remaining_path;
+        if (subcolumns.size() > MAX_SUBCOLUMNS) {
+            // pick subcolumns sort by size of none null values
+            std::unordered_map<std::string_view, size_t> none_null_value_sizes;
+            // 1. get the none null value sizes
+            for (auto&& entry : subcolumns) {
+                if (entry->data.is_root) {
+                    continue;
+                }
+                size_t size = entry->data.get_non_null_value_size();
+                none_null_value_sizes[entry->path.get_path()] = size;
+            }
+            // 2. sort by the size
+            std::vector<std::pair<std::string_view, size_t>> sorted_by_size(
+                    none_null_value_sizes.begin(), 
none_null_value_sizes.end());
+            std::sort(sorted_by_size.begin(), sorted_by_size.end(),
+                      [](const auto& a, const auto& b) { return a.second > 
b.second; });
+
+            // 3. pick MAX_SUBCOLUMNS selected subcolumns
+            for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, 
sorted_by_size.size()); ++i) {
+                selected_path.insert(sorted_by_size[i].first);
+            }
+
+            // 4. put remaining subcolumns to remaining_subcolumns
+            for (const auto& entry : sorted_by_size) {
+                if (selected_path.find(entry.first) == selected_path.end()) {
+                    remaining_path.emplace_back(entry.first);
+                }
+            }
+        }
+        // add selected subcolumns to new_subcolumns
+        for (auto&& entry : subcolumns) {
+            if (selected_path.find(entry->path.get_path()) != 
selected_path.end()) {
+                new_subcolumns.add(entry->path, entry->data);
+            }
         }
-    }
 
-    std::map<std::string_view, Subcolumn> remaing_subcolumns;
-    // merge remaining subcolumns to sparse_column
-    for (auto&& entry : subcolumns) {
-        if (selected_path.find(entry->path.get_path()) != selected_path.end()) 
{
-            remaing_subcolumns.emplace(entry->path.get_path(), entry->data);
+        std::map<std::string_view, Subcolumn> remaing_subcolumns;
+        // merge remaining subcolumns to sparse_column
+        for (auto&& entry : subcolumns) {
+            if (selected_path.find(entry->path.get_path()) != 
selected_path.end()) {
+                remaing_subcolumns.emplace(entry->path.get_path(), 
entry->data);
+            }
         }
+        
RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns)));
     }
 
-    // merge and encode sparse column
-    RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns)));
-
     std::swap(subcolumns, new_subcolumns);
     doc_structure = nullptr;
     _prev_positions.clear();
@@ -1894,6 +2004,7 @@ ColumnPtr get_base_column_of_array(const ColumnPtr& 
column) {
     return column;
 }
 
+// ----
 ColumnPtr ColumnObject::filter(const Filter& filter, ssize_t count) const {
     if (!is_finalized()) {
         auto finalized = clone_finalized();
@@ -1901,9 +2012,7 @@ ColumnPtr ColumnObject::filter(const Filter& filter, 
ssize_t count) const {
         return finalized_object.filter(filter, count);
     }
     if (subcolumns.empty()) {
-        // Add an emtpy column with filtered rows
-        auto res = ColumnObject::create(true, false);
-        res->set_num_rows(count_bytes_in_filter(filter));
+        auto res = ColumnObject::create(count_bytes_in_filter(filter));
         return res;
     }
     auto new_column = ColumnObject::create(true, false);
@@ -1912,35 +2021,17 @@ ColumnPtr ColumnObject::filter(const Filter& filter, 
ssize_t count) const {
         new_column->add_sub_column(entry->path, subcolumn->assume_mutable(),
                                    entry->data.get_least_common_type());
     }
+    // filter
     return new_column;
 }
 
-Status ColumnObject::filter_by_selector(const uint16_t* sel, size_t sel_size, 
IColumn* col_ptr) {
-    if (!is_finalized()) {
-        finalize();
-    }
-    if (subcolumns.empty()) {
-        assert_cast<ColumnObject*>(col_ptr)->insert_many_defaults(sel_size);
-        return Status::OK();
-    }
-    auto* res = assert_cast<ColumnObject*>(col_ptr);
-    for (const auto& subcolumn : subcolumns) {
-        auto new_subcolumn = 
subcolumn->data.get_least_common_type()->create_column();
-        
RETURN_IF_ERROR(subcolumn->data.get_finalized_column().filter_by_selector(
-                sel, sel_size, new_subcolumn.get()));
-        res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(),
-                            subcolumn->data.get_least_common_type());
-    }
-    return Status::OK();
-}
-
 size_t ColumnObject::filter(const Filter& filter) {
     if (!is_finalized()) {
         finalize();
     }
     size_t count = filter.size() - 
simd::count_zero_num((int8_t*)filter.data(), filter.size());
     if (count == 0) {
-        for_each_subcolumn([](auto& part) { part->clear(); });
+        clear();
     } else {
         for_each_subcolumn([&](auto& part) {
             if (part->size() != count) {
@@ -1958,6 +2049,14 @@ size_t ColumnObject::filter(const Filter& filter) {
                 }
             }
         });
+        const auto result_size = serialized_sparse_column->filter(filter);
+        if (result_size != count) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "result_size not euqal with filter_size, 
result_size={}, "
+                            "filter_size={}",
+                            result_size, count);
+        }
+        CHECK_EQ(result_size, count);
     }
     num_rows = count;
 #ifndef NDEBUG
@@ -1966,7 +2065,7 @@ size_t ColumnObject::filter(const Filter& filter) {
     return count;
 }
 
-void ColumnObject::clear_subcolumns_data() {
+void ColumnObject::clear_column_data() {
     for (auto& entry : subcolumns) {
         for (auto& part : entry->data.data) {
             DCHECK_EQ(part->use_count(), 1);
@@ -1974,12 +2073,14 @@ void ColumnObject::clear_subcolumns_data() {
         }
         entry->data.num_of_defaults_in_prefix = 0;
     }
+    serialized_sparse_column->clear();
     num_rows = 0;
 }
 
 void ColumnObject::clear() {
     Subcolumns empty;
     std::swap(empty, subcolumns);
+    serialized_sparse_column->clear();
     num_rows = 0;
     _prev_positions.clear();
 }
@@ -2063,61 +2164,53 @@ void ColumnObject::insert_indices_from(const IColumn& 
src, const uint32_t* indic
     }
 }
 
-void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback 
callback) const {
+template <typename Func>
+void ColumnObject::for_each_imutable_column(Func&& callback) const {
     if (!is_finalized()) {
         auto finalized = clone_finalized();
         auto& finalized_object = assert_cast<ColumnObject&>(*finalized);
-        finalized_object.for_each_imutable_subcolumn(callback);
+        finalized_object.for_each_imutable_column(callback);
         return;
     }
     for (const auto& entry : subcolumns) {
         for (auto& part : entry->data.data) {
-            callback(*part);
+            callback(part);
         }
     }
-}
-
-bool ColumnObject::is_exclusive() const {
-    bool is_exclusive = IColumn::is_exclusive();
-    for_each_imutable_subcolumn([&](const auto& subcolumn) {
-        if (!subcolumn.is_exclusive()) {
-            is_exclusive = false;
-        }
-    });
-    return is_exclusive;
+    callback(serialized_sparse_column);
 }
 
 void ColumnObject::update_hash_with_value(size_t n, SipHash& hash) const {
-    for_each_imutable_subcolumn(
-            [&](const auto& subcolumn) { return 
subcolumn.update_hash_with_value(n, hash); });
+    for_each_imutable_column(
+            [&](const ColumnPtr column) { return 
column->update_hash_with_value(n, hash); });
 }
 
 void ColumnObject::update_hashes_with_value(uint64_t* __restrict hashes,
                                             const uint8_t* __restrict 
null_data) const {
-    for_each_imutable_subcolumn([&](const auto& subcolumn) {
-        return subcolumn.update_hashes_with_value(hashes, nullptr);
+    for_each_imutable_column([&](const ColumnPtr column) {
+        return column->update_hashes_with_value(hashes, nullptr);
     });
 }
 
 void ColumnObject::update_xxHash_with_value(size_t start, size_t end, 
uint64_t& hash,
                                             const uint8_t* __restrict 
null_data) const {
-    for_each_imutable_subcolumn([&](const auto& subcolumn) {
-        return subcolumn.update_xxHash_with_value(start, end, hash, nullptr);
+    for_each_imutable_column([&](const ColumnPtr column) {
+        return column->update_xxHash_with_value(start, end, hash, nullptr);
     });
 }
 
 void ColumnObject::update_crcs_with_value(uint32_t* __restrict hash, 
PrimitiveType type,
                                           uint32_t rows, uint32_t offset,
                                           const uint8_t* __restrict null_data) 
const {
-    for_each_imutable_subcolumn([&](const auto& subcolumn) {
-        return subcolumn.update_crcs_with_value(hash, type, rows, offset, 
nullptr);
+    for_each_imutable_column([&](const ColumnPtr column) {
+        return column->update_crcs_with_value(hash, type, rows, offset, 
nullptr);
     });
 }
 
 void ColumnObject::update_crc_with_value(size_t start, size_t end, uint32_t& 
hash,
                                          const uint8_t* __restrict null_data) 
const {
-    for_each_imutable_subcolumn([&](const auto& subcolumn) {
-        return subcolumn.update_crc_with_value(start, end, hash, nullptr);
+    for_each_imutable_column([&](const ColumnPtr column) {
+        return column->update_crc_with_value(start, end, hash, nullptr);
     });
 }
 
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 38ed5478f02..72cc783caf8 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -98,7 +98,7 @@ public:
     constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB;
     // Nullable(Array(Nullable(Object)))
     const static DataTypePtr NESTED_TYPE;
-    const size_t MAX_SUBCOLUMNS = 200;
+    const size_t MAX_SUBCOLUMNS = 3;
     // Finlize mode for subcolumns, write mode will estimate which subcolumns 
are sparse columns(too many null values inside column),
     // merge and encode them into a shared column in root column. Only affects 
in flush block to segments.
     // Otherwise read mode should be as default mode.
@@ -185,9 +185,6 @@ public:
         void serialize_to_sparse_column(ColumnString* key, std::string_view 
path,
                                         ColumnString* value, size_t row, bool& 
is_null);
 
-        // Deserialize the i-th row of the column from the sparse column.
-        void deserialize_from_sparse_column(const ColumnString* value, size_t 
row) {}
-
         friend class ColumnObject;
 
     private:
@@ -263,7 +260,8 @@ private:
 
     // It's filled when the number of subcolumns reaches the limit.
     // It has type Map(String, String) and stores a map (path, binary 
serialized subcolumn value) for each row.
-    WrappedPtr serialized_sparse_column;
+    WrappedPtr serialized_sparse_column = ColumnMap::create(
+            ColumnString::create(), ColumnString::create(), 
ColumnArray::ColumnOffsets::create());
 
 public:
     static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
@@ -272,6 +270,9 @@ public:
 
     explicit ColumnObject(bool is_nullable_, DataTypePtr type, 
MutableColumnPtr&& column);
 
+    // create without root, num_rows = size
+    explicit ColumnObject(size_t size);
+
     ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_);
 
     ~ColumnObject() override = default;
@@ -316,8 +317,6 @@ public:
     // Only single scalar root column
     bool is_scalar_variant() const;
 
-    bool is_exclusive() const override;
-
     ColumnPtr get_root() const { return 
subcolumns.get_root()->data.get_finalized_column_ptr(); }
 
     bool has_subcolumn(const PathInData& key) const;
@@ -365,8 +364,6 @@ public:
         return serialized_sparse_column->convert_to_full_column_if_const();
     }
 
-    PathsInData getKeys() const;
-
     // use sparse_subcolumns_schema to record sparse column's path info and 
type
     Status finalize(FinalizeMode mode);
 
@@ -385,7 +382,7 @@ public:
 
     void resize(size_t n) override;
 
-    void clear_subcolumns_data();
+    void clear_column_data();
 
     std::string get_name() const override {
         if (is_scalar_variant()) {
@@ -416,8 +413,6 @@ public:
 
     void insert_default() override;
 
-    ColumnPtr replicate(const Offsets& offsets) const override;
-
     void pop_back(size_t length) override;
 
     Field operator[](size_t n) const override;
@@ -428,8 +423,6 @@ public:
 
     ColumnPtr filter(const Filter&, ssize_t) const override;
 
-    Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* 
col_ptr) override;
-
     size_t filter(const Filter&) override;
 
     ColumnPtr permute(const Permutation&, size_t) const override;
@@ -437,7 +430,7 @@ public:
     bool is_variable_length() const override { return true; }
 
     template <typename Func>
-    MutableColumnPtr apply_for_subcolumns(Func&& func) const;
+    MutableColumnPtr apply_for_columns(Func&& func) const;
 
     // Extract path from root column and output to dst
     Status extract_root(const PathInData& path, MutableColumnPtr& dst) const;
@@ -462,6 +455,10 @@ public:
     void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
                                const uint8_t* __restrict null_data) const 
override;
 
+    ColumnPtr replicate(const Offsets& offsets) const override {
+        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "replicate" + 
get_name());
+    }
+
     Int64 get_int(size_t /*n*/) const override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "get_int" + 
get_name());
     }
@@ -529,10 +526,6 @@ public:
                                "deserialize_vec_with_null_map" + get_name());
     }
 
-    Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* 
col_ptr) const {
-        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, 
"filter_by_selector" + get_name());
-    }
-
     bool structure_equals(const IColumn&) const override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, 
"structure_equals" + get_name());
     }
@@ -579,6 +572,10 @@ public:
         return {&key, &value};
     }
 
+    // Deserialize the i-th row of the column from the sparse column.
+    std::pair<Field, FieldInfo> deserialize_from_sparse_column(const 
ColumnString* value,
+                                                               size_t row) 
const;
+
 private:
     // May throw execption
     void try_insert(const Field& field);
@@ -586,7 +583,8 @@ private:
     /// It's used to get shared sized of Nested to insert correct default 
values.
     const Subcolumns::Node* get_leaf_of_the_same_nested(const 
Subcolumns::NodePtr& entry) const;
 
-    void for_each_imutable_subcolumn(ImutableColumnCallback callback) const;
+    template <typename Func>
+    void for_each_imutable_column(Func&& callback) const;
 
     // return null if not found
     const Subcolumn* get_subcolumn_with_cache(const PathInData& key, size_t 
index_hint) const;
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.cpp 
b/be/src/vec/data_types/serde/data_type_array_serde.cpp
index a56eb00dbdd..1b812781805 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_array_serde.cpp
@@ -434,5 +434,23 @@ Status DataTypeArraySerDe::read_column_from_pb(IColumn& 
column, const PValues& a
     }
     return Status::OK();
 }
+
+void DataTypeArraySerDe::write_one_cell_to_binary(const IColumn& src_column,
+                                                  ColumnString* dst_column, 
int64_t row_num) const {
+    const uint8_t type = static_cast<uint8_t>(TypeIndex::Array);
+    dst_column->insert_data(reinterpret_cast<const char*>(&type), 
sizeof(uint8_t));
+
+    const auto& array_col = assert_cast<const ColumnArray&>(src_column);
+    const IColumn& nested_column = array_col.get_data();
+    const auto& offsets = array_col.get_offsets();
+    size_t start = offsets[row_num - 1];
+    size_t end = offsets[row_num];
+    size_t size = end - start;
+    dst_column->insert_data(reinterpret_cast<const char*>(&size), 
sizeof(size_t));
+    for (size_t offset = start; offset != end; ++offset) {
+        nested_serde->write_one_cell_to_binary(nested_column, dst_column, 
offset);
+    }
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_array_serde.h 
b/be/src/vec/data_types/serde/data_type_array_serde.h
index 5b15f48f502..aaf1a425512 100644
--- a/be/src/vec/data_types/serde/data_type_array_serde.h
+++ b/be/src/vec/data_types/serde/data_type_array_serde.h
@@ -101,6 +101,9 @@ public:
         nested_serde->set_return_object_as_string(value);
     }
 
+    void write_one_cell_to_binary(const IColumn& src_column, ColumnString* 
dst_column,
+                                  int64_t row_num) const override;
+
 private:
     template <bool is_binary_format>
     Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp 
b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index 10218e4164d..f56bccc298c 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -277,5 +277,17 @@ Status DataTypeJsonbSerDe::read_column_from_pb(IColumn& 
column, const PValues& a
     }
     return Status::OK();
 }
+
+void DataTypeJsonbSerDe::write_one_cell_to_binary(const IColumn& src_column,
+                                                  ColumnString* dst_column, 
int64_t row_num) const {
+    const uint8_t type = static_cast<uint8_t>(TypeIndex::JSONB);
+    const auto& col = assert_cast<const ColumnString&>(src_column);
+    const auto& data_ref = col.get_data_at(row_num);
+    const size_t size = data_ref.size;
+
+    dst_column->insert_data(reinterpret_cast<const char*>(&type), 
sizeof(uint8_t));
+    dst_column->insert_data(reinterpret_cast<const char*>(&size), 
sizeof(size_t));
+    dst_column->insert_data(data_ref.data, size);
+}
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.h 
b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
index 5080b1ba46e..d6d29cce556 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.h
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.h
@@ -71,6 +71,9 @@ public:
                               int64_t end) const override;
     Status read_column_from_pb(IColumn& column, const PValues& arg) const 
override;
 
+    void write_one_cell_to_binary(const IColumn& src_column, ColumnString* 
dst_column,
+                                  int64_t row_num) const override;
+
 private:
     template <bool is_binary_format>
     Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp 
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index f21f160fb0a..d45b39c6d63 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -393,5 +393,20 @@ Status 
DataTypeNullableSerDe::read_one_cell_from_json(IColumn& column,
     return Status::OK();
 }
 
+void DataTypeNullableSerDe::write_one_cell_to_binary(const IColumn& src_column,
+                                                     ColumnString* dst_column,
+                                                     int64_t row_num) const {
+    auto& col = assert_cast<const ColumnNullable&>(src_column);
+    uint8_t is_null = 0;
+    if (col.is_null_at(row_num)) [[unlikely]] {
+        is_null = 1;
+        dst_column->insert_data(reinterpret_cast<const char*>(is_null), 
sizeof(uint8_t));
+    } else {
+        dst_column->insert_data(reinterpret_cast<const char*>(is_null), 
sizeof(uint8_t));
+        auto& nested_col = col.get_nested_column();
+        nested_serde->write_one_cell_to_binary(nested_col, dst_column, 
row_num);
+    }
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h 
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 6051c7f722d..51cbf54eaed 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -99,6 +99,9 @@ public:
                                   int64_t row_num) const override;
     Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& 
result) const override;
 
+    void write_one_cell_to_binary(const IColumn& src_column, ColumnString* 
dst_column,
+                                  int64_t row_num) const override;
+
 private:
     template <bool is_binary_format>
     Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp 
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index 9416fc9a8b3..5ba7fdf293a 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -393,6 +393,16 @@ Status DataTypeNumberSerDe<T>::write_column_to_orc(const 
std::string& timezone,
     return Status::OK();
 }
 
+template <typename T>
+void DataTypeNumberSerDe<T>::write_one_cell_to_binary(const IColumn& 
src_column,
+                                                      ColumnString* dst_column,
+                                                      int64_t row_num) const {
+    const uint8_t type = static_cast<uint8_t>(TypeId<T>::value);
+    dst_column->insert_data(reinterpret_cast<const char*>(&type), 
sizeof(uint8_t));
+    const auto& data_ref = assert_cast<const 
ColumnType&>(src_column).get_data_at(row_num);
+    dst_column->insert_data(data_ref.data, data_ref.size);
+}
+
 /// Explicit template instantiations - to avoid code bloat in headers.
 template class DataTypeNumberSerDe<UInt8>;
 template class DataTypeNumberSerDe<UInt16>;
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.h 
b/be/src/vec/data_types/serde/data_type_number_serde.h
index 203cd9dbf46..c9073f5e868 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.h
+++ b/be/src/vec/data_types/serde/data_type_number_serde.h
@@ -107,6 +107,9 @@ public:
                                   int64_t row_num) const override;
     Status read_one_cell_from_json(IColumn& column, const rapidjson::Value& 
result) const override;
 
+    void write_one_cell_to_binary(const IColumn& src_column, ColumnString* 
dst_column,
+                                  int64_t row_num) const override;
+
 private:
     template <bool is_binary_format>
     Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_serde.h 
b/be/src/vec/data_types/serde/data_type_serde.h
index 5b0e8fab65e..4634afa1449 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -338,7 +338,7 @@ public:
     virtual Status read_one_cell_from_json(IColumn& column, const 
rapidjson::Value& result) const;
 
     virtual void write_one_cell_to_binary(const IColumn& src_column, 
ColumnString* dst_column,
-                                          int64_t row_num) {
+                                          int64_t row_num) const {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, 
"write_one_cell_to_binary");
     }
 
diff --git a/be/src/vec/data_types/serde/data_type_string_serde.h 
b/be/src/vec/data_types/serde/data_type_string_serde.h
index 69a8cc26171..98cf89ada1e 100644
--- a/be/src/vec/data_types/serde/data_type_string_serde.h
+++ b/be/src/vec/data_types/serde/data_type_string_serde.h
@@ -366,6 +366,18 @@ public:
         return Status::OK();
     }
 
+    void write_one_cell_to_binary(const IColumn& src_column, ColumnString* 
dst_column,
+                                  int64_t row_num) const override {
+        const uint8_t type = static_cast<uint8_t>(TypeIndex::String);
+        const auto& col = assert_cast<const ColumnType&>(src_column);
+        const auto& data_ref = col.get_data_at(row_num);
+        const size_t size = data_ref.size;
+
+        dst_column->insert_data(reinterpret_cast<const char*>(&type), 
sizeof(uint8_t));
+        dst_column->insert_data(reinterpret_cast<const char*>(&size), 
sizeof(size_t));
+        dst_column->insert_data(data_ref.data, size);
+    }
+
 private:
     template <bool is_binary_format>
     Status _write_column_to_mysql(const IColumn& column, 
MysqlRowBuffer<is_binary_format>& result,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to