This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f03cee5e30d [enhancement](oom) add exception in olap data convertor 
when memory is not enough to prevent oom (#35761)
f03cee5e30d is described below

commit f03cee5e30d919833a70e7da8a8c5f22f20f9a28
Author: yiguolei <676222...@qq.com>
AuthorDate: Sun Jun 2 21:11:18 2024 +0800

    [enhancement](oom) add exception in olap data convertor when memory is not 
enough to prevent oom (#35761)
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/olap/rowset/segment_v2/plain_page.h            |  2 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp      |  8 ++++----
 .../rowset/segment_v2/vertical_segment_writer.cpp     | 12 ++++++------
 be/src/vec/core/block.cpp                             | 19 +++++++++++--------
 be/src/vec/core/block.h                               |  2 +-
 be/src/vec/olap/olap_data_convertor.cpp               | 19 +++++++++++++------
 be/src/vec/olap/olap_data_convertor.h                 |  4 ++--
 be/src/vec/sink/group_commit_block_sink.cpp           |  2 +-
 be/src/vec/sink/writer/vtablet_writer.cpp             |  3 ++-
 9 files changed, 41 insertions(+), 30 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/plain_page.h 
b/be/src/olap/rowset/segment_v2/plain_page.h
index cbcc96f31ba..af31275002a 100644
--- a/be/src/olap/rowset/segment_v2/plain_page.h
+++ b/be/src/olap/rowset/segment_v2/plain_page.h
@@ -39,7 +39,7 @@ public:
     Status init() override {
         // Reserve enough space for the page, plus a bit of slop since
         // we often overrun the page by a few values.
-        _buffer.reserve(_options.data_page_size + 1024);
+        RETURN_IF_CATCH_EXCEPTION(_buffer.reserve(_options.data_page_size + 
1024));
         return reset();
     }
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 83e93631ab1..ec3bb9c993e 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -385,8 +385,8 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     for (auto i : including_cids) {
         full_block.replace_by_position(i, 
block->get_by_position(input_id++).column);
     }
-    
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, 
row_pos, num_rows,
-                                                                   
including_cids);
+    
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+            &full_block, row_pos, num_rows, including_cids));
 
     bool have_input_seq_column = false;
     // write including columns
@@ -561,8 +561,8 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     }
 
     // convert missing columns and send to column writer
-    
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, 
row_pos, num_rows,
-                                                                   
missing_cids);
+    
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+            &full_block, row_pos, num_rows, missing_cids));
     for (auto cid : missing_cids) {
         auto converted_result = _olap_data_convertor->convert_column_data(cid);
         if (!converted_result.first.ok()) {
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5d2ddedb204..48b892afc38 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -321,8 +321,8 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
     for (auto i : including_cids) {
         full_block.replace_by_position(i, 
data.block->get_by_position(input_id++).column);
     }
-    
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, 
data.row_pos,
-                                                                   
data.num_rows, including_cids);
+    
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+            &full_block, data.row_pos, data.num_rows, including_cids));
 
     bool have_input_seq_column = false;
     // write including columns
@@ -497,8 +497,8 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
 
     // convert missing columns and send to column writer
     const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
