This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new f38ecd349c3 [enhancement](memory) return error if allocate memory failed during add rows method (#35085) f38ecd349c3 is described below commit f38ecd349c32e960ca20c29d5a3efde3cc5dabe6 Author: yiguolei <676222...@qq.com> AuthorDate: Tue May 21 10:53:40 2024 +0800 [enhancement](memory) return error if allocate memory failed during add rows method (#35085) * return error when add rows failed * f --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- .../schema_active_queries_scanner.cpp | 2 +- .../schema_scanner/schema_backend_active_tasks.cpp | 2 +- .../exec/schema_scanner/schema_routine_scanner.cpp | 2 +- .../schema_workload_groups_scanner.cpp | 2 +- .../schema_workload_sched_policy_scanner.cpp | 2 +- be/src/olap/memtable.cpp | 28 ++++---- be/src/olap/memtable.h | 8 +-- be/src/olap/memtable_flush_executor.cpp | 3 +- be/src/olap/memtable_writer.cpp | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 4 +- .../exec/partitioned_hash_join_sink_operator.cpp | 13 +++- .../pipeline_x/local_exchange/local_exchanger.cpp | 16 +++-- be/src/vec/core/block.cpp | 75 ++++++++++++---------- be/src/vec/core/block.h | 7 +- be/src/vec/exec/vaggregation_node.cpp | 4 +- be/src/vec/olap/vcollect_iterator.cpp | 2 +- be/src/vec/sink/vdata_stream_sender.cpp | 2 +- be/src/vec/sink/vrow_distribution.cpp | 2 +- 18 files changed, 100 insertions(+), 76 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp index 36cb145e3f5..2115a38a6eb 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -158,7 +158,7 @@ Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); - mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows); + RETURN_IF_ERROR(mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows)); _row_idx += current_batch_rows; *eos = _row_idx == _total_rows; diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp index c5f8825c2e4..aa84f0d68c3 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -84,7 +84,7 @@ Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); - mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows); + RETURN_IF_ERROR(mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows)); _row_idx += current_batch_rows; *eos = _row_idx == _total_rows; diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp index 7db46ada650..3d55addee6c 100644 --- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -162,7 +162,7 @@ Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); - mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows); + RETURN_IF_ERROR(mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows)); _row_idx += current_batch_rows; *eos = _row_idx == _total_rows; diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp index 55cdfe9cf35..def52df531d 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp @@ -135,7 +135,7 @@ Status SchemaWorkloadGroupsScanner::get_next_block(vectorized::Block* block, boo int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); - mblock.add_rows(_workload_groups_block.get(), _row_idx, current_batch_rows); + RETURN_IF_ERROR(mblock.add_rows(_workload_groups_block.get(), _row_idx, current_batch_rows)); _row_idx += current_batch_rows; *eos = _row_idx == _total_rows; diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp index b1a717228c8..035d3bfe217 100644 --- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp @@ -127,7 +127,7 @@ Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* bl int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); - mblock.add_rows(_block.get(), _row_idx, current_batch_rows); + RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, current_batch_rows)); _row_idx += current_batch_rows; *eos = _row_idx == _total_rows; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 2676bf7a32e..abd1c0a8314 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -178,7 +178,8 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r *_pblock, -1); } -void MemTable::insert(const vectorized::Block* input_block, const std::vector<uint32_t>& row_idxs) { +Status MemTable::insert(const vectorized::Block* input_block, + const std::vector<uint32_t>& row_idxs) { vectorized::Block target_block = *input_block; target_block = input_block->copy_block(_column_offset); if (_is_first_insertion) { @@ -209,7 +210,8 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<ui auto num_rows = row_idxs.size(); size_t cursor_in_mutableblock = _input_mutable_block.rows(); auto block_size0 = _input_mutable_block.allocated_bytes(); - _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); + RETURN_IF_ERROR(_input_mutable_block.add_rows(&target_block, row_idxs.data(), + row_idxs.data() + num_rows)); auto block_size1 = _input_mutable_block.allocated_bytes(); g_memtable_input_block_allocated_size << block_size1 - block_size0; auto input_size = size_t(target_block.bytes() * num_rows / target_block.rows() * @@ -221,6 +223,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<ui } _stat.raw_rows += num_rows; + return Status::OK(); } void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, @@ -245,7 +248,7 @@ void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo src_row->_row_pos, _arena.get()); } } -void MemTable::_put_into_output(vectorized::Block& in_block) { +Status MemTable::_put_into_output(vectorized::Block& in_block) { SCOPED_RAW_TIMER(&_stat.put_into_output_ns); std::vector<uint32_t> row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); @@ -253,8 +256,8 @@ void MemTable::_put_into_output(vectorized::Block& in_block) { for (int i = 0; i < _row_in_blocks.size(); i++) { row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); } - _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), - row_pos_vec.data() + in_block.rows()); + return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), + row_pos_vec.data() + in_block.rows()); } size_t MemTable::_sort() { @@ -298,7 +301,7 @@ size_t MemTable::_sort() { return same_keys_num; } -void MemTable::_sort_by_cluster_keys() { +Status MemTable::_sort_by_cluster_keys() { SCOPED_RAW_TIMER(&_stat.sort_ns); _stat.sort_times++; // sort all rows @@ -344,8 +347,8 @@ void MemTable::_sort_by_cluster_keys() { for (int i = 0; i < row_in_blocks.size(); i++) { row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos); } - _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), - row_pos_vec.data() + in_block.rows()); + return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), + row_pos_vec.data() + in_block.rows()); } void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, @@ -502,27 +505,28 @@ bool MemTable::need_agg() const { return false; } -std::unique_ptr<vectorized::Block> MemTable::to_block() { +Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) { size_t same_keys_num = _sort(); if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { _output_mutable_block.swap(_input_mutable_block); } else { vectorized::Block in_block = _input_mutable_block.to_block(); - _put_into_output(in_block); + RETURN_IF_ERROR(_put_into_output(in_block)); } } else { _aggregate<true>(); } if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow && !_tablet_schema->cluster_key_idxes().empty()) { - _sort_by_cluster_keys(); + RETURN_IF_ERROR(_sort_by_cluster_keys()); } g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes(); _input_mutable_block.clear(); _insert_mem_tracker->release(_mem_usage); _mem_usage = 0; - return vectorized::Block::create_unique(_output_mutable_block.to_block()); + *res = vectorized::Block::create_unique(_output_mutable_block.to_block()); + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 8362c69222e..d2dfafd972a 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -181,7 +181,7 @@ public: _flush_mem_tracker->consumption(); } // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); + Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); void shrink_memtable_by_agg(); @@ -189,7 +189,7 @@ public: bool need_agg() const; - std::unique_ptr<vectorized::Block> to_block(); + Status to_block(std::unique_ptr<vectorized::Block>* res); bool empty() const { return _input_mutable_block.rows() == 0; } @@ -244,7 +244,7 @@ private: //return number of same keys size_t _sort(); - void _sort_by_cluster_keys(); + Status _sort_by_cluster_keys(); void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, std::function<int(const RowInBlock*, const RowInBlock*)> cmp); template <bool is_final> @@ -252,7 +252,7 @@ private: int row_pos); template <bool is_final> void _aggregate(); - void _put_into_output(vectorized::Block& in_block); + Status _put_into_output(vectorized::Block& in_block); bool _is_first_insertion; void _init_agg_functions(const vectorized::Block* block); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index ece6930362d..1ebe44aabf2 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -141,7 +141,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in signal::set_signal_task_id(_rowset_writer->load_id()); { SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker()); - std::unique_ptr<vectorized::Block> block = memtable->to_block(); + std::unique_ptr<vectorized::Block> block; + RETURN_IF_ERROR(memtable->to_block(&block)); RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size)); } _memtable_stat += memtable->stat(); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index c7ad1590a89..da6af4ffb80 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -109,7 +109,7 @@ Status MemTableWriter::write(const vectorized::Block* block, } _total_received_rows += row_idxs.size(); - _mem_table->insert(block, row_idxs); + RETURN_IF_ERROR(_mem_table->insert(block, row_idxs)); if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) { _mem_table->shrink_memtable_by_agg(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 21134487c2e..10fa2effcc0 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -602,8 +602,8 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: partitioned_blocks[i] = vectorized::MutableBlock::create_unique(input_block->clone_empty()); } - partitioned_blocks[i]->add_rows(input_block, &(partition_indexes[i][0]), - &(partition_indexes[i][count])); + RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(input_block, &(partition_indexes[i][0]), + &(partition_indexes[i][count]))); if (partitioned_blocks[i]->rows() > 2 * 1024 * 1024 || (eos && partitioned_blocks[i]->rows() > 0)) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 97d5d145604..d253a519b0c 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -203,7 +203,14 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta { SCOPED_TIMER(_partition_shuffle_timer); - partition_block->add_rows(&build_block, begin, end); + Status st = partition_block->add_rows(&build_block, begin, end); + if (!st.ok()) { + std::unique_lock<std::mutex> lock(_spill_lock); + _spill_status = st; + _spill_status_ok = false; + _dependency->set_ready(); + return; + } partitions_indexes[partition_idx].clear(); } @@ -336,8 +343,8 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, partitioned_blocks[i] = vectorized::MutableBlock::create_unique(in_block->clone_empty()); } - partitioned_blocks[i]->add_rows(in_block, &(partition_indexes[i][0]), - &(partition_indexes[i][count])); + RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(in_block, &(partition_indexes[i][0]), + &(partition_indexes[i][count]))); } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 71e427f8b25..ce1f05a22bf 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -44,26 +44,28 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block PartitionedBlock partitioned_block; std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr; - auto get_data = [&](vectorized::Block* result_block) { + auto get_data = [&](vectorized::Block* result_block) -> Status { do { const auto* offset_start = &(( *std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]); auto block_wrapper = partitioned_block.first; local_state._shared_state->sub_mem_usage( local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false); - mutable_block->add_rows(&block_wrapper->data_block, offset_start, - offset_start + std::get<2>(partitioned_block.second)); + RETURN_IF_ERROR( + mutable_block->add_rows(&block_wrapper->data_block, offset_start, + offset_start + std::get<2>(partitioned_block.second))); block_wrapper->unref(local_state._shared_state); } while (mutable_block->rows() < state->batch_size() && _data_queue[local_state._channel_id].try_dequeue(partitioned_block)); *result_block = mutable_block->to_block(); + return Status::OK(); }; if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); mutable_block = vectorized::MutableBlock::create_unique( partitioned_block.first->data_block.clone_empty()); - get_data(block); + RETURN_IF_ERROR(get_data(block)); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); *eos = true; @@ -72,7 +74,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block SCOPED_TIMER(local_state._copy_data_timer); mutable_block = vectorized::MutableBlock::create_unique( partitioned_block.first->data_block.clone_empty()); - get_data(block); + RETURN_IF_ERROR(get_data(block)); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -244,7 +246,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block LocalExchangeSinkLocalState& local_state) { for (size_t i = 0; i < _num_partitions; i++) { auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); - mutable_block->add_rows(in_block, 0, in_block->rows()); + RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows())); _data_queue[i].enqueue(mutable_block->to_block()); local_state._shared_state->set_ready_to_read(i); } @@ -335,7 +337,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, if (size > 0) { std::unique_ptr<vectorized::MutableBlock> mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); - mutable_block->add_rows(block, start, size); + RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes()); data_queue[i].enqueue(std::move(new_block)); diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 83ecf568d6f..466c9b3b559 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -970,44 +970,53 @@ void MutableBlock::add_row(const Block* block, int row) { } } -void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, - const uint32_t* row_end) { - DCHECK_LE(columns(), block->columns()); - const auto& block_data = block->get_columns_with_type_and_name(); - for (size_t i = 0; i < _columns.size(); ++i) { - DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); - auto& dst = _columns[i]; - const auto& src = *block_data[i].column.get(); - DCHECK_GE(src.size(), row_end - row_begin); - dst->insert_indices_from(src, row_begin, row_end); - } +Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, + const uint32_t* row_end) { + RETURN_IF_CATCH_EXCEPTION({ + DCHECK_LE(columns(), block->columns()); + const auto& block_data = block->get_columns_with_type_and_name(); + for (size_t i = 0; i < _columns.size(); ++i) { + DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); + auto& dst = _columns[i]; + const auto& src = *block_data[i].column.get(); + DCHECK_GE(src.size(), row_end - row_begin); + dst->insert_indices_from(src, row_begin, row_end); + } + }); + return Status::OK(); } -void MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { - DCHECK_LE(columns(), block->columns()); - const auto& block_data = block->get_columns_with_type_and_name(); - for (size_t i = 0; i < _columns.size(); ++i) { - DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); - auto& dst = _columns[i]; - const auto& src = *block_data[i].column.get(); - dst->insert_range_from(src, row_begin, length); - } +Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t length) { + RETURN_IF_CATCH_EXCEPTION({ + DCHECK_LE(columns(), block->columns()); + const auto& block_data = block->get_columns_with_type_and_name(); + for (size_t i = 0; i < _columns.size(); ++i) { + DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); + auto& dst = _columns[i]; + const auto& src = *block_data[i].column.get(); + dst->insert_range_from(src, row_begin, length); + } + }); + return Status::OK(); } -void MutableBlock::add_rows(const Block* block, std::vector<int64_t> rows) { - DCHECK_LE(columns(), block->columns()); - const auto& block_data = block->get_columns_with_type_and_name(); - const size_t length = std::ranges::distance(rows); - for (size_t i = 0; i < _columns.size(); ++i) { - DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); - auto& dst = _columns[i]; - const auto& src = *block_data[i].column.get(); - dst->reserve(dst->size() + length); - for (size_t row : rows) { - // we can introduce a new function like `insert_assume_reserved` for IColumn. - dst->insert_from(src, row); +Status MutableBlock::add_rows(const Block* block, std::vector<int64_t> rows) { + RETURN_IF_CATCH_EXCEPTION({ + DCHECK_LE(columns(), block->columns()); + const auto& block_data = block->get_columns_with_type_and_name(); + const size_t length = std::ranges::distance(rows); + for (size_t i = 0; i < _columns.size(); ++i) { + DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); + auto& dst = _columns[i]; + const auto& src = *block_data[i].column.get(); + dst->reserve(dst->size() + length); + for (size_t row : rows) { + // we can introduce a new function like `insert_assume_reserved` for IColumn. + dst->insert_from(src, row); + } } - } + }); + return Status::OK(); } void MutableBlock::erase(const String& name) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 9fea5242d9b..89f8e99b66a 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -603,9 +603,10 @@ public: void swap(MutableBlock&& other) noexcept; void add_row(const Block* block, int row); - void add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end); - void add_rows(const Block* block, size_t row_begin, size_t length); - void add_rows(const Block* block, std::vector<int64_t> rows); + // Batch add row should return error status if allocate memory failed. + Status add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end); + Status add_rows(const Block* block, size_t row_begin, size_t length); + Status add_rows(const Block* block, std::vector<int64_t> rows); /// remove the column with the specified name void erase(const String& name); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 67072e2f60f..8bfa7e90b4a 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -1132,7 +1132,7 @@ Status AggregationNode::_spill_hash_table(HashTableCtxType& agg_method, HashTabl for (size_t j = 0; j < partitioned_indices.size(); ++j) { if (partitioned_indices[j] != i) { if (length > 0) { - mutable_block.add_rows(&block, begin, length); + RETURN_IF_ERROR(mutable_block.add_rows(&block, begin, length)); } length = 0; continue; @@ -1145,7 +1145,7 @@ Status AggregationNode::_spill_hash_table(HashTableCtxType& agg_method, HashTabl } if (length > 0) { - mutable_block.add_rows(&block, begin, length); + RETURN_IF_ERROR(mutable_block.add_rows(&block, begin, length)); } CHECK_EQ(mutable_block.rows(), blocks_rows[i]); diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 0a6bafca49c..3ce1869546c 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -377,7 +377,7 @@ Status VCollectIterator::_topn_next(Block* block) { size_t base = mutable_block.rows(); // append block to mutable_block - mutable_block.add_rows(block, 0, rows_to_copy); + RETURN_IF_ERROR(mutable_block.add_rows(block, 0, rows_to_copy)); // insert appended rows pos in mutable_block to sorted_row_pos and sort it for (size_t i = 0; i < rows_to_copy; i++) { sorted_row_pos.insert(base + i); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 69b7054f500..24f92bf2aae 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -885,7 +885,7 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest if (!rows->empty()) { SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); const auto* begin = rows->data(); - _mutable_block->add_rows(block, begin, begin + rows->size()); + RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin + rows->size())); } } else if (!block->empty()) { SCOPED_TIMER(_parent->merge_block_timer()); diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 3bef7d0117e..f740e8f5767 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -51,7 +51,7 @@ Status VRowDistribution::_save_missing_values( int col_size, Block* block, std::vector<int64_t> filter, const std::vector<const NullMap*>& col_null_maps) { // de-duplication for new partitions but save all rows. - _batching_block->add_rows(block, filter); + RETURN_IF_ERROR(_batching_block->add_rows(block, filter)); std::vector<TNullableStringLiteral> cur_row_values; for (int row = 0; row < col_strs[0].size(); ++row) { cur_row_values.clear(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org