xiaokang commented on code in PR #34925:
URL: https://github.com/apache/doris/pull/34925#discussion_r1615142726


##########
be/src/olap/tablet_schema.cpp:
##########
@@ -846,7 +846,8 @@ void TabletSchema::append_column(TabletColumn column, 
ColumnType col_type) {
     _cols.push_back(std::make_shared<TabletColumn>(std::move(column)));
     // The dropped column may have same name with exsiting column, so that
     // not add to name to index map, only for uid to index map
-    if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type()) {
+    if (col_type == ColumnType::VARIANT || _cols.back()->is_variant_type() ||
+        _cols.back()->is_extracted_column()) {

Review Comment:
   why delete it?



##########
be/src/olap/rowset/rowset.h:
##########
@@ -139,6 +139,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
     // publish rowset to make it visible to read
     void make_visible(Version version);
     void set_version(Version version);
+    void set_schema(TabletSchemaSPtr new_schema) { _schema = new_schema; }

Review Comment:
   What's the difference to RowsetMeta::set_tablet_schema ?



##########
be/src/olap/tablet.cpp:
##########
@@ -1816,7 +1816,8 @@ Result<std::unique_ptr<RowsetWriter>> 
Tablet::create_transient_rowset_writer(
     context.rowset_state = PREPARED;
     context.segments_overlap = OVERLAPPING;
     context.tablet_schema = std::make_shared<TabletSchema>();
-    context.tablet_schema->copy_from(*(rowset.tablet_schema()));
+    // context.tablet_schema->copy_from(*(rowset.tablet_schema()));

Review Comment:
   delete commented code and comment for why copy_without_extracted_columns



##########
be/src/olap/rowset/segment_creator.cpp:
##########
@@ -58,60 +59,38 @@ Status SegmentFlusher::flush_single_block(const 
vectorized::Block* block, int32_
     if (block->rows() == 0) {
         return Status::OK();
     }
-    // Expand variant columns
     vectorized::Block flush_block(*block);
-    TabletSchemaSPtr flush_schema;
     if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
         _context.tablet_schema->num_variant_columns() > 0) {
-        RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, 
flush_schema));
+        RETURN_IF_ERROR(_parse_variant_columns(flush_block));
     }
     bool no_compression = flush_block.bytes() <= 
config::segment_compression_threshold_kb * 1024;
     if (config::enable_vertical_segment_writer &&
         _context.tablet_schema->cluster_key_idxes().empty()) {
         std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
-        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
+        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression));
         RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
-        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, 
flush_size));
+        RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), 
flush_size));
     } else {
         std::unique_ptr<segment_v2::SegmentWriter> writer;
-        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
+        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression));
         RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
-        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, 
flush_size));
+        RETURN_IF_ERROR(_flush_segment_writer(writer, nullptr /*TODO*/, 
flush_size));

