This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push: new 92125f3b855 add stats 92125f3b855 is described below commit 92125f3b85589a87425da412c835b0233999e063 Author: eldenmoon <lihan...@selectdb.com> AuthorDate: Tue Dec 10 12:14:31 2024 +0800 add stats --- be/src/olap/rowset/segment_v2/segment_writer.cpp | 12 -- .../segment_v2/variant_column_writer_impl.cpp | 197 ++++++++++++++------- .../rowset/segment_v2/variant_column_writer_impl.h | 35 +++- .../rowset/segment_v2/vertical_segment_writer.cpp | 11 -- be/src/vec/columns/column_object.cpp | 170 +++++++----------- be/src/vec/columns/column_object.h | 31 ++-- be/src/vec/common/schema_util.cpp | 23 ++- be/src/vec/common/schema_util.h | 3 + gensrc/proto/segment_v2.proto | 9 +- 9 files changed, 275 insertions(+), 216 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index f9b3928298b..1bfcfbb999b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -422,18 +422,6 @@ Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& da _flush_schema->append_column(tablet_column); _olap_data_convertor->clear_source_content(); } - // sparse_columns - for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns( - object_column.get_sparse_subcolumns())) { - TabletColumn sparse_tablet_column = generate_column_info(entry); - _flush_schema->mutable_column_by_uid(parent_column->unique_id()) - .append_sparse_column(sparse_tablet_column); - - // add sparse column to footer - auto* column_pb = _footer.mutable_columns(i); - init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column, - _flush_schema); - } } // Update rowset schema, tablet's tablet schema will be updated when build Rowset diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index 72884ab775b..958df5780bd 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -38,78 +38,61 @@ VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions& opts _tablet_column = column; } -Status VariantColumnWriterImpl::finalize() { - auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); - ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE); - - // convert each subcolumns to storage format and add data to sub columns writers buffer - auto olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>(); - - DCHECK(ptr->is_finalized()); - - if (ptr->is_null_root()) { - auto root_type = vectorized::make_nullable( - std::make_shared<vectorized::ColumnObject::MostCommonType>()); - auto root_col = root_type->create_column(); - root_col->insert_many_defaults(ptr->rows()); - ptr->create_root(root_type, std::move(root_col)); - } - - // common extracted columns - const auto& parent_column = *_tablet_column; - - // generate column info by entry info - auto generate_column_info = [&](const auto& entry) { - const std::string& column_name = - parent_column.name_lower_case() + "." + entry->path.get_path(); - const vectorized::DataTypePtr& final_data_type_from_object = - entry->data.get_least_common_type(); - vectorized::PathInDataBuilder full_path_builder; - auto full_path = full_path_builder.append(parent_column.name_lower_case(), false) - .append(entry->path.get_parts(), false) - .build(); - // set unique_id and parent_unique_id, will use unique_id to get iterator correct - return vectorized::schema_util::get_column_by_type( - final_data_type_from_object, column_name, - vectorized::schema_util::ExtraInfo {.unique_id = parent_column.unique_id(), - .parent_unique_id = parent_column.unique_id(), - .path_info = full_path}); - }; +Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* ptr, + vectorized::OlapBlockDataConvertor* converter, + size_t num_rows, int& column_id) { // root column ColumnWriterOptions root_opts = _opts; _root_writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter( - _opts, std::unique_ptr<Field>(FieldFactory::create(parent_column)), _opts.file_writer)); + _opts, std::unique_ptr<Field>(FieldFactory::create(*_tablet_column)), + _opts.file_writer)); RETURN_IF_ERROR(_root_writer->init()); - // subcolumn - size_t num_rows = _column->size(); - for (auto& subcolumn : _subcolumn_writers) { - RETURN_IF_ERROR(subcolumn->init()); - } - // make sure the root type auto expected_root_type = vectorized::make_nullable(std::make_shared<vectorized::ColumnObject::MostCommonType>()); ptr->ensure_root_node_type(expected_root_type); - int column_id = 0; - // convert root column data from engine format to storage layer format - olap_data_convertor->add_column_data_convertor(parent_column); - RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column( + converter->add_column_data_convertor(*_tablet_column); + RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( {ptr->get_root()->get_ptr(), nullptr, ""}, 0, num_rows, column_id)); - auto [status, column] = olap_data_convertor->convert_column_data(column_id); + auto [status, column] = converter->convert_column_data(column_id); if (!status.ok()) { return status; } - // use real null data instead of root const uint8_t* nullmap = vectorized::check_and_get_column<vectorized::ColumnUInt8>(_null_column.get()) ->get_data() .data(); RETURN_IF_ERROR(_root_writer->append(nullmap, column->get_data(), num_rows)); ++column_id; - olap_data_convertor->clear_source_content(); + converter->clear_source_content(); + + _opts.meta->set_num_rows(num_rows); + return Status::OK(); +} +Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* ptr, + vectorized::OlapBlockDataConvertor* converter, + size_t num_rows, int& column_id) { + // generate column info by entry info + auto generate_column_info = [&](const auto& entry) { + const std::string& column_name = + _tablet_column->name_lower_case() + "." + entry->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + entry->data.get_least_common_type(); + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(_tablet_column->name_lower_case(), false) + .append(entry->path.get_parts(), false) + .build(); + // set unique_id and parent_unique_id, will use unique_id to get iterator correct + return vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, + vectorized::schema_util::ExtraInfo {.unique_id = _tablet_column->unique_id(), + .parent_unique_id = _tablet_column->unique_id(), + .path_info = full_path}); + }; + _statistics._subcolumns_non_null_size.reserve(ptr->get_subcolumns().size()); // convert sub column data from engine format to storage layer format for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) { @@ -120,22 +103,111 @@ Status VariantColumnWriterImpl::finalize() { CHECK(entry->data.is_finalized()); int current_column_id = column_id++; TabletColumn tablet_column = generate_column_info(entry); - RETURN_IF_ERROR(_create_column_writer(current_column_id, tablet_column, parent_column, + RETURN_IF_ERROR(_create_column_writer(current_column_id, tablet_column, *_tablet_column, _opts.rowset_ctx->tablet_schema)); - olap_data_convertor->add_column_data_convertor(tablet_column); - RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column( + converter->add_column_data_convertor(tablet_column); + RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( {entry->data.get_finalized_column_ptr()->get_ptr(), entry->data.get_least_common_type(), tablet_column.name()}, 0, num_rows, current_column_id)); - auto [status, column] = olap_data_convertor->convert_column_data(current_column_id); + auto [status, column] = converter->convert_column_data(current_column_id); if (!status.ok()) { return status; } const uint8_t* nullmap = column->get_nullmap(); RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append( nullmap, column->get_data(), num_rows)); - olap_data_convertor->clear_source_content(); + converter->clear_source_content(); + _subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows); + + // get stastics + _statistics._subcolumns_non_null_size.push_back(entry->data.get_non_null_value_size()); + } + return Status::OK(); +} + +Status VariantColumnWriterImpl::_process_sparse_column( + vectorized::ColumnObject* ptr, vectorized::OlapBlockDataConvertor* converter, + size_t num_rows, int& column_id) { + // create sparse column writer + TabletColumn sparse_column = + vectorized::schema_util::create_sparse_column(_tablet_column->unique_id()); + ColumnWriterOptions sparse_writer_opts; + sparse_writer_opts.meta = _opts.footer->add_columns(); + + _init_column_meta(sparse_writer_opts.meta, column_id, sparse_column); + RETURN_IF_ERROR(ColumnWriter::create_map_writer(sparse_writer_opts, &sparse_column, + _opts.file_writer, &_sparse_column_writer)); + RETURN_IF_ERROR(_sparse_column_writer->init()); + + // convert root column data from engine format to storage layer format + converter->add_column_data_convertor(sparse_column); + RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( + {ptr->get_sparse_column()->get_ptr(), nullptr, ""}, 0, num_rows, column_id)); + auto [status, column] = converter->convert_column_data(column_id); + if (!status.ok()) { + return status; + } + RETURN_IF_ERROR( + _sparse_column_writer->append(column->get_nullmap(), column->get_data(), num_rows)); + ++column_id; + converter->clear_source_content(); + + // get stastics + // todo: reuse the statics from collected stastics from compaction stage + std::unordered_map<std::string, size_t> sparse_data_paths_statistics; + const auto [sparse_data_paths, _] = ptr->get_sparse_data_paths_and_values(); + for (size_t i = 0; i != sparse_data_paths->size(); ++i) { + auto path = sparse_data_paths->get_data_at(i); + if (auto it = _statistics._sparse_column_non_null_size.find(path); + it != _statistics._sparse_column_non_null_size.end()) { + ++it->second; + } else if (_statistics._sparse_column_non_null_size.size() < + VariantStatistics::MAX_SHARED_DATA_STATISTICS_SIZE) { + _statistics._sparse_column_non_null_size.emplace(path, 1); + } + } + + sparse_writer_opts.meta->set_num_rows(num_rows); + return Status::OK(); +} + +void VariantStatistics::to_pb(VariantStatisticsPB* stats) const { + // TODO +} + +Status VariantColumnWriterImpl::finalize() { + auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); + RETURN_IF_ERROR(ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE)); + + // convert each subcolumns to storage format and add data to sub columns writers buffer + auto olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>(); + + DCHECK(ptr->is_finalized()); + + if (ptr->is_null_root()) { + auto root_type = vectorized::make_nullable( + std::make_shared<vectorized::ColumnObject::MostCommonType>()); + auto root_col = root_type->create_column(); + root_col->insert_many_defaults(ptr->rows()); + ptr->create_root(root_type, std::move(root_col)); } + + size_t num_rows = _column->size(); + int column_id = 0; + + // convert root column data from engine format to storage layer format + RETURN_IF_ERROR(_process_root_column(ptr, olap_data_convertor.get(), num_rows, column_id)); + + // process and append each subcolumns to sub columns writers buffer + RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(), num_rows, column_id)); + + // process sparse column and append to sparse writer buffer + RETURN_IF_ERROR(_process_sparse_column(ptr, olap_data_convertor.get(), num_rows, column_id)); + + // set statistics info + _statistics.to_pb(_opts.meta->mutable_variant_statistics()); + _is_finalized = true; return Status::OK(); } @@ -164,6 +236,7 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() { size += column_writer->estimate_buffer_size(); } size += _root_writer->estimate_buffer_size(); + size += _sparse_column_writer->estimate_buffer_size(); return size; } @@ -172,14 +245,10 @@ Status VariantColumnWriterImpl::finish() { RETURN_IF_ERROR(finalize()); } RETURN_IF_ERROR(_root_writer->finish()); + RETURN_IF_ERROR(_sparse_column_writer->finish()); for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->finish()); } - _opts.meta->set_num_rows(_root_writer->get_next_rowid()); - for (auto& suboptions : _subcolumn_opts) { - suboptions.meta->set_num_rows(_root_writer->get_next_rowid()); - } - return Status::OK(); return Status::OK(); } Status VariantColumnWriterImpl::write_data() { @@ -187,6 +256,7 @@ Status VariantColumnWriterImpl::write_data() { RETURN_IF_ERROR(finalize()); } RETURN_IF_ERROR(_root_writer->write_data()); + RETURN_IF_ERROR(_sparse_column_writer->write_data()); for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->write_data()); } @@ -197,6 +267,7 @@ Status VariantColumnWriterImpl::write_ordinal_index() { RETURN_IF_ERROR(finalize()); } RETURN_IF_ERROR(_root_writer->write_ordinal_index()); + RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index()); for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->write_ordinal_index()); } @@ -277,10 +348,6 @@ void VariantColumnWriterImpl::_init_column_meta(ColumnMetaPB* meta, uint32_t col for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); } - // add sparse column to footer - for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { - _init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i)); - } }; Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const TabletColumn& column, diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h index 348dd1ab0cb..87f67e7b1ef 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -24,11 +24,25 @@ #include "olap/tablet_schema.h" #include "vec/columns/column.h" -namespace doris::segment_v2 { +namespace doris { + +namespace vectorized { +class ColumnObject; +class OlapBlockDataConvertor; +} // namespace vectorized +namespace segment_v2 { class ColumnWriter; class ScalarColumnWriter; +struct VariantStatistics { + constexpr static size_t MAX_SHARED_DATA_STATISTICS_SIZE = 10000; + std::vector<size_t> _subcolumns_non_null_size; + std::map<StringRef, size_t> _sparse_column_non_null_size; + + void to_pb(VariantStatisticsPB* stats) const; +}; + class VariantColumnWriterImpl { public: VariantColumnWriterImpl(const ColumnWriterOptions& opts, const TabletColumn* column); @@ -54,15 +68,30 @@ private: Status _create_column_writer(uint32_t cid, const TabletColumn& column, const TabletColumn& parent_column, const TabletSchemaSPtr& tablet_schema); + Status _process_root_column(vectorized::ColumnObject* ptr, + vectorized::OlapBlockDataConvertor* converter, size_t num_rows, + int& column_id); + Status _process_sparse_column(vectorized::ColumnObject* ptr, + vectorized::OlapBlockDataConvertor* converter, size_t num_rows, + int& column_id); + Status _process_subcolumns(vectorized::ColumnObject* ptr, + vectorized::OlapBlockDataConvertor* converter, size_t num_rows, + int& column_id); // prepare a column for finalize doris::vectorized::MutableColumnPtr _column; doris::vectorized::MutableColumnPtr _null_column; ColumnWriterOptions _opts; const TabletColumn* _tablet_column = nullptr; bool _is_finalized = false; - // for sparse column and root column + // for root column std::unique_ptr<ColumnWriter> _root_writer; + // for sparse column + std::unique_ptr<ColumnWriter> _sparse_column_writer; std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers; std::vector<ColumnWriterOptions> _subcolumn_opts; + + // staticstics which will be persisted in the footer + VariantStatistics _statistics; }; -} // namespace doris::segment_v2 \ No newline at end of file +} // namespace segment_v2 +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 5c72cd6384a..089dac218fe 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -1054,17 +1054,6 @@ Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& _flush_schema->append_column(tablet_column); _olap_data_convertor->clear_source_content(); } - // sparse_columns - for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns( - object_column.get_sparse_subcolumns())) { - TabletColumn sparse_tablet_column = generate_column_info(entry); - _flush_schema->mutable_column_by_uid(parent_column->unique_id()) - .append_sparse_column(sparse_tablet_column); - - // add sparse column to footer - auto* column_pb = _footer.mutable_columns(i); - _init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column); - } } // Update rowset schema, tablet's tablet schema will be updated when build Rowset diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 0aef86e30e2..2983d799166 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -36,6 +36,7 @@ #include <memory> #include <optional> #include <sstream> +#include <unordered_map> #include <vector> #include "common/compiler_util.h" // IWYU pragma: keep @@ -1091,7 +1092,7 @@ void ColumnObject::insert_range_from(const IColumn& src, size_t start, size_t le } } num_rows += length; - finalize(FinalizeMode::READ_MODE); + finalize(); #ifndef NDEBUG check_consistency(); #endif @@ -1419,7 +1420,7 @@ void get_json_by_column_tree(rapidjson::Value& root, rapidjson::Document::Alloca Status ColumnObject::serialize_one_row_to_string(int64_t row, std::string* output) const { if (!is_finalized()) { - const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE); + const_cast<ColumnObject*>(this)->finalize(); } rapidjson::StringBuffer buf; if (is_scalar_variant()) { @@ -1435,7 +1436,7 @@ Status ColumnObject::serialize_one_row_to_string(int64_t row, std::string* outpu Status ColumnObject::serialize_one_row_to_string(int64_t row, BufferWritable& output) const { if (!is_finalized()) { - const_cast<ColumnObject*>(this)->finalize(FinalizeMode::READ_MODE); + const_cast<ColumnObject*>(this)->finalize(); } if (is_scalar_variant()) { auto type = get_root_type(); @@ -1504,99 +1505,13 @@ Status ColumnObject::serialize_one_row_to_json_format(int64_t row, rapidjson::St return Status::OK(); } -Status ColumnObject::merge_sparse_to_root_column() { - CHECK(is_finalized()); - if (sparse_columns.empty()) { - return Status::OK(); +size_t ColumnObject::Subcolumn::get_non_null_value_size() const { + size_t res = 0; + for (const auto& part : data) { + const auto& null_data = assert_cast<const ColumnNullable&>(*part).get_null_map_data(); + res += simd::count_zero_num((int8_t*)null_data.data(), null_data.size()); } - ColumnPtr src = subcolumns.get_mutable_root()->data.get_finalized_column_ptr(); - MutableColumnPtr mresult = src->clone_empty(); - const ColumnNullable* src_null = assert_cast<const ColumnNullable*>(src.get()); - const ColumnString* src_column_ptr = - assert_cast<const ColumnString*>(&src_null->get_nested_column()); - rapidjson::StringBuffer buffer; - doc_structure = std::make_shared<rapidjson::Document>(); - rapidjson::Document::AllocatorType& allocator = doc_structure->GetAllocator(); - get_json_by_column_tree(*doc_structure, allocator, sparse_columns.get_root()); - -#ifndef NDEBUG - VLOG_DEBUG << "dump structure " << JsonFunctions::print_json_value(*doc_structure); -#endif - - ColumnNullable* result_column_nullable = - assert_cast<ColumnNullable*>(mresult->assume_mutable().get()); - ColumnString* result_column_ptr = - assert_cast<ColumnString*>(&result_column_nullable->get_nested_column()); - result_column_nullable->reserve(num_rows); - // parse each row to jsonb - for (size_t i = 0; i < num_rows; ++i) { - // root is not null, store original value, eg. the root is scalar type like '[1]' - if (!src_null->empty() && !src_null->is_null_at(i)) { - result_column_ptr->insert_data(src_column_ptr->get_data_at(i).data, - src_column_ptr->get_data_at(i).size); - result_column_nullable->get_null_map_data().push_back(0); - continue; - } - - // parse and encode sparse columns - buffer.Clear(); - rapidjson::Value root(rapidjson::kNullType); - if (!doc_structure->IsNull()) { - root.CopyFrom(*doc_structure, doc_structure->GetAllocator()); - } - size_t null_count = 0; - Arena mem_pool; - for (const auto& subcolumn : sparse_columns) { - auto& column = subcolumn->data.get_finalized_column_ptr(); - if (assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(*column).is_null_at( - i)) { - ++null_count; - continue; - } - bool succ = find_and_set_leave_value( - column, subcolumn->path, subcolumn->data.get_least_common_type_serde(), - subcolumn->data.get_least_common_type(), - subcolumn->data.least_common_type.get_base_type_id(), root, - doc_structure->GetAllocator(), mem_pool, i); - if (succ && subcolumn->path.empty() && !root.IsObject()) { - // root was modified, only handle root node - break; - } - } - - // all null values, store null to sparse root - if (null_count == sparse_columns.size()) { - result_column_ptr->insert_default(); - result_column_nullable->get_null_map_data().push_back(1); - continue; - } - - // encode sparse columns into jsonb format - compact_null_values(root, doc_structure->GetAllocator()); - // parse as jsonb value and put back to rootnode - // TODO, we could convert to jsonb directly from rapidjson::Value for better performance, instead of parsing - JsonbParser parser; - rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); - root.Accept(writer); - bool res = parser.parse(buffer.GetString(), buffer.GetSize()); - if (!res) { - return Status::InvalidArgument( - "parse json failed, doc: {}" - ", row_num:{}" - ", error:{}", - std::string(buffer.GetString(), buffer.GetSize()), i, - JsonbErrMsg::getErrMsg(parser.getErrorCode())); - } - result_column_ptr->insert_data(parser.getWriter().getOutput()->getBuffer(), - parser.getWriter().getOutput()->getSize()); - result_column_nullable->get_null_map_data().push_back(0); - } - subcolumns.get_mutable_root()->data.get_finalized_column().clear(); - // assign merged column, do insert_range_from to make a copy, instead of replace the ptr itselft - // to make sure the root column ptr is not changed - subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from( - *mresult->get_ptr(), 0, num_rows); - return Status::OK(); + return res; } void ColumnObject::unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) const { @@ -1634,13 +1549,50 @@ void ColumnObject::unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) co } } -void ColumnObject::finalize(FinalizeMode mode) { +Status ColumnObject::finalize(FinalizeMode mode) { Subcolumns new_subcolumns; // finalize root first if (mode == FinalizeMode::WRITE_MODE || !is_null_root()) { new_subcolumns.create_root(subcolumns.get_root()->data); new_subcolumns.get_mutable_root()->data.finalize(mode); } + + // pick sparse columns + std::set<String> selected_subcolumns; + std::set<String> remaining_subcolumns; + if (subcolumns.size() > MAX_SUBCOLUMNS) { + // pick subcolumns sort by size of none null values + std::unordered_map<String, size_t> none_null_value_sizes; + // 1. get the none null value sizes + for (auto&& entry : subcolumns) { + if (entry->data.is_root) { + continue; + } + size_t size = entry->data.get_non_null_value_size(); + none_null_value_sizes[entry->path.get_path()] = size; + } + // 2. sort by the size + std::vector<std::pair<String, size_t>> sorted_by_size(none_null_value_sizes.begin(), + none_null_value_sizes.end()); + std::sort(sorted_by_size.begin(), sorted_by_size.end(), + [](const auto& a, const auto& b) { return a.second > b.second; }); + + // 3. pick MAX_SUBCOLUMNS selected subcolumns + std::set<String> selected_subcolumns; + for (size_t i = 0; i < std::min(MAX_SUBCOLUMNS, sorted_by_size.size()); ++i) { + selected_subcolumns.insert(sorted_by_size[i].first); + } + + // 4. put remaining subcolumns to remaining_subcolumns + std::vector<String> remaining_subcolumns; + for (const auto& entry : sorted_by_size) { + if (selected_subcolumns.find(entry.first) == selected_subcolumns.end()) { + remaining_subcolumns.push_back(entry.first); + } + } + } + + // finalize all subcolumns for (auto&& entry : subcolumns) { const auto& least_common_type = entry->data.get_least_common_type(); /// Do not add subcolumns, which consists only from NULLs @@ -1661,24 +1613,34 @@ void ColumnObject::finalize(FinalizeMode mode) { if (entry->data.is_root) { continue; } + } - // Check and spilit sparse subcolumns, not support nested array at present - if (mode == FinalizeMode::WRITE_MODE && (entry->data.check_if_sparse_column(num_rows)) && - !entry->path.has_nested_part()) { - // TODO seperate ambiguous path - sparse_columns.add(entry->path, entry->data); - continue; + // add selected subcolumns to new_subcolumns + for (auto&& entry : subcolumns) { + if (selected_subcolumns.find(entry->path.get_path()) != selected_subcolumns.end()) { + new_subcolumns.add(entry->path, entry->data); } + } - new_subcolumns.add(entry->path, entry->data); + std::map<String, Subcolumn> remaing_subcolumns; + // merge remaining subcolumns to sparse_column + for (auto&& entry : subcolumns) { + if (remaining_subcolumns.find(entry->path.get_path()) != selected_subcolumns.end()) { + remaing_subcolumns.emplace(entry->path.get_path(), entry->data); + } } + + // merge and encode sparse column + RETURN_IF_ERROR(merge_sparse_columns(remaing_subcolumns)); + std::swap(subcolumns, new_subcolumns); doc_structure = nullptr; _prev_positions.clear(); + return Status::OK(); } void ColumnObject::finalize() { - finalize(FinalizeMode::READ_MODE); + static_cast<void>(finalize(FinalizeMode::READ_MODE)); } void ColumnObject::ensure_root_node_type(const DataTypePtr& expected_root_type) { diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 21bb4469115..1475a168c23 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -38,6 +38,7 @@ #include "olap/tablet_schema.h" #include "util/jsonb_document.h" #include "vec/columns/column.h" +#include "vec/columns/column_map.h" #include "vec/columns/subcolumn_tree.h" #include "vec/common/cow.h" #include "vec/common/string_ref.h" @@ -97,6 +98,7 @@ public: constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB; // Nullable(Array(Nullable(Object))) const static DataTypePtr NESTED_TYPE; + const static size_t MAX_SUBCOLUMNS = 200; // Finlize mode for subcolumns, write mode will estimate which subcolumns are sparse columns(too many null values inside column), // merge and encode them into a shared column in root column. Only affects in flush block to segments. // Otherwise read mode should be as default mode. @@ -124,6 +126,8 @@ public: return least_common_type.get_base(); } + size_t get_non_null_value_size() const; + const DataTypeSerDeSPtr& get_least_common_type_serde() const { return least_common_type.get_serde(); } @@ -240,12 +244,8 @@ private: const bool is_nullable; Subcolumns subcolumns; size_t num_rows; - // sparse columns will be merge and encoded into root column - Subcolumns sparse_columns; - // The rapidjson document format of Subcolumns tree structure - // the leaves is null.In order to display whole document, copy - // this structure and fill with Subcolumns sub items - mutable std::shared_ptr<rapidjson::Document> doc_structure; + // sparse columns will be merge and encoded as ColumnMap<String, String> + WrappedPtr sparse_column; using SubColumnWithName = std::pair<PathInData, const Subcolumn*>; // Cached search results for previous row (keyed as index in JSON object) - used as a hint. @@ -280,12 +280,19 @@ public: Status serialize_one_row_to_json_format(int64_t row, rapidjson::StringBuffer* output, bool* is_null) const; - // merge multiple sub sparse columns into root - Status merge_sparse_to_root_column(); + // merge multiple sub sparse columns + Status merge_sparse_columns(const std::map<String, Subcolumn>& remaing_subcolumns); // ensure root node is a certain type void ensure_root_node_type(const DataTypePtr& type); + std::pair<ColumnString*, ColumnString*> get_sparse_data_paths_and_values() { + auto& column_map = assert_cast<ColumnMap&>(*sparse_column); + auto& key = assert_cast<ColumnString&>(column_map.get_keys()); + auto& value = assert_cast<ColumnString&>(column_map.get_values()); + return {&key, &value}; + } + // create jsonb root if missing // notice: should only using in VariantRootColumnIterator // since some datastructures(sparse columns are schema on read @@ -345,14 +352,14 @@ public: const Subcolumns& get_subcolumns() const { return subcolumns; } - const Subcolumns& get_sparse_subcolumns() const { return sparse_columns; } - Subcolumns& get_subcolumns() { return subcolumns; } + ColumnPtr get_sparse_column() { return sparse_column->convert_to_full_column_if_const(); } + PathsInData getKeys() const; // use sparse_subcolumns_schema to record sparse column's path info and type - void finalize(FinalizeMode mode); + Status finalize(FinalizeMode mode); /// Finalizes all subcolumns. void finalize() override; @@ -361,7 +368,7 @@ public: MutableColumnPtr clone_finalized() const { auto finalized = IColumn::mutate(get_ptr()); - static_cast<ColumnObject*>(finalized.get())->finalize(FinalizeMode::READ_MODE); + static_cast<ColumnObject*>(finalized.get())->finalize(); return finalized; } diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index fd50af3e1fc..42f9240646f 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -542,14 +542,6 @@ Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos, }); } -Status encode_variant_sparse_subcolumns(ColumnObject& column) { - // Make sure the root node is jsonb storage type - auto expected_root_type = make_nullable(std::make_shared<ColumnObject::MostCommonType>()); - column.ensure_root_node_type(expected_root_type); - RETURN_IF_ERROR(column.merge_sparse_to_root_column()); - return Status::OK(); -} - // sort by paths in lexicographical order vectorized::ColumnObject::Subcolumns get_sorted_subcolumns( const vectorized::ColumnObject::Subcolumns& subcolumns) { @@ -614,4 +606,19 @@ bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* o return new_schema_has_inverted_index != old_schema_has_inverted_index; } +TabletColumn create_sparse_column(int32_t parent_unique_id) { + TColumn tcolumn; + tcolumn.column_name = ".sparse"; + tcolumn.col_unique_id = parent_unique_id; + tcolumn.column_type = TColumnType {}; + tcolumn.column_type.type = TPrimitiveType::MAP; + + TColumn child_tcolumn; + tcolumn.column_type = TColumnType {}; + tcolumn.column_type.type = TPrimitiveType::STRING; + tcolumn.children_column.push_back(child_tcolumn); + tcolumn.children_column.push_back(child_tcolumn); + return TabletColumn {tcolumn}; +} + } // namespace doris::vectorized::schema_util diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 7c228ed2cc0..0507c9e2fe6 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -126,4 +126,7 @@ std::string dump_column(DataTypePtr type, const ColumnPtr& col); bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* old_schema, int32_t new_col_idx, int32_t old_col_idx); +// create ColumnMap<String, String> +TabletColumn create_sparse_column(int32_t parent_unique_id); + } // namespace doris::vectorized::schema_util diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index 4c7183bae9a..37a4f0a70ee 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -159,6 +159,12 @@ message ColumnPathInfo { optional uint32 parrent_column_unique_id = 4; } +message VariantStatisticsPB { + // in the order of subcolumns in variant + repeated uint32 subcolumn_non_null_size = 1; + map<string, uint32> sparse_column_non_null_size = 2; +} + message ColumnMetaPB { // column id in table schema optional uint32 column_id = 1; @@ -192,11 +198,12 @@ message ColumnMetaPB { optional int32 precision = 15; // ColumnMessage.precision optional int32 frac = 16; // ColumnMessag - repeated ColumnMetaPB sparse_columns = 17; // sparse column within a variant column + repeated ColumnMetaPB sparse_columns = 17; // deprecated optional bool result_is_nullable = 18; // used on agg_state type optional string function_name = 19; // used on agg_state type optional int32 be_exec_version = 20; // used on agg_state type + optional VariantStatisticsPB variant_statistics = 21; // only used in variant type } message PrimaryKeyIndexMetaPB { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org