This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch variant-sparse
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/variant-sparse by this push:
     new 92125f3b855 add stats
92125f3b855 is described below

commit 92125f3b85589a87425da412c835b0233999e063
Author: eldenmoon <lihan...@selectdb.com>
AuthorDate: Tue Dec 10 12:14:31 2024 +0800

    add stats
---
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  12 --
 .../segment_v2/variant_column_writer_impl.cpp      | 197 ++++++++++++++-------
 .../rowset/segment_v2/variant_column_writer_impl.h |  35 +++-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  11 --
 be/src/vec/columns/column_object.cpp               | 170 +++++++-----------
 be/src/vec/columns/column_object.h                 |  31 ++--
 be/src/vec/common/schema_util.cpp                  |  23 ++-
 be/src/vec/common/schema_util.h                    |   3 +
 gensrc/proto/segment_v2.proto                      |   9 +-
 9 files changed, 275 insertions(+), 216 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index f9b3928298b..1bfcfbb999b 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -422,18 +422,6 @@ Status 
SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& da
             _flush_schema->append_column(tablet_column);
             _olap_data_convertor->clear_source_content();
         }
-        // sparse_columns
-        for (const auto& entry : 
vectorized::schema_util::get_sorted_subcolumns(
-                     object_column.get_sparse_subcolumns())) {
-            TabletColumn sparse_tablet_column = generate_column_info(entry);
-            _flush_schema->mutable_column_by_uid(parent_column->unique_id())
-                    .append_sparse_column(sparse_tablet_column);
-
-            // add sparse column to footer
-            auto* column_pb = _footer.mutable_columns(i);
-            init_column_meta(column_pb->add_sparse_columns(), -1, 
sparse_tablet_column,
-                             _flush_schema);
-        }
     }
 
     // Update rowset schema, tablet's tablet schema will be updated when build 
Rowset
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp 
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
index 72884ab775b..958df5780bd 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp
@@ -38,78 +38,61 @@ VariantColumnWriterImpl::VariantColumnWriterImpl(const 
ColumnWriterOptions& opts
     _tablet_column = column;
 }
 
-Status VariantColumnWriterImpl::finalize() {
-    auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
-    ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE);
-
-    // convert each subcolumns to storage format and add data to sub columns 
writers buffer
-    auto olap_data_convertor = 
std::make_unique<vectorized::OlapBlockDataConvertor>();
-
-    DCHECK(ptr->is_finalized());
-
-    if (ptr->is_null_root()) {
-        auto root_type = vectorized::make_nullable(
-                std::make_shared<vectorized::ColumnObject::MostCommonType>());
-        auto root_col = root_type->create_column();
-        root_col->insert_many_defaults(ptr->rows());
-        ptr->create_root(root_type, std::move(root_col));
-    }
-
-    // common extracted columns
-    const auto& parent_column = *_tablet_column;
-
-    // generate column info by entry info
-    auto generate_column_info = [&](const auto& entry) {
-        const std::string& column_name =
-                parent_column.name_lower_case() + "." + entry->path.get_path();
-        const vectorized::DataTypePtr& final_data_type_from_object =
-                entry->data.get_least_common_type();
-        vectorized::PathInDataBuilder full_path_builder;
-        auto full_path = 
full_path_builder.append(parent_column.name_lower_case(), false)
-                                 .append(entry->path.get_parts(), false)
-                                 .build();
-        // set unique_id and parent_unique_id, will use unique_id to get 
iterator correct
-        return vectorized::schema_util::get_column_by_type(
-                final_data_type_from_object, column_name,
-                vectorized::schema_util::ExtraInfo {.unique_id = 
parent_column.unique_id(),
-                                                    .parent_unique_id = 
parent_column.unique_id(),
-                                                    .path_info = full_path});
-    };
+Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* 
ptr,
+                                                     
vectorized::OlapBlockDataConvertor* converter,
+                                                     size_t num_rows, int& 
column_id) {
     // root column
     ColumnWriterOptions root_opts = _opts;
     _root_writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(
-            _opts, 
std::unique_ptr<Field>(FieldFactory::create(parent_column)), 
_opts.file_writer));
+            _opts, 
std::unique_ptr<Field>(FieldFactory::create(*_tablet_column)),
+            _opts.file_writer));
     RETURN_IF_ERROR(_root_writer->init());
 
-    // subcolumn
-    size_t num_rows = _column->size();
-    for (auto& subcolumn : _subcolumn_writers) {
-        RETURN_IF_ERROR(subcolumn->init());
-    }
-
     // make sure the root type
     auto expected_root_type =
             
