eldenmoon commented on code in PR #26749: URL: https://github.com/apache/doris/pull/26749#discussion_r1405549745
########## be/src/olap/rowset/segment_creator.cpp: ########## @@ -40,32 +49,178 @@ SegmentFlusher::SegmentFlusher() = default; SegmentFlusher::~SegmentFlusher() = default; -Status SegmentFlusher::init(const RowsetWriterContext& rowset_writer_context) { - _context = rowset_writer_context; +Status SegmentFlusher::init(RowsetWriterContext& rowset_writer_context) { + _context = &rowset_writer_context; return Status::OK(); } Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id, - int64_t* flush_size, TabletSchemaSPtr flush_schema) { + int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); } - bool no_compression = block->bytes() <= config::segment_compression_threshold_kb * 1024; + TabletSchemaSPtr flush_schema = nullptr; + // Expand variant columns + vectorized::Block flush_block(*block); + if (_context->write_type != DataWriteType::TYPE_COMPACTION && + _context->tablet_schema->num_variant_columns() > 0) { + RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema)); + } + bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; if (config::enable_vertical_segment_writer && - _context.tablet_schema->cluster_key_idxes().empty()) { + _context->tablet_schema->cluster_key_idxes().empty()) { std::unique_ptr<segment_v2::VerticalSegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); - RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows())); + RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } else { std::unique_ptr<segment_v2::SegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); - RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows())); + RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } return Status::OK(); } +Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, + TabletSchemaSPtr& flush_schema) { + size_t num_rows = block.rows(); + if (num_rows == 0) { + return Status::OK(); + } + + std::vector<int> variant_column_pos; + if (_context->partial_update_info && _context->partial_update_info->is_partial_update) { + // check columns that used to do partial updates should not include variant + for (int i : _context->partial_update_info->update_cids) { + const auto& col = _context->tablet_schema->columns()[i]; + if (!col.is_key() && col.name() != DELETE_SIGN) { + return Status::InvalidArgument( + "Not implement partial update for variant only support delete currently"); + } + } + } else { + for (int i = 0; i < _context->tablet_schema->columns().size(); ++i) { + if (_context->tablet_schema->columns()[i].is_variant_type()) { + variant_column_pos.push_back(i); + } + } + } + + if (variant_column_pos.empty()) { + return Status::OK(); + } + + try { + // Parse each variant column from raw string column + vectorized::schema_util::parse_variant_columns(block, variant_column_pos); + vectorized::schema_util::finalize_variant_columns(block, variant_column_pos, + false /*not ingore sparse*/); + vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_column_pos); + } catch (const doris::Exception& e) { + // TODO more graceful, max_filter_ratio + LOG(WARNING) << "encounter execption " << e.to_string(); + return Status::InternalError(e.to_string()); + } + + // Dynamic Block consists of two parts, dynamic part of columns and static part of columns + // static extracted + // | --------- | ----------- | + // The static ones are original _tablet_schame columns + flush_schema = std::make_shared<TabletSchema>(); + flush_schema->copy_from(*_context->original_tablet_schema); + + vectorized::Block flush_block(std::move(block)); + // If column already exist in original tablet schema, then we pick common type + // and cast column to common type, and modify tablet column to common type, + // otherwise it's a new column, we should add to frontend Review Comment: modified ########## be/src/olap/rowset/segment_creator.cpp: ########## @@ -280,6 +441,17 @@ Status SegmentCreator::add_block(const vectorized::Block* block) { size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); size_t row_offset = 0; + if (_segment_flusher.need_buffering()) { + const static int MAX_BUFFER_SIZE = config::flushing_block_buffer_size_bytes; // 400M Review Comment: done ########## be/src/olap/rowset/segment_creator.cpp: ########## @@ -40,32 +49,178 @@ SegmentFlusher::SegmentFlusher() = default; SegmentFlusher::~SegmentFlusher() = default; -Status SegmentFlusher::init(const RowsetWriterContext& rowset_writer_context) { - _context = rowset_writer_context; +Status SegmentFlusher::init(RowsetWriterContext& rowset_writer_context) { + _context = &rowset_writer_context; return Status::OK(); } Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id, - int64_t* flush_size, TabletSchemaSPtr flush_schema) { + int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); } - bool no_compression = block->bytes() <= config::segment_compression_threshold_kb * 1024; + TabletSchemaSPtr flush_schema = nullptr; + // Expand variant columns + vectorized::Block flush_block(*block); + if (_context->write_type != DataWriteType::TYPE_COMPACTION && + _context->tablet_schema->num_variant_columns() > 0) { + RETURN_IF_ERROR(_expand_variant_to_subcolumns(flush_block, flush_schema)); + } + bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; if (config::enable_vertical_segment_writer && - _context.tablet_schema->cluster_key_idxes().empty()) { + _context->tablet_schema->cluster_key_idxes().empty()) { std::unique_ptr<segment_v2::VerticalSegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); - RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows())); + RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } else { std::unique_ptr<segment_v2::SegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression, flush_schema)); - RETURN_IF_ERROR(_add_rows(writer, block, 0, block->rows())); + RETURN_IF_ERROR(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } return Status::OK(); } +Status SegmentFlusher::_expand_variant_to_subcolumns(vectorized::Block& block, + TabletSchemaSPtr& flush_schema) { + size_t num_rows = block.rows(); + if (num_rows == 0) { + return Status::OK(); + } + + std::vector<int> variant_column_pos; + if (_context->partial_update_info && _context->partial_update_info->is_partial_update) { + // check columns that used to do partial updates should not include variant + for (int i : _context->partial_update_info->update_cids) { + const auto& col = _context->tablet_schema->columns()[i]; + if (!col.is_key() && col.name() != DELETE_SIGN) { + return Status::InvalidArgument( + "Not implement partial update for variant only support delete currently"); + } + } + } else { + for (int i = 0; i < _context->tablet_schema->columns().size(); ++i) { + if (_context->tablet_schema->columns()[i].is_variant_type()) { + variant_column_pos.push_back(i); + } + } + } + + if (variant_column_pos.empty()) { + return Status::OK(); + } + + try { + // Parse each variant column from raw string column + vectorized::schema_util::parse_variant_columns(block, variant_column_pos); + vectorized::schema_util::finalize_variant_columns(block, variant_column_pos, + false /*not ingore sparse*/); + vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_column_pos); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org