github-actions[bot] commented on code in PR #41839: URL: https://github.com/apache/doris/pull/41839#discussion_r1800406205
########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -616,142 +587,345 @@ _olap_data_convertor->clear_source_content(); return Status::OK(); } +Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block, Review Comment: warning: function 'append_block_with_flexible_partial_content' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block, ^ ``` <details> <summary>Additional context</summary> **be/src/olap/rowset/segment_v2/segment_writer.cpp:589:** 181 lines including whitespace and comments (threshold 80) ```cpp Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block, ^ ``` </details> ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -616,142 +587,345 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* _olap_data_convertor->clear_source_content(); return Status::OK(); } +Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block, Review Comment: warning: function 'append_block_with_flexible_partial_content' has cognitive complexity of 72 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block, ^ ``` <details> <summary>Additional context</summary> **be/src/olap/rowset/segment_v2/segment_writer.cpp:606:** nesting level increased to 1 ```cpp auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* block) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:619:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep", ^ ``` **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF' ```cpp if (UNLIKELY(config::enable_debug_points)) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:619:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep", ^ ``` **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF' ```cpp if (dp) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:628:** nesting level increased to 1 ```cpp [&full_block, &block, &row_pos, &num_rows, ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:631:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp for (std::size_t cid {0}; cid < _num_key_columns; cid++) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:633:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:633:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:636:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (!status.ok()) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:644:** nesting level increased to 1 ```cpp auto encode_seq_column = [&block, &row_pos, &num_rows, ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:647:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_tablet_schema->has_sequence_col()) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:649:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:649:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:652:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (!status.ok()) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:663:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR(encode_key_columns(key_columns)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:663:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(encode_key_columns(key_columns)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:670:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR(encode_seq_column(seq_column)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:670:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(encode_seq_column(seq_column)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:675:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_tablet_schema->has_sequence_col()) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:677:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, num_rows, skip_bitmaps, ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:677:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, num_rows, skip_bitmaps, ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:680:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (origin_rows != num_rows) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:683:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(encode_key_columns(key_columns)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:683:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(encode_key_columns(key_columns)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:684:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(encode_seq_column(seq_column)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:684:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(encode_seq_column(seq_column)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:694:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp for (std::size_t cid {0}; cid < _num_key_columns; cid++) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:697:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:697:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:705:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR(generate_flexible_read_plan( ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:705:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(generate_flexible_read_plan( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:712:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (config::enable_merge_on_write_correctness_check) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:719:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns( ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:719:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:753:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_num_rows_written != row_pos || ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:762:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:762:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` </details> ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -616,142 +587,345 @@ _olap_data_convertor->clear_source_content(); return Status::OK(); } +Status SegmentWriter::append_block_with_flexible_partial_content(const vectorized::Block* block, + size_t row_pos, size_t num_rows) { + DCHECK(_is_mow()); + DCHECK(_opts.rowset_ctx->partial_update_info != nullptr); + DCHECK(_opts.rowset_ctx->partial_update_info->is_flexible_partial_update()); + DCHECK_EQ(row_pos, 0); -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos, - const vectorized::Block* block) { - if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) { - // TODO(plat1ko): cloud mode - return Status::NotSupported("fill_missing_columns"); - } - auto tablet = static_cast<Tablet*>(_tablet.get()); - // create old value columns - const auto& cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids; - auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing); - CHECK_EQ(cids_missing.size(), old_value_block.columns()); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto [rid, pos] : seg_it.second) { - rids.emplace_back(rid); - read_index[pos] = read_idx++; - } - if (has_row_column) { - auto st = tablet->fetch_value_through_row_column( - rowset, *_tablet_schema, seg_it.first, rids, cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; + // block has the same schema with full_block + DCHECK(block->columns() == _tablet_schema->num_columns()); + + // create full block and fill with sort key columns + auto full_block = _tablet_schema->create_block(); + + auto segment_start_pos = _column_writers.front()->get_next_rowid(); + + DCHECK(_tablet_schema->has_skip_bitmap_col()); + auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); + auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* block) { + return &(assert_cast<vectorized::ColumnBitmap*>( + block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()) + ->get_data()); + }; + std::vector<BitmapValue>* skip_bitmaps = get_skip_bitmaps(block); + + bool has_default_or_nullable = false; + std::vector<bool> use_default_or_null_flag; + use_default_or_null_flag.reserve(num_rows); + + int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid(); + + DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep", + { sleep(60); }) + const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs; + std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); + + std::vector<vectorized::IOlapColumnDataAccessor*> key_columns {}; + vectorized::IOlapColumnDataAccessor* seq_column {nullptr}; + + auto encode_key_columns = + [&full_block, &block, &row_pos, &num_rows, + this](std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns) -> Status { + key_columns.clear(); + for (std::size_t cid {0}; cid < _num_key_columns; cid++) { + full_block.replace_by_position(cid, block->get_by_position(cid).column); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + full_block.get_by_position(cid), row_pos, num_rows, cid)); + auto [status, column] = _olap_data_convertor->convert_column_data(cid); + if (!status.ok()) { + return status; } - auto mutable_old_columns = old_value_block.mutate_columns(); - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } + key_columns.push_back(column); + } + return Status::OK(); + }; + + auto encode_seq_column = [&block, &row_pos, &num_rows, + this](vectorized::IOlapColumnDataAccessor*& seq_column) -> Status { + seq_column = nullptr; + if (_tablet_schema->has_sequence_col()) { + auto seq_col_idx = _tablet_schema->sequence_col_idx(); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx)); + auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx); + if (!status.ok()) { + return status; } - old_value_block.set_columns(std::move(mutable_old_columns)); + seq_column = column; + } + return Status::OK(); + }; + + // 1. encode key columns + // we can only encode sort key columns currently becasue all non-key columns in flexible partial update + // can have missing cells + RETURN_IF_ERROR(encode_key_columns(key_columns)); + + // 2. encode sequence column + // We encode the seguence column even thought it may have invalid values in some rows because we need to + // encode the value of sequence column in key for rows that have a valid value in sequence column during + // lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have + // a valid sequence column to encode the key with seq col. + RETURN_IF_ERROR(encode_seq_column(seq_column)); + + // 3. merge duplicate rows when table has sequence column + // When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column, + // some of them don't. We can't do the de-duplication in memtable. We must de-duplicate them here. + if (_tablet_schema->has_sequence_col()) { + std::size_t origin_rows = num_rows; + RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, num_rows, skip_bitmaps, + key_columns, seq_column, specified_rowsets, + segment_caches)); + if (origin_rows != num_rows) { + // data in block has changed, should re-encode key columns, sequence column and re-get skip_bitmaps + _olap_data_convertor->clear_source_content(); + RETURN_IF_ERROR(encode_key_columns(key_columns)); + RETURN_IF_ERROR(encode_seq_column(seq_column)); + skip_bitmaps = get_skip_bitmaps(block); + } + } + + const auto* delete_sign_column_data = + Tablet::get_delete_sign_column_data(*block, row_pos + num_rows); + DCHECK(delete_sign_column_data != nullptr); + + // 4. write key columns data + for (std::size_t cid {0}; cid < _num_key_columns; cid++) { + const auto& column = key_columns[cid]; + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written); + RETURN_IF_ERROR( + _column_writers[cid]->append(column->get_nullmap(), column->get_data(), num_rows)); + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + num_rows); + } + + // 5. genreate read plan + FlexibleReadPlan read_plan {_tablet_schema->store_row_column()}; + PartialUpdateStats stats; + RETURN_IF_ERROR(generate_flexible_read_plan( + read_plan, row_pos, num_rows, segment_start_pos, _tablet_schema->has_sequence_col(), + seq_map_col_unique_id, skip_bitmaps, key_columns, seq_column, delete_sign_column_data, + specified_rowsets, segment_caches, has_default_or_nullable, use_default_or_null_flag, + stats)); + CHECK_EQ(use_default_or_null_flag.size(), num_rows); + + if (config::enable_merge_on_write_correctness_check) { + static_cast<Tablet*>(_tablet.get()) + ->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(), + _mow_context->rowset_ids); + } + + // 6. read according plan to fill full_block + RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns( + _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block, + use_default_or_null_flag, has_default_or_nullable, segment_start_pos, row_pos, block, + skip_bitmaps)); + + // TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation? + // this column is not needed in read path for merge-on-write table + + // 7. fill row store column + _serialize_block_to_row_column(full_block); + + // 8. encode and write all non-primary key columns(including sequence column if exists) + for (auto cid = _num_key_columns; cid < _tablet_schema->num_columns(); cid++) { + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + full_block.get_by_position(cid), row_pos, num_rows, cid)); + auto [status, column] = _olap_data_convertor->convert_column_data(cid); + if (!status.ok()) { + return status; } + if (cid == _tablet_schema->sequence_col_idx()) { + // should use the latest encoded sequence column to build the primary index + seq_column = column; + } + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written); + RETURN_IF_ERROR( + _column_writers[cid]->append(column->get_nullmap(), column->get_data(), num_rows)); + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + num_rows); + } + + _num_rows_updated += stats.num_rows_updated; + _num_rows_deleted += stats.num_rows_deleted; + _num_rows_new_added += stats.num_rows_new_added; + _num_rows_filtered += stats.num_rows_filtered; + + if (_num_rows_written != row_pos || + _primary_key_index_builder->num_rows() != _num_rows_written) { + return Status::InternalError( + "Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key " + "index builder num rows: {}", + _num_rows_written, row_pos, _primary_key_index_builder->num_rows()); } - // build default value columns - auto default_value_block = old_value_block.clone_empty(); - auto mutable_default_value_columns = default_value_block.mutate_columns(); - - const vectorized::Int8* delete_sign_column_data = nullptr; - if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - old_value_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr) { - auto& delete_sign_col = - reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column)); - delete_sign_column_data = delete_sign_col.get_data().data(); - } - - if (has_default_or_nullable || delete_sign_column_data != nullptr) { - for (auto i = 0; i < cids_missing.size(); ++i) { - const auto& column = _tablet_schema->column(cids_missing[i]); - if (column.has_default_value()) { - const auto& default_value = - _opts.rowset_ctx->partial_update_info->default_values[i]; - vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()), - default_value.size()); - RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string( - rb, mutable_default_value_columns[i].get())); + + // 9. build primary key index + RETURN_IF_ERROR( + _generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false)); + + _num_rows_written += num_rows; + DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written) + << "primary key index builder num rows(" << _primary_key_index_builder->num_rows() + << ") not equal to segment writer's num rows written(" << _num_rows_written << ")"; + _olap_data_convertor->clear_source_content(); + return Status::OK(); +} + +Status SegmentWriter::generate_flexible_read_plan( + FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t segment_start_pos, + bool schema_has_sequence_col, int32_t seq_map_col_unique_id, + std::vector<BitmapValue>* skip_bitmaps, + const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, const signed char* delete_sign_column_data, + const std::vector<RowsetSharedPtr>& specified_rowsets, + std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches, + bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag, + PartialUpdateStats& stats) { + int32_t delete_sign_col_unique_id = + _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id(); + int32_t seq_col_unique_id = + (_tablet_schema->has_sequence_col() + ? _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id() + : -1); + for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { + size_t delta_pos = block_pos - row_pos; + size_t segment_pos = segment_start_pos + delta_pos; + auto& skip_bitmap = skip_bitmaps->at(block_pos); + + // the hidden sequence column should have the same mark with sequence map column + if (seq_map_col_unique_id != -1) { + DCHECK(schema_has_sequence_col); + if (skip_bitmap.contains(seq_map_col_unique_id)) { + skip_bitmap.add(seq_col_unique_id); } } + + std::string key = _full_encode_keys(key_columns, delta_pos); + _maybe_invalid_row_cache(key); + bool row_has_sequence_col = + (schema_has_sequence_col && !skip_bitmap.contains(seq_col_unique_id)); + if (row_has_sequence_col) { + _encode_seq_column(seq_column, delta_pos, &key); + } + + // mark key with delete sign as deleted. + bool have_delete_sign = (!skip_bitmap.contains(delete_sign_col_unique_id) && + delete_sign_column_data[block_pos] != 0); + + auto not_found_cb = [&]() { + return _opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error( + *_tablet_schema, &skip_bitmap); + }; + auto update_read_plan = [&](const RowLocation& loc) { + read_plan.prepare_to_read(loc, segment_pos, skip_bitmap); + }; + + RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, row_has_sequence_col, + have_delete_sign, specified_rowsets, segment_caches, + has_default_or_nullable, use_default_or_null_flag, + update_read_plan, not_found_cb, stats)); } + return Status::OK(); +} - // fill all missing value from mutable_old_columns, need to consider default value and null value - for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { - // `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row - // for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column - // marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will - // be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not - // read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column - // to check if a row REALLY exists in the table. - if (use_default_or_null_flag[idx] || - (delete_sign_column_data != nullptr && - delete_sign_column_data[read_index[idx + segment_start_pos]] != 0)) { - for (auto i = 0; i < cids_missing.size(); ++i) { - // if the column has default value, fill it with default value - // otherwise, if the column is nullable, fill it with null value - const auto& tablet_column = _tablet_schema->column(cids_missing[i]); - if (tablet_column.has_default_value()) { - mutable_full_columns[cids_missing[i]]->insert_from( - *mutable_default_value_columns[i].get(), 0); - } else if (tablet_column.is_nullable()) { - auto nullable_column = assert_cast<vectorized::ColumnNullable*>( - mutable_full_columns[cids_missing[i]].get()); - nullable_column->insert_null_elements(1); - } else if (_tablet_schema->auto_increment_column() == tablet_column.name()) { - DCHECK(_opts.rowset_ctx->tablet_schema->column(tablet_column.name()).type() == - FieldType::OLAP_FIELD_TYPE_BIGINT); - auto auto_inc_column = assert_cast<vectorized::ColumnInt64*>( - mutable_full_columns[cids_missing[i]].get()); - auto_inc_column->insert( - (assert_cast<const vectorized::ColumnInt64*>( - block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL) - .column.get())) - ->get_element(idx)); - } else { - // If the control flow reaches this branch, the column neither has default value - // nor is nullable. It means that the row's delete sign is marked, and the value - // columns are useless and won't be read. So we can just put arbitary values in the cells - mutable_full_columns[cids_missing[i]]->insert_default(); - } +Status SegmentWriter::merge_rows_for_sequence_column( + const vectorized::Block* block, size_t row_pos, size_t& num_rows, + std::vector<BitmapValue>* skip_bitmaps, + const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, + const std::vector<RowsetSharedPtr>& specified_rowsets, + std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) { + auto* tablet = static_cast<Tablet*>(_tablet.get()); + auto seq_col_unique_id = _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id(); + std::string previous_key {}; + bool previous_has_seq_col {false}; + int duplicate_keys {0}; + + auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1); + auto* __restrict filter_map = filter_column->get_data().data(); + + for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) { + size_t delta_pos = block_pos - row_pos; + auto& skip_bitmap = skip_bitmaps->at(block_pos); + std::string key = _full_encode_keys(key_columns, delta_pos); + bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); + Status st; + if (delta_pos > 0 && previous_key == key) { + DCHECK(previous_has_seq_col == !row_has_sequence_col); + ++duplicate_keys; + RowLocation loc; + RowsetSharedPtr rowset; + size_t rid_missing_seq {}; + size_t rid_with_seq {}; + if (row_has_sequence_col) { + rid_missing_seq = block_pos - 1; + rid_with_seq = block_pos; + } else { + rid_missing_seq = block_pos; + rid_with_seq = block_pos - 1; + } + std::string previous_encoded_seq_value {}; + + st = tablet->lookup_row_key(key, _tablet_schema.get(), false, specified_rowsets, &loc, + _mow_context->max_version, segment_caches, &rowset, true, + &previous_encoded_seq_value); + + Slice previous_seq_slice {}; + if (st.is<KEY_NOT_FOUND>()) { + // TODO: handle default value + } else { + _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); + } + std::string cur_encoded_seq_value {}; + _encode_seq_column(seq_column, rid_with_seq, &cur_encoded_seq_value); + int res = Slice {previous_encoded_seq_value}.compare(Slice {cur_encoded_seq_value}); + VLOG_DEBUG << fmt::format( + "SegmentWriter::merge_rows_for_sequence_column: rid_with_seq={}, " + "rid_missing_seq={}, res={}", + rid_with_seq, rid_missing_seq, res); + if (res > 0) { + filter_map[rid_with_seq] = 0; + } else if (res < 0) { + filter_map[rid_missing_seq] = 0; + } else { + filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0; } - continue; } - auto pos_in_old_block = read_index[idx + segment_start_pos]; - for (auto i = 0; i < cids_missing.size(); ++i) { - mutable_full_columns[cids_missing[i]]->insert_from( - *old_value_block.get_columns_with_type_and_name()[i].column.get(), - pos_in_old_block); + previous_key = std::move(key); + previous_has_seq_col = row_has_sequence_col; + } + if (duplicate_keys > 0) { + auto num_cols = block->columns(); + auto* mutable_block = const_cast<vectorized::Block*>(block); + mutable_block->insert({std::move(filter_column), + std::make_shared<vectorized::DataTypeUInt8>(), + "__dup_key_filter_col__"}); + RETURN_IF_ERROR(vectorized::Block::filter_block(mutable_block, num_cols, num_cols)); + int merged_rows = num_rows - mutable_block->rows(); + if (duplicate_keys != merged_rows) { + auto msg = fmt::format( + "duplicate_keys != merged_rows, duplicate_keys={}, merged_rows={}, " + "num_rows={}, mutable_block->rows()={}", + duplicate_keys, merged_rows, num_rows, block->rows()); + DCHECK(false) << msg; + return Status::InternalError<false>(msg); } + num_rows = mutable_block->rows(); } return Status::OK(); } Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos, Review Comment: warning: function 'append_block' has cognitive complexity of 62 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos, ^ ``` <details> <summary>Additional context</summary> **be/src/olap/rowset/segment_v2/segment_writer.cpp:919:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_opts.rowset_ctx->partial_update_info && ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:921:** +1 ```cpp _opts.write_type == DataWriteType::TYPE_DIRECT && ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:923:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:924:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:924:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:925:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:926:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(append_block_with_flexible_partial_content(block, row_pos, num_rows)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:926:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(append_block_with_flexible_partial_content(block, row_pos, num_rows)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:935:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_tablet_schema->store_row_column() && ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:945:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_has_key) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:950:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:950:** +1 ```cpp if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:953:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + num_rows) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:978:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_has_key) { ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:986:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:986:** +1 ```cpp if (need_primary_key_indexes && !need_short_key_indexes) { // mow table without cluster keys ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:987:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:987:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:989:** +1, nesting level increased to 2 ```cpp } else if (!need_primary_key_indexes && need_short_key_indexes) { // other tables ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:989:** +1 ```cpp } else if (!need_primary_key_indexes && need_short_key_indexes) { // other tables ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:990:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:990:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:991:** +1, nesting level increased to 2 ```cpp } else if (need_primary_key_indexes && need_short_key_indexes) { // mow with cluster keys ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:991:** +1 ```cpp } else if (need_primary_key_indexes && need_short_key_indexes) { // mow with cluster keys ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:993:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns, ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:993:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns, ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:1010:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); ^ ``` **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/rowset/segment_v2/segment_writer.cpp:1010:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos)); ^ ``` **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org