Review Comment:
   Why not writer->flush_schema()?



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();
+            const vectorized::DataTypePtr& final_data_type_from_object =
+                    entry->data.get_least_common_type();
+            vectorized::PathInDataBuilder full_path_builder;
+            auto full_path = 
full_path_builder.append(parent_column->name_lower_case(), false)
+                                     .append(entry->path.get_parts(), false)
+                                     .build();
+            return vectorized::schema_util::get_column_by_type(
+                    final_data_type_from_object, column_name,
+                    vectorized::schema_util::ExtraInfo {
+                            .unique_id = -1,
+                            .parent_unique_id = parent_column->unique_id(),
+                            .path_info = full_path});
+        };
+
+        CHECK(object_column.is_finalized());
+        // common extracted columns
+        for (const auto& entry :
+             
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) 
{
+            if (entry->path.empty()) {
+                // already handled by parent column
+                continue;
+            }
+            CHECK(entry->data.is_finalized());
+            int current_column_id = column_id++;
+            TabletColumn tablet_column = generate_column_info(entry);
+            vectorized::schema_util::inherit_column_attributes(*parent_column, 
tablet_column,
+                                                               _flush_schema);
+            RETURN_IF_ERROR(_create_column_writer(current_column_id 
/*unused*/, tablet_column,
+                                                  _flush_schema));
+            _olap_data_convertor->set_source_content_with_specifid_column(
+                    {entry->data.get_finalized_column_ptr()->get_ptr(),
+                     entry->data.get_least_common_type(), 
tablet_column.name()},
+                    data.row_pos, data.num_rows, current_column_id);
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(current_column_id);
+            if (!status.ok()) {
+                return status;
+            }
+            RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+                    column->get_nullmap(), column->get_data(), data.num_rows));
+            _flush_schema->append_column(tablet_column);
+            _olap_data_convertor->clear_source_content();
+        }
+        // sparse_columns
+        for (const auto& entry : 
vectorized::schema_util::get_sorted_subcolumns(
+                     object_column.get_sparse_subcolumns())) {
+            TabletColumn sparse_tablet_column = generate_column_info(entry);
+            _flush_schema->mutable_column_by_uid(parent_column->unique_id())
+                    .append_sparse_column(sparse_tablet_column);
+
+            // add sparse column to footer
+            auto* column_pb = _footer.mutable_columns(i);
+            _init_column_meta(column_pb->add_sparse_columns(), -1, 
sparse_tablet_column);
+        }
+    }
+
+    // Update rowset schema, tablet's tablet schema will be updated when build 
Rowset
+    // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+    // ctx.tablet_schema:  A(bigint), B(double)
+    // => update_schema:   A(bigint), B(double), C(int), D(int)
+    std::lock_guard<std::mutex> lock(*(_opts.rowset_ctx->schema_lock));
+    if (_opts.rowset_ctx->merged_tablet_schema == nullptr) {
+        _opts.rowset_ctx->merged_tablet_schema = 
_opts.rowset_ctx->tablet_schema;
+    }
+    TabletSchemaSPtr update_schema;
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+            {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, 
update_schema));
+    CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns())
+            << "Rowset merge schema columns count is " << 
update_schema->num_columns()
+            << ", but flush_schema is larger " << _flush_schema->num_columns()
+            << " update_schema: " << update_schema->dump_structure()
+            << " flush_schema: " << _flush_schema->dump_structure();
+    _opts.rowset_ctx->merged_tablet_schema.swap(update_schema);
+    VLOG_DEBUG << "dump block " << data.block->dump_data();
+    VLOG_DEBUG << "dump rs schema: " << 
_opts.rowset_ctx->merged_tablet_schema->dump_full_schema();
+    VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " 
<< _segment_id;
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::write_batch() {

Review Comment:
   Do you also need to modify SegmentWriter?



##########
be/src/olap/rowset_builder.cpp:
##########
@@ -300,14 +300,18 @@ Status RowsetBuilder::commit_txn() {
     }
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_commit_txn_timer);
-    if (tablet()->tablet_schema()->num_variant_columns() > 0) {
+
+    const RowsetWriterContext& rw_ctx = _rowset_writer->context();
+    if (rw_ctx.tablet_schema->num_variant_columns() > 0) {
         // update tablet schema when meet variant columns, before commit_txn
         // Eg. rowset schema:       A(int),    B(float),  C(int), D(int)
         // _tabelt->tablet_schema:  A(bigint), B(double)
         //  => update_schema:       A(bigint), B(double), C(int), D(int)
-        const RowsetWriterContext& rw_ctx = _rowset_writer->context();
         
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
     }
+    if (rw_ctx.merged_tablet_schema != nullptr) {
+        
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema));

Review Comment:
   add comment



