This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f38ecd349c3 [enhancement](memory) return error if allocate memory 
failed during add rows method (#35085)
f38ecd349c3 is described below

commit f38ecd349c32e960ca20c29d5a3efde3cc5dabe6
Author: yiguolei <676222...@qq.com>
AuthorDate: Tue May 21 10:53:40 2024 +0800

    [enhancement](memory) return error if allocate memory failed during add 
rows method (#35085)
    
    * return error when add rows failed
    
    * f
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 .../schema_active_queries_scanner.cpp              |  2 +-
 .../schema_scanner/schema_backend_active_tasks.cpp |  2 +-
 .../exec/schema_scanner/schema_routine_scanner.cpp |  2 +-
 .../schema_workload_groups_scanner.cpp             |  2 +-
 .../schema_workload_sched_policy_scanner.cpp       |  2 +-
 be/src/olap/memtable.cpp                           | 28 ++++----
 be/src/olap/memtable.h                             |  8 +--
 be/src/olap/memtable_flush_executor.cpp            |  3 +-
 be/src/olap/memtable_writer.cpp                    |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  4 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 13 +++-
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 16 +++--
 be/src/vec/core/block.cpp                          | 75 ++++++++++++----------
 be/src/vec/core/block.h                            |  7 +-
 be/src/vec/exec/vaggregation_node.cpp              |  4 +-
 be/src/vec/olap/vcollect_iterator.cpp              |  2 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  2 +-
 be/src/vec/sink/vrow_distribution.cpp              |  2 +-
 18 files changed, 100 insertions(+), 76 deletions(-)

diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp 
b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
index 36cb145e3f5..2115a38a6eb 100644
--- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
@@ -158,7 +158,7 @@ Status 
SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool
 
     int current_batch_rows = std::min(_block_rows_limit, _total_rows - 
_row_idx);
     vectorized::MutableBlock mblock = 
vectorized::MutableBlock::build_mutable_block(block);
-    mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows);
+    RETURN_IF_ERROR(mblock.add_rows(_active_query_block.get(), _row_idx, 
current_batch_rows));
     _row_idx += current_batch_rows;
 
     *eos = _row_idx == _total_rows;
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp 
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
index c5f8825c2e4..aa84f0d68c3 100644
--- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -84,7 +84,7 @@ Status 
SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block,
 
     int current_batch_rows = std::min(_block_rows_limit, _total_rows - 
_row_idx);
     vectorized::MutableBlock mblock = 
vectorized::MutableBlock::build_mutable_block(block);
-    mblock.add_rows(_task_stats_block.get(), _row_idx, current_batch_rows);
+    RETURN_IF_ERROR(mblock.add_rows(_task_stats_block.get(), _row_idx, 
current_batch_rows));
     _row_idx += current_batch_rows;
 
     *eos = _row_idx == _total_rows;
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp 
b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
index 7db46ada650..3d55addee6c 100644
--- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
@@ -162,7 +162,7 @@ Status 
SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos
 
     int current_batch_rows = std::min(_block_rows_limit, _total_rows - 
_row_idx);
     vectorized::MutableBlock mblock = 
vectorized::MutableBlock::build_mutable_block(block);
-    mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows);
+    RETURN_IF_ERROR(mblock.add_rows(_routines_block.get(), _row_idx, 
current_batch_rows));
     _row_idx += current_batch_rows;
 
     *eos = _row_idx == _total_rows;
diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
index 55cdfe9cf35..def52df531d 100644
--- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
@@ -135,7 +135,7 @@ Status 
SchemaWorkloadGroupsScanner::get_next_block(vectorized::Block* block, boo
 
     int current_batch_rows = std::min(_block_rows_limit, _total_rows - 
_row_idx);
     vectorized::MutableBlock mblock = 
vectorized::MutableBlock::build_mutable_block(block);
-    mblock.add_rows(_workload_groups_block.get(), _row_idx, 
current_batch_rows);
+    RETURN_IF_ERROR(mblock.add_rows(_workload_groups_block.get(), _row_idx, 
current_batch_rows));
     _row_idx += current_batch_rows;
 
     *eos = _row_idx == _total_rows;
diff --git 
a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
index b1a717228c8..035d3bfe217 100644
--- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
@@ -127,7 +127,7 @@ Status 
SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* bl
 
     int current_batch_rows = std::min(_block_rows_limit, _total_rows - 
_row_idx);
     vectorized::MutableBlock mblock = 
vectorized::MutableBlock::build_mutable_block(block);
-    mblock.add_rows(_block.get(), _row_idx, current_batch_rows);
+    RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, 
current_batch_rows));
     _row_idx += current_batch_rows;
 
     *eos = _row_idx == _total_rows;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2676bf7a32e..abd1c0a8314 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -178,7 +178,8 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<uint32_t>& row_idxs) {
+Status MemTable::insert(const vectorized::Block* input_block,
+                        const std::vector<uint32_t>& row_idxs) {
     vectorized::Block target_block = *input_block;
     target_block = input_block->copy_block(_column_offset);
     if (_is_first_insertion) {
@@ -209,7 +210,8 @@ void MemTable::insert(const vectorized::Block* input_block, 
const std::vector<ui
     auto num_rows = row_idxs.size();
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
     auto block_size0 = _input_mutable_block.allocated_bytes();
-    _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
+    RETURN_IF_ERROR(_input_mutable_block.add_rows(&target_block, 
row_idxs.data(),
+                                                  row_idxs.data() + num_rows));
     auto block_size1 = _input_mutable_block.allocated_bytes();
     g_memtable_input_block_allocated_size << block_size1 - block_size0;
     auto input_size = size_t(target_block.bytes() * num_rows / 
target_block.rows() *
@@ -221,6 +223,7 @@ void MemTable::insert(const vectorized::Block* input_block, 
const std::vector<ui
     }
 
     _stat.raw_rows += num_rows;
+    return Status::OK();
 }
 
 void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& 
mutable_block,
@@ -245,7 +248,7 @@ void 
MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo
                                  src_row->_row_pos, _arena.get());
     }
 }
-void MemTable::_put_into_output(vectorized::Block& in_block) {
+Status MemTable::_put_into_output(vectorized::Block& in_block) {
     SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
     std::vector<uint32_t> row_pos_vec;
     DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
@@ -253,8 +256,8 @@ void MemTable::_put_into_output(vectorized::Block& 
in_block) {
     for (int i = 0; i < _row_in_blocks.size(); i++) {
         row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
     }
-    _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
-                                   row_pos_vec.data() + in_block.rows());
+    return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
+                                          row_pos_vec.data() + 
in_block.rows());
 }
 
 size_t MemTable::_sort() {
@@ -298,7 +301,7 @@ size_t MemTable::_sort() {
     return same_keys_num;
 }
 
-void MemTable::_sort_by_cluster_keys() {
+Status MemTable::_sort_by_cluster_keys() {
     SCOPED_RAW_TIMER(&_stat.sort_ns);
     _stat.sort_times++;
     // sort all rows
@@ -344,8 +347,8 @@ void MemTable::_sort_by_cluster_keys() {
     for (int i = 0; i < row_in_blocks.size(); i++) {
         row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos);
     }
-    _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
-                                   row_pos_vec.data() + in_block.rows());
+    return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
+                                          row_pos_vec.data() + 
in_block.rows());
 }
 
 void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& 
tie,
@@ -502,27 +505,28 @@ bool MemTable::need_agg() const {
     return false;
 }
 
-std::unique_ptr<vectorized::Block> MemTable::to_block() {
+Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
     size_t same_keys_num = _sort();
     if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
         if (_keys_type == KeysType::DUP_KEYS && 
_tablet_schema->num_key_columns() == 0) {
             _output_mutable_block.swap(_input_mutable_block);
         } else {
             vectorized::Block in_block = _input_mutable_block.to_block();
-            _put_into_output(in_block);
+            RETURN_IF_ERROR(_put_into_output(in_block));
         }
     } else {
         _aggregate<true>();
     }
     if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow &&
         !_tablet_schema->cluster_key_idxes().empty()) {
-        _sort_by_cluster_keys();
+        RETURN_IF_ERROR(_sort_by_cluster_keys());
     }
     g_memtable_input_block_allocated_size << 
-_input_mutable_block.allocated_bytes();
     _input_mutable_block.clear();
     _insert_mem_tracker->release(_mem_usage);
     _mem_usage = 0;
-    return vectorized::Block::create_unique(_output_mutable_block.to_block());
+    *res = vectorized::Block::create_unique(_output_mutable_block.to_block());
+    return Status::OK();
 }
 
 } // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 8362c69222e..d2dfafd972a 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -181,7 +181,7 @@ public:
                _flush_mem_tracker->consumption();
     }
     // insert tuple from (row_pos) to (row_pos+num_rows)