vectorized::make_nullable(std::make_shared<vectorized::ColumnObject::MostCommonType>());
     ptr->ensure_root_node_type(expected_root_type);
 
-    int column_id = 0;
-    // convert root column data from engine format to storage layer format
-    olap_data_convertor->add_column_data_convertor(parent_column);
-    
RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column(
+    converter->add_column_data_convertor(*_tablet_column);
+    RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
             {ptr->get_root()->get_ptr(), nullptr, ""}, 0, num_rows, 
column_id));
-    auto [status, column] = 
olap_data_convertor->convert_column_data(column_id);
+    auto [status, column] = converter->convert_column_data(column_id);
     if (!status.ok()) {
         return status;
     }
-    // use real null data instead of root
     const uint8_t* nullmap =
             
vectorized::check_and_get_column<vectorized::ColumnUInt8>(_null_column.get())
                     ->get_data()
                     .data();
     RETURN_IF_ERROR(_root_writer->append(nullmap, column->get_data(), 
num_rows));
     ++column_id;
-    olap_data_convertor->clear_source_content();
+    converter->clear_source_content();
+
+    _opts.meta->set_num_rows(num_rows);
+    return Status::OK();
+}
 
+Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* 
ptr,
+                                                    
vectorized::OlapBlockDataConvertor* converter,
+                                                    size_t num_rows, int& 
column_id) {
+    // generate column info by entry info
+    auto generate_column_info = [&](const auto& entry) {
+        const std::string& column_name =
+                _tablet_column->name_lower_case() + "." + 
entry->path.get_path();
+        const vectorized::DataTypePtr& final_data_type_from_object =
+                entry->data.get_least_common_type();
+        vectorized::PathInDataBuilder full_path_builder;
+        auto full_path = 
full_path_builder.append(_tablet_column->name_lower_case(), false)
+                                 .append(entry->path.get_parts(), false)
+                                 .build();
+        // set unique_id and parent_unique_id, will use unique_id to get 
iterator correct
+        return vectorized::schema_util::get_column_by_type(
+                final_data_type_from_object, column_name,
+                vectorized::schema_util::ExtraInfo {.unique_id = 
_tablet_column->unique_id(),
+                                                    .parent_unique_id = 
_tablet_column->unique_id(),
+                                                    .path_info = full_path});
+    };
+    
_statistics._subcolumns_non_null_size.reserve(ptr->get_subcolumns().size());
     // convert sub column data from engine format to storage layer format
     for (const auto& entry :
          
vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) {
@@ -120,22 +103,111 @@ Status VariantColumnWriterImpl::finalize() {
         CHECK(entry->data.is_finalized());
         int current_column_id = column_id++;
         TabletColumn tablet_column = generate_column_info(entry);
-        RETURN_IF_ERROR(_create_column_writer(current_column_id, 
tablet_column, parent_column,
+        RETURN_IF_ERROR(_create_column_writer(current_column_id, 
tablet_column, *_tablet_column,
                                               
_opts.rowset_ctx->tablet_schema));
-        olap_data_convertor->add_column_data_convertor(tablet_column);
-        
RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column(
+        converter->add_column_data_convertor(tablet_column);
+        RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
                 {entry->data.get_finalized_column_ptr()->get_ptr(),
                  entry->data.get_least_common_type(), tablet_column.name()},
                 0, num_rows, current_column_id));
-        auto [status, column] = 
olap_data_convertor->convert_column_data(current_column_id);
+        auto [status, column] = 
converter->convert_column_data(current_column_id);
         if (!status.ok()) {
             return status;
         }
         const uint8_t* nullmap = column->get_nullmap();
         RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append(
                 nullmap, column->get_data(), num_rows));
-        olap_data_convertor->clear_source_content();
+        converter->clear_source_content();
+        _subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows);
+
+        // get stastics
+        
_statistics._subcolumns_non_null_size.push_back(entry->data.get_non_null_value_size());
+    }
+    return Status::OK();
+}
+
+Status VariantColumnWriterImpl::_process_sparse_column(
+        vectorized::ColumnObject* ptr, vectorized::OlapBlockDataConvertor* 
converter,
+        size_t num_rows, int& column_id) {
+    // create sparse column writer
+    TabletColumn sparse_column =
+            
vectorized::schema_util::create_sparse_column(_tablet_column->unique_id());
+    ColumnWriterOptions sparse_writer_opts;
+    sparse_writer_opts.meta = _opts.footer->add_columns();
+
+    _init_column_meta(sparse_writer_opts.meta, column_id, sparse_column);
+    RETURN_IF_ERROR(ColumnWriter::create_map_writer(sparse_writer_opts, 
&sparse_column,
+                                                    _opts.file_writer, 
&_sparse_column_writer));
+    RETURN_IF_ERROR(_sparse_column_writer->init());
+
+    // convert root column data from engine format to storage layer format
+    converter->add_column_data_convertor(sparse_column);
+    RETURN_IF_ERROR(converter->set_source_content_with_specifid_column(
+            {ptr->get_sparse_column()->get_ptr(), nullptr, ""}, 0, num_rows, 
column_id));
+    auto [status, column] = converter->convert_column_data(column_id);
+    if (!status.ok()) {
+        return status;
+    }
+    RETURN_IF_ERROR(
+            _sparse_column_writer->append(column->get_nullmap(), 
column->get_data(), num_rows));
+    ++column_id;
+    converter->clear_source_content();
+
+    // get stastics
+    // todo: reuse the statics from collected stastics from compaction stage
+    std::unordered_map<std::string, size_t> sparse_data_paths_statistics;
+    const auto [sparse_data_paths, _] = 
ptr->get_sparse_data_paths_and_values();
+    for (size_t i = 0; i != sparse_data_paths->size(); ++i) {
+        auto path = sparse_data_paths->get_data_at(i);
+        if (auto it = _statistics._sparse_column_non_null_size.find(path);
+            it != _statistics._sparse_column_non_null_size.end()) {
+            ++it->second;
+        } else if (_statistics._sparse_column_non_null_size.size() <
+                   VariantStatistics::MAX_SHARED_DATA_STATISTICS_SIZE) {
+            _statistics._sparse_column_non_null_size.emplace(path, 1);
+        }
+    }
+
+    sparse_writer_opts.meta->set_num_rows(num_rows);
+    return Status::OK();
+}
+
+void VariantStatistics::to_pb(VariantStatisticsPB* stats) const {
+    // TODO
+}
+
+Status VariantColumnWriterImpl::finalize() {
+    auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get());
+    
RETURN_IF_ERROR(ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE));
+
+    // convert each subcolumns to storage format and add data to sub columns 
writers buffer
+    auto olap_data_convertor = 
std::make_unique<vectorized::OlapBlockDataConvertor>();
+
+    DCHECK(ptr->is_finalized());
+
+    if (ptr->is_null_root()) {
+        auto root_type = vectorized::make_nullable(
+                std::make_shared<vectorized::ColumnObject::MostCommonType>());
+        auto root_col = root_type->create_column();
+        root_col->insert_many_defaults(ptr->rows());
+        ptr->create_root(root_type, std::move(root_col));
     }
