eldenmoon commented on code in PR #34925:
URL: https://github.com/apache/doris/pull/34925#discussion_r1619006328


##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const 
vectorized::Block* block, size_t
     return Status::OK();
 }
 
+// for variant type, we should do following steps to fill content of block:
+// 1. set block data to data convertor, and get all flattened columns from 
variant subcolumns
+// 2. get sparse columns from previous sparse columns stripped in 
OlapColumnDataConvertorVariant
+// 3. merge current columns info(contains extracted columns) with previous 
merged_tablet_schema
+//    which will be used to contruct the new schema for rowset
+Status 
VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) 
{
+    if (_tablet_schema->num_variant_columns() == 0) {
+        return Status::OK();
+    }
+    size_t column_id = _tablet_schema->num_columns();
+    for (int i = 0; i < _tablet_schema->columns().size(); ++i) {
+        if (!_tablet_schema->columns()[i]->is_variant_type()) {
+            continue;
+        }
+        if (_flush_schema == nullptr) {
+            _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema);
+        }
+        auto column_ref = data.block->get_by_position(i).column;
+        const vectorized::ColumnObject& object_column = 
assert_cast<vectorized::ColumnObject&>(
+                remove_nullable(column_ref)->assume_mutable_ref());
+        const TabletColumnPtr& parent_column = _tablet_schema->columns()[i];
+
+        // generate column info by entry info
+        auto generate_column_info = [&](const auto& entry) {
+            const std::string& column_name =
+                    parent_column->name_lower_case() + "." + 
entry->path.get_path();
+            const vectorized::DataTypePtr& final_data_type_from_object =
+                    entry->data.get_least_common_type();
+            vectorized::PathInDataBuilder full_path_builder;
+            auto full_path = 
full_path_builder.append(parent_column->name_lower_case(), false)
+                                     .append(entry->path.get_parts(), false)
+                                     .build();
+            return vectorized::schema_util::get_column_by_type(
+                    final_data_type_from_object, column_name,
+                    vectorized::schema_util::ExtraInfo {
+                            .unique_id = -1,
+                            .parent_unique_id = parent_column->unique_id(),
+                            .path_info = full_path});
+        };
+
+        CHECK(object_column.is_finalized());
+        // common extracted columns
+        for (const auto& entry :
+             
vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) 
{
+            if (entry->path.empty()) {
+                // already handled by parent column
+                continue;
+            }
+            CHECK(entry->data.is_finalized());
+            int current_column_id = column_id++;
+            TabletColumn tablet_column = generate_column_info(entry);
+            vectorized::schema_util::inherit_column_attributes(*parent_column, 
tablet_column,
+                                                               _flush_schema);
+            RETURN_IF_ERROR(_create_column_writer(current_column_id 
/*unused*/, tablet_column,
+                                                  _flush_schema));
+            _olap_data_convertor->set_source_content_with_specifid_column(
+                    {entry->data.get_finalized_column_ptr()->get_ptr(),
+                     entry->data.get_least_common_type(), 
tablet_column.name()},
+                    data.row_pos, data.num_rows, current_column_id);
+            // convert column data from engine format to storage layer format
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(current_column_id);
+            if (!status.ok()) {
+                return status;
+            }
+            RETURN_IF_ERROR(_column_writers[current_column_id]->append(
+                    column->get_nullmap(), column->get_data(), data.num_rows));
+            _flush_schema->append_column(tablet_column);
+            _olap_data_convertor->clear_source_content();
+        }
+        // sparse_columns
+        for (const auto& entry : 
vectorized::schema_util::get_sorted_subcolumns(
+                     object_column.get_sparse_subcolumns())) {
+            TabletColumn sparse_tablet_column = generate_column_info(entry);
+            _flush_schema->mutable_column_by_uid(parent_column->unique_id())
+                    .append_sparse_column(sparse_tablet_column);
+
+            // add sparse column to footer
+            auto* column_pb = _footer.mutable_columns(i);
+            _init_column_meta(column_pb->add_sparse_columns(), -1, 
sparse_tablet_column);
+        }
+    }
+
+    // Update rowset schema, tablet's tablet schema will be updated when build 
Rowset
+    // Eg. flush schema:    A(int),    B(float),  C(int), D(int)
+    // ctx.tablet_schema:  A(bigint), B(double)
+    // => update_schema:   A(bigint), B(double), C(int), D(int)
+    std::lock_guard<std::mutex> lock(*(_opts.rowset_ctx->schema_lock));
+    if (_opts.rowset_ctx->merged_tablet_schema == nullptr) {
+        _opts.rowset_ctx->merged_tablet_schema = 
_opts.rowset_ctx->tablet_schema;
+    }
+    TabletSchemaSPtr update_schema;
+    RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
+            {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, 
update_schema));
+    CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns())
+            << "Rowset merge schema columns count is " << 
update_schema->num_columns()
+            << ", but flush_schema is larger " << _flush_schema->num_columns()
+            << " update_schema: " << update_schema->dump_structure()
+            << " flush_schema: " << _flush_schema->dump_structure();
+    _opts.rowset_ctx->merged_tablet_schema.swap(update_schema);
+    VLOG_DEBUG << "dump block " << data.block->dump_data();
+    VLOG_DEBUG << "dump rs schema: " << 
_opts.rowset_ctx->merged_tablet_schema->dump_full_schema();
+    VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " 
<< _segment_id;
+    return Status::OK();
+}
+
 Status VerticalSegmentWriter::write_batch() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to