xiaokang commented on code in PR #34925:
URL: https://github.com/apache/doris/pull/34925#discussion_r1628718371


##########
be/src/olap/rowset/segment_creator.cpp:
##########
@@ -58,60 +59,38 @@ Status SegmentFlusher::flush_single_block(const 
vectorized::Block* block, int32_
     if (block->rows() == 0) {
         return Status::OK();
     }
-    // Expand variant columns
     vectorized::Block flush_block(*block);
-    TabletSchemaSPtr flush_schema;
     if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
         _context.tablet_schema->num_variant_columns() > 0) {
-        RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, 
flush_schema));
+        RETURN_IF_ERROR(_parse_variant_columns(flush_block));
     }
     bool no_compression = flush_block.bytes() <= 
config::segment_compression_threshold_kb * 1024;
     if (config::enable_vertical_segment_writer &&
         _context.tablet_schema->cluster_key_idxes().empty()) {
         std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
-        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
+        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression));
         RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
-        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, 
flush_size));
+        RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), 
flush_size));
     } else {
         std::unique_ptr<segment_v2::SegmentWriter> writer;
-        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression, flush_schema));
+        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, 
no_compression));
         RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, 
flush_block.rows()));
-        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_schema, 
flush_size));
+        RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(), 
flush_size));
     }
     return Status::OK();
 }
 
-Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block,
-                                                     TabletSchemaSPtr& 
flush_schema) {
+Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
     size_t num_rows = block.rows();
     if (num_rows == 0) {
         return Status::OK();
     }
 
-    {
-        std::lock_guard<std::mutex> lock(*(_context.schema_lock));
-        // save original tablet schema, _context->tablet_schema maybe modified
-        if (_context.original_tablet_schema == nullptr) {
-            _context.original_tablet_schema = _context.tablet_schema;
-        }
-    }
-
     std::vector<int> variant_column_pos;
-    if (_context.partial_update_info && 
_context.partial_update_info->is_partial_update) {
-        // check columns that used to do partial updates should not include 
variant
-        for (int i : _context.partial_update_info->update_cids) {
-            const auto& col = *_context.original_tablet_schema->columns()[i];
-            if (!col.is_key() && col.name() != DELETE_SIGN) {
-                return Status::InvalidArgument(
-                        "Not implement partial update for variant only support 
delete currently");
-            }
-        }
-    } else {
-        // find positions of variant columns
-        for (int i = 0; i < _context.original_tablet_schema->columns().size(); 
++i) {
-            if 
(_context.original_tablet_schema->columns()[i]->is_variant_type()) {
-                variant_column_pos.push_back(i);
-            }
+    for (int i = 0; i < block.columns(); ++i) {
+        const auto& entry = block.get_by_position(i);
+        if (remove_nullable(entry.type)->get_type_id() == 
vectorized::TypeIndex::VARIANT) {

Review Comment:
   use is_variant_type()



##########
be/src/olap/base_tablet.cpp:
##########
@@ -651,6 +651,10 @@ Status 
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                 (std::find(including_cids.cbegin(), including_cids.cend(),
                            rowset_schema->sequence_col_idx()) != 
including_cids.cend());
     }
+    bool has_variants = rowset_schema->num_variant_columns() > 0;
+    if (has_variants) {
+        rowset_schema = rowset_schema->copy_without_extracted_columns();

Review Comment:
   add comment to explain why use schema without extracted columns



##########
be/src/olap/rowset/rowset_writer_context.h:
##########
@@ -52,7 +52,8 @@ struct RowsetWriterContext {
     io::FileSystemSPtr fs;
     std::string rowset_dir;
     TabletSchemaSPtr tablet_schema;
-    TabletSchemaSPtr original_tablet_schema;
+    // for variant schema update
+    TabletSchemaSPtr merged_tablet_schema;

Review Comment:
   remove merged_tablet_schema and just update tablet_schema



##########
be/src/vec/columns/column_object.cpp:
##########
@@ -1286,9 +1293,12 @@ Status ColumnObject::merge_sparse_to_root_column() {
                                        
parser.getWriter().getOutput()->getSize());
         result_column_nullable->get_null_map_data().push_back(0);
     }
-
-    // assign merged column
-    subcolumns.get_mutable_root()->data.get_finalized_column_ptr() = 
mresult->get_ptr();
+    subcolumns.get_mutable_root()->data.get_finalized_column().clear();
+    // assign merged column, do insert_range_from to make a copy, instead of 
replace the ptr itselft
+    // to make sure the root column ptr is not changed
+    
subcolumns.get_mutable_root()->data.get_finalized_column().insert_range_from(
+            *mresult->get_ptr(), 0, num_rows);
+    // subcolumns.get_mutable_root()->data.get_finalized_column_ptr() = 
mresult->get_ptr();

Review Comment:
   delete commented code



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -813,6 +920,18 @@ Status VerticalSegmentWriter::write_batch() {
         _num_rows_written += data.num_rows;
     }
 
+    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data));
+        }
+    }
+
+    for (auto& column_writer : _column_writers) {

Review Comment:
   why move to here?



##########
be/src/olap/base_tablet.cpp:
##########
@@ -651,6 +651,10 @@ Status 
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                 (std::find(including_cids.cbegin(), including_cids.cend(),
                            rowset_schema->sequence_col_idx()) != 
including_cids.cend());
     }
+    bool has_variants = rowset_schema->num_variant_columns() > 0;

Review Comment:
   has_variants only used once, so can be removed



##########
be/src/olap/rowset/segment_v2/segment.cpp:
##########
@@ -522,24 +522,17 @@ Status Segment::new_column_iterator_with_path(const 
TabletColumn& tablet_column,
     auto sparse_node = tablet_column.has_path_info()
                                ? 
_sparse_column_tree.find_exact(*tablet_column.path_info_ptr())
                                : nullptr;
-    if (opt != nullptr && opt->io_ctx.reader_type == 
ReaderType::READER_ALTER_TABLE) {

Review Comment:
   Why READER_ALTER_TABLE does not need to be prcessed specially as before?



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -813,6 +920,18 @@ Status VerticalSegmentWriter::write_batch() {
         _num_rows_written += data.num_rows;
     }
 
+    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
+        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_variant_subcolumns(data));

Review Comment:
   why do it once again?



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -213,6 +216,17 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& 
other) {
     if (rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
         set_rowset_state(RowsetStatePB::COMMITTED);
     }
+    // In partial update the rowset schema maybe updated when table contains 
variant type, so we need the newest schema to be updated

Review Comment:
   I think we shoud do the following before set_rowset_state()



##########
be/src/olap/rowset/segment_v2/hierarchical_data_reader.h:
##########
@@ -154,22 +152,9 @@ class HierarchicalDataReader : public ColumnIterator {
             return Status::OK();
         }));
 
-        if (_output_as_raw_json) {

Review Comment:
   why delete _output_as_raw_json related code?



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();
+            const vectorized::DataTypePtr& final_data_type_from_object =
+                    entry->data.get_least_common_type();
+            vectorized::PathInDataBuilder full_path_builder;
+            auto full_path = 
full_path_builder.append(parent_column->name_lower_case(), false)
+                                     .append(entry->path.get_parts(), false)
+                                     .build();
+            return vectorized::schema_util::get_column_by_type(
+                    final_data_type_from_object, column_name,
+                    vectorized::schema_util::ExtraInfo {
+                            .unique_id = -1,
+                            .parent_unique_id = parent_column->unique_id(),
+                            .path_info = full_path});
+        };
+
+        CHECK(object_column.is_finalized());
+        // common extracted columns
+        for (const auto& entry :
+             
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) 
{
+            if (entry->path.empty()) {
+                // already handled by parent column
+                continue;
+            }
+            CHECK(entry->data.is_finalized());
+            int current_column_id = column_id++;
+            TabletColumn tablet_column = generate_column_info(entry);
+            vectorized::schema_util::inherit_column_attributes(*parent_column, 
tablet_column,
+                                                               _flush_schema);
+            RETURN_IF_ERROR(_create_column_writer(current_column_id 
/*unused*/, tablet_column,
+                                                  _flush_schema));
+            _olap_data_convertor->set_source_content_with_specifid_column(
+                    {entry->data.get_finalized_column_ptr()->get_ptr(),
+                     entry->data.get_least_common_type(), 
tablet_column.name()},
+                    data.row_pos, data.num_rows, current_column_id);
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(current_column_id);
+            if (!status.ok()) {
+                return status;
+            }
+            RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+                    column->get_nullmap(), column->get_data(), data.num_rows));
+            _flush_schema->append_column(tablet_column);
+            _olap_data_convertor->clear_source_content();
+        }
+        // sparse_columns
+        for (const auto& entry : 
vectorized::schema_util::get_sorted_subcolumns(
+                     object_column.get_sparse_subcolumns())) {
+            TabletColumn sparse_tablet_column = generate_column_info(entry);
+            _flush_schema->mutable_column_by_uid(parent_column->unique_id())
+                    .append_sparse_column(sparse_tablet_column);
+
+            // add sparse column to footer
+            auto* column_pb = _footer.mutable_columns(i);
+            _init_column_meta(column_pb->add_sparse_columns(), -1, 
sparse_tablet_column);
+        }
+    }
+
+    // Update rowset schema, tablet's tablet schema will be updated when build 
Rowset
+    // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+    // ctx.tablet_schema:  A(bigint), B(double)
+    // => update_schema:   A(bigint), B(double), C(int), D(int)
+    std::lock_guard<std::mutex> lock(*(_opts.rowset_ctx->schema_lock));
+    if (_opts.rowset_ctx->merged_tablet_schema == nullptr) {
+        _opts.rowset_ctx->merged_tablet_schema = 
_opts.rowset_ctx->tablet_schema;
+    }
+    TabletSchemaSPtr update_schema;
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+            {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, 
update_schema));
+    CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns())
+            << "Rowset merge schema columns count is " << 
update_schema->num_columns()
+            << ", but flush_schema is larger " << _flush_schema->num_columns()
+            << " update_schema: " << update_schema->dump_structure()
+            << " flush_schema: " << _flush_schema->dump_structure();
+    _opts.rowset_ctx->merged_tablet_schema.swap(update_schema);
+    VLOG_DEBUG << "dump block " << data.block->dump_data();
+    VLOG_DEBUG << "dump rs schema: " << 
_opts.rowset_ctx->merged_tablet_schema->dump_full_schema();
+    VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " 
<< _segment_id;
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::write_batch() {
     if (_opts.rowset_ctx->partial_update_info &&
         _opts.rowset_ctx->partial_update_info->is_partial_update &&
         _opts.write_type == DataWriteType::TYPE_DIRECT &&
         !_opts.rowset_ctx->is_transient_rowset_writer) {
         for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
-            RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid)));
+            RETURN_IF_ERROR(
+                    _create_column_writer(cid, _tablet_schema->column(cid), 
_tablet_schema));
+        }
+        vectorized::Block full_block;
+        for (auto& data : _batched_blocks) {
+            RETURN_IF_ERROR(_append_block_with_partial_content(data, 
full_block));
         }
         for (auto& data : _batched_blocks) {
-            RETURN_IF_ERROR(_append_block_with_partial_content(data));
+            RowsInBlock full_rows_block {&full_block, data.row_pos, 
data.num_rows};
+            
RETURN_IF_ERROR(_append_block_with_variant_subcolumns(full_rows_block));

Review Comment:
   _append_block_with_variant_subcolumns does not change block but changes 
_flush_schema, so the function name is not so good



##########
be/src/olap/base_tablet.cpp:
##########
@@ -651,6 +651,10 @@ Status 
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                 (std::find(including_cids.cbegin(), including_cids.cend(),
                            rowset_schema->sequence_col_idx()) != 
including_cids.cend());
     }
