This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch variant-sparse in repository https://gitbox.apache.org/repos/asf/doris.git
commit 25ca02009a3989656aaffae60d36fda2f3ede7a9 Author: eldenmoon <lihan...@selectdb.com> AuthorDate: Wed Nov 27 19:15:05 2024 +0800 refactor variant flush refactor writer --- be/src/olap/rowset/segment_v2/column_writer.cpp | 57 +++- be/src/olap/rowset/segment_v2/column_writer.h | 51 ++- be/src/olap/rowset/segment_v2/segment.cpp | 239 ++++++++------ be/src/olap/rowset/segment_v2/segment_writer.cpp | 17 +- .../segment_v2/variant_column_writer_impl.cpp | 355 +++++++++++++++++++++ .../rowset/segment_v2/variant_column_writer_impl.h | 68 ++++ .../rowset/segment_v2/vertical_segment_writer.cpp | 37 ++- be/src/vec/columns/column_object.cpp | 5 +- be/src/vec/olap/olap_data_convertor.cpp | 51 +-- be/src/vec/olap/olap_data_convertor.h | 5 +- 10 files changed, 723 insertions(+), 162 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 2637017b78d..e3cd3b17144 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -38,6 +38,7 @@ #include "olap/rowset/segment_v2/page_builder.h" #include "olap/rowset/segment_v2/page_io.h" #include "olap/rowset/segment_v2/page_pointer.h" +#include "olap/rowset/segment_v2/variant_column_writer_impl.h" #include "olap/rowset/segment_v2/zone_map_index.h" #include "olap/tablet_schema.h" #include "olap/types.h" @@ -292,6 +293,14 @@ Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts, return Status::OK(); } +Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts, + const TabletColumn* column, io::FileWriter* file_writer, + std::unique_ptr<ColumnWriter>* writer) { + *writer = std::unique_ptr<ColumnWriter>(new VariantColumnWriter( + opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)))); + return Status::OK(); +} + //Todo(Amory): here should according nullable and offset and need sub to simply this function Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column, io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) { @@ -320,10 +329,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* return Status::OK(); } case FieldType::OLAP_FIELD_TYPE_VARIANT: { - // Use ScalarColumnWriter to write it's only root data - std::unique_ptr<ColumnWriter> writer_local = std::unique_ptr<ColumnWriter>( - new ScalarColumnWriter(opts, std::move(field), file_writer)); - *writer = std::move(writer_local); + RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer)); return Status::OK(); } default: @@ -1158,4 +1164,47 @@ Status MapColumnWriter::write_inverted_index() { return Status::OK(); } +VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts, + const TabletColumn* column, std::unique_ptr<Field> field) + : ColumnWriter(std::move(field), opts.meta->is_nullable()) { + _impl = std::make_unique<VariantColumnWriterImpl>(opts, column); +}; + +Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { + _next_rowid += num_rows; + return _impl->append_data(ptr, num_rows); +} + +uint64_t VariantColumnWriter::estimate_buffer_size() { + return _impl->estimate_buffer_size(); +} + +Status VariantColumnWriter::finish() { + return _impl->finish(); +} +Status VariantColumnWriter::write_data() { + return _impl->write_data(); +} +Status VariantColumnWriter::write_ordinal_index() { + return _impl->write_ordinal_index(); +} + +Status VariantColumnWriter::write_zone_map() { + return _impl->write_zone_map(); +} + +Status VariantColumnWriter::write_bitmap_index() { + return _impl->write_bitmap_index(); +} +Status VariantColumnWriter::write_inverted_index() { + return _impl->write_inverted_index(); +} +Status VariantColumnWriter::write_bloom_filter_index() { + return _impl->write_bloom_filter_index(); +} +Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, + size_t num_rows) { + return _impl->append_nullable(null_map, ptr, num_rows); +} + } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 2d66b940a38..b664332ea8e 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -29,7 +29,8 @@ #include <vector> #include "common/status.h" // for Status -#include "olap/field.h" // for Field +#include "exec/decompressor.h" +#include "olap/field.h" // for Field #include "olap/rowset/segment_v2/common.h" #include "olap/rowset/segment_v2/inverted_index_writer.h" #include "util/bitmap.h" // for BitmapChange @@ -40,6 +41,7 @@ namespace doris { class BlockCompressionCodec; class TabletColumn; class TabletIndex; +struct RowsetWriterContext; namespace io { class FileWriter; @@ -66,6 +68,11 @@ struct ColumnWriterOptions { std::vector<const TabletIndex*> indexes; // unused const TabletIndex* inverted_index = nullptr; InvertedIndexFileWriter* inverted_index_file_writer; + // variant column writer used + SegmentFooterPB* footer = nullptr; + io::FileWriter* file_writer = nullptr; + CompressionTypePB compression_type = UNKNOWN_COMPRESSION; + RowsetWriterContext* rowset_ctx = nullptr; std::string to_string() const { std::stringstream ss; ss << std::boolalpha << "meta=" << meta->DebugString() @@ -84,6 +91,7 @@ class OrdinalIndexWriter; class PageBuilder; class BloomFilterIndexWriter; class ZoneMapIndexWriter; +class VariantColumnWriterImpl; class ColumnWriter { public: @@ -98,6 +106,9 @@ public: static Status create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column, io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer); + static Status create_variant_writer(const ColumnWriterOptions& opts, const TabletColumn* column, + io::FileWriter* file_writer, + std::unique_ptr<ColumnWriter>* writer); static Status create_agg_state_writer(const ColumnWriterOptions& opts, const TabletColumn* column, io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer); @@ -462,5 +473,43 @@ private: ColumnWriterOptions _opts; }; +class VariantColumnWriter : public ColumnWriter { +public: + explicit VariantColumnWriter(const ColumnWriterOptions& opts, const TabletColumn* column, + std::unique_ptr<Field> field); + + ~VariantColumnWriter() override = default; + + Status init() override { return Status::OK(); } + + Status append_data(const uint8_t** ptr, size_t num_rows) override; + + uint64_t estimate_buffer_size() override; + + Status finish() override; + Status write_data() override; + Status write_ordinal_index() override; + + Status write_zone_map() override; + + Status write_bitmap_index() override; + Status write_inverted_index() override; + Status write_bloom_filter_index() override; + ordinal_t get_next_rowid() const override { return _next_rowid; } + + Status append_nulls(size_t num_rows) override { + return Status::NotSupported("variant writer can not append_nulls"); + } + Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows) override; + + Status finish_current_page() override { + return Status::NotSupported("variant writer has no data, can not finish_current_page"); + } + +private: + std::unique_ptr<VariantColumnWriterImpl> _impl; + ordinal_t _next_rowid = 0; +}; + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 0ad799683fc..a50ada112f9 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -627,19 +627,8 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) { _column_readers.emplace(column.unique_id(), std::move(reader)); } - // init by column path - for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); ++ordinal) { - const auto& column = _tablet_schema->column(ordinal); - if (!column.has_path_info()) { - continue; - } - auto path = column.has_path_info() ? *column.path_info_ptr() - : vectorized::PathInData(column.name_lower_case()); - auto iter = column_path_to_footer_ordinal.find(path); - if (iter == column_path_to_footer_ordinal.end()) { - continue; - } - const ColumnMetaPB& column_pb = footer.columns(iter->second); + for (const auto& [path, ordinal] : column_path_to_footer_ordinal) { + const ColumnMetaPB& column_pb = footer.columns(ordinal); ColumnReaderOptions opts { .kept_in_memory = _tablet_schema->is_in_memory(), .be_exec_version = _be_exec_version, @@ -647,41 +636,83 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) { std::unique_ptr<ColumnReader> reader; RETURN_IF_ERROR( ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader)); - // root column use unique id, leaf column use parent_unique_id - int32_t unique_id = - column.parent_unique_id() > 0 ? column.parent_unique_id() : column.unique_id(); + int32_t unique_id = column_pb.unique_id(); auto relative_path = path.copy_pop_front(); + if (_sub_column_tree[unique_id].get_root() == nullptr) { + _sub_column_tree[unique_id].create_root(SubcolumnReader {nullptr, nullptr}); + } if (relative_path.empty()) { // root column - _sub_column_tree[unique_id].create_root(SubcolumnReader { + _sub_column_tree[unique_id].get_mutable_root()->modify_to_scalar(SubcolumnReader { std::move(reader), vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); } else { // check the root is already a leaf node - DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty()); + // DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty()); _sub_column_tree[unique_id].add( relative_path, SubcolumnReader { std::move(reader), vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); } - - // init sparse columns paths and type info - for (uint32_t ordinal = 0; ordinal < column_pb.sparse_columns().size(); ++ordinal) { - const auto& spase_column_pb = column_pb.sparse_columns(ordinal); - if (spase_column_pb.has_column_path_info()) { - vectorized::PathInData path; - path.from_protobuf(spase_column_pb.column_path_info()); - // Read from root column, so reader is nullptr - _sparse_column_tree[unique_id].add( - path.copy_pop_front(), - SubcolumnReader {nullptr, - vectorized::DataTypeFactory::instance().create_data_type( - spase_column_pb)}); - } - } } + // compability reason use tablet schema + // init by column path + // for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); ++ordinal) { + // const auto& column = _tablet_schema->column(ordinal); + // if (!column.has_path_info()) { + // continue; + // } + // auto path = column.has_path_info() ? *column.path_info_ptr() + // : vectorized::PathInData(column.name_lower_case()); + // auto iter = column_path_to_footer_ordinal.find(path); + // if (iter == column_path_to_footer_ordinal.end()) { + // continue; + // } + // const ColumnMetaPB& column_pb = footer.columns(iter->second); + // ColumnReaderOptions opts { + // .kept_in_memory = _tablet_schema->is_in_memory(), + // .be_exec_version = _be_exec_version, + // }; + // std::unique_ptr<ColumnReader> reader; + // RETURN_IF_ERROR( + // ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader)); + // // root column use unique id, leaf column use parent_unique_id + // int32_t unique_id = + // column.parent_unique_id() > 0 ? column.parent_unique_id() : column.unique_id(); + // auto relative_path = path.copy_pop_front(); + // if (relative_path.empty()) { + // // root column + // _sub_column_tree[unique_id].create_root(SubcolumnReader { + // std::move(reader), + // vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); + // } else { + // // check the root is already a leaf node + // DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty()); + // _sub_column_tree[unique_id].add( + // relative_path, + // SubcolumnReader { + // std::move(reader), + // vectorized::DataTypeFactory::instance().create_data_type(column_pb)}); + // } + + // // init sparse columns paths and type info + // for (uint32_t ordinal = 0; ordinal < column_pb.sparse_columns().size(); ++ordinal) { + // const auto& spase_column_pb = column_pb.sparse_columns(ordinal); + // if (spase_column_pb.has_column_path_info()) { + // vectorized::PathInData path; + // path.from_protobuf(spase_column_pb.column_path_info()); + // // Read from root column, so reader is nullptr + // _sparse_column_tree[unique_id].add( + // path.copy_pop_front(), + // SubcolumnReader {nullptr, + // vectorized::DataTypeFactory::instance().create_data_type( + // spase_column_pb)}); + // } + // } + // } + return Status::OK(); } @@ -741,77 +772,77 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, tablet_column.has_path_info() && _sparse_column_tree.contains(unique_id) ? _sparse_column_tree[unique_id].find_exact(relative_path) : nullptr; - // Currently only compaction and checksum need to read flat leaves - // They both use tablet_schema_with_merged_max_schema_version as read schema - auto type_to_read_flat_leaves = [](ReaderType type) { - return type == ReaderType::READER_BASE_COMPACTION || - type == ReaderType::READER_CUMULATIVE_COMPACTION || - type == ReaderType::READER_COLD_DATA_COMPACTION || - type == ReaderType::READER_SEGMENT_COMPACTION || - type == ReaderType::READER_FULL_COMPACTION || type == ReaderType::READER_CHECKSUM; - }; - - // find the sibling of the nested column to fill the target nested column - auto new_default_iter_with_same_nested = [&](const TabletColumn& tablet_column, - std::unique_ptr<ColumnIterator>* iter) { - // We find node that represents the same Nested type as path. - const auto* parent = _sub_column_tree[unique_id].find_best_match(relative_path); - VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path() << " parent " - << (parent ? parent->path.get_path() : "nullptr") << ", type " - << ", parent is nested " << (parent ? parent->is_nested() : false) << ", " - << TabletColumn::get_string_by_field_type(tablet_column.type()); - // find it's common parent with nested part - // why not use parent->path->has_nested_part? because parent may not be a leaf node - // none leaf node may not contain path info - // Example: - // {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : "a@b"}}]}} - // nested node path : payload.commits(NESTED) - // tablet_column path_info : payload.commits.issue.id(SCALAR) - // parent path node : payload.commits.issue(TUPLE) - // leaf path_info : payload.commits.issue.email(SCALAR) - if (parent && SubcolumnColumnReaders::find_parent( - parent, [](const auto& node) { return node.is_nested(); })) { - /// Find any leaf of Nested subcolumn. - const auto* leaf = SubcolumnColumnReaders::find_leaf( - parent, [](const auto& node) { return node.path.has_nested_part(); }); - assert(leaf); - std::unique_ptr<ColumnIterator> sibling_iter; - ColumnIterator* sibling_iter_ptr; - RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr)); - sibling_iter.reset(sibling_iter_ptr); - *iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter), - leaf->data.file_column_type); - } else { - *iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, nullptr); - } - return Status::OK(); - }; - - if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) { - // compaction need to read flat leaves nodes data to prevent from amplification - const auto* node = tablet_column.has_path_info() - ? _sub_column_tree[unique_id].find_leaf(relative_path) - : nullptr; - if (!node) { - // sparse_columns have this path, read from root - if (sparse_node != nullptr && sparse_node->is_leaf_node()) { - RETURN_IF_ERROR(_new_iterator_with_variant_root( - tablet_column, iter, root, sparse_node->data.file_column_type)); - } else { - if (tablet_column.is_nested_subcolumn()) { - // using the sibling of the nested column to fill the target nested column - RETURN_IF_ERROR(new_default_iter_with_same_nested(tablet_column, iter)); - } else { - RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); - } - } - return Status::OK(); - } - ColumnIterator* it; - RETURN_IF_ERROR(node->data.reader->new_iterator(&it)); - iter->reset(it); - return Status::OK(); - } + // // Currently only compaction and checksum need to read flat leaves + // // They both use tablet_schema_with_merged_max_schema_version as read schema + // auto type_to_read_flat_leaves = [](ReaderType type) { + // return type == ReaderType::READER_BASE_COMPACTION || + // type == ReaderType::READER_CUMULATIVE_COMPACTION || + // type == ReaderType::READER_COLD_DATA_COMPACTION || + // type == ReaderType::READER_SEGMENT_COMPACTION || + // type == ReaderType::READER_FULL_COMPACTION || type == ReaderType::READER_CHECKSUM; + // }; + + // // find the sibling of the nested column to fill the target nested column + // auto new_default_iter_with_same_nested = [&](const TabletColumn& tablet_column, + // std::unique_ptr<ColumnIterator>* iter) { + // // We find node that represents the same Nested type as path. + // const auto* parent = _sub_column_tree[unique_id].find_best_match(relative_path); + // VLOG_DEBUG << "find with path " << tablet_column.path_info_ptr()->get_path() << " parent " + // << (parent ? parent->path.get_path() : "nullptr") << ", type " + // << ", parent is nested " << (parent ? parent->is_nested() : false) << ", " + // << TabletColumn::get_string_by_field_type(tablet_column.type()); + // // find it's common parent with nested part + // // why not use parent->path->has_nested_part? because parent may not be a leaf node + // // none leaf node may not contain path info + // // Example: + // // {"payload" : {"commits" : [{"issue" : {"id" : 123, "email" : "a@b"}}]}} + // // nested node path : payload.commits(NESTED) + // // tablet_column path_info : payload.commits.issue.id(SCALAR) + // // parent path node : payload.commits.issue(TUPLE) + // // leaf path_info : payload.commits.issue.email(SCALAR) + // if (parent && SubcolumnColumnReaders::find_parent( + // parent, [](const auto& node) { return node.is_nested(); })) { + // /// Find any leaf of Nested subcolumn. + // const auto* leaf = SubcolumnColumnReaders::find_leaf( + // parent, [](const auto& node) { return node.path.has_nested_part(); }); + // assert(leaf); + // std::unique_ptr<ColumnIterator> sibling_iter; + // ColumnIterator* sibling_iter_ptr; + // RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr)); + // sibling_iter.reset(sibling_iter_ptr); + // *iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter), + // leaf->data.file_column_type); + // } else { + // *iter = std::make_unique<DefaultNestedColumnIterator>(nullptr, nullptr); + // } + // return Status::OK(); + // }; + + // if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) { + // // compaction need to read flat leaves nodes data to prevent from amplification + // const auto* node = tablet_column.has_path_info() + // ? _sub_column_tree[unique_id].find_leaf(relative_path) + // : nullptr; + // if (!node) { + // // sparse_columns have this path, read from root + // if (sparse_node != nullptr && sparse_node->is_leaf_node()) { + // RETURN_IF_ERROR(_new_iterator_with_variant_root( + // tablet_column, iter, root, sparse_node->data.file_column_type)); + // } else { + // if (tablet_column.is_nested_subcolumn()) { + // // using the sibling of the nested column to fill the target nested column + // RETURN_IF_ERROR(new_default_iter_with_same_nested(tablet_column, iter)); + // } else { + // RETURN_IF_ERROR(new_default_iterator(tablet_column, iter)); + // } + // } + // return Status::OK(); + // } + // ColumnIterator* it; + // RETURN_IF_ERROR(node->data.reader->new_iterator(&it)); + // iter->reset(it); + // return Status::OK(); + // } if (node != nullptr) { if (node->is_leaf_node() && sparse_node == nullptr) { diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index fc22c3570e5..f9b3928298b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -281,6 +281,11 @@ Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& co (page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE; } + opts.rowset_ctx = _opts.rowset_ctx; + opts.file_writer = _file_writer; + opts.compression_type = _opts.compression_type; + opts.footer = &_footer; + std::unique_ptr<ColumnWriter> writer; RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer)); RETURN_IF_ERROR(writer->init()); @@ -712,7 +717,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* << ") not equal to segment writer's num rows written(" << _num_rows_written << ")"; _olap_data_convertor->clear_source_content(); - RETURN_IF_ERROR(append_block_with_variant_subcolumns(full_block)); + // RETURN_IF_ERROR(append_block_with_variant_subcolumns(full_block)); return Status::OK(); } @@ -824,11 +829,11 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po } } - if (_opts.write_type == DataWriteType::TYPE_DIRECT || - _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { - RETURN_IF_ERROR( - append_block_with_variant_subcolumns(*const_cast<vectorized::Block*>(block))); - } + // if (_opts.write_type == DataWriteType::TYPE_DIRECT || + // _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { + // RETURN_IF_ERROR( + // append_block_with_variant_subcolumns(*const_cast<vectorized::Block*>(block))); + // } _num_rows_written += num_rows; _olap_data_convertor->clear_source_content(); diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp new file mode 100644 index 00000000000..72884ab775b --- /dev/null +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.cpp @@ -0,0 +1,355 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "olap/rowset/segment_v2/variant_column_writer_impl.h" + +#include "common/status.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/rowset/segment_v2/column_writer.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_object.h" +#include "vec/columns/columns_number.h" +#include "vec/common/schema_util.h" +#include "vec/olap/olap_data_convertor.h" + +namespace doris::segment_v2 { + +VariantColumnWriterImpl::VariantColumnWriterImpl(const ColumnWriterOptions& opts, + const TabletColumn* column) { + _opts = opts; + _column = vectorized::ColumnObject::create(true, false); + if (column->is_nullable()) { + _null_column = vectorized::ColumnUInt8::create(0); + } + _tablet_column = column; +} + +Status VariantColumnWriterImpl::finalize() { + auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); + ptr->finalize(vectorized::ColumnObject::FinalizeMode::WRITE_MODE); + + // convert each subcolumns to storage format and add data to sub columns writers buffer + auto olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>(); + + DCHECK(ptr->is_finalized()); + + if (ptr->is_null_root()) { + auto root_type = vectorized::make_nullable( + std::make_shared<vectorized::ColumnObject::MostCommonType>()); + auto root_col = root_type->create_column(); + root_col->insert_many_defaults(ptr->rows()); + ptr->create_root(root_type, std::move(root_col)); + } + + // common extracted columns + const auto& parent_column = *_tablet_column; + + // generate column info by entry info + auto generate_column_info = [&](const auto& entry) { + const std::string& column_name = + parent_column.name_lower_case() + "." + entry->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + entry->data.get_least_common_type(); + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(parent_column.name_lower_case(), false) + .append(entry->path.get_parts(), false) + .build(); + // set unique_id and parent_unique_id, will use unique_id to get iterator correct + return vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, + vectorized::schema_util::ExtraInfo {.unique_id = parent_column.unique_id(), + .parent_unique_id = parent_column.unique_id(), + .path_info = full_path}); + }; + // root column + ColumnWriterOptions root_opts = _opts; + _root_writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter( + _opts, std::unique_ptr<Field>(FieldFactory::create(parent_column)), _opts.file_writer)); + RETURN_IF_ERROR(_root_writer->init()); + + // subcolumn + size_t num_rows = _column->size(); + for (auto& subcolumn : _subcolumn_writers) { + RETURN_IF_ERROR(subcolumn->init()); + } + + // make sure the root type + auto expected_root_type = + vectorized::make_nullable(std::make_shared<vectorized::ColumnObject::MostCommonType>()); + ptr->ensure_root_node_type(expected_root_type); + + int column_id = 0; + // convert root column data from engine format to storage layer format + olap_data_convertor->add_column_data_convertor(parent_column); + RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column( + {ptr->get_root()->get_ptr(), nullptr, ""}, 0, num_rows, column_id)); + auto [status, column] = olap_data_convertor->convert_column_data(column_id); + if (!status.ok()) { + return status; + } + // use real null data instead of root + const uint8_t* nullmap = + vectorized::check_and_get_column<vectorized::ColumnUInt8>(_null_column.get()) + ->get_data() + .data(); + RETURN_IF_ERROR(_root_writer->append(nullmap, column->get_data(), num_rows)); + ++column_id; + olap_data_convertor->clear_source_content(); + + // convert sub column data from engine format to storage layer format + for (const auto& entry : + vectorized::schema_util::get_sorted_subcolumns(ptr->get_subcolumns())) { + if (entry->path.empty()) { + // already handled + continue; + } + CHECK(entry->data.is_finalized()); + int current_column_id = column_id++; + TabletColumn tablet_column = generate_column_info(entry); + RETURN_IF_ERROR(_create_column_writer(current_column_id, tablet_column, parent_column, + _opts.rowset_ctx->tablet_schema)); + olap_data_convertor->add_column_data_convertor(tablet_column); + RETURN_IF_ERROR(olap_data_convertor->set_source_content_with_specifid_column( + {entry->data.get_finalized_column_ptr()->get_ptr(), + entry->data.get_least_common_type(), tablet_column.name()}, + 0, num_rows, current_column_id)); + auto [status, column] = olap_data_convertor->convert_column_data(current_column_id); + if (!status.ok()) { + return status; + } + const uint8_t* nullmap = column->get_nullmap(); + RETURN_IF_ERROR(_subcolumn_writers[current_column_id - 1]->append( + nullmap, column->get_data(), num_rows)); + olap_data_convertor->clear_source_content(); + } + _is_finalized = true; + return Status::OK(); +} + +bool VariantColumnWriterImpl::is_finalized() const { + const auto* ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); + return ptr->is_finalized() && _is_finalized; +} + +Status VariantColumnWriterImpl::append_data(const uint8_t** ptr, size_t num_rows) { + DCHECK(!is_finalized()); + const auto& src = *reinterpret_cast<const vectorized::ColumnObject*>(*ptr); + auto* dst_ptr = assert_cast<vectorized::ColumnObject*>(_column.get()); + // TODO: if direct write we could avoid copy + dst_ptr->insert_range_from(src, 0, num_rows); + return Status::OK(); +} + +uint64_t VariantColumnWriterImpl::estimate_buffer_size() { + if (!is_finalized()) { + // not accurate + return _column->byte_size(); + } + uint64_t size = 0; + for (auto& column_writer : _subcolumn_writers) { + size += column_writer->estimate_buffer_size(); + } + size += _root_writer->estimate_buffer_size(); + return size; +} + +Status VariantColumnWriterImpl::finish() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + RETURN_IF_ERROR(_root_writer->finish()); + for (auto& column_writer : _subcolumn_writers) { + RETURN_IF_ERROR(column_writer->finish()); + } + _opts.meta->set_num_rows(_root_writer->get_next_rowid()); + for (auto& suboptions : _subcolumn_opts) { + suboptions.meta->set_num_rows(_root_writer->get_next_rowid()); + } + return Status::OK(); + return Status::OK(); +} +Status VariantColumnWriterImpl::write_data() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + RETURN_IF_ERROR(_root_writer->write_data()); + for (auto& column_writer : _subcolumn_writers) { + RETURN_IF_ERROR(column_writer->write_data()); + } + return Status::OK(); +} +Status VariantColumnWriterImpl::write_ordinal_index() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + RETURN_IF_ERROR(_root_writer->write_ordinal_index()); + for (auto& column_writer : _subcolumn_writers) { + RETURN_IF_ERROR(column_writer->write_ordinal_index()); + } + return Status::OK(); +} + +Status VariantColumnWriterImpl::write_zone_map() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + for (int i = 0; i < _subcolumn_writers.size(); ++i) { + if (_subcolumn_opts[i].need_zone_map) { + RETURN_IF_ERROR(_subcolumn_writers[i]->write_zone_map()); + } + } + return Status::OK(); +} + +Status VariantColumnWriterImpl::write_bitmap_index() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + for (int i = 0; i < _subcolumn_writers.size(); ++i) { + if (_subcolumn_opts[i].need_bitmap_index) { + RETURN_IF_ERROR(_subcolumn_writers[i]->write_bitmap_index()); + } + } + return Status::OK(); +} +Status VariantColumnWriterImpl::write_inverted_index() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + for (int i = 0; i < _subcolumn_writers.size(); ++i) { + if (_subcolumn_opts[i].need_inverted_index) { + RETURN_IF_ERROR(_subcolumn_writers[i]->write_inverted_index()); + } + } + return Status::OK(); +} +Status VariantColumnWriterImpl::write_bloom_filter_index() { + if (!is_finalized()) { + RETURN_IF_ERROR(finalize()); + } + for (int i = 0; i < _subcolumn_writers.size(); ++i) { + if (_subcolumn_opts[i].need_bloom_filter) { + RETURN_IF_ERROR(_subcolumn_writers[i]->write_bloom_filter_index()); + } + } + return Status::OK(); +} + +Status VariantColumnWriterImpl::append_nullable(const uint8_t* null_map, const uint8_t** ptr, + size_t num_rows) { + if (null_map != nullptr) { + _null_column->insert_many_raw_data((const char*)null_map, num_rows); + } + RETURN_IF_ERROR(append_data(ptr, num_rows)); + return Status::OK(); +} + +void VariantColumnWriterImpl::_init_column_meta(ColumnMetaPB* meta, uint32_t column_id, + const TabletColumn& column) { + meta->set_column_id(column_id); + meta->set_type(int(column.type())); + meta->set_length(column.length()); + meta->set_encoding(DEFAULT_ENCODING); + meta->set_compression(_opts.compression_type); + meta->set_is_nullable(column.is_nullable()); + meta->set_default_value(column.default_value()); + meta->set_precision(column.precision()); + meta->set_frac(column.frac()); + if (column.has_path_info()) { + column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(), + column.parent_unique_id()); + } + meta->set_unique_id(column.unique_id()); + for (uint32_t i = 0; i < column.get_subtype_count(); ++i) { + _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i)); + } + // add sparse column to footer + for (uint32_t i = 0; i < column.num_sparse_columns(); i++) { + _init_column_meta(meta->add_sparse_columns(), -1, column.sparse_column_at(i)); + } +}; + +Status VariantColumnWriterImpl::_create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletColumn& parent_column, + const TabletSchemaSPtr& tablet_schema) { + ColumnWriterOptions opts; + opts.meta = _opts.footer->add_columns(); + + _init_column_meta(opts.meta, cid, column); + + opts.need_zone_map = tablet_schema->keys_type() != KeysType::AGG_KEYS; + opts.need_bloom_filter = parent_column.is_bf_column(); + // const auto* tablet_index = tablet_schema->get_ngram_bf_index(parent_column.unique_id()); + // if (tablet_index) { + // opts.need_bloom_filter = true; + // opts.is_ngram_bf_index = true; + // //narrow convert from int32_t to uint8_t and uint16_t which is dangerous + // auto gram_size = tablet_index->get_gram_size(); + // auto gram_bf_size = tablet_index->get_gram_bf_size(); + // if (gram_size > 256 || gram_size < 1) { + // return Status::NotSupported("Do not support ngram bloom filter for ngram_size: ", + // gram_size); + // } + // if (gram_bf_size > 65535 || gram_bf_size < 64) { + // return Status::NotSupported("Do not support ngram bloom filter for bf_size: ", + // gram_bf_size); + // } + // opts.gram_size = gram_size; + // opts.gram_bf_size = gram_bf_size; + // } + + opts.need_bitmap_index = parent_column.has_bitmap_index(); + bool skip_inverted_index = false; + if (_opts.rowset_ctx != nullptr) { + // skip write inverted index for index compaction column + skip_inverted_index = _opts.rowset_ctx->columns_to_do_index_compaction.contains( + parent_column.unique_id()); + } + if (const auto& index = tablet_schema->inverted_index(parent_column); + index != nullptr && !skip_inverted_index) { + opts.inverted_index = index; + opts.need_inverted_index = true; + DCHECK(_opts.inverted_index_file_writer != nullptr); + opts.inverted_index_file_writer = _opts.inverted_index_file_writer; + // TODO support multiple inverted index + } + +#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE, type_name) \ + if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \ + opts.need_zone_map = false; \ + opts.need_bloom_filter = false; \ + opts.need_bitmap_index = false; \ + } + + DISABLE_INDEX_IF_FIELD_TYPE(ARRAY, "array") + DISABLE_INDEX_IF_FIELD_TYPE(JSONB, "jsonb") + DISABLE_INDEX_IF_FIELD_TYPE(VARIANT, "variant") + +#undef DISABLE_INDEX_IF_FIELD_TYPE + +#undef CHECK_FIELD_TYPE + + std::unique_ptr<ColumnWriter> writer; + RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _opts.file_writer, &writer)); + RETURN_IF_ERROR(writer->init()); + _subcolumn_writers.push_back(std::move(writer)); + _subcolumn_opts.push_back(opts); + + return Status::OK(); +}; + +} // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h new file mode 100644 index 00000000000..348dd1ab0cb --- /dev/null +++ b/be/src/olap/rowset/segment_v2/variant_column_writer_impl.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/segment_v2.pb.h> + +#include "common/status.h" +#include "olap/rowset/segment_v2/column_writer.h" +#include "olap/tablet_schema.h" +#include "vec/columns/column.h" + +namespace doris::segment_v2 { + +class ColumnWriter; +class ScalarColumnWriter; + +class VariantColumnWriterImpl { +public: + VariantColumnWriterImpl(const ColumnWriterOptions& opts, const TabletColumn* column); + Status finalize(); + + bool is_finalized() const; + + Status append_data(const uint8_t** ptr, size_t num_rows); + + Status finish(); + Status write_data(); + Status write_ordinal_index(); + Status write_zone_map(); + Status write_bitmap_index(); + Status write_inverted_index(); + Status write_bloom_filter_index(); + uint64_t estimate_buffer_size(); + Status append_nullable(const uint8_t* null_map, const uint8_t** ptr, size_t num_rows); + +private: + void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column); + + Status _create_column_writer(uint32_t cid, const TabletColumn& column, + const TabletColumn& parent_column, + const TabletSchemaSPtr& tablet_schema); + // prepare a column for finalize + doris::vectorized::MutableColumnPtr _column; + doris::vectorized::MutableColumnPtr _null_column; + ColumnWriterOptions _opts; + const TabletColumn* _tablet_column = nullptr; + bool _is_finalized = false; + // for sparse column and root column + std::unique_ptr<ColumnWriter> _root_writer; + std::vector<std::unique_ptr<ColumnWriter>> _subcolumn_writers; + std::vector<ColumnWriterOptions> _subcolumn_opts; +}; +} // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index ce16e2d502b..5c72cd6384a 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -276,6 +276,11 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo (page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE; } + opts.rowset_ctx = _opts.rowset_ctx; + opts.file_writer = _file_writer; + opts.compression_type = _opts.compression_type; + opts.footer = &_footer; + std::unique_ptr<ColumnWriter> writer; RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer)); RETURN_IF_ERROR(writer->init()); @@ -1106,10 +1111,10 @@ Status VerticalSegmentWriter::write_batch() { RETURN_IF_ERROR(_append_block_with_partial_content(data, full_block)); } } - for (auto& data : _batched_blocks) { - RowsInBlock full_rows_block {&full_block, data.row_pos, data.num_rows}; - RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block)); - } + // for (auto& data : _batched_blocks) { + // RowsInBlock full_rows_block {&full_block, data.row_pos, data.num_rows}; + // RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block)); + // } for (auto& column_writer : _column_writers) { RETURN_IF_ERROR(column_writer->finish()); RETURN_IF_ERROR(column_writer->write_data()); @@ -1174,18 +1179,18 @@ Status VerticalSegmentWriter::write_batch() { _num_rows_written += data.num_rows; } - if (_opts.write_type == DataWriteType::TYPE_DIRECT || - _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { - size_t original_writers_cnt = _column_writers.size(); - // handle variant dynamic sub columns - for (auto& data : _batched_blocks) { - RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data)); - } - for (size_t i = original_writers_cnt; i < _column_writers.size(); ++i) { - RETURN_IF_ERROR(_column_writers[i]->finish()); - RETURN_IF_ERROR(_column_writers[i]->write_data()); - } - } + // if (_opts.write_type == DataWriteType::TYPE_DIRECT || + // _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) { + // size_t original_writers_cnt = _column_writers.size(); + // // handle variant dynamic sub columns + // for (auto& data : _batched_blocks) { + // RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data)); + // } + // for (size_t i = original_writers_cnt; i < _column_writers.size(); ++i) { + // RETURN_IF_ERROR(_column_writers[i]->finish()); + // RETURN_IF_ERROR(_column_writers[i]->write_data()); + // } + // } _batched_blocks.clear(); return Status::OK(); diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index d5e52d07bcf..0aef86e30e2 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -1194,8 +1194,9 @@ bool ColumnObject::add_sub_column(const PathInData& key, MutableColumnPtr&& subc num_rows = new_size; return true; } - if (key.empty() && ((!subcolumns.get_root()->is_scalar()) || - is_nothing(subcolumns.get_root()->data.get_least_common_type()))) { + if (key.empty() && + (!subcolumns.get_root()->is_scalar() || + (is_null_root() || is_nothing(subcolumns.get_root()->data.get_least_common_type())))) { bool root_it_scalar = subcolumns.get_root()->is_scalar(); // update root to scalar subcolumns.get_mutable_root()->modify_to_scalar( diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 64fa885780a..a35109d6575 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -1104,46 +1104,47 @@ void OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column( nullable_column = assert_cast<const ColumnNullable*>(typed_column.column.get()); _nullmap = nullable_column->get_null_map_data().data(); } - const auto& variant = + const auto* variant = nullable_column == nullptr - ? assert_cast<const vectorized::ColumnObject&>(*typed_column.column) - : assert_cast<const vectorized::ColumnObject&>( + ? check_and_get_column<const vectorized::ColumnObject>(*typed_column.column) + : check_and_get_column<const vectorized::ColumnObject>( nullable_column->get_nested_column()); - if (variant.is_null_root()) { - auto root_type = make_nullable(std::make_shared<ColumnObject::MostCommonType>()); - auto root_col = root_type->create_column(); - root_col->insert_many_defaults(variant.rows()); - const_cast<ColumnObject&>(variant).create_root(root_type, std::move(root_col)); - variant.check_consistency(); - } - // ensure data finalized - _source_column_ptr = &const_cast<ColumnObject&>(variant); - _source_column_ptr->finalize(ColumnObject::FinalizeMode::WRITE_MODE); - _root_data_convertor = std::make_unique<OlapColumnDataConvertorVarChar>(true); - _root_data_convertor->set_source_column( - {_source_column_ptr->get_root()->get_ptr(), nullptr, ""}, row_pos, num_rows); OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows); + + _value_ptr = variant; + // Convert root data, since the root data is a jsonb column, we treat is as jsonb convertor + if (!_value_ptr) { + _root_data_convertor = std::make_unique<OlapColumnDataConvertorVarChar>(true); + _root_data_convertor->set_source_column(typed_column, row_pos, num_rows); + } } // convert root data Status OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap() { - RETURN_IF_ERROR(vectorized::schema_util::encode_variant_sparse_subcolumns(*_source_column_ptr)); -#ifndef NDEBUG - _source_column_ptr->check_consistency(); -#endif - const auto* nullable = assert_cast<const ColumnNullable*>(_source_column_ptr->get_root().get()); - const auto* root_column = assert_cast<const ColumnString*>(&nullable->get_nested_column()); - RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, root_column)); + // Convert root data, since the root data is a jsonb column, we treat is as jsonb convertor + if (!_value_ptr) { + const auto* nullable = assert_cast<const ColumnNullable*>(_typed_column.column.get()); + const auto* root_column = assert_cast<const ColumnString*>(&nullable->get_nested_column()); + RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, root_column)); + return Status::OK(); + } + // Do nothing, the column writer will finally do finalize and write subcolumns one by one + // since we are not sure the final column(type and columns) until the end of the last block return Status::OK(); } const void* OlapBlockDataConvertor::OlapColumnDataConvertorVariant::get_data() const { - return _root_data_convertor->get_data(); + if (!_value_ptr) { + return _root_data_convertor->get_data(); + } + // return the ptr of original column, see VariantColumnWriterImpl::append_data + // which will cast to ColumnObject + return _value_ptr; } const void* OlapBlockDataConvertor::OlapColumnDataConvertorVariant::get_data_at( size_t offset) const { - return _root_data_convertor->get_data_at(offset); + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "not implemented"); } } // namespace doris::vectorized diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index 3473d9d26b5..3c21eb4fc51 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -510,11 +510,8 @@ private: const void* get_data_at(size_t offset) const override; private: - // // encodes sparsed columns - // const ColumnString* _root_data_column; - // // _nullmap contains null info for this variant + const void* _value_ptr; std::unique_ptr<OlapColumnDataConvertorVarChar> _root_data_convertor; - ColumnObject* _source_column_ptr; }; private: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org