+
+    size_t num_rows = _column->size();
+    int column_id = 0;
+
+    // convert root column data from engine format to storage layer format
+    RETURN_IF_ERROR(_process_root_column(ptr, olap_data_convertor.get(), 
num_rows, column_id));
+
+    // process and append each subcolumns to sub columns writers buffer
+    RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(), 
num_rows, column_id));
+
+    // process sparse column and append to sparse writer buffer
+    RETURN_IF_ERROR(_process_sparse_column(ptr, olap_data_convertor.get(), 
num_rows, column_id));
+
+    // set statistics info
+    _statistics.to_pb(_opts.meta->mutable_variant_statistics());
+
     _is_finalized = true;
     return Status::OK();
 }
@@ -164,6 +236,7 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() {
         size += column_writer->estimate_buffer_size();
     }
     size += _root_writer->estimate_buffer_size();
+    size += _sparse_column_writer->estimate_buffer_size();
     return size;
 }
 
@@ -172,14 +245,10 @@ Status VariantColumnWriterImpl::finish() {
         RETURN_IF_ERROR(finalize());
     }
     RETURN_IF_ERROR(_root_writer->finish());
+    RETURN_IF_ERROR(_sparse_column_writer->finish());
     for (auto& column_writer : _subcolumn_writers) {
         RETURN_IF_ERROR(column_writer->finish());
     }