+    bool has_variants = rowset_schema->num_variant_columns() > 0;
+    if (has_variants) {
+        rowset_schema = rowset_schema->copy_without_extracted_columns();

Review Comment:
   BTW, we can rename extracted_columns to variant_extracted_columns to be more 
readible



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();
+            const vectorized::DataTypePtr& final_data_type_from_object =
+                    entry->data.get_least_common_type();
+            vectorized::PathInDataBuilder full_path_builder;
+            auto full_path = 
full_path_builder.append(parent_column->name_lower_case(), false)
+                                     .append(entry->path.get_parts(), false)
+                                     .build();
+            return vectorized::schema_util::get_column_by_type(
+                    final_data_type_from_object, column_name,
+                    vectorized::schema_util::ExtraInfo {
+                            .unique_id = -1,
+                            .parent_unique_id = parent_column->unique_id(),
+                            .path_info = full_path});
+        };
+
+        CHECK(object_column.is_finalized());
+        // common extracted columns
+        for (const auto& entry :
+             
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) 
{
+            if (entry->path.empty()) {
+                // already handled by parent column
+                continue;
+            }
+            CHECK(entry->data.is_finalized());
+            int current_column_id = column_id++;
+            TabletColumn tablet_column = generate_column_info(entry);
+            vectorized::schema_util::inherit_column_attributes(*parent_column, 
tablet_column,
+                                                               _flush_schema);
+            RETURN_IF_ERROR(_create_column_writer(current_column_id 
/*unused*/, tablet_column,
+                                                  _flush_schema));
+            _olap_data_convertor->set_source_content_with_specifid_column(
+                    {entry->data.get_finalized_column_ptr()->get_ptr(),
+                     entry->data.get_least_common_type(), 
tablet_column.name()},
+                    data.row_pos, data.num_rows, current_column_id);
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(current_column_id);
+            if (!status.ok()) {
+                return status;
+            }
+            RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+                    column->get_nullmap(), column->get_data(), data.num_rows));
+            _flush_schema->append_column(tablet_column);
+            _olap_data_convertor->clear_source_content();

Review Comment:
   OK



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -138,23 +138,20 @@ void 
VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t colum
     for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
         _init_column_meta(meta->add_children_columns(), column_id, 
column.get_sub_column(i));
     }
-    // add sparse column to footer

Review Comment:
   Why delete?



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();

Review Comment:
   I mean the column_name should be changed if parent column name changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to