-    void insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
+    Status insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
     void shrink_memtable_by_agg();
 
@@ -189,7 +189,7 @@ public:
 
     bool need_agg() const;
 
-    std::unique_ptr<vectorized::Block> to_block();
+    Status to_block(std::unique_ptr<vectorized::Block>* res);
 
     bool empty() const { return _input_mutable_block.rows() == 0; }
 
@@ -244,7 +244,7 @@ private:
 
     //return number of same keys
     size_t _sort();
-    void _sort_by_cluster_keys();
+    Status _sort_by_cluster_keys();
     void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie,
                           std::function<int(const RowInBlock*, const 
RowInBlock*)> cmp);
     template <bool is_final>
@@ -252,7 +252,7 @@ private:
                            int row_pos);
     template <bool is_final>
     void _aggregate();
-    void _put_into_output(vectorized::Block& in_block);
+    Status _put_into_output(vectorized::Block& in_block);
     bool _is_first_insertion;
 
     void _init_agg_functions(const vectorized::Block* block);
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index ece6930362d..1ebe44aabf2 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -141,7 +141,8 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     signal::set_signal_task_id(_rowset_writer->load_id());
     {
         SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
-        std::unique_ptr<vectorized::Block> block = memtable->to_block();
+        std::unique_ptr<vectorized::Block> block;
+        RETURN_IF_ERROR(memtable->to_block(&block));
         RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), 
segment_id, flush_size));
     }
     _memtable_stat += memtable->stat();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index c7ad1590a89..da6af4ffb80 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -109,7 +109,7 @@ Status MemTableWriter::write(const vectorized::Block* block,
     }
 
     _total_received_rows += row_idxs.size();
-    _mem_table->insert(block, row_idxs);
+    RETURN_IF_ERROR(_mem_table->insert(block, row_idxs));
 
     if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
         _mem_table->shrink_memtable_by_agg();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 21134487c2e..10fa2effcc0 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -602,8 +602,8 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
             partitioned_blocks[i] =
                     
vectorized::MutableBlock::create_unique(input_block->clone_empty());
         }
-        partitioned_blocks[i]->add_rows(input_block, 
&(partition_indexes[i][0]),
-                                        &(partition_indexes[i][count]));
+        RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(input_block, 
&(partition_indexes[i][0]),
+                                                        
&(partition_indexes[i][count])));
 
         if (partitioned_blocks[i]->rows() > 2 * 1024 * 1024 ||
             (eos && partitioned_blocks[i]->rows() > 0)) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 97d5d145604..d253a519b0c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -203,7 +203,14 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
                 {
                     SCOPED_TIMER(_partition_shuffle_timer);
-                    partition_block->add_rows(&build_block, begin, end);
+                    Status st = partition_block->add_rows(&build_block, begin, 
end);
+                    if (!st.ok()) {
+                        std::unique_lock<std::mutex> lock(_spill_lock);
+                        _spill_status = st;
+                        _spill_status_ok = false;
+                        _dependency->set_ready();
+                        return;
+                    }
                     partitions_indexes[partition_idx].clear();
                 }
 
@@ -336,8 +343,8 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
             partitioned_blocks[i] =
                     
vectorized::MutableBlock::create_unique(in_block->clone_empty());
         }
-        partitioned_blocks[i]->add_rows(in_block, &(partition_indexes[i][0]),
-                                        &(partition_indexes[i][count]));
+        RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(in_block, 
&(partition_indexes[i][0]),
+                                                        
&(partition_indexes[i][count])));
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 71e427f8b25..ce1f05a22bf 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -44,26 +44,28 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
     PartitionedBlock partitioned_block;
     std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
 