-    _opts.meta->set_num_rows(_root_writer->get_next_rowid());
-    for (auto& suboptions : _subcolumn_opts) {
-        suboptions.meta->set_num_rows(_root_writer->get_next_rowid());
-    }
-    return Status::OK();
     return Status::OK();
 }
 Status VariantColumnWriterImpl::write_data() {
@@ -187,6 +256,7 @@ Status VariantColumnWriterImpl::write_data() {
         RETURN_IF_ERROR(finalize());
     }
     RETURN_IF_ERROR(_root_writer->write_data());
+    RETURN_IF_ERROR(_sparse_column_writer->write_data());
     for (auto& column_writer : _subcolumn_writers) {
         RETURN_IF_ERROR(column_writer->write_data());
     }
@@ -197,6 +267,7 @@ Status VariantColumnWriterImpl::write_ordinal_index() {
         RETURN_IF_ERROR(finalize());
     }
     RETURN_IF_ERROR(_root_writer->write_ordinal_index());
+    RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index());
     for (auto& column_writer : _subcolumn_writers) {
         RETURN_IF_ERROR(column_writer->write_ordinal_index());
     }
@@ -277,10 +348,6 @@ void 
VariantColumnWriterImpl::_init_column_meta(ColumnMetaPB* meta, uint32_t col
     for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
         _init_column_meta(meta->add_children_columns(), column_id, 
column.get_sub_column(i));
     }
-    // add sparse column to footer
-    for (uint32_t i = 0; i < column.num_sparse_columns(); i++) {
-        _init_column_meta(meta->add_sparse_columns(), -1, 
column.sparse_column_at(i));
-    }
 };
 
 Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const 
TabletColumn& column,
diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h 
b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
index 348dd1ab0cb..87f67e7b1ef 100644
--- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
+++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h
@@ -24,11 +24,25 @@
 #include "olap/tablet_schema.h"
 #include "vec/columns/column.h"
 
-namespace doris::segment_v2 {
+namespace doris {
+
+namespace vectorized {
+class ColumnObject;
+class OlapBlockDataConvertor;
+} // namespace vectorized
+namespace segment_v2 {
 
 class ColumnWriter;
 class ScalarColumnWriter;
 
+struct VariantStatistics {
+    constexpr static size_t MAX_SHARED_DATA_STATISTICS_SIZE = 10000;
+    std::vector<size_t> _subcolumns_non_null_size;
+    std::map<StringRef, size_t> _sparse_column_non_null_size;
+
+    void to_pb(VariantStatisticsPB* stats) const;
+};
+
 class VariantColumnWriterImpl {
 public:
     VariantColumnWriterImpl(const ColumnWriterOptions& opts, const 
TabletColumn* column);
@@ -54,15 +68,30 @@ private:
     Status _create_column_writer(uint32_t cid, const TabletColumn& column,
                                  const TabletColumn& parent_column,
                                  const TabletSchemaSPtr& tablet_schema);
+    Status _process_root_column(vectorized::ColumnObject* ptr,
+                                vectorized::OlapBlockDataConvertor* converter, 
size_t num_rows,
+                                int& column_id);
+    Status _process_sparse_column(vectorized::ColumnObject* ptr,
+                                  vectorized::OlapBlockDataConvertor* 
converter, size_t num_rows,
+                                  int& column_id);
+    Status _process_subcolumns(vectorized::ColumnObject* ptr,
+                               vectorized::OlapBlockDataConvertor* converter, 
size_t num_rows,
+                               int& column_id);
     // prepare a column for finalize
     doris::vectorized::MutableColumnPtr _column;
     doris::vectorized::MutableColumnPtr _null_column;
     ColumnWriterOptions _opts;
     const TabletColumn* _tablet_column = nullptr;
     bool _is_finalized = false;
-    // for sparse column and root column
+    // for root column
     std::unique_ptr<ColumnWriter> _root_writer;
+    // for sparse column
+    std::unique_ptr<ColumnWriter> _sparse_column_writer;
     std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers;
     std::vector<ColumnWriterOptions> _subcolumn_opts;
+
+    // staticstics which will be persisted in the footer
+    VariantStatistics _statistics;
 };
-} // namespace doris::segment_v2
\ No newline at end of file
+} // namespace segment_v2
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5c72cd6384a..089dac218fe 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -1054,17 +1054,6 @@ Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock&
             _flush_schema->append_column(tablet_column);
             _olap_data_convertor->clear_source_content();
         }
-        // sparse_columns
-        for (const auto& entry : 
vectorized::schema_util::get_sorted_subcolumns(
-                     object_column.get_sparse_subcolumns())) {
-            TabletColumn sparse_tablet_column = generate_column_info(entry);
-            _flush_schema->mutable_column_by_uid(parent_column->unique_id())
-                    .append_sparse_column(sparse_tablet_column);
-
-            // add sparse column to footer
-            auto* column_pb = _footer.mutable_columns(i);
-            _init_column_meta(column_pb->add_sparse_columns(), -1, 
sparse_tablet_column);
-        }
     }
 
     // Update rowset schema, tablet's tablet schema will be updated when build 
