This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7e8b4064a29 [Improvement](queue) Return value of concurrent queue should be proce… (#45033) 7e8b4064a29 is described below commit 7e8b4064a299f986039ca6d6fa8af198686dd23f Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Dec 5 16:23:18 2024 +0800 [Improvement](queue) Return value of concurrent queue should be proce… (#45033) …… (#44986) …ssed Push items into concurrent queue will return false due to some unexpected error (e.g. poor memory available). --- be/src/pipeline/local_exchange/local_exchanger.h | 14 ++++++++++++-- be/src/vec/exec/scan/scanner_context.cpp | 5 ++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index af95e5348c8..274e7b404aa 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -109,7 +109,11 @@ struct BlockQueue { : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {} inline bool enqueue(BlockType const& item) { if (!eos) { - data_queue.enqueue(item); + if (!data_queue.enqueue(item)) [[unlikely]] { + throw Exception(ErrorCode::INTERNAL_ERROR, + "Exception occurs in data queue [size = {}] of local exchange.", + data_queue.size_approx()); + } return true; } return false; @@ -117,7 +121,11 @@ struct BlockQueue { inline bool enqueue(BlockType&& item) { if (!eos) { - data_queue.enqueue(std::move(item)); + if (!data_queue.enqueue(std::move(item))) [[unlikely]] { + throw Exception(ErrorCode::INTERNAL_ERROR, + "Exception occurs in data queue [size = {}] of local exchange.", + data_queue.size_approx()); + } return true; } return false; @@ -185,6 +193,8 @@ struct BlockWrapper { shared_state->exchanger->_free_block_limit * shared_state->exchanger->_num_sources) { data_block.clear_column_data(); + // Free blocks is used to improve memory efficiency. Failure during pushing back + // free block will not incur any bad result so just ignore the return value. shared_state->exchanger->_free_blocks.enqueue(std::move(data_block)); } } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 451b3561dad..a3cafe86daa 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -65,7 +65,10 @@ ScannerContext::ScannerContext( _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); - _scanners.enqueue_bulk(scanners.begin(), scanners.size()); + if (!_scanners.enqueue_bulk(scanners.begin(), scanners.size())) [[unlikely]] { + throw Exception(ErrorCode::INTERNAL_ERROR, + "Exception occurs during scanners initialization."); + }; if (limit < 0) { limit = -1; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org