##########
be/src/olap/rowset_builder.cpp:
##########
@@ -300,14 +300,18 @@ Status RowsetBuilder::commit_txn() {
     }
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_commit_txn_timer);
-    if (tablet()->tablet_schema()->num_variant_columns() > 0) {
+
+    const RowsetWriterContext& rw_ctx = _rowset_writer->context();
+    if (rw_ctx.tablet_schema->num_variant_columns() > 0) {
         // update tablet schema when meet variant columns, before commit_txn
         // Eg. rowset schema:       A(int),    B(float),  C(int), D(int)
         // _tabelt->tablet_schema:  A(bigint), B(double)
         //  => update_schema:       A(bigint), B(double), C(int), D(int)
-        const RowsetWriterContext& rw_ctx = _rowset_writer->context();
         
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
     }
+    if (rw_ctx.merged_tablet_schema != nullptr) {

Review Comment:
   Is `rw_ctx.merged_tablet_schema != nullptr` duplicated with last branch 
`rw_ctx.tablet_schema->num_variant_columns() > 0` ?



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -213,6 +216,17 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& 
other) {
     if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
         set_rowset_state(RowsetStatePB::COMMITTED);
     }
+    // In partial update the rowset schema maybe updated when table contains 
variant type, so we need the newest schema to be updated
+    // Otherwise the schema is stale and lead to wrong data read
+    if (tablet_schema()->num_variant_columns() > 0) {

Review Comment:
   What about tablet_schema()->num_variant_columns() == 0 but 
other->tablet_schema()->num_variant_columns() > 0 ?



##########
be/src/vec/columns/column_object.cpp:
##########
@@ -749,8 +749,13 @@ void ColumnObject::insert_from(const IColumn& src, size_t 
n) {
 void ColumnObject::try_insert(const Field& field) {
     if (field.get_type() != Field::Types::VariantMap) {
         auto* root = get_subcolumn({});
-        if (!root) {
-            doris::Exception(doris::ErrorCode::INVALID_ARGUMENT, "Failed to 
find root column_path");
+        if (root == nullptr) {
+            bool succ = add_sub_column({}, num_rows);

Review Comment:
   add comment to explain this change.



##########
be/src/olap/rowset/rowset.h:
##########
@@ -139,6 +139,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
     // publish rowset to make it visible to read
     void make_visible(Version version);
     void set_version(Version version);
+    void set_schema(TabletSchemaSPtr new_schema) { _schema = new_schema; }

Review Comment:
   What's the difference between Rowset::_schema and 
Rowset::_rowset_meta::_schema?



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();

Review Comment:
   What will happen if parent column changed?



##########
be/src/vec/olap/olap_data_convertor.cpp:
##########
@@ -1071,28 +1081,29 @@ void 
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::set_source_column(
                     ? assert_cast<const 
vectorized::ColumnObject&>(*typed_column.column)
                     : assert_cast<const vectorized::ColumnObject&>(
                               nullable_column->get_nested_column());
-
-    const_cast<ColumnObject&>(variant).finalize_if_not();
     if (variant.is_null_root()) {
         auto root_type = 
make_nullable(std::make_shared<ColumnObject::MostCommonType>());
         auto root_col = root_type->create_column();
         root_col->insert_many_defaults(variant.rows());
         const_cast<ColumnObject&>(variant).create_root(root_type, 
std::move(root_col));
         variant.check_consistency();
     }
-    auto root_of_variant = variant.get_root();
-    auto nullable = assert_cast<const ColumnNullable*>(root_of_variant.get());
-    CHECK(nullable);
-    _root_data_column = assert_cast<const 
ColumnString*>(&nullable->get_nested_column());
-    _root_data_convertor->set_source_column({root_of_variant->get_ptr(), 
nullptr, ""}, row_pos,
-                                            num_rows);
+    // ensure data finalized
+    _source_column_ptr = &const_cast<ColumnObject&>(variant);
+    _source_column_ptr->finalize(false);
+    _root_data_convertor = 
std::make_unique<OlapColumnDataConvertorVarChar>(true);
     
OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column,
 row_pos,
                                                                            
num_rows);
 }
 
 // convert root data
 Status 
OlapBlockDataConvertor::OlapColumnDataConvertorVariant::convert_to_olap() {
-    RETURN_IF_ERROR(_root_data_convertor->convert_to_olap(_nullmap, 
_root_data_column));
+    
RETURN_IF_ERROR(vectorized::schema_util::encode_variant_sparse_subcolumns(*_source_column_ptr));
+    _root_data_convertor->set_source_column(

Review Comment:
   Why move set_source_column from set_source_column function to 
convert_to_olap?



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();
+            const vectorized::DataTypePtr& final_data_type_from_object =
+                    entry->data.get_least_common_type();
+            vectorized::PathInDataBuilder full_path_builder;
+            auto full_path = 
full_path_builder.append(parent_column->name_lower_case(), false)
+                                     .append(entry->path.get_parts(), false)
+                                     .build();
+            return vectorized::schema_util::get_column_by_type(
+                    final_data_type_from_object, column_name,
+                    vectorized::schema_util::ExtraInfo {
+                            .unique_id = -1,
+                            .parent_unique_id = parent_column->unique_id(),
+                            .path_info = full_path});
+        };
+
+        CHECK(object_column.is_finalized());
+        // common extracted columns
+        for (const auto& entry :
+             
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) 
{
+            if (entry->path.empty()) {
+                // already handled by parent column
+                continue;
+            }
+            CHECK(entry->data.is_finalized());
+            int current_column_id = column_id++;
+            TabletColumn tablet_column = generate_column_info(entry);
+            vectorized::schema_util::inherit_column_attributes(*parent_column, 
tablet_column,
+                                                               _flush_schema);
+            RETURN_IF_ERROR(_create_column_writer(current_column_id 
/*unused*/, tablet_column,
+                                                  _flush_schema));
+            _olap_data_convertor->set_source_content_with_specifid_column(
+                    {entry->data.get_finalized_column_ptr()->get_ptr(),
+                     entry->data.get_least_common_type(), 
tablet_column.name()},
+                    data.row_pos, data.num_rows, current_column_id);
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(current_column_id);
+            if (!status.ok()) {
+                return status;
+            }
+            RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+                    column->get_nullmap(), column->get_data(), data.num_rows));
+            _flush_schema->append_column(tablet_column);
+            _olap_data_convertor->clear_source_content();

Review Comment:
   It's better to add a function to just clear the specific column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to