Rowset
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index 0aef86e30e2..2983d799166 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -36,6 +36,7 @@
 #include <memory>
 #include <optional>
 #include <sstream>
+#include <unordered_map>
 #include <vector>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
@@ -1091,7 +1092,7 @@ void ColumnObject::insert_range_from(const IColumn& src, 
size_t start, size_t le
         }
     }
     num_rows += length;
-    finalize(FinalizeMode::READ_MODE);
+    finalize();
 #ifndef NDEBUG
     check_consistency();
 #endif
@@ -1419,7 +1420,7 @@ void get_json_by_column_tree(rapidjson::Value& root, 
rapidjson::Document::Alloca
 
 Status ColumnObject::serialize_one_row_to_string(int64_t row, std::string* 
output) const {
     if (!is_finalized()) {
-        const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE);
+        const_cast<ColumnObject*>(this)->finalize();
     }
     rapidjson::StringBuffer buf;
     if (is_scalar_variant()) {
@@ -1435,7 +1436,7 @@ Status ColumnObject::serialize_one_row_to_string(int64_t 
row, std::string* outpu
 
 Status ColumnObject::serialize_one_row_to_string(int64_t row, BufferWritable& 
output) const {
     if (!is_finalized()) {
-        const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE);
+        const_cast<ColumnObject*>(this)->finalize();
     }
     if (is_scalar_variant()) {
         auto type = get_root_type();
@@ -1504,99 +1505,13 @@ Status 
ColumnObject::serialize_one_row_to_json_format(int64_t row, rapidjson::St
     return Status::OK();
 }
 
-Status ColumnObject::merge_sparse_to_root_column() {
-    CHECK(is_finalized());
-    if (sparse_columns.empty()) {
-        return Status::OK();
+size_t ColumnObject::Subcolumn::get_non_null_value_size() const {
+    size_t res = 0;
+    for (const auto& part : data) {
+        const auto& null_data = assert_cast<const 
ColumnNullable&>(*part).get_null_map_data();
+        res += simd::count_zero_num((int8_t*)null_data.data(), 
null_data.size());
     }
-    ColumnPtr src = 
subcolumns.get_mutable_root()->data.get_finalized_column_ptr();
-    MutableColumnPtr mresult = src->clone_empty();
-    const ColumnNullable* src_null = assert_cast<const 
ColumnNullable*>(src.get());
-    const ColumnString* src_column_ptr =
-            assert_cast<const ColumnString*>(&src_null->get_nested_column());
-    rapidjson::StringBuffer buffer;
-    doc_structure = std::make_shared<rapidjson::Document>();
-    rapidjson::Document::AllocatorType& allocator = 
doc_structure->GetAllocator();
-    get_json_by_column_tree(*doc_structure, allocator, 
sparse_columns.get_root());
-
-#ifndef NDEBUG
-    VLOG_DEBUG << "dump structure " << 
JsonFunctions::print_json_value(*doc_structure);
-#endif
-
-    ColumnNullable* result_column_nullable =
-            assert_cast<ColumnNullable*>(mresult->assume_mutable().get());
-    ColumnString* result_column_ptr =
-            
assert_cast<ColumnString*>(&result_column_nullable->get_nested_column());
-    result_column_nullable->reserve(num_rows);
-    // parse each row to jsonb
-    for (size_t i = 0; i < num_rows; ++i) {
-        // root is not null, store original value, eg. the root is scalar type 
like '[1]'
-        if (!src_null->empty() && !src_null->is_null_at(i)) {
-            result_column_ptr->insert_data(src_column_ptr->get_data_at(i).data,
-                                           
src_column_ptr->get_data_at(i).size);
-            result_column_nullable->get_null_map_data().push_back(0);
-            continue;
-        }
-
-        // parse and encode sparse columns
-        buffer.Clear();
-        rapidjson::Value root(rapidjson::kNullType);
-        if (!doc_structure->IsNull()) {
-            root.CopyFrom(*doc_structure, doc_structure->GetAllocator());
-        }
-        size_t null_count = 0;
-        Arena mem_pool;
-        for (const auto& subcolumn : sparse_columns) {
-            auto& column = subcolumn->data.get_finalized_column_ptr();
-            if (assert_cast<const ColumnNullable&, 
TypeCheckOnRelease::DISABLE>(*column).is_null_at(
-                        i)) {
-                ++null_count;
-                continue;
-            }
-            bool succ = find_and_set_leave_value(
-                    column, subcolumn->path, 
subcolumn->data.get_least_common_type_serde(),
-                    subcolumn->data.get_least_common_type(),
-                    subcolumn->data.least_common_type.get_base_type_id(), root,
-                    doc_structure->GetAllocator(), mem_pool, i);
-            if (succ && subcolumn->path.empty() && !root.IsObject()) {
-                // root was modified, only handle root node
-                break;
-            }
-        }
-
-        // all null values, store null to sparse root
-        if (null_count == sparse_columns.size()) {
-            result_column_ptr->insert_default();
-            result_column_nullable->get_null_map_data().push_back(1);
-            continue;
-        }
-
-        // encode sparse columns into jsonb format
-        compact_null_values(root, doc_structure->GetAllocator());
-        // parse as jsonb value and put back to rootnode
-        // TODO, we could convert to jsonb directly from rapidjson::Value for 
better performance, instead of parsing
-        JsonbParser parser;
-        rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
-        root.Accept(writer);
-        bool res = parser.parse(buffer.GetString(), buffer.GetSize());
-        if (!res) {
-            return Status::InvalidArgument(
-                    "parse json failed, doc: {}"
-                    ", row_num:{}"
-                    ", error:{}",
-                    std::string(buffer.GetString(), buffer.GetSize()), i,
-                    JsonbErrMsg::getErrMsg(parser.getErrorCode()));
-        }
-        
result_column_ptr->insert_data(parser.getWriter().getOutput()->getBuffer(),
-                                       
parser.getWriter().getOutput()->getSize());
-        result_column_nullable->get_null_map_data().push_back(0);
-    }
-    subcolumns.get_mutable_root()->data.get_finalized_column().clear();
-    // assign merged column, do insert_range_from to make a copy, instead of 
replace the ptr itselft
-    // to make sure the root column ptr is not changed
-    
subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from(
-            *mresult->get_ptr(), 0, num_rows);
-    return Status::OK();
+    return res;
 }
 
 void ColumnObject::unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) 
const {
@@ -1634,13 +1549,50 @@ void ColumnObject::unnest(Subcolumns::NodePtr& entry, 
Subcolumns& subcolumns) co
     }
 }
 
-void ColumnObject::finalize(FinalizeMode mode) {
+Status ColumnObject::finalize(FinalizeMode mode) {
     Subcolumns new_subcolumns;
     // finalize root first
     if (mode == FinalizeMode::WRITE_MODE || !is_null_root()) {
         new_subcolumns.create_root(subcolumns.get_root()->data);
         new_subcolumns.get_mutable_root()->data.finalize(mode);
     }
+
+    // pick sparse columns
+    std::set<String> selected_subcolumns;
+    std::set<String> remaining_subcolumns;
+    if (subcolumns.size() > MAX_SUBCOLUMNS) {
+        // pick subcolumns sort by size of none null values
+        std::unordered_map<String, size_t> none_null_value_sizes;
+        // 1. get the none null value sizes
+        for (auto&& entry : subcolumns) {
+            if (entry->data.is_root) {
+                continue;
+            }
+            size_t size = entry->data.get_non_null_value_size();
+            none_null_value_sizes[entry->path.get_path()] = size;
+        }
+        // 2. sort by the size
+        std::vector<std::pair<String, size_t>> 
sorted_by_size(none_null_value_sizes.begin(),
+                                                              
none_null_value_sizes.end());
+        std::sort(sorted_by_size.begin(), sorted_by_size.end(),
+                  [](const auto& a, const auto& b) { return a.second > 
b.second; });
+
+        // 3. pick MAX_SUBCOLUMNS selected subcolumns
+        std::set<String> selected_subcolumns;
+        for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, 
sorted_by_size.size()); ++i) {
+            selected_subcolumns.insert(sorted_by_size[i].first);
+        }
+
+        // 4. put remaining subcolumns to remaining_subcolumns
+        std::vector<String> remaining_subcolumns;
+        for (const auto& entry : sorted_by_size) {
+            if (selected_subcolumns.find(entry.first) == 
selected_subcolumns.end()) {
+                remaining_subcolumns.push_back(entry.first);
+            }
+        }
+    }
+
+    // finalize all subcolumns
     for (auto&& entry : subcolumns) {
         const auto& least_common_type = entry->data.get_least_common_type();
         /// Do not add subcolumns, which consists only from NULLs
@@ -1661,24 +1613,34 @@ void ColumnObject::finalize(FinalizeMode mode) {
         if (entry->data.is_root) {
             continue;
         }
+    }
 
-        // Check and spilit sparse subcolumns, not support nested array at 
present
-        if (mode == FinalizeMode::WRITE_MODE && 
(entry->data.check_if_sparse_column(num_rows)) &&
-            !entry->path.has_nested_part()) {
-            // TODO seperate ambiguous path
-            sparse_columns.add(entry->path, entry->data);
-            continue;
+    // add selected subcolumns to new_subcolumns
+    for (auto&& entry : subcolumns) {
+        if (selected_subcolumns.find(entry->path.get_path()) != 
selected_subcolumns.end()) {
+            new_subcolumns.add(entry->path, entry->data);
         }
+    }
 
-        new_subcolumns.add(entry->path, entry->data);
+    std::map<String, Subcolumn> remaing_subcolumns;
+    // merge remaining subcolumns to sparse_column
+    for (auto&& entry : subcolumns) {
+        if (remaining_subcolumns.find(entry->path.get_path()) != 
selected_subcolumns.end()) {
+            remaing_subcolumns.emplace(entry->path.get_path(), entry->data);
+        }
     }
+
+    // merge and encode sparse column
+    RETURN_IF_ERROR(merge_sparse_columns(remaing_subcolumns));
+
     std::swap(subcolumns, new_subcolumns);
     doc_structure = nullptr;
     _prev_positions.clear();
+    return Status::OK();
 }
 
 void ColumnObject::finalize() {
-    finalize(FinalizeMode::READ_MODE);
+    static_cast<void>(finalize(FinalizeMode::READ_MODE));
 }
 
 void ColumnObject::ensure_root_node_type(const DataTypePtr& 
expected_root_type) {
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 21bb4469115..1475a168c23 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -38,6 +38,7 @@
 #include "olap/tablet_schema.h"
 #include "util/jsonb_document.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_map.h"
 #include "vec/columns/subcolumn_tree.h"
 #include "vec/common/cow.h"
 #include "vec/common/string_ref.h"
@@ -97,6 +98,7 @@ public:
     constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB;
     // Nullable(Array(Nullable(Object)))
     const static DataTypePtr NESTED_TYPE;
+    const static size_t MAX_SUBCOLUMNS = 200;
     // Finlize mode for subcolumns, write mode will estimate which subcolumns 
are sparse columns(too many null values inside column),
     // merge and encode them into a shared column in root column. Only affects 
in flush block to segments.
     // Otherwise read mode should be as default mode.
@@ -124,6 +126,8 @@ public:
             return least_common_type.get_base();
         }
 
+        size_t get_non_null_value_size() const;
+
         const DataTypeSerDeSPtr& get_least_common_type_serde() const {
             return least_common_type.get_serde();
         }
@@ -240,12 +244,8 @@ private:
     const bool is_nullable;
     Subcolumns subcolumns;
     size_t num_rows;
-    // sparse columns will be merge and encoded into root column
-    Subcolumns sparse_columns;
-    // The rapidjson document format of Subcolumns tree structure
-    // the leaves is null.In order to display whole document, copy
-    // this structure and fill with Subcolumns sub items
-    mutable std::shared_ptr<rapidjson::Document> doc_structure;
+    // sparse columns will be merge and encoded as ColumnMap<String, String>
+    WrappedPtr sparse_column;
 
     using SubColumnWithName = std::pair<PathInData, const Subcolumn*>;
     // Cached search results for previous row (keyed as index in JSON object) 
- used as a hint.
@@ -280,12 +280,19 @@ public:
     Status serialize_one_row_to_json_format(int64_t row, 
rapidjson::StringBuffer* output,
                                             bool* is_null) const;
 
-    // merge multiple sub sparse columns into root
-    Status merge_sparse_to_root_column();
+    // merge multiple sub sparse columns
+    Status merge_sparse_columns(const std::map<String, Subcolumn>& 
remaing_subcolumns);
 
     // ensure root node is a certain type
     void ensure_root_node_type(const DataTypePtr& type);
 
+    std::pair<ColumnString*, ColumnString*> get_sparse_data_paths_and_values() 
{
+        auto& column_map = assert_cast<ColumnMap&>(*sparse_column);
+        auto& key = assert_cast<ColumnString&>(column_map.get_keys());
+        auto& value = assert_cast<ColumnString&>(column_map.get_values());
+        return {&key, &value};
+    }
+
     // create jsonb root if missing
     // notice: should only using in VariantRootColumnIterator
     // since some datastructures(sparse columns are schema on read
@@ -345,14 +352,14 @@ public:
 
     const Subcolumns& get_subcolumns() const { return subcolumns; }
 
-    const Subcolumns& get_sparse_subcolumns() const { return sparse_columns; }
-
     Subcolumns& get_subcolumns() { return subcolumns; }
 
+    ColumnPtr get_sparse_column() { return 
sparse_column->convert_to_full_column_if_const(); }
+
     PathsInData getKeys() const;
 
     // use sparse_subcolumns_schema to record sparse column's path info and 
type
-    void finalize(FinalizeMode mode);
+    Status finalize(FinalizeMode mode);
 
     /// Finalizes all subcolumns.
     void finalize() override;
@@ -361,7 +368,7 @@ public:
 
     MutableColumnPtr clone_finalized() const {
         auto finalized = IColumn::mutate(get_ptr());
-        
static_cast<ColumnObject*>(finalized.get())->finalize(FinalizeMode::READ_MODE);
+        static_cast<ColumnObject*>(finalized.get())->finalize();
         return finalized;
     }
 
diff --git a/be/src/vec/common/schema_util.cpp 
b/be/src/vec/common/schema_util.cpp
index fd50af3e1fc..42f9240646f 100644
--- a/be/src/vec/common/schema_util.cpp
+++ b/be/src/vec/common/schema_util.cpp
@@ -542,14 +542,6 @@ Status parse_variant_columns(Block& block, const 
std::vector<int>& variant_pos,
     });
 }
 
-Status encode_variant_sparse_subcolumns(ColumnObject& column) {
-    // Make sure the root node is jsonb storage type
-    auto expected_root_type = 
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
-    column.ensure_root_node_type(expected_root_type);
-    RETURN_IF_ERROR(column.merge_sparse_to_root_column());
-    return Status::OK();
-}
-
 // sort by paths in lexicographical order
 vectorized::ColumnObject::Subcolumns get_sorted_subcolumns(
         const vectorized::ColumnObject::Subcolumns& subcolumns) {
@@ -614,4 +606,19 @@ bool has_schema_index_diff(const TabletSchema* new_schema, 
const TabletSchema* o
     return new_schema_has_inverted_index != old_schema_has_inverted_index;
 }
 
+TabletColumn create_sparse_column(int32_t parent_unique_id) {
+    TColumn tcolumn;
+    tcolumn.column_name = ".sparse";
+    tcolumn.col_unique_id = parent_unique_id;
+    tcolumn.column_type = TColumnType {};
+    tcolumn.column_type.type = TPrimitiveType::MAP;
+
+    TColumn child_tcolumn;
+    tcolumn.column_type = TColumnType {};
+    tcolumn.column_type.type = TPrimitiveType::STRING;
+    tcolumn.children_column.push_back(child_tcolumn);
+    tcolumn.children_column.push_back(child_tcolumn);
+    return TabletColumn {tcolumn};
+}
+
 } // namespace doris::vectorized::schema_util
diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h
index 7c228ed2cc0..0507c9e2fe6 100644
--- a/be/src/vec/common/schema_util.h
+++ b/be/src/vec/common/schema_util.h
@@ -126,4 +126,7 @@ std::string dump_column(DataTypePtr type, const ColumnPtr& 
col);
 bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* 
old_schema,
                            int32_t new_col_idx, int32_t old_col_idx);
 
+// create ColumnMap<String, String>
+TabletColumn create_sparse_column(int32_t parent_unique_id);
+
 } // namespace  doris::vectorized::schema_util
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index 4c7183bae9a..37a4f0a70ee 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -159,6 +159,12 @@ message ColumnPathInfo {
     optional uint32 parrent_column_unique_id = 4;
 }
 
+message VariantStatisticsPB {
+    // in the order of subcolumns in variant
+    repeated uint32 subcolumn_non_null_size = 1;
+    map<string, uint32> sparse_column_non_null_size = 2;
+} 
+
 message ColumnMetaPB {
     // column id in table schema
     optional uint32 column_id = 1;
@@ -192,11 +198,12 @@ message ColumnMetaPB {
     optional int32 precision = 15; // ColumnMessage.precision
     optional int32 frac = 16; // ColumnMessag
 
-    repeated ColumnMetaPB sparse_columns = 17; // sparse column within a 
variant column
+    repeated ColumnMetaPB sparse_columns = 17; // deprecated
 
     optional bool result_is_nullable = 18; // used on agg_state type
     optional string function_name = 19; // used on agg_state type
     optional int32 be_exec_version = 20; // used on agg_state type
+    optional VariantStatisticsPB variant_statistics = 21; // only used in 
variant type
 }
 
 message PrimaryKeyIndexMetaPB {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to