github-actions[bot] commented on code in PR #41701: URL: https://github.com/apache/doris/pull/41701#discussion_r1806067141
########## be/src/olap/partial_update_info.cpp: ########## @@ -658,4 +666,336 @@ return Status::OK(); } +BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer) + : _writer(vertical_segment_writer), _tablet_schema(*_writer._tablet_schema) {} + +Status BlockAggregator::aggregate_for_sequence_column( + vectorized::Block* block, size_t num_rows, + const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, + vectorized::IOlapColumnDataAccessor* seq_column, + const std::vector<RowsetSharedPtr>& specified_rowsets, + std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) { + DCHECK_EQ(block->columns(), _tablet_schema.num_columns()); + VLOG_DEBUG << fmt::format("BlockAggregator::aggregate_for_sequence_column enter: block:{}\n", + block->dump_data()); + // the process logic here is the same as MemTable::_aggregate_for_flexible_partial_update_without_seq_col() + // after this function, there will be at most 2 rows for a specified key + + std::vector<BitmapValue>* skip_bitmaps = + &(assert_cast<vectorized::ColumnBitmap*>( + block->get_by_position(_tablet_schema.skip_bitmap_col_idx()) + .column->assume_mutable() + .get()) + ->get_data()); + const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows); + + auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id(); + auto delete_sign_col_unique_id = + _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id(); + + auto filtered_block = _tablet_schema.create_block(); + vectorized::MutableBlock output_block = + vectorized::MutableBlock::build_mutable_block(&filtered_block); + int cur_rows {0}; + bool have_rows_with_delete_sign {false}; + + auto append_row = [&](int64_t rid, BitmapValue& skip_bitmap, bool have_delete_sign) { + if (have_delete_sign) { + // remove all the preious batched rows + if (cur_rows > 0) { + for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) { + DCHECK_GE(output_block.mutable_columns()[cid]->size(), cur_rows); + output_block.mutable_columns()[cid]->pop_back(cur_rows); + } + } + output_block.add_row(block, rid); + cur_rows = 1; + have_rows_with_delete_sign = true; + VLOG_DEBUG << fmt::format( + "append a row with delete sign, after append, cur_rows={}, " + "output_block.rows()={}", + cur_rows, output_block.rows()); + } else { + if ((cur_rows == 1 && !have_rows_with_delete_sign) || cur_rows == 2) { + for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) { + if (cid == _tablet_schema.skip_bitmap_col_idx()) { + auto& cur_skip_bitmap = assert_cast<vectorized::ColumnBitmap*>( + output_block.mutable_columns()[cid].get()) + ->get_data() + .back(); + const auto& new_row_skip_bitmap = + assert_cast<vectorized::ColumnBitmap*>( + block->get_by_position(cid).column->assume_mutable().get()) + ->get_data()[rid]; + cur_skip_bitmap &= new_row_skip_bitmap; + continue; + } + if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) { + output_block.mutable_columns()[cid]->pop_back(1); + output_block.mutable_columns()[cid]->insert_from( + *block->get_by_position(cid).column, rid); + } + } + VLOG_DEBUG << fmt::format( + "merge a row, after merge, cur_rows={}, output_block.rows()={}", cur_rows, + output_block.rows()); + } else { + output_block.add_row(block, rid); + cur_rows++; + VLOG_DEBUG << fmt::format( + "append a new row, after append, cur_rows={}, output_block.rows()={}", + cur_rows, output_block.rows()); + } + } + }; + + // aggregate rows with same keys in block range [start, end) + auto aggregate_rows = [&](std::string key, int64_t start, int64_t end) -> Status { + VLOG_DEBUG << fmt::format("merge rows in range=({}-{})", start, end); + if (end - start == 1) { + output_block.add_row(block, start); + VLOG_DEBUG << fmt::format("append a row directly, rid={}", start); + return Status::OK(); + } + cur_rows = 0; + have_rows_with_delete_sign = false; + + RowLocation loc; + RowsetSharedPtr rowset; + std::string previous_encoded_seq_value {}; + Status st = _writer._tablet->lookup_row_key(key, &_tablet_schema, false, specified_rowsets, + &loc, _writer._mow_context->max_version, + segment_caches, &rowset, true, + &previous_encoded_seq_value); + int64_t pos = start; + DCHECK(st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok()); + + std::string cur_seq_val; + if (st.ok()) { + for (int64_t i {start}; i < end; i++) { + auto& skip_bitmap = skip_bitmaps->at(i); + bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); + // Find the first row that has larger sequence value than the existing row's + // or the first row that doesn't specify the sequence column. + // Discard all the rows before and begin to do aggregation from that row + if (!row_has_sequence_col) { + cur_seq_val = std::move(previous_encoded_seq_value); + pos = i; + break; + } else { + std::string seq_val {}; + _writer._encode_seq_column(seq_column, i, &seq_val); + if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) >= 0) { + cur_seq_val = std::move(seq_val); + pos = i; + break; + } + } + } + } else { + pos = start; + auto& skip_bitmap = skip_bitmaps->at(pos); + bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); + if (row_has_sequence_col) { + std::string seq_val {}; + // for rows that don't specify seqeunce col, seq_val will be encoded to minial value + _writer._encode_seq_column(seq_column, pos, &seq_val); + cur_seq_val = std::move(seq_val); + } else { + cur_seq_val.clear(); + RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value( + _tablet_schema, *_writer._opts.rowset_ctx->partial_update_info, + &cur_seq_val)); + } + } + cur_rows = 0; + VLOG_DEBUG << fmt::format("start pos={}, output_block.rows()={}", pos, output_block.rows()); + + for (int64_t rid {pos}; rid < end; rid++) { + auto& skip_bitmap = skip_bitmaps->at(rid); + bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); + bool have_delete_sign = + (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0); + if (!row_has_sequence_col) { + append_row(rid, skip_bitmap, have_delete_sign); + } else { + std::string seq_val {}; + _writer._encode_seq_column(seq_column, rid, &seq_val); + if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) { + append_row(rid, skip_bitmap, have_delete_sign); + cur_seq_val = std::move(seq_val); + } else { + VLOG_DEBUG << fmt::format( + "skip rid={} becasue its seq value is lower than the previous", rid); + } + } + } + return Status::OK(); + }; + + int same_key_rows {0}; + std::string previous_key {}; + for (size_t block_pos {0}; block_pos < num_rows; block_pos++) { + std::string key = _writer._full_encode_keys(key_columns, block_pos); + if (block_pos > 0 && previous_key == key) { + same_key_rows++; + } else { + if (same_key_rows > 0) { + RETURN_IF_ERROR(aggregate_rows(std::move(previous_key), block_pos - same_key_rows, + block_pos)); + } + same_key_rows = 1; + } + previous_key = std::move(key); + } + if (same_key_rows > 0) { + RETURN_IF_ERROR( + aggregate_rows(std::move(previous_key), num_rows - same_key_rows, num_rows)); + } + + block->swap(output_block.to_block()); + VLOG_DEBUG << fmt::format( + "BlockAggregator::aggregate_for_sequence_column after: data.block:{}\n", + block->dump_data()); + return Status::OK(); +} + +Status BlockAggregator::aggregate_for_insert_after_delete( Review Comment: warning: function 'aggregate_for_insert_after_delete' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status BlockAggregator::aggregate_for_insert_after_delete( ^ ``` <details> <summary>Additional context</summary> **be/src/olap/partial_update_info.cpp:862:** 109 lines including whitespace and comments (threshold 80) ```cpp Status BlockAggregator::aggregate_for_insert_after_delete( ^ ``` </details> ########## be/src/olap/partial_update_info.cpp: ########## @@ -658,4 +666,336 @@ Status FlexibleReadPlan::fill_non_primary_key_columns_for_row_store( return Status::OK(); } +BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer) + : _writer(vertical_segment_writer), _tablet_schema(*_writer._tablet_schema) {} + +Status BlockAggregator::aggregate_for_sequence_column( Review Comment: warning: function 'aggregate_for_sequence_column' has cognitive complexity of 97 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status BlockAggregator::aggregate_for_sequence_column( ^ ``` <details> <summary>Additional context</summary> **be/src/olap/partial_update_info.cpp:701:** nesting level increased to 1 ```cpp auto append_row = [&](int64_t rid, BitmapValue& skip_bitmap, bool have_delete_sign) { ^ ``` **be/src/olap/partial_update_info.cpp:702:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (have_delete_sign) { ^ ``` **be/src/olap/partial_update_info.cpp:704:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (cur_rows > 0) { ^ ``` **be/src/olap/partial_update_info.cpp:705:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) { ^ ``` **be/src/olap/partial_update_info.cpp:717:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:718:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if ((cur_rows == 1 && !have_rows_with_delete_sign) || cur_rows == 2) { ^ ``` **be/src/olap/partial_update_info.cpp:718:** +1 ```cpp if ((cur_rows == 1 && !have_rows_with_delete_sign) || cur_rows == 2) { ^ ``` **be/src/olap/partial_update_info.cpp:718:** +1 ```cpp if ((cur_rows == 1 && !have_rows_with_delete_sign) || cur_rows == 2) { ^ ``` **be/src/olap/partial_update_info.cpp:719:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) { ^ ``` **be/src/olap/partial_update_info.cpp:720:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp if (cid == _tablet_schema.skip_bitmap_col_idx()) { ^ ``` **be/src/olap/partial_update_info.cpp:732:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) { ^ ``` **be/src/olap/partial_update_info.cpp:741:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:752:** nesting level increased to 1 ```cpp auto aggregate_rows = [&](std::string key, int64_t start, int64_t end) -> Status { ^ ``` **be/src/olap/partial_update_info.cpp:754:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (end - start == 1) { ^ ``` **be/src/olap/partial_update_info.cpp:770:** +1 ```cpp DCHECK(st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok()); ^ ``` **be/src/olap/partial_update_info.cpp:773:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (st.ok()) { ^ ``` **be/src/olap/partial_update_info.cpp:774:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp for (int64_t i {start}; i < end; i++) { ^ ``` **be/src/olap/partial_update_info.cpp:780:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!row_has_sequence_col) { ^ ``` **be/src/olap/partial_update_info.cpp:784:** +1, nesting level increased to 4 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:787:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) >= 0) { ^ ``` **be/src/olap/partial_update_info.cpp:794:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:798:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (row_has_sequence_col) { ^ ``` **be/src/olap/partial_update_info.cpp:803:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:805:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/partial_update_info.cpp:805:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/partial_update_info.cpp:813:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp for (int64_t rid {pos}; rid < end; rid++) { ^ ``` **be/src/olap/partial_update_info.cpp:817:** +1 ```cpp (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0); ^ ``` **be/src/olap/partial_update_info.cpp:818:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (!row_has_sequence_col) { ^ ``` **be/src/olap/partial_update_info.cpp:820:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:823:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) { ^ ``` **be/src/olap/partial_update_info.cpp:826:** +1, nesting level increased to 4 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:837:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp for (size_t block_pos {0}; block_pos < num_rows; block_pos++) { ^ ``` **be/src/olap/partial_update_info.cpp:839:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (block_pos > 0 && previous_key == key) { ^ ``` **be/src/olap/partial_update_info.cpp:839:** +1 ```cpp if (block_pos > 0 && previous_key == key) { ^ ``` **be/src/olap/partial_update_info.cpp:841:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/olap/partial_update_info.cpp:842:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (same_key_rows > 0) { ^ ``` **be/src/olap/partial_update_info.cpp:843:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(aggregate_rows(std::move(previous_key), block_pos - same_key_rows, ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/partial_update_info.cpp:843:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(aggregate_rows(std::move(previous_key), block_pos - same_key_rows, ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/olap/partial_update_info.cpp:850:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (same_key_rows > 0) { ^ ``` **be/src/olap/partial_update_info.cpp:851:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/olap/partial_update_info.cpp:851:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` </details> ########## be/src/olap/partial_update_info.cpp: ########## @@ -658,4 +666,336 @@ return Status::OK(); } +BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer) + : _writer(vertical_segment_writer), _tablet_schema(*_writer._tablet_schema) {} + +Status BlockAggregator::aggregate_for_sequence_column( Review Comment: warning: function 'aggregate_for_sequence_column' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status BlockAggregator::aggregate_for_sequence_column( ^ ``` <details> <summary>Additional context</summary> **be/src/olap/partial_update_info.cpp:671:** 184 lines including whitespace and comments (threshold 80) ```cpp Status BlockAggregator::aggregate_for_sequence_column( ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org