This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/variant-sparse by this push: new 824ed73153a refactor and implement sparse column reader and stats(#45492) 824ed73153a is described below commit 824ed73153a18939d15f9b56ff6f1ca6888d0312 Author: lihangyu <lihan...@selectdb.com> AuthorDate: Mon Dec 16 22:07:10 2024 +0800 refactor and implement sparse column reader and stats(#45492) --- be/src/olap/compaction.cpp | 1 + be/src/olap/rowset/rowset_writer_context.h | 3 + be/src/olap/rowset/segment_v2/column_reader.cpp | 146 ++++++++ be/src/olap/rowset/segment_v2/column_reader.h | 37 +- be/src/olap/rowset/segment_v2/column_writer.cpp | 6 +- be/src/olap/rowset/segment_v2/column_writer.h | 4 +- .../rowset/segment_v2/hierarchical_data_reader.cpp | 364 ++++++++++++++----- .../rowset/segment_v2/hierarchical_data_reader.h | 197 +++-------- be/src/olap/rowset/segment_v2/segment.cpp | 330 ++++++------------ be/src/olap/rowset/segment_v2/segment.h | 31 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 1 + be/src/olap/rowset/segment_v2/stream_reader.h | 7 +- .../segment_v2/variant_column_writer_impl.cpp | 113 +++++- .../rowset/segment_v2/variant_column_writer_impl.h | 11 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 1 + be/src/vec/columns/column_object.cpp | 387 +++++++++++++++++---- be/src/vec/columns/column_object.h | 34 +- be/src/vec/common/schema_util.cpp | 6 +- be/src/vec/common/schema_util.h | 1 + be/src/vec/common/string_buffer.hpp | 86 +++++ .../data_types/serde/data_type_object_serde.cpp | 6 +- gensrc/proto/segment_v2.proto | 2 +- 22 files changed, 1213 insertions(+), 561 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 68ed0322a9e..1e53ddc7364 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -172,6 +172,7 @@ Status Compaction::merge_input_rowsets() { } RowsetWriterContext ctx; + ctx.input_rs_readers = input_rs_readers; RETURN_IF_ERROR(construct_output_rowset_writer(ctx)); // write merged rows to output rowset diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index cb0fda83e60..cbdba6991ae 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -115,6 +115,9 @@ struct RowsetWriterContext { // For remote rowset std::optional<StorageResource> storage_resource; + // For collect segment statistics for compaction + std::vector<RowsetReaderSharedPtr> input_rs_readers; + bool is_local_rowset() const { return !storage_resource; } std::string segment_path(int seg_id) const { diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index b96cf4f7e67..745ff3d93a3 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -44,6 +44,7 @@ #include "olap/rowset/segment_v2/bloom_filter.h" #include "olap/rowset/segment_v2/bloom_filter_index_reader.h" #include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo +#include "olap/rowset/segment_v2/hierarchical_data_reader.h" #include "olap/rowset/segment_v2/inverted_index_file_reader.h" #include "olap/rowset/segment_v2/inverted_index_reader.h" #include "olap/rowset/segment_v2/page_decoder.h" @@ -52,6 +53,7 @@ #include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/variant_column_writer_impl.h" #include "olap/rowset/segment_v2/zone_map_index.h" #include "olap/tablet_schema.h" #include "olap/types.h" // for TypeInfo @@ -220,6 +222,146 @@ Status ColumnReader::create_agg_state(const ColumnReaderOptions& opts, const Col agg_state_type->get_name(), int(type)); } +const SubcolumnColumnReaders::Node* VariantColumnReader::get_reader_by_path( + const vectorized::PathInData& relative_path) const { + return _subcolumn_readers->find_leaf(relative_path); +} + +Status VariantColumnReader::new_iterator(ColumnIterator** iterator, + const TabletColumn& target_col) { + // root column use unique id, leaf column use parent_unique_id + auto relative_path = target_col.path_info_ptr()->copy_pop_front(); + const auto* root = _subcolumn_readers->get_root(); + const auto* node = + target_col.has_path_info() ? _subcolumn_readers->find_exact(relative_path) : nullptr; + + if (node != nullptr) { + if (node->is_leaf_node()) { + // Node contains column without any child sub columns and no corresponding sparse columns + // Direct read extracted columns + const auto* node = _subcolumn_readers->find_leaf(relative_path); + RETURN_IF_ERROR(node->data.reader->new_iterator(iterator)); + } else { + // Node contains column with children columns or has correspoding sparse columns + // Create reader with hirachical data. + std::unique_ptr<ColumnIterator> sparse_iter; + if (!_sparse_column_set_in_stats.empty()) { + // Sparse column exists or reached sparse size limit, read sparse column + ColumnIterator* iter; + RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&iter)); + sparse_iter.reset(iter); + } + // If read the full path of variant read in MERGE_ROOT, otherwise READ_DIRECT + HierarchicalDataReader::ReadType read_type = + (relative_path == root->path) ? HierarchicalDataReader::ReadType::MERGE_ROOT + : HierarchicalDataReader::ReadType::READ_DIRECT; + RETURN_IF_ERROR(HierarchicalDataReader::create(iterator, relative_path, node, root, + read_type, std::move(sparse_iter))); + } + } else { + if (_sparse_column_set_in_stats.contains(StringRef {relative_path.get_path()}) || + _sparse_column_set_in_stats.size() > + VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + // Sparse column exists or reached sparse size limit, read sparse column + ColumnIterator* inner_iter; + RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter)); + *iterator = new SparseColumnExtractReader(relative_path.get_path(), + std::unique_ptr<ColumnIterator>(inner_iter)); + } else { + // Sparse column not exists and not reached stats limit, then the target path is not exist, get a default iterator + std::unique_ptr<ColumnIterator> iter; + RETURN_IF_ERROR(Segment::new_default_iterator(target_col, &iter)); + *iterator = iter.release(); + } + } + return Status::OK(); +} + +Status VariantColumnReader::init(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, + uint32_t column_id, uint64_t num_rows, + io::FileReaderSPtr file_reader) { + // init sub columns + _subcolumn_readers = std::make_unique<SubcolumnColumnReaders>(); + std::unordered_map<vectorized::PathInData, uint32_t, vectorized::PathInData::Hash> + column_path_to_footer_ordinal; + for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) { + const auto& column_pb = footer.columns(ordinal); + // column path for accessing subcolumns of variant + if (column_pb.has_column_path_info()) { + vectorized::PathInData path; + path.from_protobuf(column_pb.column_path_info()); + column_path_to_footer_ordinal.emplace(path, ordinal); + } + } + + const ColumnMetaPB& self_column_pb = footer.columns(column_id); + for (const ColumnMetaPB& column_pb : footer.columns()) { + if (column_pb.unique_id() != self_column_pb.unique_id()) { + continue; + } + DCHECK(column_pb.has_column_path_info()); + std::unique_ptr<ColumnReader> reader; + RETURN_IF_ERROR( + ColumnReader::create(opts, column_pb, footer.num_rows(), file_reader, &reader)); + vectorized::PathInData path; + path.from_protobuf(column_pb.column_path_info()); + // init sparse column + if (path.get_path() == SPARSE_COLUMN_PATH) { + RETURN_IF_ERROR(ColumnReader::create(opts, column_pb, footer.num_rows(), file_reader, + &_sparse_column_reader)); + continue; + } + // init subcolumns + auto relative_path = path.copy_pop_front(); + if (_subcolumn_readers->get_root() == nullptr) { + _subcolumn_readers->create_root(SubcolumnReader {nullptr, nullptr}); + } + if (relative_path.empty()) { + // root column + _subcolumn_readers->get_mutable_root()->modify_to_scalar(SubcolumnReader { + std::move(reader), + vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); + } else { + // check the root is already a leaf node + _subcolumn_readers->add( + relative_path, + SubcolumnReader { + std::move(reader), + vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); + } + } + + // init sparse column set in stats + if (self_column_pb.has_variant_statistics()) { + const auto& variant_stats = self_column_pb.variant_statistics(); + for (const auto& [path, _] : variant_stats.sparse_column_non_null_size()) { + _sparse_column_set_in_stats.emplace(path.data(), path.size()); + } + } + return Status::OK(); +} + +Status ColumnReader::create_variant(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, + uint32_t column_id, uint64_t num_rows, + const io::FileReaderSPtr& file_reader, + std::unique_ptr<ColumnReader>* reader) { + std::unique_ptr<VariantColumnReader> reader_local(new VariantColumnReader()); + RETURN_IF_ERROR(reader_local->init(opts, footer, column_id, num_rows, file_reader)); + *reader = std::move(reader_local); + return Status::OK(); +} + +Status ColumnReader::create(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, + uint32_t column_id, uint64_t num_rows, + const io::FileReaderSPtr& file_reader, + std::unique_ptr<ColumnReader>* reader) { + if ((FieldType)footer.columns(column_id).type() != FieldType::OLAP_FIELD_TYPE_VARIANT) { + return ColumnReader::create(opts, footer.columns(column_id), num_rows, file_reader, reader); + } + // create variant column reader with extracted columns info in footer + return create_variant(opts, footer, column_id, num_rows, file_reader, reader); +} + Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows, const io::FileReaderSPtr& file_reader, std::unique_ptr<ColumnReader>* reader) { @@ -706,6 +848,10 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterat return Status::OK(); } +Status ColumnReader::new_iterator(ColumnIterator** iterator, const TabletColumn& col) { + return new_iterator(iterator); +} + Status ColumnReader::new_iterator(ColumnIterator** iterator) { if (is_empty()) { *iterator = new EmptyFileColumnIterator(); diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index d72d802f977..d61393e820c 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -41,6 +41,7 @@ #include "olap/rowset/segment_v2/page_handle.h" // for PageHandle #include "olap/rowset/segment_v2/page_pointer.h" #include "olap/rowset/segment_v2/parsed_page.h" // for ParsedPage +#include "olap/rowset/segment_v2/stream_reader.h" #include "olap/types.h" #include "olap/utils.h" #include "util/once.h" @@ -78,6 +79,8 @@ class InvertedIndexFileReader; class PageDecoder; class RowRanges; class ZoneMapIndexReader; +// struct SubcolumnReader; +// using SubcolumnColumnReaders = vectorized::SubcolumnsTree<SubcolumnReader>; struct ColumnReaderOptions { // whether verify checksum when read page @@ -112,11 +115,16 @@ struct ColumnIteratorOptions { // This will cache data shared by all reader class ColumnReader : public MetadataAdder<ColumnReader> { public: + ColumnReader() = default; // Create an initialized ColumnReader in *reader. // This should be a lightweight operation without I/O. static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows, const io::FileReaderSPtr& file_reader, std::unique_ptr<ColumnReader>* reader); + static Status create(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, + uint32_t column_id, uint64_t num_rows, + const io::FileReaderSPtr& file_reader, + std::unique_ptr<ColumnReader>* reader); static Status create_array(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, const io::FileReaderSPtr& file_reader, std::unique_ptr<ColumnReader>* reader); @@ -129,11 +137,16 @@ public: static Status create_agg_state(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows, const io::FileReaderSPtr& file_reader, std::unique_ptr<ColumnReader>* reader); + static Status create_variant(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, + uint32_t column_id, uint64_t num_rows, + const io::FileReaderSPtr& file_reader, + std::unique_ptr<ColumnReader>* reader); enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, ALL_DICT_ENCODING }; - virtual ~ColumnReader(); + ~ColumnReader() override; // create a new column iterator. Client should delete returned iterator + virtual Status new_iterator(ColumnIterator** iterator, const TabletColumn& col); Status new_iterator(ColumnIterator** iterator); Status new_array_iterator(ColumnIterator** iterator); Status new_struct_iterator(ColumnIterator** iterator); @@ -283,6 +296,28 @@ private: DorisCallOnce<Status> _set_dict_encoding_type_once; }; +class VariantColumnReader : public ColumnReader { +public: + VariantColumnReader() = default; + + Status init(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, uint32_t column_id, + uint64_t num_rows, io::FileReaderSPtr file_reader); + Status new_iterator(ColumnIterator** iterator, const TabletColumn& col) override; + + const SubcolumnColumnReaders::Node* get_reader_by_path( + const vectorized::PathInData& relative_path) const; + + ~VariantColumnReader() override = default; + +private: + std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers; + std::unique_ptr<ColumnReader> _sparse_column_reader; + // Some sparse column record in stats, use StringRef to reduce memory usage, + // notice: make sure the ref is not released before the ColumnReader is destructed, + // used to decide whether to read from sparse column + std::unordered_set<StringRef> _sparse_column_set_in_stats; +}; + // Base iterator to read one column data class ColumnIterator { public: diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index e3cd3b17144..895589d1cd3 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -1168,7 +1168,11 @@ VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts, const TabletColumn* column, std::unique_ptr<Field> field) : ColumnWriter(std::move(field), opts.meta->is_nullable()) { _impl = std::make_unique<VariantColumnWriterImpl>(opts, column); -}; +} + +Status VariantColumnWriter::init() { + return _impl->init(); +} Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { _next_rowid += num_rows; diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index b664332ea8e..53d7c5d0234 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -73,6 +73,8 @@ struct ColumnWriterOptions { io::FileWriter* file_writer = nullptr; CompressionTypePB compression_type = UNKNOWN_COMPRESSION; RowsetWriterContext* rowset_ctx = nullptr; + // For collect segment statistics for compaction + std::vector<RowsetReaderSharedPtr> input_rs_readers; std::string to_string() const { std::stringstream ss; ss << std::boolalpha << "meta=" << meta->DebugString() @@ -480,7 +482,7 @@ public: ~VariantColumnWriter() override = default; - Status init() override { return Status::OK(); } + Status init() override; Status append_data(const uint8_t** ptr, size_t num_rows) override; diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp index c85e4b429ad..2b8e58d47f1 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp @@ -23,6 +23,7 @@ #include "io/io_common.h" #include "olap/rowset/segment_v2/column_reader.h" #include "vec/columns/column.h" +#include "vec/columns/column_map.h" #include "vec/columns/column_object.h" #include "vec/common/assert_cast.h" #include "vec/common/schema_util.h" @@ -30,14 +31,12 @@ #include "vec/data_types/data_type_nullable.h" #include "vec/json/path_in_data.h" -namespace doris { -namespace segment_v2 { +namespace doris::segment_v2 { -Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader, - vectorized::PathInData path, +Status HierarchicalDataReader::create(ColumnIterator** reader, vectorized::PathInData path, const SubcolumnColumnReaders::Node* node, - const SubcolumnColumnReaders::Node* root, - ReadType read_type) { + const SubcolumnColumnReaders::Node* root, ReadType read_type, + std::unique_ptr<ColumnIterator>&& sparse_reader) { // None leave node need merge with root auto* stream_iter = new HierarchicalDataReader(path); std::vector<const SubcolumnColumnReaders::Node*> leaves; @@ -54,14 +53,21 @@ Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader, // Eg. {"a" : "b" : {"c" : 1}}, access the `a.b` path and merge with root path so that // we could make sure the data could be fully merged, since some column may not be extracted but remains in root // like {"a" : "b" : {"e" : 1.1}} in jsonb format - if (read_type == ReadType::MERGE_SPARSE) { + if (read_type == ReadType::MERGE_ROOT) { ColumnIterator* it; RETURN_IF_ERROR(root->data.reader->new_iterator(&it)); stream_iter->set_root(std::make_unique<SubstreamIterator>( root->data.file_column_type->create_column(), std::unique_ptr<ColumnIterator>(it), root->data.file_column_type)); } - reader->reset(stream_iter); + // need read from sparse column + if (sparse_reader) { + vectorized::MutableColumnPtr sparse_column = + vectorized::ColumnObject::create_sparse_column_fn(); + stream_iter->_sparse_column_reader = std::make_unique<SubstreamIterator>( + std::move(sparse_column), std::move(sparse_reader), nullptr); + }; + *reader = stream_iter; return Status::OK(); } @@ -104,7 +110,7 @@ Status HierarchicalDataReader::next_batch(size_t* n, vectorized::MutableColumnPt CHECK(reader.inited); RETURN_IF_ERROR(reader.iterator->next_batch(n, reader.column, has_null)); VLOG_DEBUG << fmt::format("{} next_batch {} rows, type={}", path.get_path(), *n, - type->get_name()); + type ? type->get_name() : "null"); reader.rows_read += *n; return Status::OK(); }, @@ -119,7 +125,7 @@ Status HierarchicalDataReader::read_by_rowids(const rowid_t* rowids, const size_ CHECK(reader.inited); RETURN_IF_ERROR(reader.iterator->read_by_rowids(rowids, count, reader.column)); VLOG_DEBUG << fmt::format("{} read_by_rowids {} rows, type={}", path.get_path(), - count, type->get_name()); + count, type ? type->get_name() : "null"); reader.rows_read += count; return Status::OK(); }, @@ -150,95 +156,295 @@ ordinal_t HierarchicalDataReader::get_current_ordinal() const { return (*_substream_reader.begin())->data.iterator->get_current_ordinal(); } -Status ExtractReader::init(const ColumnIteratorOptions& opts) { - if (!_root_reader->inited) { - RETURN_IF_ERROR(_root_reader->iterator->init(opts)); - _root_reader->inited = true; +Status HierarchicalDataReader::_process_sub_columns( + vectorized::ColumnObject& container_variant, + const vectorized::PathsWithColumnAndType& non_nested_subcolumns) { + for (const auto& entry : non_nested_subcolumns) { + DCHECK(!entry.path.has_nested_part()); + bool add = container_variant.add_sub_column(entry.path, entry.column->assume_mutable(), + entry.type); + if (!add) { + return Status::InternalError("Duplicated {}, type {}", entry.path.get_path(), + entry.type->get_name()); + } } return Status::OK(); } -Status ExtractReader::seek_to_first() { - LOG(FATAL) << "Not implemented"; - __builtin_unreachable(); +Status HierarchicalDataReader::_process_nested_columns( + vectorized::ColumnObject& container_variant, + const std::map<vectorized::PathInData, vectorized::PathsWithColumnAndType>& + nested_subcolumns) { + using namespace vectorized; + // Iterate nested subcolumns and flatten them, the entry contains the nested subcolumns of the same nested parent + // first we pick the first subcolumn as base array and using it's offset info. Then we flatten all nested subcolumns + // into a new object column and wrap it with array column using the first element offsets.The wrapped array column + // will type the type of ColumnObject::NESTED_TYPE, whih is Nullable<ColumnArray<NULLABLE(ColumnObject)>>. + for (const auto& entry : nested_subcolumns) { + MutableColumnPtr nested_object = ColumnObject::create(true, false); + const auto* base_array = + check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column)); + MutableColumnPtr offset = base_array->get_offsets_ptr()->assume_mutable(); + auto* nested_object_ptr = assert_cast<ColumnObject*>(nested_object.get()); + // flatten nested arrays + for (const auto& subcolumn : entry.second) { + const auto& column = subcolumn.column; + const auto& type = subcolumn.type; + if (!remove_nullable(column)->is_column_array()) { + return Status::InvalidArgument( + "Meet none array column when flatten nested array, path {}, type {}", + subcolumn.path.get_path(), subcolumn.type->get_name()); + } + const auto* target_array = + check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get()); +#ifndef NDEBUG + if (!base_array->has_equal_offsets(*target_array)) { + return Status::InvalidArgument( + "Meet none equal offsets array when flatten nested array, path {}, " + "type {}", + subcolumn.path.get_path(), subcolumn.type->get_name()); + } +#endif + MutableColumnPtr flattend_column = target_array->get_data_ptr()->assume_mutable(); + DataTypePtr flattend_type = + check_and_get_data_type<DataTypeArray>(remove_nullable(type).get()) + ->get_nested_type(); + // add sub path without parent prefix + nested_object_ptr->add_sub_column( + subcolumn.path.copy_pop_nfront(entry.first.get_parts().size()), + std::move(flattend_column), std::move(flattend_type)); + } + nested_object = make_nullable(nested_object->get_ptr())->assume_mutable(); + auto array = + make_nullable(ColumnArray::create(std::move(nested_object), std::move(offset))); + PathInDataBuilder builder; + // add parent prefix + builder.append(entry.first.get_parts(), false); + PathInData parent_path = builder.build(); + // unset nested parts + parent_path.unset_nested(); + DCHECK(!parent_path.has_nested_part()); + container_variant.add_sub_column(parent_path, array->assume_mutable(), + ColumnObject::NESTED_TYPE); + } + return Status::OK(); } -Status ExtractReader::seek_to_ordinal(ordinal_t ord) { - CHECK(_root_reader->inited); - return _root_reader->iterator->seek_to_ordinal(ord); -} +Status HierarchicalDataReader::_init_container(vectorized::MutableColumnPtr& container, + size_t nrows) { + using namespace vectorized; + // build variant as container + container = ColumnObject::create(true, false); + auto& container_variant = assert_cast<ColumnObject&>(*container); -Status ExtractReader::extract_to(vectorized::MutableColumnPtr& dst, size_t nrows) { - DCHECK(_root_reader); - DCHECK(_root_reader->inited); - vectorized::ColumnNullable* nullable_column = nullptr; - if (dst->is_nullable()) { - nullable_column = assert_cast<vectorized::ColumnNullable*>(dst.get()); + // add root first + if (_path.get_parts().empty() && _root_reader) { + auto& root_var = + _root_reader->column->is_nullable() + ? assert_cast<vectorized::ColumnObject&>( + assert_cast<vectorized::ColumnNullable&>(*_root_reader->column) + .get_nested_column()) + : assert_cast<vectorized::ColumnObject&>(*_root_reader->column); + auto column = root_var.get_root(); + auto type = root_var.get_root_type(); + container_variant.add_sub_column({}, std::move(column), type); } - auto& variant = - nullable_column == nullptr - ? assert_cast<vectorized::ColumnObject&>(*dst) - : assert_cast<vectorized::ColumnObject&>(nullable_column->get_nested_column()); - const auto& root = - _root_reader->column->is_nullable() - ? assert_cast<vectorized::ColumnObject&>( - assert_cast<vectorized::ColumnNullable&>(*_root_reader->column) - .get_nested_column()) - : assert_cast<const vectorized::ColumnObject&>(*_root_reader->column); - // extract root value with path, we can't modify the original root column - // since some other column may depend on it. - vectorized::MutableColumnPtr extracted_column; - RETURN_IF_ERROR(root.extract_root( // trim the root name, eg. v.a.b -> a.b - _col.path_info_ptr()->copy_pop_front(), extracted_column)); - - if (_target_type_hint != nullptr) { - variant.create_root(_target_type_hint, _target_type_hint->create_column()); + // parent path -> subcolumns + std::map<PathInData, PathsWithColumnAndType> nested_subcolumns; + PathsWithColumnAndType non_nested_subcolumns; + RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) { + MutableColumnPtr column = node.data.column->get_ptr(); + PathInData relative_path = node.path.copy_pop_nfront(_path.get_parts().size()); + + if (node.path.has_nested_part()) { + CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()), + getTypeName(TypeIndex::Array)); + PathInData parent_path = + node.path.get_nested_prefix_path().copy_pop_nfront(_path.get_parts().size()); + nested_subcolumns[parent_path].emplace_back(relative_path, column->get_ptr(), + node.data.type); + } else { + non_nested_subcolumns.emplace_back(relative_path, column->get_ptr(), node.data.type); + } + return Status::OK(); + })); + + RETURN_IF_ERROR(_process_sub_columns(container_variant, non_nested_subcolumns)); + + RETURN_IF_ERROR(_process_nested_columns(container_variant, nested_subcolumns)); + + RETURN_IF_ERROR(_process_sparse_column(container_variant, nrows)); + return Status::OK(); +} + +// Return sub-path by specified prefix. +// For example, for prefix a.b: +// a.b.c.d -> c.d, a.b.c -> c +static std::string_view get_sub_path(const std::string_view& path, const std::string_view& prefix) { + return path.substr(prefix.size() + 1); +} + +Status HierarchicalDataReader::_process_sparse_column(vectorized::ColumnObject& container_variant, + size_t nrows) { + using namespace vectorized; + if (!_sparse_column_reader) { + container_variant.get_sparse_column()->assume_mutable()->insert_many_defaults(nrows); + return Status::OK(); } - if (variant.empty() || variant.is_null_root()) { - variant.create_root(root.get_root_type(), std::move(extracted_column)); + // process sparse column + if (_path.get_parts().empty()) { + // directly use sparse column if access root + container_variant.set_sparse_column(_sparse_column_reader->column->get_ptr()); } else { - vectorized::ColumnPtr cast_column; - const auto& expected_type = variant.get_root_type(); - RETURN_IF_ERROR(vectorized::schema_util::cast_column( - {extracted_column->get_ptr(), - vectorized::make_nullable( - std::make_shared<vectorized::ColumnObject::MostCommonType>()), - ""}, - expected_type, &cast_column)); - variant.get_root()->insert_range_from(*cast_column, 0, nrows); - // variant.set_num_rows(variant.get_root()->size()); + const auto& offsets = + assert_cast<const ColumnMap&>(*_sparse_column_reader->column).get_offsets(); + /// Check if there is no data in shared data in current range. + if (offsets.back() == offsets[-1]) { + container_variant.get_sparse_column()->assume_mutable()->insert_many_defaults(nrows); + } else { + // Read for variant sparse column + // Example path: a.b + // data: a.b.c : int|123 + // a.b.d : string|"456" + // a.e.d : string|"789" + // then the extracted sparse column will be: + // c : int|123 + // d : string|"456" + const auto& sparse_data_map = + assert_cast<const ColumnMap&>(*_sparse_column_reader->column); + const auto& src_sparse_data_offsets = sparse_data_map.get_offsets(); + const auto& src_sparse_data_paths = + assert_cast<const ColumnString&>(sparse_data_map.get_keys()); + const auto& src_sparse_data_values = + assert_cast<const ColumnString&>(sparse_data_map.get_values()); + + auto& sparse_data_offsets = + assert_cast<ColumnMap&>( + *container_variant.get_sparse_column()->assume_mutable()) + .get_offsets(); + auto [sparse_data_paths, sparse_data_values] = + container_variant.get_sparse_data_paths_and_values(); + StringRef prefix_ref(_path.get_path()); + std::string_view path_prefix(prefix_ref.data, prefix_ref.size); + for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) { + size_t start = src_sparse_data_offsets[ssize_t(i) - 1]; + size_t end = src_sparse_data_offsets[ssize_t(i)]; + size_t lower_bound_index = + vectorized::ColumnObject::find_path_lower_bound_in_sparse_data( + prefix_ref, src_sparse_data_paths, start, end); + for (; lower_bound_index != end; ++lower_bound_index) { + auto path_ref = src_sparse_data_paths.get_data_at(lower_bound_index); + std::string_view path(path_ref.data, path_ref.size); + if (!path.starts_with(path_prefix)) { + break; + } + // Don't include path that is equal to the prefix. + if (path.size() != path_prefix.size()) { + auto sub_path = get_sub_path(path, path_prefix); + sparse_data_paths->insert_data(sub_path.data(), sub_path.size()); + sparse_data_values->insert_from(src_sparse_data_values, lower_bound_index); + } + } + sparse_data_offsets.push_back(sparse_data_paths->size()); + } + } } - if (dst->is_nullable()) { - // fill nullmap - vectorized::ColumnUInt8& dst_null_map = - assert_cast<vectorized::ColumnNullable&>(*dst).get_null_map_column(); - vectorized::ColumnUInt8& src_null_map = - assert_cast<vectorized::ColumnNullable&>(*variant.get_root()).get_null_map_column(); - dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size()); + return Status::OK(); +} + +Status HierarchicalDataReader::_init_null_map_and_clear_columns( + vectorized::MutableColumnPtr& container, vectorized::MutableColumnPtr& dst, size_t nrows) { + using namespace vectorized; + // clear data in nodes + RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) { + node.data.column->clear(); + return Status::OK(); + })); + container->clear(); + _sparse_column_reader->column->clear(); + if (_root_reader) { + if (_root_reader->column->is_nullable()) { + // fill nullmap + DCHECK(dst->is_nullable()); + ColumnUInt8& dst_null_map = assert_cast<ColumnNullable&>(*dst).get_null_map_column(); + ColumnUInt8& src_null_map = + assert_cast<ColumnNullable&>(*_root_reader->column).get_null_map_column(); + dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size()); + // clear nullmap and inner data + src_null_map.clear(); + assert_cast<ColumnObject&>( + assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column()) + .clear_column_data(); + } else { + auto& root_column = assert_cast<ColumnObject&>(*_root_reader->column); + root_column.clear_column_data(); + } + } else { + if (dst->is_nullable()) { + // No nullable info exist in hirearchical data, fill nullmap with all none null + ColumnUInt8& dst_null_map = assert_cast<ColumnNullable&>(*dst).get_null_map_column(); + auto fake_nullable_column = ColumnUInt8::create(nrows, 0); + dst_null_map.insert_range_from(*fake_nullable_column, 0, nrows); + } } - _root_reader->column->clear(); -#ifndef NDEBUG - variant.check_consistency(); -#endif return Status::OK(); } -Status ExtractReader::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) { - RETURN_IF_ERROR(_root_reader->iterator->next_batch(n, _root_reader->column)); - RETURN_IF_ERROR(extract_to(dst, *n)); +Status SparseColumnExtractReader::init(const ColumnIteratorOptions& opts) { + return _sparse_column_reader->init(opts); +} + +Status SparseColumnExtractReader::seek_to_first() { + return _sparse_column_reader->seek_to_first(); +} + +Status SparseColumnExtractReader::seek_to_ordinal(ordinal_t ord) { + return _sparse_column_reader->seek_to_ordinal(ord); +} + +void SparseColumnExtractReader::_fill_path_column(vectorized::MutableColumnPtr& dst) { + vectorized::ColumnObject& var = + dst->is_nullable() + ? assert_cast<vectorized::ColumnObject&>( + assert_cast<vectorized::ColumnNullable&>(*dst).get_nested_column()) + : assert_cast<vectorized::ColumnObject&>(*dst); + DCHECK(!var.is_null_root()); + vectorized::ColumnObject::fill_path_olumn_from_sparse_data( + *var.get_subcolumn({}) /*root*/, StringRef {_path.data(), _path.size()}, + _sparse_column->get_ptr(), 0, _sparse_column->size()); + _sparse_column->clear(); +} + +Status SparseColumnExtractReader::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, + bool* has_null) { + _sparse_column->clear(); + RETURN_IF_ERROR(_sparse_column_reader->next_batch(n, _sparse_column, has_null)); + const auto& offsets = assert_cast<const vectorized::ColumnMap&>(*_sparse_column).get_offsets(); + // Check if we don't have any paths in shared data in current range. + if (offsets.back() == offsets[-1]) { + dst->insert_many_defaults(*n); + } else { + _fill_path_column(dst); + } return Status::OK(); } -Status ExtractReader::read_by_rowids(const rowid_t* rowids, const size_t count, - vectorized::MutableColumnPtr& dst) { - RETURN_IF_ERROR(_root_reader->iterator->read_by_rowids(rowids, count, _root_reader->column)); - RETURN_IF_ERROR(extract_to(dst, count)); +Status SparseColumnExtractReader::read_by_rowids(const rowid_t* rowids, const size_t count, + vectorized::MutableColumnPtr& dst) { + _sparse_column->clear(); + RETURN_IF_ERROR(_sparse_column_reader->read_by_rowids(rowids, count, _sparse_column)); + const auto& offsets = assert_cast<const vectorized::ColumnMap&>(*_sparse_column).get_offsets(); + // Check if we don't have any paths in shared data in current range. + if (offsets.back() == offsets[-1]) { + dst->insert_many_defaults(count); + } else { + _fill_path_column(dst); + } return Status::OK(); } -ordinal_t ExtractReader::get_current_ordinal() const { - return _root_reader->iterator->get_current_ordinal(); +ordinal_t SparseColumnExtractReader::get_current_ordinal() const { + return _sparse_column_reader->get_current_ordinal(); } -} // namespace segment_v2 -} // namespace doris +} // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h index 6c8ced89cd2..5d58f666f62 100644 --- a/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h +++ b/be/src/olap/rowset/segment_v2/hierarchical_data_reader.h @@ -18,8 +18,11 @@ #pragma once #include <memory> +#include <string_view> #include <unordered_map> +#include <utility> +#include "common/status.h" #include "io/io_common.h" #include "olap/field.h" #include "olap/iterators.h" @@ -49,13 +52,14 @@ namespace doris::segment_v2 { class HierarchicalDataReader : public ColumnIterator { public: // Currently two types of read, merge sparse columns with root columns, or read directly - enum class ReadType { MERGE_SPARSE, READ_DIRECT }; + enum class ReadType { MERGE_ROOT, READ_DIRECT }; HierarchicalDataReader(const vectorized::PathInData& path) : _path(path) {} - static Status create(std::unique_ptr<ColumnIterator>* reader, vectorized::PathInData path, + static Status create(ColumnIterator** reader, vectorized::PathInData path, const SubcolumnColumnReaders::Node* target_node, - const SubcolumnColumnReaders::Node* root, ReadType read_type); + const SubcolumnColumnReaders::Node* root, ReadType read_type, + std::unique_ptr<ColumnIterator>&& sparse_reader); Status init(const ColumnIteratorOptions& opts) override; @@ -77,6 +81,7 @@ public: private: SubstreamReaderTree _substream_reader; std::unique_ptr<SubstreamIterator> _root_reader; + std::unique_ptr<SubstreamIterator> _sparse_column_reader; size_t _rows_read = 0; vectorized::PathInData _path; @@ -87,6 +92,29 @@ private: } return Status::OK(); } + + Status _process_sub_columns(vectorized::ColumnObject& container_variant, + const vectorized::PathsWithColumnAndType& non_nested_subcolumns); + + Status _process_nested_columns( + vectorized::ColumnObject& container_variant, + const std::map<vectorized::PathInData, vectorized::PathsWithColumnAndType>& + nested_subcolumns); + + Status _process_sparse_column(vectorized::ColumnObject& container_variant, size_t nrows); + + // 1. add root column + // 2. collect path for subcolumns and nested subcolumns + // 3. init container with subcolumns + // 4. init container with nested subcolumns + // 5. init container with sparse column + Status _init_container(vectorized::MutableColumnPtr& container, size_t nrows); + + // clear all subcolumns's column data for next batch read + // set null map for nullable column + Status _init_null_map_and_clear_columns(vectorized::MutableColumnPtr& container, + vectorized::MutableColumnPtr& dst, size_t nrows); + // process read template <typename ReadFunction> Status process_read(ReadFunction&& read_func, vectorized::MutableColumnPtr& dst, size_t nrows) { @@ -112,162 +140,36 @@ private: return Status::OK(); })); - // build variant as container - auto container = ColumnObject::create(true, false); - auto& container_variant = assert_cast<ColumnObject&>(*container); - - // add root first - if (_path.get_parts().empty() && _root_reader) { - auto& root_var = - _root_reader->column->is_nullable() - ? assert_cast<vectorized::ColumnObject&>( - assert_cast<vectorized::ColumnNullable&>( - *_root_reader->column) - .get_nested_column()) - : assert_cast<vectorized::ColumnObject&>(*_root_reader->column); - auto column = root_var.get_root(); - auto type = root_var.get_root_type(); - container_variant.add_sub_column({}, std::move(column), type); + // read sparse column + if (_sparse_column_reader) { + RETURN_IF_ERROR(read_func(*_sparse_column_reader, {}, nullptr)); } - // parent path -> subcolumns - std::map<PathInData, PathsWithColumnAndType> nested_subcolumns; - PathsWithColumnAndType non_nested_subcolumns; - RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) { - MutableColumnPtr column = node.data.column->get_ptr(); - PathInData relative_path = node.path.copy_pop_nfront(_path.get_parts().size()); - - if (node.path.has_nested_part()) { - CHECK_EQ(getTypeName(remove_nullable(node.data.type)->get_type_id()), - getTypeName(TypeIndex::Array)); - PathInData parent_path = node.path.get_nested_prefix_path().copy_pop_nfront( - _path.get_parts().size()); - nested_subcolumns[parent_path].emplace_back(relative_path, column->get_ptr(), - node.data.type); - } else { - non_nested_subcolumns.emplace_back(relative_path, column->get_ptr(), - node.data.type); - } - return Status::OK(); - })); - for (auto& entry : non_nested_subcolumns) { - DCHECK(!entry.path.has_nested_part()); - bool add = container_variant.add_sub_column(entry.path, entry.column->assume_mutable(), - entry.type); - if (!add) { - return Status::InternalError("Duplicated {}, type {}", entry.path.get_path(), - entry.type->get_name()); - } - } - // Iterate nested subcolumns and flatten them, the entry contains the nested subcolumns of the same nested parent - // first we pick the first subcolumn as base array and using it's offset info. Then we flatten all nested subcolumns - // into a new object column and wrap it with array column using the first element offsets.The wrapped array column - // will type the type of ColumnObject::NESTED_TYPE, whih is Nullable<ColumnArray<NULLABLE(ColumnObject)>>. - for (auto& entry : nested_subcolumns) { - MutableColumnPtr nested_object = ColumnObject::create(true, false); - const auto* base_array = - check_and_get_column<ColumnArray>(remove_nullable(entry.second[0].column)); - MutableColumnPtr offset = base_array->get_offsets_ptr()->assume_mutable(); - auto* nested_object_ptr = assert_cast<ColumnObject*>(nested_object.get()); - // flatten nested arrays - for (const auto& subcolumn : entry.second) { - const auto& column = subcolumn.column; - const auto& type = subcolumn.type; - if (!remove_nullable(column)->is_column_array()) { - return Status::InvalidArgument( - "Meet none array column when flatten nested array, path {}, type {}", - subcolumn.path.get_path(), subcolumn.type->get_name()); - } - const auto* target_array = - check_and_get_column<ColumnArray>(remove_nullable(subcolumn.column).get()); -#ifndef NDEBUG - if (!base_array->has_equal_offsets(*target_array)) { - return Status::InvalidArgument( - "Meet none equal offsets array when flatten nested array, path {}, " - "type {}", - subcolumn.path.get_path(), subcolumn.type->get_name()); - } -#endif - MutableColumnPtr flattend_column = target_array->get_data_ptr()->assume_mutable(); - DataTypePtr flattend_type = - check_and_get_data_type<DataTypeArray>(remove_nullable(type).get()) - ->get_nested_type(); - // add sub path without parent prefix - nested_object_ptr->add_sub_column( - subcolumn.path.copy_pop_nfront(entry.first.get_parts().size()), - std::move(flattend_column), std::move(flattend_type)); - } - nested_object = make_nullable(nested_object->get_ptr())->assume_mutable(); - auto array = - make_nullable(ColumnArray::create(std::move(nested_object), std::move(offset))); - PathInDataBuilder builder; - // add parent prefix - builder.append(entry.first.get_parts(), false); - PathInData parent_path = builder.build(); - // unset nested parts - parent_path.unset_nested(); - DCHECK(!parent_path.has_nested_part()); - container_variant.add_sub_column(parent_path, array->assume_mutable(), - ColumnObject::NESTED_TYPE); - } + MutableColumnPtr container; + RETURN_IF_ERROR(_init_container(container, nrows)); + auto& container_variant = assert_cast<ColumnObject&>(*container); - // TODO select v:b -> v.b / v.b.c but v.d maybe in v - // copy container variant to dst variant, todo avoid copy variant.insert_range_from(container_variant, 0, nrows); - // variant.set_num_rows(nrows); _rows_read += nrows; variant.finalize(); #ifndef NDEBUG variant.check_consistency(); #endif - // clear data in nodes - RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) { - node.data.column->clear(); - return Status::OK(); - })); - container->clear(); - if (_root_reader) { - if (_root_reader->column->is_nullable()) { - // fill nullmap - DCHECK(dst->is_nullable()); - ColumnUInt8& dst_null_map = - assert_cast<ColumnNullable&>(*dst).get_null_map_column(); - ColumnUInt8& src_null_map = - assert_cast<ColumnNullable&>(*_root_reader->column).get_null_map_column(); - dst_null_map.insert_range_from(src_null_map, 0, src_null_map.size()); - // clear nullmap and inner data - src_null_map.clear(); - assert_cast<ColumnObject&>( - assert_cast<ColumnNullable&>(*_root_reader->column).get_nested_column()) - .clear_column_data(); - } else { - ColumnObject& root_column = assert_cast<ColumnObject&>(*_root_reader->column); - root_column.clear_column_data(); - } - } else { - if (dst->is_nullable()) { - // No nullable info exist in hirearchical data, fill nullmap with all none null - ColumnUInt8& dst_null_map = - assert_cast<ColumnNullable&>(*dst).get_null_map_column(); - auto fake_nullable_column = ColumnUInt8::create(nrows, 0); - dst_null_map.insert_range_from(*fake_nullable_column, 0, nrows); - } - } + RETURN_IF_ERROR(_init_null_map_and_clear_columns(container, dst, nrows)); return Status::OK(); } }; -// Extract from root column of variant, since root column of variant -// encodes sparse columns that are not materialized -class ExtractReader : public ColumnIterator { +// Extract path from sparse column +class SparseColumnExtractReader : public ColumnIterator { public: - ExtractReader(const TabletColumn& col, std::unique_ptr<SubstreamIterator>&& root_reader, - vectorized::DataTypePtr target_type_hint) - : _col(col), - _root_reader(std::move(root_reader)), - _target_type_hint(target_type_hint) {} + SparseColumnExtractReader(std::string path, + std::unique_ptr<ColumnIterator>&& sparse_column_reader) + : _path(std::move(path)), _sparse_column_reader(std::move(sparse_column_reader)) { + _sparse_column = vectorized::ColumnObject::create_sparse_column_fn(); + } Status init(const ColumnIteratorOptions& opts) override; @@ -283,12 +185,11 @@ public: ordinal_t get_current_ordinal() const override; private: - Status extract_to(vectorized::MutableColumnPtr& dst, size_t nrows); - - TabletColumn _col; + void _fill_path_column(vectorized::MutableColumnPtr& dst); + vectorized::MutableColumnPtr _sparse_column; + std::string _path; // may shared among different column iterators - std::unique_ptr<SubstreamIterator> _root_reader; - vectorized::DataTypePtr _target_type_hint; + std::unique_ptr<ColumnIterator> _sparse_column_reader; }; } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index a50ada112f9..441e839e6ef 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -43,7 +43,6 @@ #include "olap/rowset/rowset_reader_context.h" #include "olap/rowset/segment_v2/column_reader.h" #include "olap/rowset/segment_v2/empty_segment_iterator.h" -#include "olap/rowset/segment_v2/hierarchical_data_reader.h" #include "olap/rowset/segment_v2/indexed_column_reader.h" #include "olap/rowset/segment_v2/inverted_index_file_reader.h" #include "olap/rowset/segment_v2/page_io.h" @@ -51,6 +50,7 @@ #include "olap/rowset/segment_v2/segment_iterator.h" #include "olap/rowset/segment_v2/segment_writer.h" // k_segment_magic_length #include "olap/rowset/segment_v2/stream_reader.h" +#include "olap/rowset/segment_v2/variant_column_writer_impl.h" #include "olap/schema.h" #include "olap/short_key_index.h" #include "olap/tablet_schema.h" @@ -201,9 +201,24 @@ Status Segment::_open() { // 0.01 comes from PrimaryKeyIndexBuilder::init _meta_mem_usage += BloomFilter::optimal_bit_num(_num_rows, 0.01) / 8; + uint32_t ordinal = 0; + for (const auto& column_meta : _footer_pb->columns()) { + // unique_id < 0 means this column is extracted column from variant + if (static_cast<int>(column_meta.unique_id()) >= 0) { + _column_id_to_footer_ordinal[column_meta.unique_id()] = ordinal++; + } + } return Status::OK(); } +const ColumnMetaPB* Segment::get_column_meta(int32_t unique_id) const { + auto it = _column_id_to_footer_ordinal.find(unique_id); + if (it == _column_id_to_footer_ordinal.end()) { + return nullptr; + } + return &_footer_pb->columns(it->second); +} + Status Segment::_open_inverted_index() { _inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>( _fs, @@ -233,7 +248,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o if (col.is_extracted_column()) { auto relative_path = col.path_info_ptr()->copy_pop_front(); int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id(); - const auto* node = _sub_column_tree[unique_id].find_exact(relative_path); + const auto* node = ((VariantColumnReader*)(_column_readers.at(unique_id).get())) + ->get_reader_by_path(relative_path); reader = node != nullptr ? node->data.reader.get() : nullptr; } else { reader = _column_readers.contains(col.unique_id()) @@ -558,20 +574,17 @@ vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& identi auto relative_path = identifier.path->copy_pop_front(); int32_t unique_id = identifier.unique_id > 0 ? identifier.unique_id : identifier.parent_unique_id; - const auto* node = _sub_column_tree.contains(unique_id) - ? _sub_column_tree.at(unique_id).find_leaf(relative_path) + const auto* node = _column_readers.contains(unique_id) + ? ((VariantColumnReader*)(_column_readers.at(unique_id).get())) + ->get_reader_by_path(relative_path) : nullptr; - const auto* sparse_node = - _sparse_column_tree.contains(unique_id) - ? _sparse_column_tree.at(unique_id).find_exact(relative_path) - : nullptr; if (node) { - if (read_flat_leaves || (node->children.empty() && sparse_node == nullptr)) { + if (read_flat_leaves || (node->children.empty())) { return node->data.file_column_type; } } // missing in storage, treat it using input data type - if (read_flat_leaves && !node && !sparse_node) { + if (read_flat_leaves && !node) { return nullptr; } // it contains children or column missing in storage, so treat it as variant @@ -592,28 +605,11 @@ Status Segment::_create_column_readers_once() { } Status Segment::_create_column_readers(const SegmentFooterPB& footer) { - std::unordered_map<uint32_t, uint32_t> column_id_to_footer_ordinal; - std::unordered_map<vectorized::PathInData, uint32_t, vectorized::PathInData::Hash> - column_path_to_footer_ordinal; - for (uint32_t ordinal = 0; ordinal < footer.columns().size(); ++ordinal) { - const auto& column_pb = footer.columns(ordinal); - // column path for accessing subcolumns of variant - if (column_pb.has_column_path_info()) { - vectorized::PathInData path; - path.from_protobuf(column_pb.column_path_info()); - column_path_to_footer_ordinal.emplace(path, ordinal); - } - // unique_id is unsigned, -1 meaning no unique id(e.g. an extracted column from variant) - if (static_cast<int>(column_pb.unique_id()) >= 0) { - // unique id - column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal); - } - } // init by unique_id for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); ++ordinal) { const auto& column = _tablet_schema->column(ordinal); - auto iter = column_id_to_footer_ordinal.find(column.unique_id()); - if (iter == column_id_to_footer_ordinal.end()) { + auto iter = _column_id_to_footer_ordinal.find(column.unique_id()); + if (iter == _column_id_to_footer_ordinal.end()) { continue; } @@ -622,40 +618,40 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) { .be_exec_version = _be_exec_version, }; std::unique_ptr<ColumnReader> reader; - RETURN_IF_ERROR(ColumnReader::create(opts, footer.columns(iter->second), footer.num_rows(), + RETURN_IF_ERROR(ColumnReader::create(opts, footer, iter->second, footer.num_rows(), _file_reader, &reader)); _column_readers.emplace(column.unique_id(), std::move(reader)); } - for (const auto& [path, ordinal] : column_path_to_footer_ordinal) { - const ColumnMetaPB& column_pb = footer.columns(ordinal); - ColumnReaderOptions opts { - .kept_in_memory = _tablet_schema->is_in_memory(), - .be_exec_version = _be_exec_version, - }; - std::unique_ptr<ColumnReader> reader; - RETURN_IF_ERROR( - ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader)); - int32_t unique_id = column_pb.unique_id(); - auto relative_path = path.copy_pop_front(); - if (_sub_column_tree[unique_id].get_root() == nullptr) { - _sub_column_tree[unique_id].create_root(SubcolumnReader {nullptr, nullptr}); - } - if (relative_path.empty()) { - // root column - _sub_column_tree[unique_id].get_mutable_root()->modify_to_scalar(SubcolumnReader { - std::move(reader), - vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); - } else { - // check the root is already a leaf node - // DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty()); - _sub_column_tree[unique_id].add( - relative_path, - SubcolumnReader { - std::move(reader), - vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); - } - } + // for (const auto& [path, ordinal] : column_path_to_footer_ordinal) { + // const ColumnMetaPB& column_pb = footer.columns(ordinal); + // ColumnReaderOptions opts { + // .kept_in_memory = _tablet_schema->is_in_memory(), + // .be_exec_version = _be_exec_version, + // }; + // std::unique_ptr<ColumnReader> reader; + // RETURN_IF_ERROR( + // ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader)); + // int32_t unique_id = column_pb.unique_id(); + // auto relative_path = path.copy_pop_front(); + // if (_sub_column_tree[unique_id].get_root() == nullptr) { + // _sub_column_tree[unique_id].create_root(SubcolumnReader {nullptr, nullptr}); + // } + // if (relative_path.empty()) { + // // root column + // _sub_column_tree[unique_id].get_mutable_root()->modify_to_scalar(SubcolumnReader { + // std::move(reader), + // vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); + // } else { + // // check the root is already a leaf node + // // DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty()); + // _sub_column_tree[unique_id].add( + // relative_path, + // SubcolumnReader { + // std::move(reader), + // vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); + // } + // } // compability reason use tablet schema // init by column path @@ -716,8 +712,8 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) { return Status::OK(); } -static Status new_default_iterator(const TabletColumn& tablet_column, - std::unique_ptr<ColumnIterator>* iter) { +Status Segment::new_default_iterator(const TabletColumn& tablet_column, + std::unique_ptr<ColumnIterator>* iter) { if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) { return Status::InternalError( "invalid nonexistent column without default value. column_uid={}, column_name={}, " @@ -736,147 +732,48 @@ static Status new_default_iterator(const TabletColumn& tablet_column, return Status::OK(); } -Status Segment::_new_iterator_with_variant_root(const TabletColumn& tablet_column, - std::unique_ptr<ColumnIterator>* iter, - const SubcolumnColumnReaders::Node* root, - vectorized::DataTypePtr target_type_hint) { - ColumnIterator* it; - RETURN_IF_ERROR(root->data.reader->new_iterator(&it)); - auto* stream_iter = new ExtractReader( - tablet_column, - std::make_unique<SubstreamIterator>(root->data.file_column_type->create_column(), - std::unique_ptr<ColumnIterator>(it), - root->data.file_column_type), - target_type_hint); - iter->reset(stream_iter); - return Status::OK(); -} - -Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, - std::unique_ptr<ColumnIterator>* iter, - const StorageReadOptions* opt) { - // root column use unique id, leaf column use parent_unique_id - int32_t unique_id = tablet_column.unique_id() > 0 ? tablet_column.unique_id() - : tablet_column.parent_unique_id(); - if (!_sub_column_tree.contains(unique_id)) { - // No such variant column in this segment, get a default one - RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); - return Status::OK(); - } - auto relative_path = tablet_column.path_info_ptr()->copy_pop_front(); - const auto* root = _sub_column_tree[unique_id].get_root(); - const auto* node = tablet_column.has_path_info() - ? _sub_column_tree[unique_id].find_exact(relative_path) - : nullptr; - const auto* sparse_node = - tablet_column.has_path_info() && _sparse_column_tree.contains(unique_id) - ? _sparse_column_tree[unique_id].find_exact(relative_path) - : nullptr; - // // Currently only compaction and checksum need to read flat leaves - // // They both use tablet_schema_with_merged_max_schema_version as read schema - // auto type_to_read_flat_leaves = [](ReaderType type) { - // return type == ReaderType::READER_BASE_COMPACTION || - // type == ReaderType::READER_CUMULATIVE_COMPACTION || - // type == ReaderType::READER_COLD_DATA_COMPACTION || - // type == ReaderType::READER_SEGMENT_COMPACTION || - // type == ReaderType::READER_FULL_COMPACTION || type == ReaderType::READER_CHECKSUM; - // }; - - // // find the sibling of the nested column to fill the target nested column - // auto new_default_iter_with_same_nested = [&](const TabletColumn& tablet_column, - // std::unique_ptr<ColumnIterator>* iter) { - // // We find node that represents the same Nested type as path. - // const auto* parent = _sub_column_tree[unique_id].find_best_match(relative_path); - // VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path() << " parent " - // << (parent ? parent->path.get_path() : "nullptr") << ", type " - // << ", parent is nested " << (parent ? parent->is_nested() : false) << ", " - // << TabletColumn::get_string_by_field_type(tablet_column.type()); - // // find it's common parent with nested part - // // why not use parent->path->has_nested_part? because parent may not be a leaf node - // // none leaf node may not contain path info - // // Example: - // // {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : "a@b"}}]}} - // // nested node path : payload.commits(NESTED) - // // tablet_column path_info : payload.commits.issue.id(SCALAR) - // // parent path node : payload.commits.issue(TUPLE) - // // leaf path_info : payload.commits.issue.email(SCALAR) - // if (parent && SubcolumnColumnReaders::find_parent( - // parent, [](const auto& node) { return node.is_nested(); })) { - // /// Find any leaf of Nested subcolumn. - // const auto* leaf = SubcolumnColumnReaders::find_leaf( - // parent, [](const auto& node) { return node.path.has_nested_part(); }); - // assert(leaf); - // std::unique_ptr<ColumnIterator> sibling_iter; - // ColumnIterator* sibling_iter_ptr; - // RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr)); - // sibling_iter.reset(sibling_iter_ptr); - // *iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter), - // leaf->data.file_column_type); - // } else { - // *iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, nullptr); - // } - // return Status::OK(); - // }; - - // if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) { - // // compaction need to read flat leaves nodes data to prevent from amplification - // const auto* node = tablet_column.has_path_info() - // ? _sub_column_tree[unique_id].find_leaf(relative_path) - // : nullptr; - // if (!node) { - // // sparse_columns have this path, read from root - // if (sparse_node != nullptr && sparse_node->is_leaf_node()) { - // RETURN_IF_ERROR(_new_iterator_with_variant_root( - // tablet_column, iter, root, sparse_node->data.file_column_type)); - // } else { - // if (tablet_column.is_nested_subcolumn()) { - // // using the sibling of the nested column to fill the target nested column - // RETURN_IF_ERROR(new_default_iter_with_same_nested(tablet_column, iter)); - // } else { - // RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); - // } - // } - // return Status::OK(); - // } - // ColumnIterator* it; - // RETURN_IF_ERROR(node->data.reader->new_iterator(&it)); - // iter->reset(it); - // return Status::OK(); - // } - - if (node != nullptr) { - if (node->is_leaf_node() && sparse_node == nullptr) { - // Node contains column without any child sub columns and no corresponding sparse columns - // Direct read extracted columns - const auto* node = _sub_column_tree[unique_id].find_leaf(relative_path); - ColumnIterator* it; - RETURN_IF_ERROR(node->data.reader->new_iterator(&it)); - iter->reset(it); - } else { - // Node contains column with children columns or has correspoding sparse columns - // Create reader with hirachical data. - // If sparse column exists or read the full path of variant read in MERGE_SPARSE, otherwise READ_DIRECT - HierarchicalDataReader::ReadType read_type = - (relative_path == root->path) || sparse_node != nullptr - ? HierarchicalDataReader::ReadType::MERGE_SPARSE - : HierarchicalDataReader::ReadType::READ_DIRECT; - RETURN_IF_ERROR( - HierarchicalDataReader::create(iter, relative_path, node, root, read_type)); - } - } else { - // No such node, read from either sparse column or default column - if (sparse_node != nullptr) { - // sparse columns have this path, read from root - RETURN_IF_ERROR(_new_iterator_with_variant_root(tablet_column, iter, root, - sparse_node->data.file_column_type)); - } else { - // No such variant column in this segment, get a default one - RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); - } - } - - return Status::OK(); -} +// Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, +// std::unique_ptr<ColumnIterator>* iter, +// const StorageReadOptions* opt) { +// // root column use unique id, leaf column use parent_unique_id +// int32_t unique_id = tablet_column.unique_id() > 0 ? tablet_column.unique_id() +// : tablet_column.parent_unique_id(); +// if (!_sub_column_tree.contains(unique_id)) { +// // No such variant column in this segment, get a default one +// RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); +// return Status::OK(); +// } +// auto relative_path = tablet_column.path_info_ptr()->copy_pop_front(); +// const auto* root = _sub_column_tree[unique_id].get_root(); +// const auto* node = tablet_column.has_path_info() +// ? _sub_column_tree[unique_id].find_exact(relative_path) +// : nullptr; +// +// if (node != nullptr) { +// if (node->is_leaf_node()) { +// // Node contains column without any child sub columns and no corresponding sparse columns +// // Direct read extracted columns +// const auto* node = _sub_column_tree[unique_id].find_leaf(relative_path); +// ColumnIterator* it; +// RETURN_IF_ERROR(node->data.reader->new_iterator(&it)); +// iter->reset(it); +// } else { +// // Node contains column with children columns or has correspoding sparse columns +// // Create reader with hirachical data. +// // If sparse column exists or read the full path of variant read in MERGE_ROOT, otherwise READ_DIRECT +// HierarchicalDataReader::ReadType read_type = +// (relative_path == root->path) ? HierarchicalDataReader::ReadType::MERGE_ROOT +// : HierarchicalDataReader::ReadType::READ_DIRECT; +// RETURN_IF_ERROR( +// HierarchicalDataReader::create(iter, relative_path, node, root, read_type)); +// } +// } else { +// // No such node, read from sparse column +// // TODO test if in VariantStatisticsPB.sparse_column_non_null_size, otherwise generate a default iterator +// } +// +// return Status::OK(); +// } // Not use cid anymore, for example original table schema is colA int, then user do following actions // 1.add column b @@ -894,46 +791,43 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column, RETURN_IF_ERROR(_create_column_readers_once()); // init column iterator by path info - if (tablet_column.has_path_info() || tablet_column.is_variant_type()) { - return new_column_iterator_with_path(tablet_column, iter, opt); - } + // if (tablet_column.has_path_info() || tablet_column.is_variant_type()) { + // return new_column_iterator_with_path(tablet_column, iter, opt); + // } + + // For compability reason unique_id may less than 0 for variant extracted column + int32_t unique_id = tablet_column.unique_id() > 0 ? tablet_column.unique_id() + : tablet_column.parent_unique_id(); // init default iterator - if (!_column_readers.contains(tablet_column.unique_id())) { + if (!_column_readers.contains(unique_id)) { RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); return Status::OK(); } // init iterator by unique id ColumnIterator* it; - RETURN_IF_ERROR(_column_readers.at(tablet_column.unique_id())->new_iterator(&it)); + RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it, tablet_column)); iter->reset(it); if (config::enable_column_type_check && !tablet_column.is_agg_state_type() && - tablet_column.type() != _column_readers.at(tablet_column.unique_id())->get_meta_type()) { + tablet_column.type() != _column_readers.at(unique_id)->get_meta_type()) { LOG(WARNING) << "different type between schema and column reader," << " column schema name: " << tablet_column.name() << " column schema type: " << int(tablet_column.type()) << " column reader meta type: " - << int(_column_readers.at(tablet_column.unique_id())->get_meta_type()); + << int(_column_readers.at(unique_id)->get_meta_type()); return Status::InternalError("different type between schema and column reader"); } return Status::OK(); } -Status Segment::new_column_iterator(int32_t unique_id, std::unique_ptr<ColumnIterator>* iter) { - RETURN_IF_ERROR(_create_column_readers_once()); - ColumnIterator* it; - RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it)); - iter->reset(it); - return Status::OK(); -} - ColumnReader* Segment::_get_column_reader(const TabletColumn& col) { // init column iterator by path info if (col.has_path_info() || col.is_variant_type()) { auto relative_path = col.path_info_ptr()->copy_pop_front(); int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id(); const auto* node = col.has_path_info() - ? _sub_column_tree[unique_id].find_exact(relative_path) + ? ((VariantColumnReader*)(_column_readers.at(unique_id).get())) + ->get_reader_by_path(relative_path) : nullptr; if (node != nullptr) { return node->data.reader.get(); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index bc5ab1e1fdc..877f74ae1c3 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -37,14 +37,12 @@ #include "olap/olap_common.h" #include "olap/rowset/segment_v2/column_reader.h" // ColumnReader #include "olap/rowset/segment_v2/page_handle.h" -#include "olap/rowset/segment_v2/stream_reader.h" #include "olap/schema.h" #include "olap/tablet_schema.h" #include "runtime/descriptors.h" #include "util/once.h" #include "util/slice.h" #include "vec/columns/column.h" -#include "vec/columns/subcolumn_tree.h" #include "vec/data_types/data_type.h" #include "vec/data_types/data_type_nullable.h" #include "vec/json/path_in_data.h" @@ -107,11 +105,9 @@ public: std::unique_ptr<ColumnIterator>* iter, const StorageReadOptions* opt); - Status new_column_iterator_with_path(const TabletColumn& tablet_column, - std::unique_ptr<ColumnIterator>* iter, - const StorageReadOptions* opt); - - Status new_column_iterator(int32_t unique_id, std::unique_ptr<ColumnIterator>* iter); + // Status new_column_iterator_with_path(const TabletColumn& tablet_column, + // std::unique_ptr<ColumnIterator>* iter, + // const StorageReadOptions* opt); Status new_bitmap_index_iterator(const TabletColumn& tablet_column, std::unique_ptr<BitmapIndexIterator>* iter); @@ -120,7 +116,8 @@ public: const TabletIndex* index_meta, const StorageReadOptions& read_options, std::unique_ptr<InvertedIndexIterator>* iter); - + static Status new_default_iterator(const TabletColumn& tablet_column, + std::unique_ptr<ColumnIterator>* iter); const ShortKeyIndexDecoder* get_short_key_index() const { DCHECK(_load_index_once.has_called() && _load_index_once.stored_result().ok()); return _sk_index_decoder.get(); @@ -211,6 +208,8 @@ public: const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; } + const ColumnMetaPB* get_column_meta(int32_t unique_id) const; + private: DISALLOW_COPY_AND_ASSIGN(Segment); Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, @@ -226,11 +225,6 @@ private: Status _load_pk_bloom_filter(); ColumnReader* _get_column_reader(const TabletColumn& col); - // Get Iterator which will read variant root column and extract with paths and types info - Status _new_iterator_with_variant_root(const TabletColumn& tablet_column, - std::unique_ptr<ColumnIterator>* iter, - const SubcolumnColumnReaders::Node* root, - vectorized::DataTypePtr target_type_hint); Status _write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data, io::IOContext& io_ctx); @@ -269,15 +263,6 @@ private: // map column unique id ---> it's inner data type std::map<int32_t, std::shared_ptr<const vectorized::IDataType>> _file_column_types; - // Each node in the tree represents the sub column reader and type - // for variants. - // map column unique id --> it's sub column readers - std::map<int32_t, SubcolumnColumnReaders> _sub_column_tree; - - // each sprase column's path and types info - // map column unique id --> it's sparse sub column readers - std::map<int32_t, SubcolumnColumnReaders> _sparse_column_tree; - // used to guarantee that short key index will be loaded at most once in a thread-safe way DorisCallOnce<Status> _load_index_once; // used to guarantee that primary key bloom filter will be loaded at most once in a thread-safe way @@ -303,6 +288,8 @@ private: int _be_exec_version = BeExecVersionManager::get_newest_version(); OlapReaderStatistics* _pk_index_load_stats = nullptr; + // unique_id -> idx in footer.columns() + std::unordered_map<int32_t, uint32_t> _column_id_to_footer_ordinal; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 1bfcfbb999b..b76acf68978 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -285,6 +285,7 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& co opts.file_writer = _file_writer; opts.compression_type = _opts.compression_type; opts.footer = &_footer; + opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers; std::unique_ptr<ColumnWriter> writer; RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer)); diff --git a/be/src/olap/rowset/segment_v2/stream_reader.h b/be/src/olap/rowset/segment_v2/stream_reader.h index 9aac3c0f232..5b71e00101f 100644 --- a/be/src/olap/rowset/segment_v2/stream_reader.h +++ b/be/src/olap/rowset/segment_v2/stream_reader.h @@ -19,9 +19,14 @@ #include <memory> -#include "olap/rowset/segment_v2/column_reader.h" +// #include "olap/rowset/segment_v2/column_reader.h" +#include "vec/columns/column.h" +#include "vec/columns/subcolumn_tree.h" +#include "vec/data_types/data_type.h" namespace doris::segment_v2 { +class ColumnIterator; +class ColumnReader; // This file Defined ColumnIterator and ColumnReader for reading variant subcolumns. The types from read schema and from storage are // different, so we need to wrap the ColumnIterator from execution phase and storage column reading phase.And we also diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp index 958df5780bd..761cbec8c49 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -16,9 +16,14 @@ // under the License. #include "olap/rowset/segment_v2/variant_column_writer_impl.h" +#include <gen_cpp/segment_v2.pb.h> + #include "common/status.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/rowset_fwd.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_v2/column_writer.h" +#include "olap/segment_loader.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" @@ -31,11 +36,90 @@ namespace doris::segment_v2 { VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions& opts, const TabletColumn* column) { _opts = opts; - _column = vectorized::ColumnObject::create(true, false); - if (column->is_nullable()) { + _tablet_column = column; +} + +Status VariantColumnWriterImpl::init() { + // caculate stats info + std::set<std::string> dynamic_paths; + RETURN_IF_ERROR(_get_subcolumn_paths_from_stats(dynamic_paths)); + if (dynamic_paths.empty()) { + _column = vectorized::ColumnObject::create(true, false); + } else { + vectorized::ColumnObject::Subcolumns dynamic_subcolumns; + for (const auto& path : dynamic_paths) { + dynamic_subcolumns.add(vectorized::PathInData(path), + vectorized::ColumnObject::Subcolumn {0, true}); + } + _column = vectorized::ColumnObject::create(std::move(dynamic_subcolumns), true); + } + if (_tablet_column->is_nullable()) { _null_column = vectorized::ColumnUInt8::create(0); } - _tablet_column = column; + return Status::OK(); +} + +Status VariantColumnWriterImpl::_get_subcolumn_paths_from_stats(std::set<std::string>& paths) { + std::unordered_map<std::string, size_t> path_to_total_number_of_non_null_values; + + // Merge and collect all stats info from all input rowsets/segments + for (RowsetReaderSharedPtr reader : _opts.input_rs_readers) { + SegmentCacheHandle segment_cache; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( + std::static_pointer_cast<BetaRowset>(reader->rowset()), &segment_cache)); + for (const auto& segment : segment_cache.get_segments()) { + const auto* column_meta_pb = segment->get_column_meta(_tablet_column->unique_id()); + if (!column_meta_pb) { + continue; + } + if (!column_meta_pb->has_variant_statistics()) { + continue; + } + const VariantStatisticsPB& source_statistics = column_meta_pb->variant_statistics(); + for (const auto& [path, size] : source_statistics.subcolumn_non_null_size()) { + auto it = path_to_total_number_of_non_null_values.find(path); + if (it == path_to_total_number_of_non_null_values.end()) { + it = path_to_total_number_of_non_null_values.emplace(path, 0).first; + } + it->second += size; + } + for (const auto& [path, size] : source_statistics.sparse_column_non_null_size()) { + auto it = path_to_total_number_of_non_null_values.find(path); + if (it == path_to_total_number_of_non_null_values.end()) { + it = path_to_total_number_of_non_null_values.emplace(path, 0).first; + } + it->second += size; + } + } + } + // Check if the number of all dynamic paths exceeds the limit. + if (path_to_total_number_of_non_null_values.size() > vectorized::ColumnObject::MAX_SUBCOLUMNS) { + // Sort paths by total number of non null values. + std::vector<std::pair<size_t, std::string_view>> paths_with_sizes; + paths_with_sizes.reserve(path_to_total_number_of_non_null_values.size()); + for (const auto& [path, size] : path_to_total_number_of_non_null_values) { + paths_with_sizes.emplace_back(size, path); + } + std::sort(paths_with_sizes.begin(), paths_with_sizes.end(), std::greater()); + + // Fill dynamic_paths with first max_dynamic_paths paths in sorted list. + for (const auto& [size, path] : paths_with_sizes) { + if (paths.size() < vectorized::ColumnObject::MAX_SUBCOLUMNS) { + paths.emplace(path); + } + // // todo : Add all remaining paths into shared data statistics until we reach its max size; + // else if (new_statistics.sparse_data_paths_statistics.size() < Statistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { + // new_statistics.sparse_data_paths_statistics.emplace(path, size); + // } + } + } else { + // Use all dynamic paths from all source columns. + for (const auto& [path, _] : path_to_total_number_of_non_null_values) { + paths.emplace(path); + } + } + + return Status::OK(); } Status VariantColumnWriterImpl::_process_root_column(vectorized::ColumnObject* ptr, @@ -92,7 +176,6 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt .parent_unique_id = _tablet_column->unique_id(), .path_info = full_path}); }; - _statistics._subcolumns_non_null_size.reserve(ptr->get_subcolumns().size()); // convert sub column data from engine format to storage layer format for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) { @@ -121,7 +204,8 @@ Status VariantColumnWriterImpl::_process_subcolumns(vectorized::ColumnObject* pt _subcolumn_opts[current_column_id - 1].meta->set_num_rows(num_rows); // get stastics - _statistics._subcolumns_non_null_size.push_back(entry->data.get_non_null_value_size()); + _statistics._subcolumns_non_null_size.emplace(entry->path.get_path(), + entry->data.get_non_null_value_size()); } return Status::OK(); } @@ -163,7 +247,7 @@ Status VariantColumnWriterImpl::_process_sparse_column( it != _statistics._sparse_column_non_null_size.end()) { ++it->second; } else if (_statistics._sparse_column_non_null_size.size() < - VariantStatistics::MAX_SHARED_DATA_STATISTICS_SIZE) { + VariantStatistics::MAX_SPARSE_DATA_STATISTICS_SIZE) { _statistics._sparse_column_non_null_size.emplace(path, 1); } } @@ -173,7 +257,22 @@ Status VariantColumnWriterImpl::_process_sparse_column( } void VariantStatistics::to_pb(VariantStatisticsPB* stats) const { - // TODO + for (const auto& [path, value] : _sparse_column_non_null_size) { + stats->mutable_subcolumn_non_null_size()->emplace(path.to_string(), value); + } + for (const auto& [path, value] : _sparse_column_non_null_size) { + stats->mutable_sparse_column_non_null_size()->emplace(path.to_string(), value); + } +} + +void VariantStatistics::from_pb(const VariantStatisticsPB& stats) { + // make sure the ref of path, todo not use ref + for (const auto& [path, value] : stats.subcolumn_non_null_size()) { + _subcolumns_non_null_size[StringRef(path.data(), path.size())] = value; + } + for (const auto& [path, value] : stats.sparse_column_non_null_size()) { + _sparse_column_non_null_size[StringRef(path.data(), path.size())] = value; + } } Status VariantColumnWriterImpl::finalize() { diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h index 87f67e7b1ef..66c5269e7ce 100644 --- a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -36,18 +36,20 @@ class ColumnWriter; class ScalarColumnWriter; struct VariantStatistics { - constexpr static size_t MAX_SHARED_DATA_STATISTICS_SIZE = 10000; - std::vector<size_t> _subcolumns_non_null_size; + // If reached the size of this, we should stop writing statistics for sparse data + constexpr static size_t MAX_SPARSE_DATA_STATISTICS_SIZE = 10000; + std::map<StringRef, size_t> _subcolumns_non_null_size; std::map<StringRef, size_t> _sparse_column_non_null_size; void to_pb(VariantStatisticsPB* stats) const; + void from_pb(const VariantStatisticsPB& stats); }; class VariantColumnWriterImpl { public: VariantColumnWriterImpl(const ColumnWriterOptions& opts, const TabletColumn* column); Status finalize(); - + Status init(); bool is_finalized() const; Status append_data(const uint8_t** ptr, size_t num_rows); @@ -65,6 +67,9 @@ public: private: void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); + // subcolumn path from variant stats info to distinguish from sparse column + Status _get_subcolumn_paths_from_stats(std::set<std::string>& paths); + Status _create_column_writer(uint32_t cid, const TabletColumn& column, const TabletColumn& parent_column, const TabletSchemaSPtr& tablet_schema); diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 089dac218fe..6abc2f5c16e 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -280,6 +280,7 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo opts.file_writer = _file_writer; opts.compression_type = _opts.compression_type; opts.footer = &_footer; + opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers; std::unique_ptr<ColumnWriter> writer; RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer)); diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 9d6e260724b..11130e628e9 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -356,6 +356,7 @@ ColumnObject::Subcolumn::Subcolumn(MutableColumnPtr&& data_, DataTypePtr type, b : least_common_type(type), is_nullable(is_nullable_), is_root(is_root_) { data.push_back(std::move(data_)); data_types.push_back(type); + data_serdes.push_back(type->get_serde()); } ColumnObject::Subcolumn::Subcolumn(size_t size_, bool is_nullable_, bool is_root_) @@ -398,6 +399,7 @@ void ColumnObject::Subcolumn::add_new_column_part(DataTypePtr type) { data.push_back(type->create_column()); least_common_type = LeastCommonType {type}; data_types.push_back(type); + data_serdes.push_back(type->get_serde()); } void ColumnObject::Subcolumn::insert(Field field, FieldInfo info) { @@ -800,6 +802,9 @@ ColumnObject::ColumnObject(bool is_nullable_, bool create_root_) } } +ColumnObject::ColumnObject(MutableColumnPtr&& sparse_column) + : is_nullable(true), serialized_sparse_column(std::move(sparse_column)) {} + ColumnObject::ColumnObject(bool is_nullable_, DataTypePtr type, MutableColumnPtr&& column) : is_nullable(is_nullable_) { add_sub_column({}, std::move(column), type); @@ -957,6 +962,27 @@ void ColumnObject::insert_default() { ++num_rows; } +bool ColumnObject::Subcolumn::is_null_at(size_t n) const { + if (least_common_type.get_base_type_id() == TypeIndex::Nothing) { + return true; + } + size_t ind = n; + if (ind < num_of_defaults_in_prefix) { + return true; + } + + ind -= num_of_defaults_in_prefix; + for (const auto& part : data) { + if (ind < part->size()) { + return assert_cast<const ColumnNullable&>(*part).is_null_at(ind); + } + ind -= part->size(); + } + + throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range", + n); +} + void ColumnObject::Subcolumn::get(size_t n, Field& res) const { if (least_common_type.get_base_type_id() == TypeIndex::Nothing) { res = Null(); @@ -1023,8 +1049,9 @@ void ColumnObject::Subcolumn::serialize_to_sparse_column(ColumnString* key, std: is_null = false; // insert key key->insert_data(path.data(), path.size()); + const auto& part_type_serde = data_serdes[i]; // insert value - data_types[i]->get_serde()->write_one_cell_to_binary(*part, value, row); + part_type_serde->write_one_cell_to_binary(*part, value, row); } return; } @@ -1088,7 +1115,7 @@ const char* parse_binary_from_sparse_column(TypeIndex type, const char* data, Fi const size_t size = *reinterpret_cast<const size_t*>(data); data += sizeof(size_t); res = Array(size); - vectorized::Array& array = res.get<Array>(); + auto& array = res.get<Array>(); info_res.num_dimensions++; for (size_t i = 0; i < size; ++i) { const uint8_t is_null = *reinterpret_cast<const uint8_t*>(data++); @@ -1097,7 +1124,7 @@ const char* parse_binary_from_sparse_column(TypeIndex type, const char* data, Fi continue; } Field nested_field; - const TypeIndex nested_type = + const auto nested_type = assert_cast<const TypeIndex>(*reinterpret_cast<const uint8_t*>(data++)); data = parse_binary_from_sparse_column(nested_type, data, nested_field, info_res); array.emplace_back(std::move(nested_field)); @@ -1113,7 +1140,7 @@ const char* parse_binary_from_sparse_column(TypeIndex type, const char* data, Fi } std::pair<Field, FieldInfo> ColumnObject::deserialize_from_sparse_column(const ColumnString* value, - size_t row) const { + size_t row) { const auto& data_ref = value->get_data_at(row); const char* data = data_ref.data; DCHECK(data_ref.size > 0); @@ -1132,7 +1159,7 @@ std::pair<Field, FieldInfo> ColumnObject::deserialize_from_sparse_column(const C } DCHECK(data_ref.size > 1); - const TypeIndex type = assert_cast<const TypeIndex>(*reinterpret_cast<const uint8_t*>(data++)); + const auto type = assert_cast<const TypeIndex>(*reinterpret_cast<const uint8_t*>(data++)); info_res.scalar_type_id = type; Field res; const char* end = parse_binary_from_sparse_column(type, data, res, info_res); @@ -1171,7 +1198,7 @@ void ColumnObject::get(size_t n, Field& res) const { // Iterator over [path, binary value] for (size_t i = offset; i != end; ++i) { const StringRef path_data = path->get_data_at(i); - const auto& data = deserialize_from_sparse_column(value, i); + const auto& data = ColumnObject::deserialize_from_sparse_column(value, i); object.try_emplace(std::string(path_data.data, path_data.size), data.first); } @@ -1360,7 +1387,8 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column( const PathInData column_path(src_sparse_path); if (auto* subcolumn = get_subcolumn(column_path); subcolumn != nullptr) { // Deserialize binary value into subcolumn from src serialized sparse column data. - const auto& data = src.deserialize_from_sparse_column(src_sparse_column_values, i); + const auto& data = + ColumnObject::deserialize_from_sparse_column(src_sparse_column_values, i); subcolumn->insert(data.first, data.second); } else { // Before inserting this path into sparse column check if we need to @@ -1386,7 +1414,7 @@ void ColumnObject::insert_from_sparse_column_and_fill_remaing_dense_column( } } - // Insert remaining dynamic paths from src_dynamic_paths_for_shared_data. + // Insert remaining dynamic paths from src_dynamic_paths_for_sparse_data. while (sorted_src_subcolumn_for_sparse_column_idx < sorted_src_subcolumn_for_sparse_column_size) { auto& [src_path, src_subcolumn] = sorted_src_subcolumn_for_sparse_column @@ -1452,6 +1480,37 @@ const ColumnObject::Subcolumn* ColumnObject::get_subcolumn(const PathInData& key return &node->data; } +size_t ColumnObject::Subcolumn::serialize_text_json(size_t n, BufferWritable& output) const { + if (least_common_type.get_base_type_id() == TypeIndex::Nothing) { + output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(), + DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size()); + return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size(); + } + + size_t ind = n; + if (ind < num_of_defaults_in_prefix) { + output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(), + DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size()); + return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size(); + } + + ind -= num_of_defaults_in_prefix; + DataTypeSerDe::FormatOptions opt; + for (size_t i = 0; i < data.size(); ++i) { + const auto& part = data[i]; + const auto& part_type_serde = data_serdes[i]; + + if (ind < part->size()) { + return part_type_serde->serialize_one_cell_to_json(*part, ind, output, opt); + } + + ind -= part->size(); + } + + throw doris::Exception(ErrorCode::OUT_OF_BOUND, + "Index ({}) for serializing JSON is out of range", n); +} + const ColumnObject::Subcolumn* ColumnObject::get_subcolumn_with_cache(const PathInData& key, size_t key_index) const { // Optimization by caching the order of fields (which is almost always the same) @@ -1717,89 +1776,203 @@ void get_json_by_column_tree(rapidjson::Value& root, rapidjson::Document::Alloca } Status ColumnObject::serialize_one_row_to_string(int64_t row, std::string* output) const { - if (!is_finalized()) { - const_cast<ColumnObject*>(this)->finalize(); - } - rapidjson::StringBuffer buf; - if (is_scalar_variant()) { + // if (!is_finalized()) { + // const_cast<ColumnObject*>(this)->finalize(); + // } + if (is_scalar_variant() && is_finalized()) { auto type = get_root_type(); *output = type->to_string(*get_root(), row); return Status::OK(); } - RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr)); - // TODO avoid copy - *output = std::string(buf.GetString(), buf.GetSize()); + // TODO preallocate memory + auto tmp_col = ColumnString::create(); + VectorBufferWriter write_buffer(*tmp_col.get()); + RETURN_IF_ERROR(serialize_one_row_to_json_format(row, write_buffer, nullptr)); + write_buffer.commit(); + auto str_ref = tmp_col->get_data_at(0); + *output = std::string(str_ref.data, str_ref.size); return Status::OK(); } Status ColumnObject::serialize_one_row_to_string(int64_t row, BufferWritable& output) const { - if (!is_finalized()) { - const_cast<ColumnObject*>(this)->finalize(); - } - if (is_scalar_variant()) { + // if (!is_finalized()) { + // const_cast<ColumnObject*>(this)->finalize(); + // } + if (is_scalar_variant() && is_finalized()) { auto type = get_root_type(); type->to_string(*get_root(), row, output); return Status::OK(); } - rapidjson::StringBuffer buf; - RETURN_IF_ERROR(serialize_one_row_to_json_format(row, &buf, nullptr)); - output.write(buf.GetString(), buf.GetLength()); + RETURN_IF_ERROR(serialize_one_row_to_json_format(row, output, nullptr)); return Status::OK(); } -Status ColumnObject::serialize_one_row_to_json_format(int64_t row, rapidjson::StringBuffer* output, - bool* is_null) const { - CHECK(is_finalized()); - if (subcolumns.empty()) { - if (is_null != nullptr) { - *is_null = true; - } else { - rapidjson::Value root(rapidjson::kNullType); - rapidjson::Writer<rapidjson::StringBuffer> writer(*output); - if (!root.Accept(writer)) { - return Status::InternalError("Failed to serialize json value"); +/// Struct that represents elements of the JSON path. +/// "a.b.c" -> ["a", "b", "c"] +struct PathElements { + explicit PathElements(const String& path) { + const char* start = path.data(); + const char* end = start + path.size(); + const char* pos = start; + const char* last_dot_pos = pos - 1; + for (pos = start; pos != end; ++pos) { + if (*pos == '.') { + elements.emplace_back(last_dot_pos + 1, size_t(pos - last_dot_pos - 1)); + last_dot_pos = pos; } } - return Status::OK(); + + elements.emplace_back(last_dot_pos + 1, size_t(pos - last_dot_pos - 1)); } - CHECK(size() > row); - rapidjson::StringBuffer buffer; - rapidjson::Value root(rapidjson::kNullType); - if (doc_structure == nullptr) { - doc_structure = std::make_shared<rapidjson::Document>(); - rapidjson::Document::AllocatorType& allocator = doc_structure->GetAllocator(); - get_json_by_column_tree(*doc_structure, allocator, subcolumns.get_root()); + + size_t size() const { return elements.size(); } + + std::vector<std::string_view> elements; +}; + +/// Struct that represents a prefix of a JSON path. Used during output of the JSON object. +struct Prefix { + /// Shrink current prefix to the common prefix of current prefix and specified path. + /// For example, if current prefix is a.b.c.d and path is a.b.e, then shrink the prefix to a.b. + void shrink_to_common_prefix(const PathElements& path_elements) { + /// Don't include last element in path_elements in the prefix. + size_t i = 0; + while (i != elements.size() && i != (path_elements.elements.size() - 1) && + elements[i].first == path_elements.elements[i]) + ++i; + elements.resize(i); } - if (!doc_structure->IsNull()) { - root.CopyFrom(*doc_structure, doc_structure->GetAllocator()); + + /// Check is_first flag in current object. + bool is_first_in_current_object() const { + if (elements.empty()) return root_is_first_flag; + return elements.back().second; } - Arena mem_pool; -#ifndef NDEBUG - VLOG_DEBUG << "dump structure " << JsonFunctions::print_json_value(*doc_structure); -#endif - for (const auto& subcolumn : subcolumns) { - RETURN_IF_ERROR(find_and_set_leave_value( - subcolumn->data.get_finalized_column_ptr(), subcolumn->path, - subcolumn->data.get_least_common_type_serde(), - subcolumn->data.get_least_common_type(), - subcolumn->data.least_common_type.get_base_type_id(), root, - doc_structure->GetAllocator(), mem_pool, row)); - if (subcolumn->path.empty() && !root.IsObject()) { - // root was modified, only handle root node - break; - } + + /// Set flag is_first = false in current object. + void set_not_first_in_current_object() { + if (elements.empty()) + root_is_first_flag = false; + else + elements.back().second = false; } - compact_null_values(root, doc_structure->GetAllocator()); - if (root.IsNull() && is_null != nullptr) { - // Fast path - *is_null = true; - } else { - output->Clear(); - rapidjson::Writer<rapidjson::StringBuffer> writer(*output); - if (!root.Accept(writer)) { - return Status::InternalError("Failed to serialize json value"); + + size_t size() const { return elements.size(); } + + /// Elements of the prefix: (path element, is_first flag in this prefix). + /// is_first flag indicates if we already serialized some key in the object with such prefix. + std::vector<std::pair<std::string_view, bool>> elements; + bool root_is_first_flag = true; +}; + +Status ColumnObject::serialize_one_row_to_json_format(int64_t row_num, BufferWritable& output, + bool* is_null) const { + const auto& column_map = assert_cast<const ColumnMap&>(*serialized_sparse_column); + const auto& sparse_data_offsets = column_map.get_offsets(); + const auto [sparse_data_paths, sparse_data_values] = get_sparse_data_paths_and_values(); + size_t sparse_data_offset = sparse_data_offsets[static_cast<ssize_t>(row_num) - 1]; + size_t sparse_data_end = sparse_data_offsets[static_cast<ssize_t>(row_num)]; + + // We need to convert the set of paths in this row to a JSON object. + // To do it, we first collect all the paths from current row, then we sort them + // and construct the resulting JSON object by iterating over sorted list of paths. + // For example: + // b.c, a.b, a.a, b.e, g, h.u.t -> a.a, a.b, b.c, b.e, g, h.u.t -> {"a" : {"a" : ..., "b" : ...}, "b" : {"c" : ..., "e" : ...}, "g" : ..., "h" : {"u" : {"t" : ...}}}. + std::vector<String> sorted_paths; + std::map<std::string, Subcolumn> subcolumn_path_map; + sorted_paths.reserve(get_subcolumns().size() + (sparse_data_end - sparse_data_offset)); + for (const auto& subcolumn : get_subcolumns()) { + /// We consider null value and absence of the path in a row as equivalent cases, because we cannot actually distinguish them. + /// So, we don't output null values at all. + if (!subcolumn->data.is_null_at(row_num)) { + sorted_paths.emplace_back(subcolumn->path.get_path()); + } + subcolumn_path_map.emplace(subcolumn->path.get_path(), subcolumn->data); + } + for (size_t i = sparse_data_offset; i != sparse_data_end; ++i) { + auto path = sparse_data_paths->get_data_at(i).to_string(); + sorted_paths.emplace_back(path); + } + + std::sort(sorted_paths.begin(), sorted_paths.end()); + + writeChar('{', output); + size_t index_in_sparse_data_values = sparse_data_offset; + // current_prefix represents the path of the object we are currently serializing keys in. + Prefix current_prefix; + for (const auto& path : sorted_paths) { + PathElements path_elements(path); + // Change prefix to common prefix between current prefix and current path. + // If prefix changed (it can only decrease), close all finished objects. + // For example: + // Current prefix: a.b.c.d + // Current path: a.b.e.f + // It means now we have : {..., "a" : {"b" : {"c" : {"d" : ... + // Common prefix will be a.b, so it means we should close objects a.b.c.d and a.b.c: {..., "a" : {"b" : {"c" : {"d" : ...}} + // and continue serializing keys in object a.b + size_t prev_prefix_size = current_prefix.size(); + current_prefix.shrink_to_common_prefix(path_elements); + size_t prefix_size = current_prefix.size(); + if (prefix_size != prev_prefix_size) { + size_t objects_to_close = prev_prefix_size - prefix_size; + for (size_t i = 0; i != objects_to_close; ++i) { + writeChar('}', output); + } + } + + // Now we are inside object that has common prefix with current path. + // We should go inside all objects in current path. + // From the example above we should open object a.b.e: + // {..., "a" : {"b" : {"c" : {"d" : ...}}, "e" : { + if (prefix_size + 1 < path_elements.size()) { + for (size_t i = prefix_size; i != path_elements.size() - 1; ++i) { + /// Write comma before the key if it's not the first key in this prefix. + if (!current_prefix.is_first_in_current_object()) { + writeChar(',', output); + } else { + current_prefix.set_not_first_in_current_object(); + } + + writeJSONString(path_elements.elements[i], output); + writeCString(":{", output); + + // Update current prefix. + current_prefix.elements.emplace_back(path_elements.elements[i], true); + } + } + + // Write comma before the key if it's not the first key in this prefix. + if (!current_prefix.is_first_in_current_object()) { + writeChar(',', output); + } else { + current_prefix.set_not_first_in_current_object(); + } + + writeJSONString(path_elements.elements.back(), output); + writeCString(":", output); + + // Serialize value of current path. + if (auto subcolumn_it = subcolumn_path_map.find(path); + subcolumn_it != subcolumn_path_map.end()) { + subcolumn_it->second.serialize_text_json(row_num, output); + } else { + // To serialize value stored in shared data we should first deserialize it from binary format. + Subcolumn tmp_subcolumn(0, true); + const auto& data = ColumnObject::deserialize_from_sparse_column( + sparse_data_values, index_in_sparse_data_values++); + tmp_subcolumn.insert(data.first, data.second); + tmp_subcolumn.serialize_text_json(0, output); } } + + // Close all remaining open objects. + for (size_t i = 0; i != current_prefix.elements.size(); ++i) { + writeChar('}', output); + } + writeChar('}', output); +#ifndef NDEBUG + // check if it is a valid json +#endif return Status::OK(); } @@ -2126,6 +2299,8 @@ const DataTypePtr ColumnObject::NESTED_TYPE = std::make_shared<vectorized::DataT std::make_shared<vectorized::DataTypeArray>(std::make_shared<vectorized::DataTypeNullable>( std::make_shared<vectorized::DataTypeObject>()))); +const size_t ColumnObject::MAX_SUBCOLUMNS = 5; + DataTypePtr ColumnObject::get_root_type() const { return subcolumns.get_root()->data.get_least_common_type(); } @@ -2312,4 +2487,80 @@ bool ColumnObject::try_insert_default_from_nested(const Subcolumns::NodePtr& ent return true; } +size_t ColumnObject::find_path_lower_bound_in_sparse_data(StringRef path, + const ColumnString& sparse_data_paths, + size_t start, size_t end) { + // Simple random access iterator over values in ColumnString in specified range. + class Iterator { + public: + using difference_type = size_t; + using value_type = StringRef; + using iterator_category = std::random_access_iterator_tag; + using pointer = StringRef*; + using reference = StringRef&; + + Iterator() = delete; + Iterator(const ColumnString* data_, size_t index_) : data(data_), index(index_) {} + Iterator(const Iterator& rhs) = default; + Iterator& operator=(const Iterator& rhs) = default; + inline Iterator& operator+=(difference_type rhs) { + index += rhs; + return *this; + } + inline StringRef operator*() const { return data->get_data_at(index); } + + inline Iterator& operator++() { + ++index; + return *this; + } + inline Iterator& operator--() { + --index; + return *this; + } + inline difference_type operator-(const Iterator& rhs) const { return index - rhs.index; } + + const ColumnString* data; + size_t index; + }; + + Iterator start_it(&sparse_data_paths, start); + Iterator end_it(&sparse_data_paths, end); + auto it = std::lower_bound(start_it, end_it, path); + return it.index; +} + +void ColumnObject::fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, StringRef path, + const ColumnPtr& sparse_data_column, + size_t start, size_t end) { + const auto& sparse_data_map = assert_cast<const ColumnMap&>(*sparse_data_column); + const auto& sparse_data_offsets = sparse_data_map.get_offsets(); + size_t first_offset = sparse_data_offsets[static_cast<ssize_t>(start) - 1]; + size_t last_offset = sparse_data_offsets[static_cast<ssize_t>(end) - 1]; + // Check if we have at least one row with data. + if (first_offset == last_offset) { + subcolumn.insert_many_defaults(end - start); + return; + } + + const auto& sparse_data_paths = assert_cast<const ColumnString&>(sparse_data_map.get_keys()); + const auto& sparse_data_values = assert_cast<const ColumnString&>(sparse_data_map.get_values()); + for (size_t i = start; i != end; ++i) { + size_t paths_start = sparse_data_offsets[static_cast<ssize_t>(i) - 1]; + size_t paths_end = sparse_data_offsets[static_cast<ssize_t>(i)]; + auto lower_bound_path_index = ColumnObject::find_path_lower_bound_in_sparse_data( + path, sparse_data_paths, paths_start, paths_end); + if (lower_bound_path_index != paths_end && + sparse_data_paths.get_data_at(lower_bound_path_index) == path) { + // auto value_data = sparse_data_values.get_data_at(lower_bound_path_index); + // ReadBufferFromMemory buf(value_data.data, value_data.size); + // dynamic_serialization->deserializeBinary(path_column, buf, getFormatSettings()); + const auto& data = ColumnObject::deserialize_from_sparse_column(&sparse_data_values, + lower_bound_path_index); + subcolumn.insert(data.first, data.second); + } else { + subcolumn.insert_default(); + } + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 72cc783caf8..b63c0c5c0d8 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -98,7 +98,7 @@ public: constexpr static TypeIndex MOST_COMMON_TYPE_ID = TypeIndex::JSONB; // Nullable(Array(Nullable(Object))) const static DataTypePtr NESTED_TYPE; - const size_t MAX_SUBCOLUMNS = 3; + const static size_t MAX_SUBCOLUMNS; // Finlize mode for subcolumns, write mode will estimate which subcolumns are sparse columns(too many null values inside column), // merge and encode them into a shared column in root column. Only affects in flush block to segments. // Otherwise read mode should be as default mode. @@ -128,6 +128,8 @@ public: size_t get_non_null_value_size() const; + size_t serialize_text_json(size_t n, BufferWritable& output) const; + const DataTypeSerDeSPtr& get_least_common_type_serde() const { return least_common_type.get_serde(); } @@ -136,6 +138,8 @@ public: void get(size_t n, Field& res) const; + bool is_null_at(size_t n) const; + /// Inserts a field, which scalars can be arbitrary, but number of /// dimensions should be consistent with current common type. /// throws InvalidArgument when meet conflict types @@ -233,6 +237,7 @@ public: /// and it's the supertype for all type of column from 0 to i-1. std::vector<WrappedPtr> data; std::vector<DataTypePtr> data_types; + std::vector<DataTypeSerDeSPtr> data_serdes; /// Until we insert any non-default field we don't know further /// least common type and we count number of defaults in prefix, /// which will be converted to the default type of final common type. @@ -248,7 +253,6 @@ private: const bool is_nullable; Subcolumns subcolumns; size_t num_rows; - // The rapidjson document format of Subcolumns tree structure // the leaves is null.In order to display whole document, copy // this structure and fill with Subcolumns sub items @@ -268,6 +272,8 @@ public: explicit ColumnObject(bool is_nullable_, bool create_root = true); + explicit ColumnObject(MutableColumnPtr&& sparse_column); + explicit ColumnObject(bool is_nullable_, DataTypePtr type, MutableColumnPtr&& column); // create without root, num_rows = size @@ -292,7 +298,7 @@ public: Status serialize_one_row_to_string(int64_t row, BufferWritable& output) const; // serialize one row to json format - Status serialize_one_row_to_json_format(int64_t row, rapidjson::StringBuffer* output, + Status serialize_one_row_to_json_format(int64_t row, BufferWritable& output, bool* is_null) const; // Fill the `serialized_sparse_column` @@ -360,11 +366,19 @@ public: Subcolumns& get_subcolumns() { return subcolumns; } - ColumnPtr get_sparse_column() { + ColumnPtr get_sparse_column() const { return serialized_sparse_column->convert_to_full_column_if_const(); } // use sparse_subcolumns_schema to record sparse column's path info and type + static MutableColumnPtr create_sparse_column_fn() { + return vectorized::ColumnMap::create(vectorized::ColumnString::create(), + vectorized::ColumnString::create(), + vectorized::ColumnArray::ColumnOffsets::create()); + } + + void set_sparse_column(ColumnPtr column) { serialized_sparse_column = column; } + Status finalize(FinalizeMode mode); /// Finalizes all subcolumns. @@ -571,10 +585,18 @@ public: const auto& value = assert_cast<const ColumnString&>(column_map.get_values()); return {&key, &value}; } + // Insert all the data from sparse data with specified path to sub column. + static void fill_path_olumn_from_sparse_data(Subcolumn& subcolumn, StringRef path, + const ColumnPtr& sparse_data_column, size_t start, + size_t end); + + static size_t find_path_lower_bound_in_sparse_data(StringRef path, + const ColumnString& sparse_data_paths, + size_t start, size_t end); // Deserialize the i-th row of the column from the sparse column. - std::pair<Field, FieldInfo> deserialize_from_sparse_column(const ColumnString* value, - size_t row) const; + static std::pair<Field, FieldInfo> deserialize_from_sparse_column(const ColumnString* value, + size_t row); private: // May throw execption diff --git a/be/src/vec/common/schema_util.cpp b/be/src/vec/common/schema_util.cpp index 42f9240646f..77b3299c5b5 100644 --- a/be/src/vec/common/schema_util.cpp +++ b/be/src/vec/common/schema_util.cpp @@ -608,7 +608,7 @@ bool has_schema_index_diff(const TabletSchema* new_schema, const TabletSchema* o TabletColumn create_sparse_column(int32_t parent_unique_id) { TColumn tcolumn; - tcolumn.column_name = ".sparse"; + tcolumn.column_name = SPARSE_COLUMN_PATH; tcolumn.col_unique_id = parent_unique_id; tcolumn.column_type = TColumnType {}; tcolumn.column_type.type = TPrimitiveType::MAP; @@ -618,7 +618,9 @@ TabletColumn create_sparse_column(int32_t parent_unique_id) { tcolumn.column_type.type = TPrimitiveType::STRING; tcolumn.children_column.push_back(child_tcolumn); tcolumn.children_column.push_back(child_tcolumn); - return TabletColumn {tcolumn}; + auto res = TabletColumn {tcolumn}; + res.set_path_info(PathInData {SPARSE_COLUMN_PATH}); + return res; } } // namespace doris::vectorized::schema_util diff --git a/be/src/vec/common/schema_util.h b/be/src/vec/common/schema_util.h index 0507c9e2fe6..fee6e778325 100644 --- a/be/src/vec/common/schema_util.h +++ b/be/src/vec/common/schema_util.h @@ -49,6 +49,7 @@ struct ColumnWithTypeAndName; } // namespace vectorized } // namespace doris +const std::string SPARSE_COLUMN_PATH = "__DORIS_VARIANT_SPARSE__"; namespace doris::vectorized::schema_util { /// Returns number of dimensions in Array type. 0 if type is not array. size_t get_number_of_dimensions(const IDataType& type); diff --git a/be/src/vec/common/string_buffer.hpp b/be/src/vec/common/string_buffer.hpp index 8dca6f057a2..d297d465985 100644 --- a/be/src/vec/common/string_buffer.hpp +++ b/be/src/vec/common/string_buffer.hpp @@ -85,6 +85,92 @@ private: const char* _data; }; +inline void writeChar(char x, BufferWritable& buf) { + buf.write(x); +} + +/** Writes a C-string without creating a temporary object. If the string is a literal, then `strlen` is executed at the compilation stage. + * Use when the string is a literal. + */ +#define writeCString(s, buf) (buf).write((s), strlen(s)) + +inline void writeJSONString(const char* begin, const char* end, BufferWritable& buf) { + writeChar('"', buf); + for (const char* it = begin; it != end; ++it) { + switch (*it) { + case '\b': + writeChar('\\', buf); + writeChar('b', buf); + break; + case '\f': + writeChar('\\', buf); + writeChar('f', buf); + break; + case '\n': + writeChar('\\', buf); + writeChar('n', buf); + break; + case '\r': + writeChar('\\', buf); + writeChar('r', buf); + break; + case '\t': + writeChar('\\', buf); + writeChar('t', buf); + break; + case '\\': + writeChar('\\', buf); + writeChar('\\', buf); + break; + case '/': + writeChar('/', buf); + break; + case '"': + writeChar('\\', buf); + writeChar('"', buf); + break; + default: + UInt8 c = *it; + if (c <= 0x1F) { + /// Escaping of ASCII control characters. + + UInt8 higher_half = c >> 4; + UInt8 lower_half = c & 0xF; + + writeCString("\\u00", buf); + writeChar('0' + higher_half, buf); + + if (lower_half <= 9) { + writeChar('0' + lower_half, buf); + } else { + writeChar('A' + lower_half - 10, buf); + } + } else if (end - it >= 3 && it[0] == '\xE2' && it[1] == '\x80' && + (it[2] == '\xA8' || it[2] == '\xA9')) { + /// This is for compatibility with JavaScript, because unescaped line separators are prohibited in string literals, + /// and these code points are alternative line separators. + + if (it[2] == '\xA8') { + writeCString("\\u2028", buf); + } + if (it[2] == '\xA9') { + writeCString("\\u2029", buf); + } + + /// Byte sequence is 3 bytes long. We have additional two bytes to skip. + it += 2; + } else { + writeChar(*it, buf); + } + } + } + writeChar('"', buf); +} + +inline void writeJSONString(std::string_view s, BufferWritable& buf) { + writeJSONString(s.data(), s.data() + s.size(), buf); +} + using VectorBufferReader = BufferReadable; using BufferReader = BufferReadable; diff --git a/be/src/vec/data_types/serde/data_type_object_serde.cpp b/be/src/vec/data_types/serde/data_type_object_serde.cpp index f6719437285..6c902b60589 100644 --- a/be/src/vec/data_types/serde/data_type_object_serde.cpp +++ b/be/src/vec/data_types/serde/data_type_object_serde.cpp @@ -58,15 +58,15 @@ Status DataTypeObjectSerDe::_write_column_to_mysql(const IColumn& column, root->get_finalized_column(), row_buffer, row_idx, col_const, options)); } else { // Serialize hierarchy types to json format - rapidjson::StringBuffer buffer; + std::string buffer; bool is_null = false; - if (!variant.serialize_one_row_to_json_format(row_idx, &buffer, &is_null)) { + if (!variant.serialize_one_row_to_string(row_idx, &buffer)) { return Status::InternalError("Invalid json format"); } if (is_null) { row_buffer.push_null(); } else { - row_buffer.push_string(buffer.GetString(), buffer.GetLength()); + row_buffer.push_string(buffer.data(), buffer.size()); } } return Status::OK(); diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index 37a4f0a70ee..dee4a81d3bb 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -161,7 +161,7 @@ message ColumnPathInfo { message VariantStatisticsPB { // in the order of subcolumns in variant - repeated uint32 subcolumn_non_null_size = 1; + map<string, uint32> subcolumn_non_null_size = 1; map<string, uint32> sparse_column_non_null_size = 2; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org