eldenmoon commented on code in PR #34925: URL: https://github.com/apache/doris/pull/34925#discussion_r1619006328
########## be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp: ########## @@ -709,16 +707,127 @@ Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t return Status::OK(); } +// for variant type, we should do following steps to fill content of block: +// 1. set block data to data convertor, and get all flattened columns from variant subcolumns +// 2. get sparse columns from previous sparse columns stripped in OlapColumnDataConvertorVariant +// 3. merge current columns info(contains extracted columns) with previous merged_tablet_schema +// which will be used to contruct the new schema for rowset +Status VerticalSegmentWriter::_append_block_with_variant_subcolumns(RowsInBlock& data) { + if (_tablet_schema->num_variant_columns() == 0) { + return Status::OK(); + } + size_t column_id = _tablet_schema->num_columns(); + for (int i = 0; i < _tablet_schema->columns().size(); ++i) { + if (!_tablet_schema->columns()[i]->is_variant_type()) { + continue; + } + if (_flush_schema == nullptr) { + _flush_schema = std::make_shared<TabletSchema>(*_tablet_schema); + } + auto column_ref = data.block->get_by_position(i).column; + const vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>( + remove_nullable(column_ref)->assume_mutable_ref()); + const TabletColumnPtr& parent_column = _tablet_schema->columns()[i]; + + // generate column info by entry info + auto generate_column_info = [&](const auto& entry) { + const std::string& column_name = + parent_column->name_lower_case() + "." + entry->path.get_path(); + const vectorized::DataTypePtr& final_data_type_from_object = + entry->data.get_least_common_type(); + vectorized::PathInDataBuilder full_path_builder; + auto full_path = full_path_builder.append(parent_column->name_lower_case(), false) + .append(entry->path.get_parts(), false) + .build(); + return vectorized::schema_util::get_column_by_type( + final_data_type_from_object, column_name, + vectorized::schema_util::ExtraInfo { + .unique_id = -1, + .parent_unique_id = parent_column->unique_id(), + .path_info = full_path}); + }; + + CHECK(object_column.is_finalized()); + // common extracted columns + for (const auto& entry : + vectorized::schema_util::get_sorted_subcolumns(object_column.get_subcolumns())) { + if (entry->path.empty()) { + // already handled by parent column + continue; + } + CHECK(entry->data.is_finalized()); + int current_column_id = column_id++; + TabletColumn tablet_column = generate_column_info(entry); + vectorized::schema_util::inherit_column_attributes(*parent_column, tablet_column, + _flush_schema); + RETURN_IF_ERROR(_create_column_writer(current_column_id /*unused*/, tablet_column, + _flush_schema)); + _olap_data_convertor->set_source_content_with_specifid_column( + {entry->data.get_finalized_column_ptr()->get_ptr(), + entry->data.get_least_common_type(), tablet_column.name()}, + data.row_pos, data.num_rows, current_column_id); + // convert column data from engine format to storage layer format + auto [status, column] = _olap_data_convertor->convert_column_data(current_column_id); + if (!status.ok()) { + return status; + } + RETURN_IF_ERROR(_column_writers[current_column_id]->append( + column->get_nullmap(), column->get_data(), data.num_rows)); + _flush_schema->append_column(tablet_column); + _olap_data_convertor->clear_source_content(); + } + // sparse_columns + for (const auto& entry : vectorized::schema_util::get_sorted_subcolumns( + object_column.get_sparse_subcolumns())) { + TabletColumn sparse_tablet_column = generate_column_info(entry); + _flush_schema->mutable_column_by_uid(parent_column->unique_id()) + .append_sparse_column(sparse_tablet_column); + + // add sparse column to footer + auto* column_pb = _footer.mutable_columns(i); + _init_column_meta(column_pb->add_sparse_columns(), -1, sparse_tablet_column); + } + } + + // Update rowset schema, tablet's tablet schema will be updated when build Rowset + // Eg. flush schema: A(int), B(float), C(int), D(int) + // ctx.tablet_schema: A(bigint), B(double) + // => update_schema: A(bigint), B(double), C(int), D(int) + std::lock_guard<std::mutex> lock(*(_opts.rowset_ctx->schema_lock)); + if (_opts.rowset_ctx->merged_tablet_schema == nullptr) { + _opts.rowset_ctx->merged_tablet_schema = _opts.rowset_ctx->tablet_schema; + } + TabletSchemaSPtr update_schema; + RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema( + {_opts.rowset_ctx->merged_tablet_schema, _flush_schema}, nullptr, update_schema)); + CHECK_GE(update_schema->num_columns(), _flush_schema->num_columns()) + << "Rowset merge schema columns count is " << update_schema->num_columns() + << ", but flush_schema is larger " << _flush_schema->num_columns() + << " update_schema: " << update_schema->dump_structure() + << " flush_schema: " << _flush_schema->dump_structure(); + _opts.rowset_ctx->merged_tablet_schema.swap(update_schema); + VLOG_DEBUG << "dump block " << data.block->dump_data(); + VLOG_DEBUG << "dump rs schema: " << _opts.rowset_ctx->merged_tablet_schema->dump_full_schema(); + VLOG_DEBUG << "rowset : " << _opts.rowset_ctx->rowset_id << ", seg id : " << _segment_id; + return Status::OK(); +} + Status VerticalSegmentWriter::write_batch() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org