This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7e8b4064a29 [Improvement](queue) Return value of concurrent queue 
should be proce… (#45033)
7e8b4064a29 is described below

commit 7e8b4064a299f986039ca6d6fa8af198686dd23f
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Dec 5 16:23:18 2024 +0800

    [Improvement](queue) Return value of concurrent queue should be proce… 
(#45033)
    
    …… (#44986)
    
    …ssed
    
    Push items into concurrent queue will return false due to some
    unexpected error (e.g. poor memory available).
---
 be/src/pipeline/local_exchange/local_exchanger.h | 14 ++++++++++++--
 be/src/vec/exec/scan/scanner_context.cpp         |  5 ++++-
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index af95e5348c8..274e7b404aa 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -109,7 +109,11 @@ struct BlockQueue {
             : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
     inline bool enqueue(BlockType const& item) {
         if (!eos) {
-            data_queue.enqueue(item);
+            if (!data_queue.enqueue(item)) [[unlikely]] {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Exception occurs in data queue [size = {}] of 
local exchange.",
+                                data_queue.size_approx());
+            }
             return true;
         }
         return false;
@@ -117,7 +121,11 @@ struct BlockQueue {
 
     inline bool enqueue(BlockType&& item) {
         if (!eos) {
-            data_queue.enqueue(std::move(item));
+            if (!data_queue.enqueue(std::move(item))) [[unlikely]] {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "Exception occurs in data queue [size = {}] of 
local exchange.",
+                                data_queue.size_approx());
+            }
             return true;
         }
         return false;
@@ -185,6 +193,8 @@ struct BlockWrapper {
                         shared_state->exchanger->_free_block_limit *
                                 shared_state->exchanger->_num_sources) {
                 data_block.clear_column_data();
+                // Free blocks is used to improve memory efficiency. Failure 
during pushing back
+                // free block will not incur any bad result so just ignore the 
return value.
                 
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
             }
         }
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 451b3561dad..a3cafe86daa 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -65,7 +65,10 @@ ScannerContext::ScannerContext(
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
     ctx_id = UniqueId::gen_uid().to_string();
-    _scanners.enqueue_bulk(scanners.begin(), scanners.size());
+    if (!_scanners.enqueue_bulk(scanners.begin(), scanners.size())) 
[[unlikely]] {
+        throw Exception(ErrorCode::INTERNAL_ERROR,
+                        "Exception occurs during scanners initialization.");
+    };
     if (limit < 0) {
         limit = -1;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to