-    
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block, 
data.row_pos,
-                                                                   
data.num_rows, missing_cids);
+    
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+            &full_block, data.row_pos, data.num_rows, missing_cids));
     for (auto cid : missing_cids) {
         auto [status, column] = _olap_data_convertor->convert_column_data(cid);
         if (!status.ok()) {
@@ -747,8 +747,8 @@ Status VerticalSegmentWriter::write_batch() {
     for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
         RETURN_IF_ERROR(_create_column_writer(cid, 
_tablet_schema->column(cid)));
         for (auto& data : _batched_blocks) {
-            _olap_data_convertor->set_source_content_with_specifid_columns(
-                    data.block, data.row_pos, data.num_rows, 
std::vector<uint32_t> {cid});
+            
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+                    data.block, data.row_pos, data.num_rows, 
std::vector<uint32_t> {cid}));
 
             // convert column data from engine format to storage layer format
             auto [status, column] = 
_olap_data_convertor->convert_column_data(cid);
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 466c9b3b559..e6bedd6c78e 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -786,15 +786,18 @@ Block Block::copy_block(const std::vector<int>& 
column_offset) const {
     return columns_with_type_and_name;
 }
 
-void Block::append_to_block_by_selector(MutableBlock* dst,
-                                        const IColumn::Selector& selector) 
const {
-    DCHECK_EQ(data.size(), dst->mutable_columns().size());
-    for (size_t i = 0; i < data.size(); i++) {
-        // FIXME: this is a quickfix. we assume that only partition functions 
make there some
-        if (!is_column_const(*data[i].column)) {
-            data[i].column->append_data_by_selector(dst->mutable_columns()[i], 
selector);
+Status Block::append_to_block_by_selector(MutableBlock* dst,
+                                          const IColumn::Selector& selector) 
const {
+    RETURN_IF_CATCH_EXCEPTION({
+        DCHECK_EQ(data.size(), dst->mutable_columns().size());
+        for (size_t i = 0; i < data.size(); i++) {
+            // FIXME: this is a quickfix. we assume that only partition 
functions make there some
+            if (!is_column_const(*data[i].column)) {
+                
data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector);
+            }
         }
-    }
+    });
+    return Status::OK();
 }
 
 Status Block::filter_block(Block* block, const std::vector<uint32_t>& 
columns_to_filter,
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 89f8e99b66a..c9b3f2d5b5e 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -274,7 +274,7 @@ public:
     // copy a new block by the offset column
     Block copy_block(const std::vector<int>& column_offset) const;
 
-    void append_to_block_by_selector(MutableBlock* dst, const 
IColumn::Selector& selector) const;
+    Status append_to_block_by_selector(MutableBlock* dst, const 
IColumn::Selector& selector) const;
 
     // need exception safety
     static void filter_block_internal(Block* block, const 
std::vector<uint32_t>& columns_to_filter,
diff --git a/be/src/vec/olap/olap_data_convertor.cpp 
b/be/src/vec/olap/olap_data_convertor.cpp
index 3da1f7c8678..86c1d2d6669 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -214,16 +214,19 @@ void OlapBlockDataConvertor::set_source_content(const 
vectorized::Block* block,
     }
 }
 
-void OlapBlockDataConvertor::set_source_content_with_specifid_columns(
+Status OlapBlockDataConvertor::set_source_content_with_specifid_columns(
         const vectorized::Block* block, size_t row_pos, size_t num_rows,
         std::vector<uint32_t> cids) {
     DCHECK(block != nullptr);
     DCHECK(num_rows > 0);
     DCHECK(row_pos + num_rows <= block->rows());
-    for (auto i : cids) {
-        DCHECK(i < _convertors.size());
-        _convertors[i]->set_source_column(block->get_by_position(i), row_pos, 
num_rows);
-    }
+    RETURN_IF_CATCH_EXCEPTION({
+        for (auto i : cids) {
+            DCHECK(i < _convertors.size());
+            _convertors[i]->set_source_column(block->get_by_position(i), 
row_pos, num_rows);
+        }
+    });
+    return Status::OK();
 }
 
 void OlapBlockDataConvertor::clear_source_content() {
@@ -235,7 +238,11 @@ void OlapBlockDataConvertor::clear_source_content() {
 std::pair<Status, IOlapColumnDataAccessor*> 
OlapBlockDataConvertor::convert_column_data(
         size_t cid) {
     assert(cid < _convertors.size());
-    auto status = _convertors[cid]->convert_to_olap();
+    auto convert_func = [&]() -> Status {
+        
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_convertors[cid]->convert_to_olap());
+        return Status::OK();
+    };
+    auto status = convert_func();
     return {status, _convertors[cid].get()};
 }
 
diff --git a/be/src/vec/olap/olap_data_convertor.h 
b/be/src/vec/olap/olap_data_convertor.h
index d6a721f9792..0ec720fcdc1 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -75,8 +75,8 @@ public:
     OlapBlockDataConvertor(const TabletSchema* tablet_schema);
     OlapBlockDataConvertor(const TabletSchema* tablet_schema, const 
std::vector<uint32_t>& col_ids);
     void set_source_content(const vectorized::Block* block, size_t row_pos, 
size_t num_rows);
-    void set_source_content_with_specifid_columns(const vectorized::Block* 
block, size_t row_pos,
-                                                  size_t num_rows, 
std::vector<uint32_t> cids);
+    Status set_source_content_with_specifid_columns(const vectorized::Block* 
block, size_t row_pos,
+                                                    size_t num_rows, 
std::vector<uint32_t> cids);
     void clear_source_content();
     std::pair<Status, IOlapColumnDataAccessor*> convert_column_data(size_t 
cid);
     void add_column_data_convertor(const TabletColumn& column);
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 8aa60bb3f22..97ab60a8801 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -217,7 +217,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
         for (auto i = 0; i < block->rows(); i++) {
             selector.emplace_back(i);
         }
-        block->append_to_block_by_selector(cur_mutable_block.get(), selector);
+        
RETURN_IF_ERROR(block->append_to_block_by_selector(cur_mutable_block.get(), 
selector));
     }
     std::shared_ptr<vectorized::Block> output_block = 
vectorized::Block::create_shared();
     output_block->swap(cur_mutable_block->to_block());
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 70d1c05b453..818bff422f9 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -516,7 +516,8 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload)
     }
 
     SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
-    block->append_to_block_by_selector(_cur_mutable_block.get(), 
*(payload->first));
+    RETURN_IF_ERROR(
+            block->append_to_block_by_selector(_cur_mutable_block.get(), 
*(payload->first)));
     for (auto tablet_id : payload->second) {
         _cur_add_block_request->add_tablet_ids(tablet_id);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to