This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push: new 98dd3ee257c support vertical compaction merge sparse (#48401) 98dd3ee257c is described below commit 98dd3ee257c06ab03074d4ac90abf23b39f7dd16 Author: lihangyu <lihan...@selectdb.com> AuthorDate: Tue Mar 4 10:19:56 2025 +0800 support vertical compaction merge sparse (#48401) --- be/src/cloud/cloud_base_compaction.h | 2 +- be/src/cloud/cloud_rowset_writer.cpp | 5 +- be/src/olap/base_tablet.cpp | 25 ++ be/src/olap/base_tablet.h | 3 + be/src/olap/compaction.cpp | 18 +- be/src/olap/compaction.h | 4 +- be/src/olap/iterators.h | 3 + be/src/olap/rowset/beta_rowset_writer.cpp | 9 +- be/src/olap/rowset/rowset_writer_context.h | 3 + be/src/olap/rowset/segment_v2/column_reader.cpp | 206 +++++++---- be/src/olap/rowset/segment_v2/column_reader.h | 12 +- be/src/olap/rowset/segment_v2/column_writer.cpp | 6 + be/src/olap/rowset/segment_v2/column_writer.h | 51 +++ .../rowset/segment_v2/hierarchical_data_reader.cpp | 177 ++++++++-- .../rowset/segment_v2/hierarchical_data_reader.h | 192 ++++++++++- be/src/olap/rowset/segment_v2/segment.cpp | 6 +- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 3 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 50 ++- be/src/olap/rowset/segment_v2/segment_writer.h | 1 + be/src/olap/rowset/segment_v2/stream_reader.h | 1 + .../segment_v2/variant_column_writer_impl.cpp | 382 +++++++++++++++------ .../rowset/segment_v2/variant_column_writer_impl.h | 6 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 7 +- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 1 + be/src/olap/tablet_schema.cpp | 7 + be/src/olap/tablet_schema.h | 36 +- be/src/vec/columns/column_dummy.h | 1 + be/src/vec/columns/column_object.cpp | 6 +- be/src/vec/columns/column_object.h | 22 +- be/src/vec/common/schema_util.cpp | 190 +++++++++- be/src/vec/common/schema_util.h | 14 +- be/src/vec/json/path_in_data.h | 5 + gensrc/proto/segment_v2.proto | 2 +- .../suites/variant_github_events_p2/load.groovy | 7 +- .../variant_p0/update/inverted_index/load.groovy | 9 +- .../variant_p0/update/inverted_index/query.groovy | 3 +- 36 files changed, 1193 insertions(+), 282 deletions(-) diff --git a/be/src/cloud/cloud_base_compaction.h b/be/src/cloud/cloud_base_compaction.h index 4240458f21b..13eeccb3d58 100644 --- a/be/src/cloud/cloud_base_compaction.h +++ b/be/src/cloud/cloud_base_compaction.h @@ -46,7 +46,7 @@ private: void _filter_input_rowset(); - void build_basic_info(); + Status build_basic_info(); ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; } diff --git a/be/src/cloud/cloud_rowset_writer.cpp b/be/src/cloud/cloud_rowset_writer.cpp index ebc411697ee..343ccc23b27 100644 --- a/be/src/cloud/cloud_rowset_writer.cpp +++ b/be/src/cloud/cloud_rowset_writer.cpp @@ -60,7 +60,10 @@ Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) DCHECK_NE(_context.newest_write_timestamp, -1); _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); } - _rowset_meta->set_tablet_schema(_context.tablet_schema); + auto schema = _context.tablet_schema->need_record_variant_extended_schema() + ? _context.tablet_schema + : _context.tablet_schema->copy_without_variant_extracted_columns(); + _rowset_meta->set_tablet_schema(schema); _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); return Status::OK(); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index a4720f89d19..06ad16ab2af 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -30,6 +30,7 @@ #include "olap/delete_bitmap_calculator.h" #include "olap/iterators.h" #include "olap/memtable.h" +#include "olap/olap_common.h" #include "olap/partial_update_info.h" #include "olap/primary_key_index.h" #include "olap/rowid_conversion.h" @@ -163,6 +164,30 @@ TabletSchemaSPtr BaseTablet::tablet_schema_with_merged_max_schema_version( return target_schema; } +Status BaseTablet::get_compaction_schema(const std::vector<RowsetMetaSharedPtr>& rowset_metas, + TabletSchemaSPtr& target_schema) { + RowsetMetaSharedPtr max_schema_version_rs = *std::max_element( + rowset_metas.begin(), rowset_metas.end(), + [](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { + return !a->tablet_schema() + ? true + : (!b->tablet_schema() + ? false + : a->tablet_schema()->schema_version() < + b->tablet_schema()->schema_version()); + }); + target_schema = max_schema_version_rs->tablet_schema(); + if (target_schema->num_variant_columns() > 0) { + RowsetIdUnorderedSet rowset_ids; + for (const RowsetMetaSharedPtr& rs_meta : rowset_metas) { + rowset_ids.emplace(rs_meta->rowset_id()); + } + RETURN_IF_ERROR(vectorized::schema_util::get_compaction_schema( + get_rowset_by_ids(&rowset_ids), target_schema)); + } + return Status::OK(); +} + Status BaseTablet::set_tablet_state(TabletState state) { if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) { return Status::Error<META_INVALID_ARGUMENT>( diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index c6de447200f..1af9327b4a8 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -135,6 +135,9 @@ public: static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version( const std::vector<RowsetMetaSharedPtr>& rowset_metas); + Status get_compaction_schema(const std::vector<RowsetMetaSharedPtr>& rowset_metas, + TabletSchemaSPtr& target_schema); + //////////////////////////////////////////////////////////////////////////// // begin MoW functions //////////////////////////////////////////////////////////////////////////// diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 571ec7f9525..acfcc34470f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -289,7 +289,7 @@ Tablet* CompactionMixin::tablet() { } Status CompactionMixin::do_compact_ordered_rowsets() { - build_basic_info(); + RETURN_IF_ERROR(build_basic_info()); RowsetWriterContext ctx; RETURN_IF_ERROR(construct_output_rowset_writer(ctx)); @@ -323,7 +323,7 @@ Status CompactionMixin::do_compact_ordered_rowsets() { return Status::OK(); } -void CompactionMixin::build_basic_info() { +Status CompactionMixin::build_basic_info() { for (auto& rowset : _input_rowsets) { _input_rowsets_data_size += rowset->data_disk_size(); _input_rowsets_index_size += rowset->index_disk_size(); @@ -344,6 +344,10 @@ void CompactionMixin::build_basic_info() { std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(), [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); }); _cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas); + if (!_cur_tablet_schema->need_record_variant_extended_schema()) { + RETURN_IF_ERROR(_tablet->get_compaction_schema(rowset_metas, _cur_tablet_schema)); + } + return Status::OK(); } bool CompactionMixin::handle_ordered_data_compaction() { @@ -461,7 +465,7 @@ Status CompactionMixin::execute_compact_impl(int64_t permits) { _state = CompactionState::SUCCESS; return Status::OK(); } - build_basic_info(); + RETURN_IF_ERROR(build_basic_info()); VLOG_DEBUG << "dump tablet schema: " << _cur_tablet_schema->dump_structure(); @@ -1348,7 +1352,7 @@ void Compaction::_load_segment_to_cache() { } } -void CloudCompactionMixin::build_basic_info() { +Status CloudCompactionMixin::build_basic_info() { _output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); @@ -1358,6 +1362,10 @@ void CloudCompactionMixin::build_basic_info() { std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(), [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); }); _cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas); + if (!_cur_tablet_schema->need_record_variant_extended_schema()) { + RETURN_IF_ERROR(_tablet->get_compaction_schema(rowset_metas, _cur_tablet_schema)); + } + return Status::OK(); } int64_t CloudCompactionMixin::get_compaction_permits() { @@ -1375,7 +1383,7 @@ CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, CloudTabl Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { OlapStopWatch watch; - build_basic_info(); + RETURN_IF_ERROR(build_basic_info()); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->tablet_id() << ", output_version=" << _output_version << ", permits: " << permits; diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 057f4084b06..973d4e9d0ab 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -165,7 +165,7 @@ protected: private: Status execute_compact_impl(int64_t permits); - void build_basic_info(); + Status build_basic_info(); // Return true if do ordered data compaction successfully bool handle_ordered_data_compaction(); @@ -204,7 +204,7 @@ private: Status execute_compact_impl(int64_t permits); - void build_basic_info(); + Status build_basic_info(); virtual Status modify_rowsets(); diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 1d3c2ddf6b6..963f4d23598 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -28,6 +28,7 @@ #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/tablet_schema.h" #include "runtime/runtime_state.h" +#include "vec/columns/column.h" #include "vec/core/block.h" #include "vec/exprs/vexpr.h" @@ -119,6 +120,8 @@ public: std::map<std::string, TypeDescriptor> target_cast_type_for_variants; RowRanges row_ranges; size_t topn_limit = 0; + // Cache for sparse column data to avoid redundant reads + vectorized::ColumnPtr sparse_column_cache; }; struct CompactionSampleInfo { diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index dc155efe016..a7745d2bc56 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -301,7 +301,10 @@ Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); } _rowset_meta->set_tablet_uid(_context.tablet_uid); - _rowset_meta->set_tablet_schema(_context.tablet_schema); + auto schema = _context.tablet_schema->need_record_variant_extended_schema() + ? _context.tablet_schema + : _context.tablet_schema->copy_without_variant_extracted_columns(); + _rowset_meta->set_tablet_schema(schema); _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); return Status::OK(); @@ -801,6 +804,10 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { // update rowset meta tablet schema if tablet schema updated auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema : _context.tablet_schema; + + rowset_schema = rowset_schema->need_record_variant_extended_schema() + ? rowset_schema + : rowset_schema->copy_without_variant_extracted_columns(); _rowset_meta->set_tablet_schema(rowset_schema); // If segment compaction occurs, the idx file info will become inaccurate. diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index cbdba6991ae..44902459b11 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -19,6 +19,9 @@ #include <gen_cpp/olap_file.pb.h> +#include <string_view> +#include <unordered_map> + #include "olap/olap_define.h" #include "olap/partial_update_info.h" #include "olap/storage_policy.h" diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index c71b45b82c1..dfe7c02c7a8 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -32,6 +32,7 @@ #include "common/status.h" #include "io/fs/file_reader.h" #include "io/fs/file_system.h" +#include "io/io_common.h" #include "olap/block_column_predicate.h" #include "olap/column_predicate.h" #include "olap/comparison_predicate.h" @@ -223,11 +224,35 @@ Status ColumnReader::create_agg_state(const ColumnReaderOptions& opts, const Col agg_state_type->get_name(), int(type)); } +bool ColumnReader::is_compaction_reader_type(ReaderType type) { + return type == ReaderType::READER_BASE_COMPACTION || + type == ReaderType::READER_CUMULATIVE_COMPACTION || + type == ReaderType::READER_COLD_DATA_COMPACTION || + type == ReaderType::READER_SEGMENT_COMPACTION || + type == ReaderType::READER_FULL_COMPACTION; +} + const SubcolumnColumnReaders::Node* VariantColumnReader::get_reader_by_path( const vectorized::PathInData& relative_path) const { return _subcolumn_readers->find_leaf(relative_path); } +bool VariantColumnReader::exist_in_sparse_column( + const vectorized::PathInData& relative_path) const { + // Check if path exist in sparse column + bool existed_in_sparse_column = + !_statistics->sparse_column_non_null_size.empty() && + _statistics->sparse_column_non_null_size.find(relative_path.get_path()) != + _statistics->sparse_column_non_null_size.end(); + const std::string& prefix = relative_path.get_path() + "."; + bool prefix_existed_in_sparse_column = + !_statistics->sparse_column_non_null_size.empty() && + (_statistics->sparse_column_non_null_size.lower_bound(prefix) != + _statistics->sparse_column_non_null_size.end()) && + _statistics->sparse_column_non_null_size.lower_bound(prefix)->first.starts_with(prefix); + return existed_in_sparse_column || prefix_existed_in_sparse_column; +} + int64_t VariantColumnReader::get_metadata_size() const { int64_t size = ColumnReader::get_metadata_size(); if (_statistics) { @@ -268,30 +293,40 @@ Status VariantColumnReader::_create_hierarchical_reader(ColumnIterator** reader, return Status::OK(); } -bool VariantColumnReader::_read_flat_leaves(ReaderType type, const TabletColumn& target_col) { - auto relative_path = target_col.path_info_ptr()->copy_pop_front(); - bool is_compaction_type = - (type == ReaderType::READER_BASE_COMPACTION || - type == ReaderType::READER_CUMULATIVE_COMPACTION || - type == ReaderType::READER_COLD_DATA_COMPACTION || - type == ReaderType::READER_SEGMENT_COMPACTION || - type == ReaderType::READER_FULL_COMPACTION || type == ReaderType::READER_CHECKSUM); - // For compaction operations (e.g., base compaction, cumulative compaction, cold data compaction, - // segment compaction, full compaction, or checksum reading), a legacy compaction style is applied - // when reading variant columns. - // - // Specifically: - // 1. If the target column is a root column (i.e., relative_path is empty) and it does not have any - // subcolumns (i.e., target_col.variant_max_subcolumns_count() <= 0), then the legacy compaction style - // is used. - // 2. If the target column is a nested subcolumn (i.e., relative_path is not empty), then the legacy - // compaction style is also used. - // - // This ensures that during compaction, the reading behavior for variant columns remains consistent - // with historical processing methods, preventing potential data amplification issues. - return is_compaction_type && - ((relative_path.empty() && target_col.variant_max_subcolumns_count() <= 0) || - !relative_path.empty()); +Status VariantColumnReader::_create_sparse_merge_reader(ColumnIterator** iterator, + const StorageReadOptions* opts, + const TabletColumn& target_col, + ColumnIterator* inner_iter) { + // Get subcolumns path set from tablet schema + const auto& path_set_info = opts->tablet_schema->path_set_info(target_col.parent_unique_id()); + + // Build substream reader tree for merging subcolumns into sparse column + SubstreamReaderTree src_subcolumns_for_sparse; + for (const auto& subcolumn_reader : *_subcolumn_readers) { + const auto& path = subcolumn_reader->path.get_path(); + if (path_set_info.sparse_path_set.find(StringRef(path)) == + path_set_info.sparse_path_set.end()) { + // The subcolumn is not a sparse column, skip it + continue; + } + // Create subcolumn iterator + ColumnIterator* it; + RETURN_IF_ERROR(subcolumn_reader->data.reader->new_iterator(&it)); + std::unique_ptr<ColumnIterator> it_ptr(it); + + // Create substream reader and add to tree + SubstreamIterator reader(subcolumn_reader->data.file_column_type->create_column(), + std::move(it_ptr), subcolumn_reader->data.file_column_type); + if (!src_subcolumns_for_sparse.add(subcolumn_reader->path, std::move(reader))) { + return Status::InternalError("Failed to add node path {}", path); + } + } + + // Create sparse column merge reader + *iterator = new SparseColumnMergeReader( + path_set_info.sub_path_set, std::unique_ptr<ColumnIterator>(inner_iter), + std::move(src_subcolumns_for_sparse), const_cast<StorageReadOptions*>(opts)); + return Status::OK(); } Status VariantColumnReader::_new_default_iter_with_same_nested(ColumnIterator** iterator, @@ -331,20 +366,43 @@ Status VariantColumnReader::_new_default_iter_with_same_nested(ColumnIterator** } Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIterator** iterator, - const TabletColumn& target_col) { + const TabletColumn& target_col, + const StorageReadOptions* opts, + bool exceeded_sparse_column_limit, + bool existed_in_sparse_column) { + DCHECK(opts != nullptr); auto relative_path = target_col.path_info_ptr()->copy_pop_front(); // compaction need to read flat leaves nodes data to prevent from amplification const auto* node = target_col.has_path_info() ? _subcolumn_readers->find_leaf(relative_path) : nullptr; if (!node) { + if (existed_in_sparse_column || exceeded_sparse_column_limit) { + // Sparse column exists or reached sparse size limit, read sparse column + ColumnIterator* inner_iter; + RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter)); + DCHECK(opts); + *iterator = new SparseColumnExtractReader( + relative_path.get_path(), std::unique_ptr<ColumnIterator>(inner_iter), + // need to modify sparse_column_cache, so use const_cast here + const_cast<StorageReadOptions*>(opts)); + return Status::OK(); + } + if (relative_path.get_path() == SPARSE_COLUMN_PATH) { + // read sparse column and filter extracted columns in subcolumn_path_map + ColumnIterator* inner_iter; + RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter)); + // get subcolumns in sparse path set which will be merged into sparse column + RETURN_IF_ERROR(_create_sparse_merge_reader(iterator, opts, target_col, inner_iter)); + return Status::OK(); + } if (target_col.is_nested_subcolumn()) { // using the sibling of the nested column to fill the target nested column RETURN_IF_ERROR(_new_default_iter_with_same_nested(iterator, target_col)); - } else { - std::unique_ptr<ColumnIterator> it; - RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &it)); - *iterator = it.release(); + return Status::OK(); } + std::unique_ptr<ColumnIterator> it; + RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &it)); + *iterator = it.release(); return Status::OK(); } if (relative_path.empty()) { @@ -365,9 +423,48 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet const auto* node = target_col.has_path_info() ? _subcolumn_readers->find_exact(relative_path) : nullptr; - if (opt != nullptr && _read_flat_leaves(opt->io_ctx.reader_type, target_col)) { + // Check if path exist in sparse column + bool existed_in_sparse_column = + !_statistics->sparse_column_non_null_size.empty() && + _statistics->sparse_column_non_null_size.find(relative_path.get_path()) != + _statistics->sparse_column_non_null_size.end(); + + // Otherwise the prefix is not exist and the sparse column size is reached limit + // which means the path maybe exist in sparse_column + bool exceeded_sparse_column_limit = !_statistics->sparse_column_non_null_size.empty() && + _statistics->sparse_column_non_null_size.size() > + VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE; + + // For compaction operations, read flat leaves, otherwise read hierarchical data + // Since the variant subcolumns are flattened in schema_util::get_compaction_schema + if (opt != nullptr && is_compaction_reader_type(opt->io_ctx.reader_type)) { // original path, compaction with wide schema - return _new_iterator_with_flat_leaves(iterator, target_col); + return _new_iterator_with_flat_leaves( + iterator, target_col, opt, exceeded_sparse_column_limit, existed_in_sparse_column); + } + + // Check if path is prefix, example sparse columns path: a.b.c, a.b.e, access prefix: a.b. + // then we must read the sparse columns + const std::string& prefix = relative_path.get_path() + "."; + bool prefix_existed_in_sparse_column = + !_statistics->sparse_column_non_null_size.empty() && + (_statistics->sparse_column_non_null_size.lower_bound(prefix) != + _statistics->sparse_column_non_null_size.end()) && + _statistics->sparse_column_non_null_size.lower_bound(prefix)->first.starts_with(prefix); + // if prefix exists in sparse column, read sparse column with hierarchical reader + if (prefix_existed_in_sparse_column) { + return _create_hierarchical_reader(iterator, relative_path, nullptr, root); + } + + // if path exists in sparse column, read sparse column with extract reader + if (existed_in_sparse_column) { + // Sparse column exists or reached sparse size limit, read sparse column + ColumnIterator* inner_iter; + RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter)); + DCHECK(opt); + *iterator = new SparseColumnExtractReader( + relative_path.get_path(), std::unique_ptr<ColumnIterator>(inner_iter), nullptr); + return Status::OK(); } if (node != nullptr) { @@ -381,34 +478,7 @@ Status VariantColumnReader::new_iterator(ColumnIterator** iterator, const Tablet RETURN_IF_ERROR(_create_hierarchical_reader(iterator, relative_path, node, root)); } } else { - // Check if path exist in sparse column - bool existed_in_sparse_column = - _statistics && - _statistics->sparse_column_non_null_size.find(relative_path.get_path()) != - _statistics->sparse_column_non_null_size.end(); - if (existed_in_sparse_column) { - // Sparse column exists or reached sparse size limit, read sparse column - ColumnIterator* inner_iter; - RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter)); - *iterator = new SparseColumnExtractReader(relative_path.get_path(), - std::unique_ptr<ColumnIterator>(inner_iter)); - return Status::OK(); - } - // Check if path is prefix, example sparse columns path: a.b.c, a.b.e, access prefix: a.b. - // then we must read the sparse columns - bool prefix_existed_in_sparse_column = - _statistics && - (_statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path()) != - _statistics->sparse_column_non_null_size.end()) && - _statistics->sparse_column_non_null_size.lower_bound(relative_path.get_path()) - ->first.starts_with(relative_path.get_path() + "."); - - // Otherwise the prefix is not exist and the sparse column size is reached limit - // which means the path maybe exist in sparse_column - bool exceeded_sparse_column_limit = - _statistics && _statistics->sparse_column_non_null_size.size() > - VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE; - if (prefix_existed_in_sparse_column || exceeded_sparse_column_limit) { + if (exceeded_sparse_column_limit) { return _create_hierarchical_reader(iterator, relative_path, nullptr, root); } // Sparse column not exists and not reached stats limit, then the target path is not exist, get a default iterator @@ -424,6 +494,7 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF io::FileReaderSPtr file_reader) { // init sub columns _subcolumn_readers = std::make_unique<SubcolumnColumnReaders>(); + _statistics = std::make_unique<VariantStatistics>(); const ColumnMetaPB& self_column_pb = footer.columns(column_id); for (const ColumnMetaPB& column_pb : footer.columns()) { // Find all columns belonging to the current variant column @@ -449,12 +520,20 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF ColumnReader::create(opts, column_pb, footer.num_rows(), file_reader, &reader)); vectorized::PathInData path; path.from_protobuf(column_pb.column_path_info()); + // init sparse column - if (path.get_path() == SPARSE_COLUMN_PATH) { + if (path.copy_pop_front().get_path() == SPARSE_COLUMN_PATH) { + DCHECK(column_pb.has_variant_statistics()); + const auto& variant_stats = column_pb.variant_statistics(); + for (const auto& [path, size] : variant_stats.sparse_column_non_null_size()) { + _statistics->sparse_column_non_null_size.emplace(path, size); + } RETURN_IF_ERROR(ColumnReader::create(opts, column_pb, footer.num_rows(), file_reader, &_sparse_column_reader)); continue; } + + // init subcolumns auto relative_path = path.copy_pop_front(); auto get_data_type_fn = [&]() { // root subcolumn is ColumnObject::MostCommonType which is jsonb @@ -476,6 +555,10 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF SubcolumnReader {std::move(reader), get_data_type_fn()}); } else { // check the root is already a leaf node + if (column_pb.has_none_null_size()) { + _statistics->subcolumns_non_null_size.emplace(relative_path.get_path(), + column_pb.none_null_size()); + } _subcolumn_readers->add(relative_path, SubcolumnReader {std::move(reader), get_data_type_fn()}); // init TabletIndex for subcolumns @@ -500,9 +583,6 @@ Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentF for (const auto& [path, size] : variant_stats.sparse_column_non_null_size()) { _statistics->sparse_column_non_null_size.emplace(path, size); } - for (const auto& [path, size] : variant_stats.subcolumn_non_null_size()) { - _statistics->subcolumns_non_null_size.emplace(path, size); - } } return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 49058ef511f..311be3f9d73 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -144,6 +144,8 @@ public: std::unique_ptr<ColumnReader>* reader); enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, ALL_DICT_ENCODING }; + static bool is_compaction_reader_type(ReaderType type); + ~ColumnReader() override; // create a new column iterator. Client should delete returned iterator @@ -329,15 +331,21 @@ public: TabletIndex* find_subcolumn_tablet_index(const std::string&); + bool exist_in_sparse_column(const vectorized::PathInData& path) const; + private: - bool _read_flat_leaves(ReaderType type, const TabletColumn& target_col); // init for compaction read Status _new_default_iter_with_same_nested(ColumnIterator** iterator, const TabletColumn& col); - Status _new_iterator_with_flat_leaves(ColumnIterator** iterator, const TabletColumn& col); + Status _new_iterator_with_flat_leaves(ColumnIterator** iterator, const TabletColumn& col, + const StorageReadOptions* opts, + bool exceeded_sparse_column_limit, + bool existed_in_sparse_column); Status _create_hierarchical_reader(ColumnIterator** reader, vectorized::PathInData path, const SubcolumnColumnReaders::Node* node, const SubcolumnColumnReaders::Node* root); + Status _create_sparse_merge_reader(ColumnIterator** iterator, const StorageReadOptions* opts, + const TabletColumn& target_col, ColumnIterator* inner_iter); std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers; std::unique_ptr<ColumnReader> _sparse_column_reader; std::unique_ptr<VariantStatistics> _statistics; diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 3f001cb5671..eb42acb539d 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -296,6 +296,12 @@ Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts, Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts, const TabletColumn* column, io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) { + if (column->is_extracted_column()) { + VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path(); + *writer = std::unique_ptr<ColumnWriter>(new VariantSubcolumnWriter( + opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)))); + return Status::OK(); + } *writer = std::unique_ptr<ColumnWriter>(new VariantColumnWriter( opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)))); return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 53d7c5d0234..41464982476 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -35,6 +35,7 @@ #include "olap/rowset/segment_v2/inverted_index_writer.h" #include "util/bitmap.h" // for BitmapChange #include "util/slice.h" // for OwnedSlice +#include "vec/columns/column.h" namespace doris { @@ -475,6 +476,56 @@ private: ColumnWriterOptions _opts; }; +// used for compaction to write sub variant column +class VariantSubcolumnWriter : public ColumnWriter { +public: + explicit VariantSubcolumnWriter(const ColumnWriterOptions& opts, const TabletColumn* column, + std::unique_ptr<Field> field); + + ~VariantSubcolumnWriter() override = default; + + Status init() override; + + Status append_data(const uint8_t** ptr, size_t num_rows) override; + + uint64_t estimate_buffer_size() override; + + Status finish() override; + Status write_data() override; + Status write_ordinal_index() override; + + Status write_zone_map() override; + + Status write_bitmap_index() override; + Status write_inverted_index() override; + Status write_bloom_filter_index() override; + ordinal_t get_next_rowid() const override { return _next_rowid; } + + Status append_nulls(size_t num_rows) override { + return Status::NotSupported("variant writer can not append_nulls"); + } + Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows) override; + + Status finish_current_page() override { + return Status::NotSupported("variant writer has no data, can not finish_current_page"); + } + + size_t get_non_null_size() const { return none_null_size; } + + Status finalize(); + +private: + bool is_finalized() const; + bool _is_finalized = false; + ordinal_t _next_rowid = 0; + size_t none_null_size = 0; + vectorized::MutableColumnPtr _column; + const TabletColumn* _tablet_column = nullptr; + ColumnWriterOptions _opts; + std::unique_ptr<ColumnWriter> _writer; + std::unique_ptr<TabletIndex> _index; +}; + class VariantColumnWriter : public ColumnWriter { public: explicit VariantColumnWriter(const ColumnWriterOptions& opts, const TabletColumn* column, diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index b0788c10b54..e739028a3e7 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -381,7 +381,12 @@ Status HierarchicalDataReader::_process_sparse_column(vectorized::ColumnObject& // b maybe in sparse column, and b.c is in subolumn, put `b` into root column to distinguish // from "" which is empty path and root if (container_variant.is_null_root()) { - container_variant.add_sub_column({}, sparse_data_offsets.size()); + // root was created with nrows with Nothing type, resize it to fit the size of sparse column + container_variant.get_root()->resize(sparse_data_offsets.size()); + // bool added = container_variant.add_sub_column({}, sparse_data_offsets.size()); + // if (!added) { + // return Status::InternalError("Failed to add subcolumn for sparse column"); + // } } const auto& data = ColumnObject::deserialize_from_sparse_column( &src_sparse_data_values, lower_bound_index); @@ -435,16 +440,12 @@ Status HierarchicalDataReader::_init_null_map_and_clear_columns( return Status::OK(); } -Status SparseColumnExtractReader::init(const ColumnIteratorOptions& opts) { - return _sparse_column_reader->init(opts); -} - -Status SparseColumnExtractReader::seek_to_first() { - return _sparse_column_reader->seek_to_first(); -} - -Status SparseColumnExtractReader::seek_to_ordinal(ordinal_t ord) { - return _sparse_column_reader->seek_to_ordinal(ord); +bool is_compaction_type(ReaderType type) { + return type == ReaderType::READER_BASE_COMPACTION || + type == ReaderType::READER_CUMULATIVE_COMPACTION || + type == ReaderType::READER_COLD_DATA_COMPACTION || + type == ReaderType::READER_SEGMENT_COMPACTION || + type == ReaderType::READER_FULL_COMPACTION; } void SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& dst) { @@ -469,39 +470,147 @@ void SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& #ifndef NDEBUG var.check_consistency(); #endif - _sparse_column->clear(); + // _sparse_column->clear(); } -Status SparseColumnExtractReader::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, - bool* has_null) { - _sparse_column->clear(); - RETURN_IF_ERROR(_sparse_column_reader->next_batch(n, _sparse_column, has_null)); - const auto& offsets = assert_cast<const vectorized::ColumnMap&>(*_sparse_column).get_offsets(); - // Check if we don't have any paths in shared data in current range. - if (offsets.back() == offsets[-1]) { - dst->insert_many_defaults(*n); - } else { - _fill_path_column(dst); +Status SparseColumnMergeReader::seek_to_first() { + RETURN_IF_ERROR(_sparse_column_reader->seek_to_first()); + for (auto& entry : _src_subcolumns_for_sparse) { + RETURN_IF_ERROR(entry->data.iterator->seek_to_first()); } return Status::OK(); } -Status SparseColumnExtractReader::read_by_rowids(const rowid_t* rowids, const size_t count, - vectorized::MutableColumnPtr& dst) { - _sparse_column->clear(); - RETURN_IF_ERROR(_sparse_column_reader->read_by_rowids(rowids, count, _sparse_column)); - const auto& offsets = assert_cast<const vectorized::ColumnMap&>(*_sparse_column).get_offsets(); - // Check if we don't have any paths in shared data in current range. - if (offsets.back() == offsets[-1]) { - dst->insert_many_defaults(count); - } else { - _fill_path_column(dst); +Status SparseColumnMergeReader::seek_to_ordinal(ordinal_t ord) { + RETURN_IF_ERROR(_sparse_column_reader->seek_to_ordinal(ord)); + for (auto& entry : _src_subcolumns_for_sparse) { + RETURN_IF_ERROR(entry->data.iterator->seek_to_ordinal(ord)); + } + return Status::OK(); +} + +Status SparseColumnMergeReader::init(const ColumnIteratorOptions& opts) { + RETURN_IF_ERROR(_sparse_column_reader->init(opts)); + for (auto& entry : _src_subcolumns_for_sparse) { + entry->data.serde = entry->data.type->get_serde(); + RETURN_IF_ERROR(entry->data.iterator->init(opts)); + const auto& path = entry->path.get_path(); + _sorted_src_subcolumn_for_sparse.emplace_back(StringRef(path.data(), path.size()), entry); } + + // sort src subcolumns by path + std::sort( + _sorted_src_subcolumn_for_sparse.begin(), _sorted_src_subcolumn_for_sparse.end(), + [](const auto& lhsItem, const auto& rhsItem) { return lhsItem.first < rhsItem.first; }); return Status::OK(); } -ordinal_t SparseColumnExtractReader::get_current_ordinal() const { - return _sparse_column_reader->get_current_ordinal(); +void SparseColumnMergeReader::_serialize_nullable_column_to_sparse( + const SubstreamReaderTree::Node* src_subcolumn, + vectorized::ColumnString& dst_sparse_column_paths, + vectorized::ColumnString& dst_sparse_column_values, const StringRef& src_path, size_t row) { + // every subcolumn is always Nullable + const auto& nullable_serde = + assert_cast<vectorized::DataTypeNullableSerDe&>(*src_subcolumn->data.serde); + const auto& nullable_col = + assert_cast<const vectorized::ColumnNullable&, TypeCheckOnRelease::DISABLE>( + *src_subcolumn->data.column); + if (nullable_col.is_null_at(row)) { + return; + } + // insert key + dst_sparse_column_paths.insert_data(src_path.data, src_path.size); + // insert value + vectorized::ColumnString::Chars& chars = dst_sparse_column_values.get_chars(); + nullable_serde.get_nested_serde()->write_one_cell_to_binary(nullable_col.get_nested_column(), + chars, row); + dst_sparse_column_values.get_offsets().push_back(chars.size()); +} + +void SparseColumnMergeReader::_process_data_without_sparse_column(vectorized::MutableColumnPtr& dst, + size_t num_rows) { + if (_src_subcolumns_for_sparse.empty()) { + dst->insert_many_defaults(num_rows); + } else { + // merge subcolumns to sparse column + // Otherwise insert required src dense columns into sparse column. + auto& map_column = assert_cast<vectorized::ColumnMap&>(*dst); + auto& sparse_column_keys = assert_cast<vectorized::ColumnString&>(map_column.get_keys()); + auto& sparse_column_values = + assert_cast<vectorized::ColumnString&>(map_column.get_values()); + auto& sparse_column_offsets = map_column.get_offsets(); + for (size_t i = 0; i != num_rows; ++i) { + // Paths in sorted_src_subcolumn_for_sparse_column are already sorted. + for (const auto& entry : _sorted_src_subcolumn_for_sparse) { + const auto& path = entry.first; + _serialize_nullable_column_to_sparse(entry.second.get(), sparse_column_keys, + sparse_column_values, path, i); + } + sparse_column_offsets.push_back(sparse_column_keys.size()); + } + } +} + +void SparseColumnMergeReader::_merge_to(vectorized::MutableColumnPtr& dst) { + auto& column_map = assert_cast<vectorized::ColumnMap&>(*dst); + auto& dst_sparse_column_paths = assert_cast<vectorized::ColumnString&>(column_map.get_keys()); + auto& dst_sparse_column_values = + assert_cast<vectorized::ColumnString&>(column_map.get_values()); + auto& dst_sparse_column_offsets = column_map.get_offsets(); + + const auto& src_column_map = assert_cast<const vectorized::ColumnMap&>(*_sparse_column); + const auto& src_sparse_column_paths = + assert_cast<const vectorized::ColumnString&>(*src_column_map.get_keys_ptr()); + const auto& src_sparse_column_values = + assert_cast<const vectorized::ColumnString&>(*src_column_map.get_values_ptr()); + const auto& src_serialized_sparse_column_offsets = src_column_map.get_offsets(); + DCHECK_EQ(src_sparse_column_paths.size(), src_sparse_column_values.size()); + // Src object column contains some paths in serialized sparse column in specified range. + // Iterate over this range and insert all required paths into serialized sparse column or subcolumns. + for (size_t row = 0; row != _sparse_column->size(); ++row) { + // Use separate index to iterate over sorted sorted_src_subcolumn_for_sparse_column. + size_t sorted_src_subcolumn_for_sparse_column_idx = 0; + size_t sorted_src_subcolumn_for_sparse_column_size = _src_subcolumns_for_sparse.size(); + + size_t offset = src_serialized_sparse_column_offsets[row - 1]; + size_t end = src_serialized_sparse_column_offsets[row]; + // Iterator over [path, binary value] + for (size_t i = offset; i != end; ++i) { + const StringRef src_sparse_path_string = src_sparse_column_paths.get_data_at(i); + // Check if we have this path in subcolumns. This path already materialized in subcolumns. + // So we don't need to insert it into sparse column. + if (!_src_subcolumn_map.contains(src_sparse_path_string)) { + // Before inserting this path into sparse column check if we need to + // insert subcolumns from sorted_src_subcolumn_for_sparse_column before. + while (sorted_src_subcolumn_for_sparse_column_idx < + sorted_src_subcolumn_for_sparse_column_size && + _sorted_src_subcolumn_for_sparse[sorted_src_subcolumn_for_sparse_column_idx] + .first < src_sparse_path_string) { + auto& [src_path, src_subcolumn] = _sorted_src_subcolumn_for_sparse + [sorted_src_subcolumn_for_sparse_column_idx++]; + _serialize_nullable_column_to_sparse(src_subcolumn.get(), + dst_sparse_column_paths, + dst_sparse_column_values, src_path, row); + } + + /// Insert path and value from src sparse column to our sparse column. + dst_sparse_column_paths.insert_from(src_sparse_column_paths, i); + dst_sparse_column_values.insert_from(src_sparse_column_values, i); + } + } + + // Insert remaining dynamic paths from src_dynamic_paths_for_sparse_data. + while (sorted_src_subcolumn_for_sparse_column_idx < + sorted_src_subcolumn_for_sparse_column_size) { + auto& [src_path, src_subcolumn] = + _sorted_src_subcolumn_for_sparse[sorted_src_subcolumn_for_sparse_column_idx++]; + _serialize_nullable_column_to_sparse(src_subcolumn.get(), dst_sparse_column_paths, + dst_sparse_column_values, src_path, row); + } + + // All the sparse columns in this row are null. + dst_sparse_column_offsets.push_back(dst_sparse_column_paths.size()); + } } } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index a99c15bb12a..591b706e0e7 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -17,6 +17,8 @@ #pragma once +#include <parallel_hashmap/phmap.h> + #include <memory> #include <string_view> #include <unordered_map> @@ -164,34 +166,194 @@ private: } }; -// Extract path from sparse column -class SparseColumnExtractReader : public ColumnIterator { +// Base class for sparse column processors with common functionality +class BaseSparseColumnProcessor : public ColumnIterator { +protected: + vectorized::MutableColumnPtr _sparse_column; + StorageReadOptions* _read_opts; // Shared cache pointer + std::unique_ptr<ColumnIterator> _sparse_column_reader; + + // Pure virtual method for data processing when encounter existing sparse columns(to be implemented by subclasses) + virtual void _process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst, + size_t num_rows) = 0; + + // Pure virtual method for data processing when no sparse columns(to be implemented by subclasses) + virtual void _process_data_without_sparse_column(vectorized::MutableColumnPtr& dst, + size_t num_rows) = 0; + public: - SparseColumnExtractReader(std::string path, - std::unique_ptr<ColumnIterator>&& sparse_column_reader) - : _path(std::move(path)), _sparse_column_reader(std::move(sparse_column_reader)) { + BaseSparseColumnProcessor(std::unique_ptr<ColumnIterator>&& reader, StorageReadOptions* opts) + : _read_opts(opts), _sparse_column_reader(std::move(reader)) { _sparse_column = vectorized::ColumnObject::create_sparse_column_fn(); } - Status init(const ColumnIteratorOptions& opts) override; + // Common initialization for all processors + Status init(const ColumnIteratorOptions& opts) override { + return _sparse_column_reader->init(opts); + } - Status seek_to_first() override; + // Standard seek implementations + Status seek_to_first() override { return _sparse_column_reader->seek_to_first(); } - Status seek_to_ordinal(ordinal_t ord) override; + Status seek_to_ordinal(ordinal_t ord) override { + return _sparse_column_reader->seek_to_ordinal(ord); + } - Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override; + ordinal_t get_current_ordinal() const override { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "not implement"); + } + + // Template method pattern for batch processing + template <typename ReadMethod> + Status _process_batch(ReadMethod&& read_method, size_t nrows, + vectorized::MutableColumnPtr& dst) { + // Cache check and population logic + if (_read_opts && _read_opts->sparse_column_cache && + ColumnReader::is_compaction_reader_type(_read_opts->io_ctx.reader_type)) { + _sparse_column = _read_opts->sparse_column_cache->assume_mutable(); + } else { + _sparse_column->clear(); + RETURN_IF_ERROR(read_method()); + + if (_read_opts) { + _read_opts->sparse_column_cache = _sparse_column->assume_mutable(); + } + } + const auto& offsets = + assert_cast<const vectorized::ColumnMap&>(*_sparse_column).get_offsets(); + if (offsets.back() == offsets[-1]) { + // no sparse column in this batch + _process_data_without_sparse_column(dst, nrows); + } else { + // merge subcolumns to existing sparse columns + _process_data_with_existing_sparse_column(dst, nrows); + } + return Status::OK(); + } + + // Batch processing using template method + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { + return _process_batch( + [&]() { return _sparse_column_reader->next_batch(n, _sparse_column, has_null); }, + *n, dst); + } + + // RowID-based read using template method Status read_by_rowids(const rowid_t* rowids, const size_t count, - vectorized::MutableColumnPtr& dst) override; + vectorized::MutableColumnPtr& dst) override { + return _process_batch( + [&]() { + return _sparse_column_reader->read_by_rowids(rowids, count, _sparse_column); + }, + count, dst); + } +}; - ordinal_t get_current_ordinal() const override; +// Implementation for path extraction processor +class SparseColumnExtractReader : public BaseSparseColumnProcessor { +public: + SparseColumnExtractReader(std::string_view path, std::unique_ptr<ColumnIterator> reader, + StorageReadOptions* opts) + : BaseSparseColumnProcessor(std::move(reader), opts), _path(path) {} private: - void _fill_path_column(vectorized::MutableColumnPtr& dst); - vectorized::MutableColumnPtr _sparse_column; std::string _path; - // may shared among different column iterators - std::unique_ptr<ColumnIterator> _sparse_column_reader; + + // Fill column by finding path in sparse column + void _process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst, + size_t num_rows) override { + _fill_path_column(dst); + } + + void _fill_path_column(vectorized::MutableColumnPtr& dst); + + void _process_data_without_sparse_column(vectorized::MutableColumnPtr& dst, + size_t num_rows) override { + dst->insert_many_defaults(num_rows); + } +}; + +// Implementation for merge processor +class SparseColumnMergeReader : public BaseSparseColumnProcessor { +public: + SparseColumnMergeReader(const TabletSchema::PathSet& path_map, + std::unique_ptr<ColumnIterator>&& sparse_column_reader, + SubstreamReaderTree&& src_subcolumns_for_sparse, + StorageReadOptions* opts) + : BaseSparseColumnProcessor(std::move(sparse_column_reader), opts), + _src_subcolumn_map(path_map), + _src_subcolumns_for_sparse(src_subcolumns_for_sparse) {} + Status init(const ColumnIteratorOptions& opts) override; + + // Batch processing using template method + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override { + // read subcolumns first + RETURN_IF_ERROR(_read_subcolumns([&](SubstreamReaderTree::Node* entry) { + bool has_null = false; + return entry->data.iterator->next_batch(n, entry->data.column, &has_null); + })); + // then read sparse column + return _process_batch( + [&]() { return _sparse_column_reader->next_batch(n, _sparse_column, has_null); }, + *n, dst); + } + + // RowID-based read using template method + Status read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) override { + // read subcolumns first + RETURN_IF_ERROR(_read_subcolumns([&](SubstreamReaderTree::Node* entry) { + return entry->data.iterator->read_by_rowids(rowids, count, entry->data.column); + })); + // then read sparse column + return _process_batch( + [&]() { + return _sparse_column_reader->read_by_rowids(rowids, count, _sparse_column); + }, + count, dst); + } + + Status seek_to_first() override; + + Status seek_to_ordinal(ordinal_t ord) override; + +private: + template <typename ReadFunction> + Status _read_subcolumns(ReadFunction&& read_func) { + // clear previous data + for (auto& entry : _src_subcolumns_for_sparse) { + entry->data.column->clear(); + } + // read subcolumns + for (auto& entry : _src_subcolumns_for_sparse) { + RETURN_IF_ERROR(read_func(entry.get())); + } + return Status::OK(); + } + + // subcolumns in src tablet schema, which will be filtered + const TabletSchema::PathSet& _src_subcolumn_map; + // subcolumns to merge to sparse column + SubstreamReaderTree _src_subcolumns_for_sparse; + std::vector<std::pair<StringRef, std::shared_ptr<SubstreamReaderTree::Node>>> + _sorted_src_subcolumn_for_sparse; + + // Path filtering implementation + void _process_data_with_existing_sparse_column(vectorized::MutableColumnPtr& dst, + size_t num_rows) override { + _merge_to(dst); + } + + void _merge_to(vectorized::MutableColumnPtr& dst); + + void _process_data_without_sparse_column(vectorized::MutableColumnPtr& dst, + size_t num_rows) override; + + void _serialize_nullable_column_to_sparse(const SubstreamReaderTree::Node* src_subcolumn, + vectorized::ColumnString& dst_sparse_column_paths, + vectorized::ColumnString& dst_sparse_column_values, + const StringRef& src_path, size_t row); }; } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index df7e47854f4..58fb5341e92 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -583,7 +583,9 @@ vectorized::DataTypePtr Segment::get_data_type_of(const TabletColumn& column, ->get_reader_by_path(relative_path) : nullptr; if (node) { - if (read_flat_leaves || (node->children.empty())) { + bool exist_in_sparse = ((VariantColumnReader*)(_column_readers.at(unique_id).get())) + ->exist_in_sparse_column(relative_path); + if (read_flat_leaves || (node->children.empty() && !exist_in_sparse)) { return node->data.file_column_type; } } @@ -591,7 +593,7 @@ vectorized::DataTypePtr Segment::get_data_type_of(const TabletColumn& column, if (read_flat_leaves && !node) { return nullptr; } - // it contains children or column missing in storage, so treat it as variant + // it contains children, exist in sparse column or column missing in storage, so treat it as variant return column.is_nullable() ? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>( column.variant_max_subcolumns_count())) diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 5e2f6dcbed7..a9db0a81307 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2023,7 +2023,8 @@ void SegmentIterator::_clear_iterators() { Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { bool is_mem_reuse = block->mem_reuse(); DCHECK(is_mem_reuse); - + // Clear the sparse column cache before processing a new batch + _opts.sparse_column_cache = nullptr; SCOPED_RAW_TIMER(&_opts.stats->block_load_ns); if (UNLIKELY(!_lazy_inited)) { RETURN_IF_ERROR(_lazy_init()); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index ece06ffdcf4..1720b0ddba4 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -39,6 +39,7 @@ #include "olap/data_dir.h" #include "olap/key_coder.h" #include "olap/olap_common.h" +#include "olap/olap_define.h" #include "olap/partial_update_info.h" #include "olap/primary_key_index.h" #include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep @@ -161,14 +162,12 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id, init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i), tablet_schema); } - // add sparse column to footer - for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { - init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i), tablet_schema); - } meta->set_result_is_nullable(column.get_result_is_nullable()); meta->set_function_name(column.get_aggregation_name()); meta->set_be_exec_version(column.get_be_exec_version()); - meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count()); + if (column.is_variant_type()) { + meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count()); + } } Status SegmentWriter::init() { @@ -364,9 +363,7 @@ void SegmentWriter::_maybe_invalid_row_cache(const std::string& key) { // which will be used to contruct the new schema for rowset Status SegmentWriter::append_block_with_variant_subcolumns(vectorized::Block& data) { if (_tablet_schema->num_variant_columns() == 0 || - // need to handle sparse columns if variant_max_subcolumns_count > 0 - std::any_of(_tablet_schema->columns().begin(), _tablet_schema->columns().end(), - [](const auto& col) { return col->variant_max_subcolumns_count() > 0; })) { + !_tablet_schema->need_record_variant_extended_schema()) { return Status::OK(); } size_t column_id = _tablet_schema->num_columns(); @@ -809,6 +806,10 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po } RETURN_IF_ERROR(_column_writers[id]->append(converted_result.second->get_nullmap(), converted_result.second->get_data(), num_rows)); + + // caculate stats for variant type + // TODO it's tricky here, maybe come up with a better idea + _maybe_calculate_variant_stats(block, id, cid); } if (_has_key) { if (_is_mow_with_cluster_key()) { @@ -1324,5 +1325,38 @@ inline bool SegmentWriter::_is_mow() { inline bool SegmentWriter::_is_mow_with_cluster_key() { return _is_mow() && !_tablet_schema->cluster_key_uids().empty(); } + +// Compaction will extend sparse column and is visible during read and write, in order to +// persit variant stats info, we should do extra caculation during flushing segment, otherwise +// the info is lost +void SegmentWriter::_maybe_calculate_variant_stats(const vectorized::Block* block, size_t id, + size_t cid) { + // Only process sparse columns during compaction + if (!_tablet_schema->columns()[cid]->is_sparse_column() || + _opts.write_type != DataWriteType::TYPE_COMPACTION) { + return; + } + + // Get parent column's unique ID for matching + int64_t parent_unique_id = _tablet_schema->columns()[cid]->parent_unique_id(); + + // Find matching column in footer + for (auto& column : *_footer.mutable_columns()) { + // Check if this is the target sparse column + if (!column.has_column_path_info() || + !column.column_path_info().path().ends_with(SPARSE_COLUMN_PATH) || + column.column_path_info().parrent_column_unique_id() != parent_unique_id) { + continue; + } + + // Found matching column, calculate statistics + auto* stats = column.mutable_variant_statistics(); + vectorized::schema_util::calculate_variant_stats(*block->get_by_position(id).column, stats); + + VLOG_DEBUG << "sparse stats columns " << stats->sparse_column_non_null_size_size(); + break; + } +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 60300383d72..f505aaeaebb 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -175,6 +175,7 @@ private: Status _write_footer(); Status _write_raw_data(const std::vector<Slice>& slices); void _maybe_invalid_row_cache(const std::string& key); + void _maybe_calculate_variant_stats(const vectorized::Block* block, size_t id, size_t cid); std::string _encode_keys(const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos); // used for unique-key with merge on write and segment min_max key diff --git a/be/src/olap/rowset/segment_v2/stream_reader.h b/be/src/olap/rowset/segment_v2/stream_reader.h index 5b71e00101f..1b18436e086 100644 --- a/be/src/olap/rowset/segment_v2/stream_reader.h +++ b/be/src/olap/rowset/segment_v2/stream_reader.h @@ -37,6 +37,7 @@ struct SubstreamIterator { vectorized::MutableColumnPtr column; std::unique_ptr<ColumnIterator> iterator; std::shared_ptr<const vectorized::IDataType> type; + std::shared_ptr<vectorized::DataTypeSerDe> serde; bool inited = false; size_t rows_read = 0; SubstreamIterator() = default; diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index 2f19309520a..087de2b5e02 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -19,24 +19,121 @@ #include <fmt/core.h> #include <gen_cpp/segment_v2.pb.h> +#include <memory> +#include <set> + #include "common/config.h" #include "common/status.h" +#include "exec/decompressor.h" #include "olap/olap_common.h" +#include "olap/olap_define.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_v2/column_writer.h" #include "olap/segment_loader.h" +#include "olap/tablet_schema.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" #include "vec/columns/columns_number.h" #include "vec/common/schema_util.h" +#include "vec/data_types/data_type_factory.hpp" #include "vec/json/path_in_data.h" #include "vec/olap/olap_data_convertor.h" namespace doris::segment_v2 { +void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column, + CompressionTypePB compression_type) { + meta->Clear(); + meta->set_column_id(column_id); + meta->set_type(int(column.type())); + meta->set_length(column.length()); + meta->set_encoding(DEFAULT_ENCODING); + meta->set_compression(compression_type); + meta->set_is_nullable(column.is_nullable()); + meta->set_default_value(column.default_value()); + meta->set_precision(column.precision()); + meta->set_frac(column.frac()); + if (column.has_path_info()) { + column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(), + column.parent_unique_id()); + } + meta->set_unique_id(column.unique_id()); + for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { + _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i), + compression_type); + } + if (column.is_variant_type()) { + meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count()); + } +}; + +Status _create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletSchemaSPtr& tablet_schema, + InvertedIndexFileWriter* inverted_index_file_writer, + std::unique_ptr<ColumnWriter>* writer, + std::unique_ptr<TabletIndex>& subcolumn_index, + ColumnWriterOptions* opt, size_t none_null_value_size) { + _init_column_meta(opt->meta, cid, column, opt->compression_type); + // record none null value size for statistics + opt->meta->set_none_null_size(none_null_value_size); + opt->need_zone_map = tablet_schema->keys_type() != KeysType::AGG_KEYS; + opt->need_bloom_filter = column.is_bf_column(); + opt->need_bitmap_index = column.has_bitmap_index(); + const auto& index = tablet_schema->inverted_index(column.parent_unique_id()); + + // init inverted index + if (index != nullptr && + segment_v2::InvertedIndexColumnWriter::check_support_inverted_index(column)) { + subcolumn_index = std::make_unique<TabletIndex>(*index); + subcolumn_index->set_escaped_escaped_index_suffix_path(column.path_info_ptr()->get_path()); + opt->inverted_index = subcolumn_index.get(); + opt->need_inverted_index = true; + DCHECK(inverted_index_file_writer != nullptr); + opt->inverted_index_file_writer = inverted_index_file_writer; + } + +#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \ + if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \ + opt->need_zone_map = false; \ + opt->need_bloom_filter = false; \ + opt->need_bitmap_index = false; \ + } + + DISABLE_INDEX_IF_FIELD_TYPE(ARRAY, "array") + DISABLE_INDEX_IF_FIELD_TYPE(JSONB, "jsonb") + DISABLE_INDEX_IF_FIELD_TYPE(VARIANT, "variant") + +#undef DISABLE_INDEX_IF_FIELD_TYPE + +#undef CHECK_FIELD_TYPE + + RETURN_IF_ERROR(ColumnWriter::create(*opt, &column, opt->file_writer, writer)); + RETURN_IF_ERROR((*writer)->init()); + + return Status::OK(); +} + +Status convert_and_write_column(vectorized::OlapBlockDataConvertor* converter, + const TabletColumn& column, ColumnWriter* writer, + + const vectorized::ColumnPtr& src_column, size_t num_rows, + int column_id) { + converter->add_column_data_convertor(column); + RETURN_IF_ERROR(converter->set_source_content_with_specifid_column({src_column, nullptr, ""}, 0, + num_rows, column_id)); + auto [status, converted_column] = converter->convert_column_data(column_id); + RETURN_IF_ERROR(status); + + const uint8_t* nullmap = converted_column->get_nullmap(); + RETURN_IF_ERROR(writer->append(nullmap, converted_column->get_data(), num_rows)); + + converter->clear_source_content(); + return Status::OK(); +} + VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions& opts, const TabletColumn* column) { _opts = opts; @@ -174,6 +271,7 @@ Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* p vectorized::make_nullable(std::make_shared<vectorized::ColumnObject::MostCommonType>()); ptr->ensure_root_node_type(expected_root_type); + DCHECK_EQ(ptr->get_root()->get_ptr()->size(), num_rows); converter->add_column_data_convertor(*_tablet_column); DCHECK_EQ(ptr->get_root()->get_ptr()->size(), num_rows); RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( @@ -194,6 +292,7 @@ Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* p _opts.meta->set_num_rows(num_rows); return Status::OK(); + return Status::OK(); } Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* ptr, @@ -209,13 +308,15 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt auto full_path = full_path_builder.append(_tablet_column->name_lower_case(), false) .append(entry->path.get_parts(), false) .build(); - // set unique_id and parent_unique_id, will use parent_unique_id to get iterator correct - return vectorized::schema_util::get_column_by_type( + // set unique_id and parent_unique_id, will use unique_id to get iterator correct + auto column = vectorized::schema_util::get_column_by_type( final_data_type_from_object, column_name, vectorized::schema_util::ExtraInfo {.unique_id = _tablet_column->unique_id(), .parent_unique_id = _tablet_column->unique_id(), .path_info = full_path}); + return column; }; + _subcolumns_indexes.resize(ptr->get_subcolumns().size()); // convert sub column data from engine format to storage layer format for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) { @@ -229,26 +330,45 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt continue; } CHECK(entry->data.is_finalized()); + + // create subcolumn writer int current_column_id = column_id++; TabletColumn tablet_column = generate_column_info(entry); + ColumnWriterOptions opts; + opts.meta = _opts.footer->add_columns(); + opts.inverted_index_file_writer = _opts.inverted_index_file_writer; + opts.compression_type = _opts.compression_type; + opts.rowset_ctx = _opts.rowset_ctx; + opts.file_writer = _opts.file_writer; + std::unique_ptr<ColumnWriter> writer; vectorized::schema_util::inherit_column_attributes(*_tablet_column, tablet_column); - RETURN_IF_ERROR(_create_column_writer(current_column_id, tablet_column, - _opts.rowset_ctx->tablet_schema)); - converter->add_column_data_convertor(tablet_column); - RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( - {entry->data.get_finalized_column_ptr()->get_ptr(), - entry->data.get_least_common_type(), tablet_column.name()}, - 0, num_rows, current_column_id)); - auto [status, column] = converter->convert_column_data(current_column_id); - if (!status.ok()) { - return status; - } - const uint8_t* nullmap = column->get_nullmap(); - RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append( - nullmap, column->get_data(), num_rows)); - converter->clear_source_content(); + RETURN_IF_ERROR(_create_column_writer( + current_column_id, tablet_column, _opts.rowset_ctx->tablet_schema, + _opts.inverted_index_file_writer, &writer, _subcolumns_indexes[current_column_id], + &opts, entry->data.get_non_null_value_size())); + _subcolumn_writers.push_back(std::move(writer)); + _subcolumn_opts.push_back(opts); + + // set convertors + // converter->add_column_data_convertor(tablet_column); + // RETURN_IF_ERROR(converter->set_source_content_with_specifid_column( + // {entry->data.get_finalized_column_ptr()->get_ptr(), + // entry->data.get_least_common_type(), tablet_column.name()}, + // 0, num_rows, current_column_id)); + // auto [status, column] = converter->convert_column_data(current_column_id); + // if (!status.ok()) { + // return status; + // } + // const uint8_t* nullmap = column->get_nullmap(); + // RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append( + // nullmap, column->get_data(), num_rows)); + // converter->clear_source_content(); _subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows); + RETURN_IF_ERROR(convert_and_write_column( + converter, tablet_column, _subcolumn_writers[current_column_id - 1].get(), + entry->data.get_finalized_column_ptr()->get_ptr(), ptr->rows(), current_column_id)); + // get stastics _statistics.subcolumns_non_null_size.emplace(entry->path.get_path(), entry->data.get_non_null_value_size()); @@ -264,7 +384,7 @@ Status VariantColumnWriterImpl::_process_sparse_column( ColumnWriterOptions sparse_writer_opts; sparse_writer_opts.meta = _opts.footer->add_columns(); - _init_column_meta(sparse_writer_opts.meta, column_id, sparse_column); + _init_column_meta(sparse_writer_opts.meta, column_id, sparse_column, _opts.compression_type); RETURN_IF_ERROR(ColumnWriter::create_map_writer(sparse_writer_opts, &sparse_column, _opts.file_writer, &_sparse_column_writer)); RETURN_IF_ERROR(_sparse_column_writer->init()); @@ -306,14 +426,13 @@ Status VariantColumnWriterImpl::_process_sparse_column( for (const auto& [path, size] : sparse_data_paths_statistics) { _statistics.sparse_column_non_null_size.emplace(path.to_string(), size); } + // set statistics info + _statistics.to_pb(sparse_writer_opts.meta->mutable_variant_statistics()); sparse_writer_opts.meta->set_num_rows(num_rows); return Status::OK(); } void VariantStatistics::to_pb(VariantStatisticsPB* stats) const { - for (const auto& [path, value] : subcolumns_non_null_size) { - stats->mutable_subcolumn_non_null_size()->emplace(path, value); - } for (const auto& [path, value] : sparse_column_non_null_size) { stats->mutable_sparse_column_non_null_size()->emplace(path, value); } @@ -322,10 +441,6 @@ void VariantStatistics::to_pb(VariantStatisticsPB* stats) const { } void VariantStatistics::from_pb(const VariantStatisticsPB& stats) { - // make sure the ref of path, todo not use ref - for (const auto& [path, value] : stats.subcolumn_non_null_size()) { - subcolumns_non_null_size[path] = value; - } for (const auto& [path, value] : stats.sparse_column_non_null_size()) { sparse_column_non_null_size[path] = value; } @@ -359,14 +474,14 @@ Status VariantColumnWriterImpl::finalize() { // convert root column data from engine format to storage layer format RETURN_IF_ERROR(_process_root_column(ptr, olap_data_convertor.get(), num_rows, column_id)); - // process and append each subcolumns to sub columns writers buffer - RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(), num_rows, column_id)); + if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION) { + // process and append each subcolumns to sub columns writers buffer + RETURN_IF_ERROR(_process_subcolumns(ptr, olap_data_convertor.get(), num_rows, column_id)); - // process sparse column and append to sparse writer buffer - RETURN_IF_ERROR(_process_sparse_column(ptr, olap_data_convertor.get(), num_rows, column_id)); - - // set statistics info - _statistics.to_pb(_opts.meta->mutable_variant_statistics()); + // process sparse column and append to sparse writer buffer + RETURN_IF_ERROR( + _process_sparse_column(ptr, olap_data_convertor.get(), num_rows, column_id)); + } _is_finalized = true; return Status::OK(); @@ -396,7 +511,7 @@ uint64_t VariantColumnWriterImpl::estimate_buffer_size() { for (auto& column_writer : _subcolumn_writers) { size += column_writer->estimate_buffer_size(); } - size += _sparse_column_writer->estimate_buffer_size(); + size += _sparse_column_writer ? _sparse_column_writer->estimate_buffer_size() : 0; return size; } @@ -408,7 +523,9 @@ Status VariantColumnWriterImpl::finish() { for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->finish()); } - RETURN_IF_ERROR(_sparse_column_writer->finish()); + if (_sparse_column_writer) { + RETURN_IF_ERROR(_sparse_column_writer->finish()); + } return Status::OK(); } Status VariantColumnWriterImpl::write_data() { @@ -419,7 +536,9 @@ Status VariantColumnWriterImpl::write_data() { for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->write_data()); } - RETURN_IF_ERROR(_sparse_column_writer->write_data()); + if (_sparse_column_writer) { + RETURN_IF_ERROR(_sparse_column_writer->write_data()); + } return Status::OK(); } Status VariantColumnWriterImpl::write_ordinal_index() { @@ -430,7 +549,9 @@ Status VariantColumnWriterImpl::write_ordinal_index() { for (auto& column_writer : _subcolumn_writers) { RETURN_IF_ERROR(column_writer->write_ordinal_index()); } - RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index()); + if (_sparse_column_writer) { + RETURN_IF_ERROR(_sparse_column_writer->write_ordinal_index()); + } return Status::OK(); } @@ -489,91 +610,126 @@ Status VariantColumnWriterImpl::append_nullable(const uint8_t* null_map, const u return Status::OK(); } -void VariantColumnWriterImpl::_init_column_meta(ColumnMetaPB* meta, uint32_t column_id, - const TabletColumn& column) { - meta->set_column_id(column_id); - meta->set_type(int(column.type())); - meta->set_length(column.length()); - meta->set_encoding(DEFAULT_ENCODING); - meta->set_compression(_opts.compression_type); - meta->set_is_nullable(column.is_nullable()); - meta->set_default_value(column.default_value()); - meta->set_precision(column.precision()); - meta->set_frac(column.frac()); - if (column.has_path_info()) { - column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(), - column.parent_unique_id()); - } - meta->set_unique_id(column.unique_id()); - for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { - _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); - } -}; +VariantSubcolumnWriter::VariantSubcolumnWriter(const ColumnWriterOptions& opts, + const TabletColumn* column, + std::unique_ptr<Field> field) + : ColumnWriter(std::move(field), opts.meta->is_nullable()) { + // + _tablet_column = column; + _opts = opts; + _column = vectorized::ColumnObject::create(true); +} -Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const TabletColumn& column, - const TabletSchemaSPtr& tablet_schema) { - ColumnWriterOptions opts; - opts.meta = _opts.footer->add_columns(); - - _init_column_meta(opts.meta, cid, column); - - opts.need_zone_map = tablet_schema->keys_type() != KeysType::AGG_KEYS; - opts.need_bloom_filter = column.is_bf_column(); - - // const auto* tablet_index = tablet_schema->get_ngram_bf_index(parent_column.unique_id()); - // if (tablet_index) { - // opts.need_bloom_filter = true; - // opts.is_ngram_bf_index = true; - // //narrow convert from int32_t to uint8_t and uint16_t which is dangerous - // auto gram_size = tablet_index->get_gram_size(); - // auto gram_bf_size = tablet_index->get_gram_bf_size(); - // if (gram_size > 256 || gram_size < 1) { - // return Status::NotSupported("Do not support ngram bloom filter for ngram_size: ", - // gram_size); - // } - // if (gram_bf_size > 65535 || gram_bf_size < 64) { - // return Status::NotSupported("Do not support ngram bloom filter for bf_size: ", - // gram_bf_size); - // } - // opts.gram_size = gram_size; - // opts.gram_bf_size = gram_bf_size; - // } +Status VariantSubcolumnWriter::init() { + return Status::OK(); +} - opts.need_bitmap_index = column.has_bitmap_index(); - const auto& index = tablet_schema->inverted_index(column.parent_unique_id()); - if (index != nullptr && - segment_v2::InvertedIndexColumnWriter::check_support_inverted_index(column)) { - auto subcolumn_index = std::make_unique<TabletIndex>(*index); - subcolumn_index->set_escaped_escaped_index_suffix_path(column.path_info_ptr()->get_path()); - opts.inverted_index = subcolumn_index.get(); - opts.need_inverted_index = true; - DCHECK(_opts.inverted_index_file_writer != nullptr); - opts.inverted_index_file_writer = _opts.inverted_index_file_writer; - _subcolumns_indexes.emplace_back(std::move(subcolumn_index)); - } +Status VariantSubcolumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { + const auto& src = *reinterpret_cast<const vectorized::ColumnObject*>(*ptr); + auto* dst_ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); + dst_ptr->insert_range_from(src, 0, num_rows); + return Status::OK(); +} -#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \ - if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \ - opts.need_zone_map = false; \ - opts.need_bloom_filter = false; \ - opts.need_bitmap_index = false; \ - } +uint64_t VariantSubcolumnWriter::estimate_buffer_size() { + return _column->byte_size(); +} - DISABLE_INDEX_IF_FIELD_TYPE(ARRAY, "array") - DISABLE_INDEX_IF_FIELD_TYPE(JSONB, "jsonb") - DISABLE_INDEX_IF_FIELD_TYPE(VARIANT, "variant") +bool VariantSubcolumnWriter::is_finalized() const { + const auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); + return ptr->is_finalized() && _is_finalized; +} -#undef DISABLE_INDEX_IF_FIELD_TYPE +Status VariantSubcolumnWriter::finalize() { + auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); + ptr->finalize(); -#undef CHECK_FIELD_TYPE + DCHECK(ptr->is_finalized()); - std::unique_ptr<ColumnWriter> writer; - RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _opts.file_writer, &writer)); - RETURN_IF_ERROR(writer->init()); - _subcolumn_writers.push_back(std::move(writer)); - _subcolumn_opts.push_back(opts); + TabletColumn flush_column = vectorized::schema_util::get_column_by_type( + ptr->get_root_type(), _tablet_column->name(), + vectorized::schema_util::ExtraInfo { + .unique_id = _tablet_column->unique_id(), + .parent_unique_id = _tablet_column->parent_unique_id(), + .path_info = *_tablet_column->path_info_ptr()}); + ColumnWriterOptions opts = _opts; + const auto& parent_column = + _opts.rowset_ctx->tablet_schema->column_by_uid(_tablet_column->parent_unique_id()); + // refresh opts and get writer with flush column + vectorized::schema_util::inherit_column_attributes(parent_column, flush_column); + RETURN_IF_ERROR(_create_column_writer( + 0, flush_column, _opts.rowset_ctx->tablet_schema, _opts.inverted_index_file_writer, + &_writer, _index, &opts, + ptr->get_subcolumns().get_root()->data.get_non_null_value_size())); + _opts = opts; + auto olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>(); + int column_id = 0; + RETURN_IF_ERROR(convert_and_write_column(olap_data_convertor.get(), flush_column, _writer.get(), + ptr->get_root()->get_ptr(), ptr->rows(), column_id)); + _is_finalized = true; return Status::OK(); -}; +} + +Status VariantSubcolumnWriter::finish() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + RETURN_IF_ERROR(_writer->finish()); + return Status::OK(); +} +Status VariantSubcolumnWriter::write_data() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + RETURN_IF_ERROR(_writer->write_data()); + return Status::OK(); +} +Status VariantSubcolumnWriter::write_ordinal_index() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + RETURN_IF_ERROR(_writer->write_ordinal_index()); + return Status::OK(); +} + +Status VariantSubcolumnWriter::write_zone_map() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + if (_opts.need_zone_map) { + RETURN_IF_ERROR(_writer->write_zone_map()); + } + return Status::OK(); +} + +Status VariantSubcolumnWriter::write_bitmap_index() { + return Status::OK(); +} +Status VariantSubcolumnWriter::write_inverted_index() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + if (_opts.need_inverted_index) { + RETURN_IF_ERROR(_writer->write_inverted_index()); + } + return Status::OK(); +} +Status VariantSubcolumnWriter::write_bloom_filter_index() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + if (_opts.need_bloom_filter) { + RETURN_IF_ERROR(_writer->write_bloom_filter_index()); + } + return Status::OK(); +} + +Status VariantSubcolumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, + size_t num_rows) { + // the root contains the same nullable info + RETURN_IF_ERROR(append_data(ptr, num_rows)); + return Status::OK(); +} } // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h index 5835868c33f..d9974dd6f2e 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -65,14 +65,9 @@ public: Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows); private: - // not including root column - void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); - // subcolumn path from variant stats info to distinguish from sparse column Status _get_subcolumn_paths_from_stats(std::set<std::string>& paths); - Status _create_column_writer(uint32_t cid, const TabletColumn& column, - const TabletSchemaSPtr& tablet_schema); Status _process_root_column(vectorized::ColumnObject* ptr, vectorized::OlapBlockDataConvertor* converter, size_t num_rows, int& column_id); @@ -98,6 +93,7 @@ private: // staticstics which will be persisted in the footer VariantStatistics _statistics; + // hold the references of subcolumns indexes std::vector<std::unique_ptr<TabletIndex>> _subcolumns_indexes; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 89442676670..14694f8648f 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -164,7 +164,9 @@ void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); } - meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count()); + if (column.is_variant_type()) { + meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count()); + } } Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column, @@ -1025,8 +1027,7 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t // which will be used to contruct the new schema for rowset Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) { if (_tablet_schema->num_variant_columns() == 0 || - std::any_of(_tablet_schema->columns().begin(), _tablet_schema->columns().end(), - [](const auto& col) { return col->variant_max_subcolumns_count() > 0; })) { + !_tablet_schema->need_record_variant_extended_schema()) { return Status::OK(); } size_t column_id = _tablet_schema->num_columns(); diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index f493f21ac97..624140d3bf4 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -180,6 +180,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer( writer_options.enable_unique_key_merge_on_write = context.enable_unique_key_merge_on_write; writer_options.rowset_ctx = &context; writer_options.max_rows_per_segment = context.max_rows_per_segment; + writer_options.write_type = context.write_type; // TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index *writer = std::make_unique<segment_v2::SegmentWriter>( segment_file_writer.get(), seg_id, context.tablet_schema, context.tablet, diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 4508e1c4145..782a6dd4eca 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -27,6 +27,8 @@ #include <algorithm> #include <cctype> // IWYU pragma: no_include <bits/std_abs.h> +#include <vec/common/schema_util.h> + #include <cmath> // IWYU pragma: keep #include <memory> #include <ostream> @@ -1067,6 +1069,7 @@ void TabletSchema::copy_from(const TabletSchema& tablet_schema) { tablet_schema.to_schema_pb(&tablet_schema_pb); init_from_pb(tablet_schema_pb); _table_id = tablet_schema.table_id(); + _path_set_info_map = tablet_schema._path_set_info_map; } void TabletSchema::shawdow_copy_without_columns(const TabletSchema& tablet_schema) { @@ -1339,6 +1342,10 @@ const TabletColumn& TabletColumn::sparse_column_at(size_t ordinal) const { return *_sparse_cols[ordinal]; } +bool TabletColumn::is_sparse_column() const { + return _column_path != nullptr && _column_path->get_relative_path() == SPARSE_COLUMN_PATH; +} + const TabletColumn& TabletSchema::column_by_uid(int32_t col_unique_id) const { return *_cols.at(_field_id_to_index.at(col_unique_id)); } diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 9ff39af14ca..707ecd1bac8 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -165,7 +165,9 @@ public: // If it is an extracted column from variant column bool is_extracted_column() const { return _column_path != nullptr && !_column_path->empty() && _parent_col_unique_id > 0; - }; + } + // If it is sparse column of variant type + bool is_sparse_column() const; std::string suffix_path() const { return is_extracted_column() ? _column_path->get_path() : ""; } @@ -404,6 +406,19 @@ public: void set_storage_page_size(long storage_page_size) { _storage_page_size = storage_page_size; } long storage_page_size() const { return _storage_page_size; } + // Currently if variant_max_subcolumns_count = 0, then we need to record variant extended schema + // for compability reason + bool need_record_variant_extended_schema() const { return variant_max_subcolumns_count() == 0; } + + int32_t variant_max_subcolumns_count() const { + for (const auto& col : _cols) { + if (col->is_variant_type()) { + return col->variant_max_subcolumns_count(); + } + } + return 0; + } + const std::vector<const TabletIndex*> inverted_indexes() const { std::vector<const TabletIndex*> inverted_indexes; for (const auto& index : _indexes) { @@ -537,6 +552,21 @@ public: int64_t get_metadata_size() const override; + using PathSet = phmap::flat_hash_set<std::string>; + + struct PathsSetInfo { + PathSet sub_path_set; // extracted columns + PathSet sparse_path_set; // sparse columns + }; + + const PathsSetInfo& path_set_info(int32_t unique_id) const { + return _path_set_info_map.at(unique_id); + } + + void set_path_set_info(std::unordered_map<int32_t, PathsSetInfo>& path_set_info_map) { + _path_set_info_map = path_set_info_map; + } + private: friend bool operator==(const TabletSchema& a, const TabletSchema& b); friend bool operator!=(const TabletSchema& a, const TabletSchema& b); @@ -591,6 +621,10 @@ private: bool _variant_enable_flatten_nested = false; int64_t _vl_field_mem_size {0}; // variable length field + + // key: unique_id of column + // value: extracted path set and sparse path set + std::unordered_map<int32_t, PathsSetInfo> _path_set_info_map; }; bool operator==(const TabletSchema& a, const TabletSchema& b); diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index b8c2363fe6a..27dda8fea1f 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -40,6 +40,7 @@ public: MutableColumnPtr clone_resized(size_t s) const override { return clone_dummy(s); } size_t size() const override { return s; } + void resize(size_t _s) override { s = _s; } void insert_default() override { ++s; } void pop_back(size_t n) override { s -= n; } size_t byte_size() const override { return 0; } diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 6b2a69ca8c5..d2528bdb76c 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -2252,9 +2252,11 @@ bool ColumnObject::is_null_root() const { } bool ColumnObject::is_scalar_variant() const { - // Only root itself + const auto& sparse_offsets = serialized_sparse_column_offsets().data(); + // Only root itself is scalar, and no sparse data return !is_null_root() && subcolumns.get_leaves().size() == 1 && - subcolumns.get_root()->is_scalar(); + subcolumns.get_root()->is_scalar() && + sparse_offsets[num_rows - 1] == 0; // no sparse data } const DataTypePtr ColumnObject::NESTED_TYPE = std::make_shared<vectorized::DataTypeNullable>( diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index bef2b62e822..8d90cc1aac1 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -294,7 +294,7 @@ public: void check_consistency() const; MutableColumnPtr get_root() { - if (subcolumns.empty() || is_nothing(subcolumns.get_root()->data.get_least_common_type())) { + if (subcolumns.empty()) { return nullptr; } return subcolumns.get_mutable_root()->data.get_finalized_column_ptr()->assume_mutable(); @@ -592,6 +592,16 @@ public: const auto& value = assert_cast<const ColumnString&>(column_map.get_values()); return {&key, &value}; } + + ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() { + auto& column_map = assert_cast<ColumnMap&>(*serialized_sparse_column); + return column_map.get_offsets(); + } + + const ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() const { + const auto& column_map = assert_cast<const ColumnMap&>(*serialized_sparse_column); + return column_map.get_offsets(); + } // Insert all the data from sparse data with specified path to sub column. static void fill_path_column_from_sparse_data(Subcolumn& subcolumn, NullMap* null_map, StringRef path, @@ -622,16 +632,6 @@ private: // unnest nested type columns, and flat them into finlized array subcolumns void unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) const; - ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() { - auto& column_map = assert_cast<ColumnMap&>(*serialized_sparse_column); - return column_map.get_offsets(); - } - - const ColumnArray::Offsets64& ALWAYS_INLINE serialized_sparse_column_offsets() const { - const auto& column_map = assert_cast<const ColumnMap&>(*serialized_sparse_column); - return column_map.get_offsets(); - } - void insert_from_sparse_column_and_fill_remaing_dense_column( const ColumnObject& src, std::vector<std::pair<std::string_view, Subcolumn>>&& diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index aa07adf05a7..fb48714ef8a 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -42,6 +42,13 @@ #include "common/status.h" #include "exprs/json_functions.h" #include "olap/olap_common.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_fwd.h" +#include "olap/rowset/segment_v2/variant_column_writer_impl.h" +#include "olap/segment_loader.h" +#include "olap/tablet.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" @@ -49,6 +56,7 @@ #include "util/defer_op.h" #include "vec/columns/column.h" #include "vec/columns/column_array.h" +#include "vec/columns/column_map.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" #include "vec/columns/columns_number.h" @@ -620,11 +628,12 @@ bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* o TabletColumn create_sparse_column(const TabletColumn& variant) { TabletColumn res; - res.set_name(SPARSE_COLUMN_PATH); - res.set_unique_id(variant.unique_id()); + res.set_name(variant.name_lower_case() + "." + SPARSE_COLUMN_PATH); + res.set_unique_id(variant.parent_unique_id() > 0 ? variant.parent_unique_id() + : variant.unique_id()); res.set_type(FieldType::OLAP_FIELD_TYPE_MAP); res.set_aggregation_method(variant.aggregation()); - res.set_path_info(PathInData {SPARSE_COLUMN_PATH}); + res.set_path_info(PathInData {variant.name_lower_case() + "." + SPARSE_COLUMN_PATH}); res.set_parent_unique_id(variant.unique_id()); TabletColumn child_tcolumn; @@ -634,4 +643,179 @@ TabletColumn create_sparse_column(const TabletColumn& variant) { return res; } +using PathToNoneNullValues = std::unordered_map<std::string, size_t>; + +Status collect_path_stats(const RowsetSharedPtr& rs, + std::unordered_map<int32_t, PathToNoneNullValues>& uid_to_path_stats) { + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( + std::static_pointer_cast<BetaRowset>(rs), &segment_cache)); + + for (const auto& column : rs->tablet_schema()->columns()) { + if (!column->is_variant_type()) { + continue; + } + + for (const auto& segment : segment_cache.get_segments()) { + auto column_reader_or = segment->get_column_reader(column->unique_id()); + if (!column_reader_or.has_value()) { + continue; + } + auto* column_reader = column_reader_or.value(); + if (!column_reader) { + continue; + } + + CHECK(column_reader->get_meta_type() == FieldType::OLAP_FIELD_TYPE_VARIANT); + const auto* source_stats = + static_cast<const segment_v2::VariantColumnReader*>(column_reader)->get_stats(); + CHECK(source_stats); + + // 合并子列统计信息 + for (const auto& [path, size] : source_stats->subcolumns_non_null_size) { + uid_to_path_stats[column->unique_id()][path] += size; + } + + // 合并稀疏列统计信息 + for (const auto& [path, size] : source_stats->sparse_column_non_null_size) { + CHECK(!path.empty()); + uid_to_path_stats[column->unique_id()][path] += size; + } + } + } + return Status::OK(); +} + +void get_subpaths(const TabletColumn& variant, + const std::unordered_map<int32_t, PathToNoneNullValues>& path_stats, + std::unordered_map<int32_t, TabletSchema::PathsSetInfo>& uid_to_paths_set_info) { + for (const auto& [uid, stats] : path_stats) { + if (stats.size() > variant.variant_max_subcolumns_count()) { + // 按非空值数量排序 + std::vector<std::pair<size_t, std::string_view>> paths_with_sizes; + paths_with_sizes.reserve(stats.size()); + for (const auto& [path, size] : stats) { + paths_with_sizes.emplace_back(size, path); + } + std::sort(paths_with_sizes.begin(), paths_with_sizes.end(), std::greater()); + + // 选取前N个路径作为子列,其余路径作为稀疏列 + for (const auto& [size, path] : paths_with_sizes) { + if (uid_to_paths_set_info[uid].sub_path_set.size() < + variant.variant_max_subcolumns_count()) { + uid_to_paths_set_info[uid].sub_path_set.emplace(path); + } else { + uid_to_paths_set_info[uid].sparse_path_set.emplace(path); + } + } + } else { + // 使用所有路径 + for (const auto& [path, _] : stats) { + uid_to_paths_set_info[uid].sub_path_set.emplace(path); + } + } + } +} + +// Build the temporary schema for compaction +// 1. collect path stats from all rowsets +// 2. get the subpaths and sparse paths for each unique id +// 3. build the output schema with the subpaths and sparse paths +// 4. set the path set info for each unique id +// 5. append the subpaths and sparse paths to the output schema +// 6. return the output schema +Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets, + TabletSchemaSPtr& target) { + std::unordered_map<int32_t, PathToNoneNullValues> uid_to_path_stats; + + // 收集统计信息 + for (const auto& rs : rowsets) { + RETURN_IF_ERROR(collect_path_stats(rs, uid_to_path_stats)); + } + + // 构建输出schema + TabletSchemaSPtr output_schema = std::make_shared<TabletSchema>(); + output_schema->shawdow_copy_without_columns(*target); + std::unordered_map<int32_t, TabletSchema::PathsSetInfo> uid_to_paths_set_info; + for (const TabletColumnPtr& column : target->columns()) { + output_schema->append_column(*column); + if (!column->is_variant_type()) { + continue; + } + + // 获取子路径 + get_subpaths(*column, uid_to_path_stats, uid_to_paths_set_info); + std::vector<StringRef> sorted_subpaths( + uid_to_paths_set_info[column->unique_id()].sub_path_set.begin(), + uid_to_paths_set_info[column->unique_id()].sub_path_set.end()); + std::sort(sorted_subpaths.begin(), sorted_subpaths.end()); + // 添加子列 + for (const auto& subpath : sorted_subpaths) { + TabletColumn subcolumn; + subcolumn.set_name(column->name() + "." + subpath.to_string()); + subcolumn.set_type(FieldType::OLAP_FIELD_TYPE_VARIANT); + subcolumn.set_parent_unique_id(column->unique_id()); + subcolumn.set_path_info(PathInData(column->name() + "." + subpath.to_string())); + subcolumn.set_aggregation_method(column->aggregation()); + subcolumn.set_variant_max_subcolumns_count(column->variant_max_subcolumns_count()); + subcolumn.set_is_nullable(true); + output_schema->append_column(subcolumn); + } + // 添加稀疏列 + TabletColumn sparse_column = create_sparse_column(*column); + output_schema->append_column(sparse_column); + } + + target = output_schema; + // used to merge & filter path to sparse column during reading in compaction + target->set_path_set_info(uid_to_paths_set_info); + VLOG_DEBUG << "dump schema " << target->dump_full_schema(); + return Status::OK(); +} + +// Calculate statistics about variant data paths from the encoded sparse column +void calculate_variant_stats(const IColumn& encoded_sparse_column, + segment_v2::VariantStatisticsPB* stats) { + // Cast input column to ColumnMap type since sparse column is stored as a map + const auto& map_column = assert_cast<const ColumnMap&>(encoded_sparse_column); + + // Map to store path frequencies - tracks how many times each path appears + std::unordered_map<StringRef, size_t> sparse_data_paths_statistics; + + // Get the keys column which contains the paths as strings + const auto& sparse_data_paths = + assert_cast<const ColumnString*>(map_column.get_keys_ptr().get()); + + // Iterate through all paths in the sparse column + for (size_t i = 0; i != sparse_data_paths->size(); ++i) { + auto path = sparse_data_paths->get_data_at(i); + + // If path already exists in statistics, increment its count + if (auto it = sparse_data_paths_statistics.find(path); + it != sparse_data_paths_statistics.end()) { + ++it->second; + } + // If path doesn't exist and we haven't hit the max statistics size limit, + // add it with count 1 + else if (sparse_data_paths_statistics.size() < + VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + sparse_data_paths_statistics.emplace(path, 1); + } + } + + // Copy the collected statistics into the protobuf stats object + // This maps each path string to its frequency count + for (const auto& [path, size] : sparse_data_paths_statistics) { + const auto& sparse_path = path.to_string(); + auto it = stats->sparse_column_non_null_size().find(sparse_path); + if (it == stats->sparse_column_non_null_size().end()) { + stats->mutable_sparse_column_non_null_size()->emplace(sparse_path, size); + } else { + size_t original_size = it->second; + stats->mutable_sparse_column_non_null_size()->emplace(sparse_path, + original_size + size); + } + } +} + } // namespace doris::vectorized::schema_util diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 4a916618da6..6e3d049f199 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -27,6 +27,7 @@ #include <string> #include "common/status.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_schema.h" #include "udf/udf.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -41,7 +42,9 @@ namespace doris { enum class FieldType; - +namespace segment_v2 { +struct VariantStatisticsPB; +} // namespace segment_v2 namespace vectorized { class Block; class IColumn; @@ -87,7 +90,7 @@ TabletColumn get_column_by_type(const vectorized::DataTypePtr& data_type, const // 3. encode sparse sub columns Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos, const ParseConfig& config); -Status encode_variant_sparse_subcolumns(ColumnObject& column); +// Status encode_variant_sparse_subcolumns(ColumnObject& column); // Pick the tablet schema with the highest schema version as the reference. // Then update all variant columns to there least common types. @@ -130,4 +133,11 @@ bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* o // create ColumnMap<String, String> TabletColumn create_sparse_column(const TabletColumn& variant); +// Build the temporary schema for compaction, this will reduce the memory usage of compacting variant columns +Status get_compaction_schema(const std::vector<RowsetSharedPtr>& rowsets, TabletSchemaSPtr& target); + +// Calculate statistics about variant data paths from the encoded sparse column +void calculate_variant_stats(const IColumn& encoded_sparse_column, + segment_v2::VariantStatisticsPB* stats); + } // namespace doris::vectorized::schema_util diff --git a/be/src/vec/json/path_in_data.h b/be/src/vec/json/path_in_data.h index 8d94b02f37a..d4a84323231 100644 --- a/be/src/vec/json/path_in_data.h +++ b/be/src/vec/json/path_in_data.h @@ -73,6 +73,11 @@ public: static UInt128 get_parts_hash(const Parts& parts_); bool empty() const { return parts.empty(); } const vectorized::String& get_path() const { return path; } + // if path is v.a.b, then relative path will return a.b + // make sure the parts is not empty + std::string_view get_relative_path() const { + return {path.begin() + parts[0].key.size() + 1, path.end()}; + } const Parts& get_parts() const { return parts; } bool is_nested(size_t i) const { return parts[i].is_nested; } bool has_nested_part() const { return has_nested; } diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index c51982a8dab..37a208ad5b6 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -161,7 +161,6 @@ message ColumnPathInfo { message VariantStatisticsPB { // in the order of subcolumns in variant - map<string, uint32> subcolumn_non_null_size = 1; map<string, uint32> sparse_column_non_null_size = 2; } @@ -205,6 +204,7 @@ message ColumnMetaPB { optional int32 be_exec_version = 20; // used on agg_state type optional VariantStatisticsPB variant_statistics = 21; // only used in variant type optional int32 variant_max_subcolumns_count = 22 [default = 0]; + optional uint32 none_null_size = 23; } message PrimaryKeyIndexMetaPB { diff --git a/regression-test/suites/variant_github_events_p2/load.groovy b/regression-test/suites/variant_github_events_p2/load.groovy index 48a3507f303..2e06d207d17 100644 --- a/regression-test/suites/variant_github_events_p2/load.groovy +++ b/regression-test/suites/variant_github_events_p2/load.groovy @@ -150,6 +150,8 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){ def table_name = "github_events" sql """DROP TABLE IF EXISTS ${table_name}""" table_name = "github_events" + int rand_subcolumns_count = Math.floor(Math.random() * (611 - 511 + 1)) + 511 + // int rand_subcolumns_count = 0; sql """ CREATE TABLE IF NOT EXISTS ${table_name} ( k bigint, @@ -158,7 +160,7 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){ ) DUPLICATE KEY(`k`) DISTRIBUTED BY HASH(k) BUCKETS 4 - properties("replication_num" = "1", "disable_auto_compaction" = "true", "variant_enable_flatten_nested" = "false"); + properties("replication_num" = "1", "disable_auto_compaction" = "true", "variant_enable_flatten_nested" = "false", "variant_max_subcolumns_count" = "${rand_subcolumns_count}"); """ // 2015 @@ -227,7 +229,8 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){ ) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(k) BUCKETS 4 - properties("replication_num" = "1", "disable_auto_compaction" = "false", "variant_enable_flatten_nested" = "false"); + properties("replication_num" = "1", "disable_auto_compaction" = "false", "variant_enable_flatten_nested" = "false", + "variant_max_subcolumns_count" = "${rand_subcolumns_count}"); """ sql """insert into github_events2 select * from github_events order by k""" sql """select v['payload']['commits'] from github_events order by k ;""" diff --git a/regression-test/suites/variant_p0/update/inverted_index/load.groovy b/regression-test/suites/variant_p0/update/inverted_index/load.groovy index 79f602d2a16..495be8bcfca 100644 --- a/regression-test/suites/variant_p0/update/inverted_index/load.groovy +++ b/regression-test/suites/variant_p0/update/inverted_index/load.groovy @@ -14,8 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - -suite("update_test_index_load", "p0") { + +suite("update_test_index_load", "nonConcurrent,p0") { def load_json_data = {table_name, file_name -> // load the json data @@ -61,7 +61,8 @@ suite("update_test_index_load", "p0") { "replication_num" = "1", "disable_auto_compaction" = "true", "bloom_filter_columns" = "v", - "inverted_index_storage_format" = ${format} + "inverted_index_storage_format" = ${format}, + "variant_max_subcolumns_count" = "9999" ); """ @@ -70,6 +71,7 @@ suite("update_test_index_load", "p0") { } try { GetDebugPoint().enableDebugPointForAllBEs("segment_iterator.apply_inverted_index") + sql "set enable_common_expr_pushdown = true" sql """set enable_match_without_inverted_index = false""" sql """ set inverted_index_skip_threshold = 0 """ sql """ set enable_inverted_index_query = true """ @@ -104,3 +106,4 @@ suite("update_test_index_load", "p0") { create_table_load_data.call("test_update_index_compact2_v1", "V1") create_table_load_data.call("test_update_index_compact2_v2", "V2") } + \ No newline at end of file diff --git a/regression-test/suites/variant_p0/update/inverted_index/query.groovy b/regression-test/suites/variant_p0/update/inverted_index/query.groovy index d5bdfcc5f72..d78f2d41b59 100644 --- a/regression-test/suites/variant_p0/update/inverted_index/query.groovy +++ b/regression-test/suites/variant_p0/update/inverted_index/query.groovy @@ -19,7 +19,7 @@ import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import org.awaitility.Awaitility -suite("update_test_index_query", "p0") { +suite("update_test_index_query", "nonConcurrent,p0") { def load_json_data = {table_name, file_name -> // load the json data @@ -103,6 +103,7 @@ suite("update_test_index_query", "p0") { def normal_check = {check_table_name-> try { GetDebugPoint().enableDebugPointForAllBEs("segment_iterator.apply_inverted_index") + sql "set enable_common_expr_pushdown = true" sql """set enable_match_without_inverted_index = false""" sql """ set inverted_index_skip_threshold = 0 """ sql """ set enable_inverted_index_query = true """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org