-    auto get_data = [&](vectorized::Block* result_block) {
+    auto get_data = [&](vectorized::Block* result_block) -> Status {
         do {
             const auto* offset_start = &((
                     
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
             auto block_wrapper = partitioned_block.first;
             local_state._shared_state->sub_mem_usage(
                     local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
-            mutable_block->add_rows(&block_wrapper->data_block, offset_start,
-                                    offset_start + 
std::get<2>(partitioned_block.second));
+            RETURN_IF_ERROR(
+                    mutable_block->add_rows(&block_wrapper->data_block, 
offset_start,
+                                            offset_start + 
std::get<2>(partitioned_block.second)));
             block_wrapper->unref(local_state._shared_state);
         } while (mutable_block->rows() < state->batch_size() &&
                  
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
         *result_block = mutable_block->to_block();
+        return Status::OK();
     };
     if (_running_sink_operators == 0) {
         if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
             SCOPED_TIMER(local_state._copy_data_timer);
             mutable_block = vectorized::MutableBlock::create_unique(
                     partitioned_block.first->data_block.clone_empty());
-            get_data(block);
+            RETURN_IF_ERROR(get_data(block));
         } else {
             COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
@@ -72,7 +74,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
         SCOPED_TIMER(local_state._copy_data_timer);
         mutable_block = vectorized::MutableBlock::create_unique(
                 partitioned_block.first->data_block.clone_empty());
-        get_data(block);
+        RETURN_IF_ERROR(get_data(block));
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
@@ -244,7 +246,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
                                 LocalExchangeSinkLocalState& local_state) {
     for (size_t i = 0; i < _num_partitions; i++) {
         auto mutable_block = 
vectorized::MutableBlock::create_unique(in_block->clone_empty());
-        mutable_block->add_rows(in_block, 0, in_block->rows());
+        RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, 
in_block->rows()));
         _data_queue[i].enqueue(mutable_block->to_block());
         local_state._shared_state->set_ready_to_read(i);
     }
@@ -335,7 +337,7 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
         if (size > 0) {
             std::unique_ptr<vectorized::MutableBlock> mutable_block =
                     
vectorized::MutableBlock::create_unique(block->clone_empty());
-            mutable_block->add_rows(block, start, size);
+            RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
             auto new_block = mutable_block->to_block();
             local_state._shared_state->add_mem_usage(i, 
new_block.allocated_bytes());
             data_queue[i].enqueue(std::move(new_block));
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 83ecf568d6f..466c9b3b559 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -970,44 +970,53 @@ void MutableBlock::add_row(const Block* block, int row) {
     }
 }
 
-void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
-                            const uint32_t* row_end) {
-    DCHECK_LE(columns(), block->columns());
-    const auto& block_data = block->get_columns_with_type_and_name();
-    for (size_t i = 0; i < _columns.size(); ++i) {
-        DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
-        auto& dst = _columns[i];
-        const auto& src = *block_data[i].column.get();
-        DCHECK_GE(src.size(), row_end - row_begin);
-        dst->insert_indices_from(src, row_begin, row_end);
-    }
+Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
+                              const uint32_t* row_end) {
+    RETURN_IF_CATCH_EXCEPTION({
+        DCHECK_LE(columns(), block->columns());
+        const auto& block_data = block->get_columns_with_type_and_name();
+        for (size_t i = 0; i < _columns.size(); ++i) {
+            DCHECK_EQ(_data_types[i]->get_name(), 
block_data[i].type->get_name());
+            auto& dst = _columns[i];
+            const auto& src = *block_data[i].column.get();
+            DCHECK_GE(src.size(), row_end - row_begin);
+            dst->insert_indices_from(src, row_begin, row_end);
+        }
+    });
+    return Status::OK();
 }
 
-void MutableBlock::add_rows(const Block* block, size_t row_begin, size_t 
length) {
-    DCHECK_LE(columns(), block->columns());
-    const auto& block_data = block->get_columns_with_type_and_name();
-    for (size_t i = 0; i < _columns.size(); ++i) {
-        DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
-        auto& dst = _columns[i];
-        const auto& src = *block_data[i].column.get();
-        dst->insert_range_from(src, row_begin, length);
-    }
+Status MutableBlock::add_rows(const Block* block, size_t row_begin, size_t 
length) {
+    RETURN_IF_CATCH_EXCEPTION({
+        DCHECK_LE(columns(), block->columns());
+        const auto& block_data = block->get_columns_with_type_and_name();
+        for (size_t i = 0; i < _columns.size(); ++i) {
+            DCHECK_EQ(_data_types[i]->get_name(), 
block_data[i].type->get_name());
+            auto& dst = _columns[i];
+            const auto& src = *block_data[i].column.get();
+            dst->insert_range_from(src, row_begin, length);
+        }
+    });
+    return Status::OK();
 }
 
-void MutableBlock::add_rows(const Block* block, std::vector<int64_t> rows) {
-    DCHECK_LE(columns(), block->columns());
-    const auto& block_data = block->get_columns_with_type_and_name();
-    const size_t length = std::ranges::distance(rows);
-    for (size_t i = 0; i < _columns.size(); ++i) {
-        DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
-        auto& dst = _columns[i];
-        const auto& src = *block_data[i].column.get();
-        dst->reserve(dst->size() + length);
-        for (size_t row : rows) {
-            // we can introduce a new function like `insert_assume_reserved` 
for IColumn.
-            dst->insert_from(src, row);
+Status MutableBlock::add_rows(const Block* block, std::vector<int64_t> rows) {
+    RETURN_IF_CATCH_EXCEPTION({
+        DCHECK_LE(columns(), block->columns());
+        const auto& block_data = block->get_columns_with_type_and_name();
+        const size_t length = std::ranges::distance(rows);
+        for (size_t i = 0; i < _columns.size(); ++i) {
+            DCHECK_EQ(_data_types[i]->get_name(), 
block_data[i].type->get_name());
+            auto& dst = _columns[i];
+            const auto& src = *block_data[i].column.get();
+            dst->reserve(dst->size() + length);
+            for (size_t row : rows) {
+                // we can introduce a new function like 
`insert_assume_reserved` for IColumn.
+                dst->insert_from(src, row);
+            }
         }
-    }
+    });
+    return Status::OK();
 }
 
 void MutableBlock::erase(const String& name) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 9fea5242d9b..89f8e99b66a 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -603,9 +603,10 @@ public:
     void swap(MutableBlock&& other) noexcept;
 
     void add_row(const Block* block, int row);
-    void add_rows(const Block* block, const uint32_t* row_begin, const 
uint32_t* row_end);
-    void add_rows(const Block* block, size_t row_begin, size_t length);
-    void add_rows(const Block* block, std::vector<int64_t> rows);
+    // Batch add row should return error status if allocate memory failed.
+    Status add_rows(const Block* block, const uint32_t* row_begin, const 
uint32_t* row_end);
+    Status add_rows(const Block* block, size_t row_begin, size_t length);
+    Status add_rows(const Block* block, std::vector<int64_t> rows);
 
     /// remove the column with the specified name
     void erase(const String& name);
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 67072e2f60f..8bfa7e90b4a 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -1132,7 +1132,7 @@ Status 
AggregationNode::_spill_hash_table(HashTableCtxType& agg_method, HashTabl
         for (size_t j = 0; j < partitioned_indices.size(); ++j) {
             if (partitioned_indices[j] != i) {
                 if (length > 0) {
-                    mutable_block.add_rows(&block, begin, length);
+                    RETURN_IF_ERROR(mutable_block.add_rows(&block, begin, 
length));
                 }
                 length = 0;
                 continue;
@@ -1145,7 +1145,7 @@ Status 
AggregationNode::_spill_hash_table(HashTableCtxType& agg_method, HashTabl
         }
 
         if (length > 0) {
-            mutable_block.add_rows(&block, begin, length);
+            RETURN_IF_ERROR(mutable_block.add_rows(&block, begin, length));
         }
 
         CHECK_EQ(mutable_block.rows(), blocks_rows[i]);
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 0a6bafca49c..3ce1869546c 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -377,7 +377,7 @@ Status VCollectIterator::_topn_next(Block* block) {
 
                 size_t base = mutable_block.rows();
                 // append block to mutable_block
-                mutable_block.add_rows(block, 0, rows_to_copy);
+                RETURN_IF_ERROR(mutable_block.add_rows(block, 0, 
rows_to_copy));
                 // insert appended rows pos in mutable_block to sorted_row_pos 
and sort it
                 for (size_t i = 0; i < rows_to_copy; i++) {
                     sorted_row_pos.insert(base + i);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 69b7054f500..24f92bf2aae 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -885,7 +885,7 @@ Status 
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
             if (!rows->empty()) {
                 
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
                 const auto* begin = rows->data();
-                _mutable_block->add_rows(block, begin, begin + rows->size());
+                RETURN_IF_ERROR(_mutable_block->add_rows(block, begin, begin + 
rows->size()));
             }
         } else if (!block->empty()) {
             SCOPED_TIMER(_parent->merge_block_timer());
diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index 3bef7d0117e..f740e8f5767 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -51,7 +51,7 @@ Status VRowDistribution::_save_missing_values(
         int col_size, Block* block, std::vector<int64_t> filter,
         const std::vector<const NullMap*>& col_null_maps) {
     // de-duplication for new partitions but save all rows.
-    _batching_block->add_rows(block, filter);
+    RETURN_IF_ERROR(_batching_block->add_rows(block, filter));
     std::vector<TNullableStringLiteral> cur_row_values;
     for (int row = 0; row < col_strs[0].size(); ++row) {
         cur_row_values.clear();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to