This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new f03cee5e30d [enhancement](oom) add exception in olap data convertor when memory is not enough to prevent oom (#35761) f03cee5e30d is described below commit f03cee5e30d919833a70e7da8a8c5f22f20f9a28 Author: yiguolei <676222...@qq.com> AuthorDate: Sun Jun 2 21:11:18 2024 +0800 [enhancement](oom) add exception in olap data convertor when memory is not enough to prevent oom (#35761) Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/olap/rowset/segment_v2/plain_page.h | 2 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 8 ++++---- .../rowset/segment_v2/vertical_segment_writer.cpp | 12 ++++++------ be/src/vec/core/block.cpp | 19 +++++++++++-------- be/src/vec/core/block.h | 2 +- be/src/vec/olap/olap_data_convertor.cpp | 19 +++++++++++++------ be/src/vec/olap/olap_data_convertor.h | 4 ++-- be/src/vec/sink/group_commit_block_sink.cpp | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 3 ++- 9 files changed, 41 insertions(+), 30 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/plain_page.h b/be/src/olap/rowset/segment_v2/plain_page.h index cbcc96f31ba..af31275002a 100644 --- a/be/src/olap/rowset/segment_v2/plain_page.h +++ b/be/src/olap/rowset/segment_v2/plain_page.h @@ -39,7 +39,7 @@ public: Status init() override { // Reserve enough space for the page, plus a bit of slop since // we often overrun the page by a few values. - _buffer.reserve(_options.data_page_size + 1024); + RETURN_IF_CATCH_EXCEPTION(_buffer.reserve(_options.data_page_size + 1024)); return reset(); } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 83e93631ab1..ec3bb9c993e 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -385,8 +385,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* for (auto i : including_cids) { full_block.replace_by_position(i, block->get_by_position(input_id++).column); } - _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows, - including_cids); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( + &full_block, row_pos, num_rows, including_cids)); bool have_input_seq_column = false; // write including columns @@ -561,8 +561,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } // convert missing columns and send to column writer - _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, row_pos, num_rows, - missing_cids); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( + &full_block, row_pos, num_rows, missing_cids)); for (auto cid : missing_cids) { auto converted_result = _olap_data_convertor->convert_column_data(cid); if (!converted_result.first.ok()) { diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 5d2ddedb204..48b892afc38 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -321,8 +321,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da for (auto i : including_cids) { full_block.replace_by_position(i, data.block->get_by_position(input_id++).column); } - _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, data.row_pos, - data.num_rows, including_cids); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( + &full_block, data.row_pos, data.num_rows, including_cids)); bool have_input_seq_column = false; // write including columns @@ -497,8 +497,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da // convert missing columns and send to column writer const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids; - _olap_data_convertor->set_source_content_with_specifid_columns(&full_block, data.row_pos, - data.num_rows, missing_cids); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( + &full_block, data.row_pos, data.num_rows, missing_cids)); for (auto cid : missing_cids) { auto [status, column] = _olap_data_convertor->convert_column_data(cid); if (!status.ok()) { @@ -747,8 +747,8 @@ Status VerticalSegmentWriter::write_batch() { for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid))); for (auto& data : _batched_blocks) { - _olap_data_convertor->set_source_content_with_specifid_columns( - data.block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid}); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( + data.block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid})); // convert column data from engine format to storage layer format auto [status, column] = _olap_data_convertor->convert_column_data(cid); diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 466c9b3b559..e6bedd6c78e 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -786,15 +786,18 @@ Block Block::copy_block(const std::vector<int>& column_offset) const { return columns_with_type_and_name; } -void Block::append_to_block_by_selector(MutableBlock* dst, - const IColumn::Selector& selector) const { - DCHECK_EQ(data.size(), dst->mutable_columns().size()); - for (size_t i = 0; i < data.size(); i++) { - // FIXME: this is a quickfix. we assume that only partition functions make there some - if (!is_column_const(*data[i].column)) { - data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); +Status Block::append_to_block_by_selector(MutableBlock* dst, + const IColumn::Selector& selector) const { + RETURN_IF_CATCH_EXCEPTION({ + DCHECK_EQ(data.size(), dst->mutable_columns().size()); + for (size_t i = 0; i < data.size(); i++) { + // FIXME: this is a quickfix. we assume that only partition functions make there some + if (!is_column_const(*data[i].column)) { + data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector); + } } - } + }); + return Status::OK(); } Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to_filter, diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 89f8e99b66a..c9b3f2d5b5e 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -274,7 +274,7 @@ public: // copy a new block by the offset column Block copy_block(const std::vector<int>& column_offset) const; - void append_to_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const; + Status append_to_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const; // need exception safety static void filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter, diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 3da1f7c8678..86c1d2d6669 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -214,16 +214,19 @@ void OlapBlockDataConvertor::set_source_content(const vectorized::Block* block, } } -void OlapBlockDataConvertor::set_source_content_with_specifid_columns( +Status OlapBlockDataConvertor::set_source_content_with_specifid_columns( const vectorized::Block* block, size_t row_pos, size_t num_rows, std::vector<uint32_t> cids) { DCHECK(block != nullptr); DCHECK(num_rows > 0); DCHECK(row_pos + num_rows <= block->rows()); - for (auto i : cids) { - DCHECK(i < _convertors.size()); - _convertors[i]->set_source_column(block->get_by_position(i), row_pos, num_rows); - } + RETURN_IF_CATCH_EXCEPTION({ + for (auto i : cids) { + DCHECK(i < _convertors.size()); + _convertors[i]->set_source_column(block->get_by_position(i), row_pos, num_rows); + } + }); + return Status::OK(); } void OlapBlockDataConvertor::clear_source_content() { @@ -235,7 +238,11 @@ void OlapBlockDataConvertor::clear_source_content() { std::pair<Status, IOlapColumnDataAccessor*> OlapBlockDataConvertor::convert_column_data( size_t cid) { assert(cid < _convertors.size()); - auto status = _convertors[cid]->convert_to_olap(); + auto convert_func = [&]() -> Status { + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_convertors[cid]->convert_to_olap()); + return Status::OK(); + }; + auto status = convert_func(); return {status, _convertors[cid].get()}; } diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index d6a721f9792..0ec720fcdc1 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -75,8 +75,8 @@ public: OlapBlockDataConvertor(const TabletSchema* tablet_schema); OlapBlockDataConvertor(const TabletSchema* tablet_schema, const std::vector<uint32_t>& col_ids); void set_source_content(const vectorized::Block* block, size_t row_pos, size_t num_rows); - void set_source_content_with_specifid_columns(const vectorized::Block* block, size_t row_pos, - size_t num_rows, std::vector<uint32_t> cids); + Status set_source_content_with_specifid_columns(const vectorized::Block* block, size_t row_pos, + size_t num_rows, std::vector<uint32_t> cids); void clear_source_content(); std::pair<Status, IOlapColumnDataAccessor*> convert_column_data(size_t cid); void add_column_data_convertor(const TabletColumn& column); diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 8aa60bb3f22..97ab60a8801 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -217,7 +217,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, for (auto i = 0; i < block->rows(); i++) { selector.emplace_back(i); } - block->append_to_block_by_selector(cur_mutable_block.get(), selector); + RETURN_IF_ERROR(block->append_to_block_by_selector(cur_mutable_block.get(), selector)); } std::shared_ptr<vectorized::Block> output_block = vectorized::Block::create_shared(); output_block->swap(cur_mutable_block->to_block()); diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 70d1c05b453..818bff422f9 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -516,7 +516,8 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload) } SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); - block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first)); + RETURN_IF_ERROR( + block->append_to_block_by_selector(_cur_mutable_block.get(), *(payload->first))); for (auto tablet_id : payload->second) { _cur_add_block_request->add_tablet_ids(tablet_id); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org