This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c70d7d8423e [fix](sink) Do not block result sink on pipeline engine 
(#40094)
c70d7d8423e is described below

commit c70d7d8423ee61ee609902b11cf6493f0130b0fa
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 29 14:04:25 2024 +0800

    [fix](sink) Do not block result sink on pipeline engine (#40094)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/runtime/buffer_control_block.cpp | 10 ++++++----
 be/src/runtime/buffer_control_block.h   |  4 ++--
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index a10ce354325..6ac22212820 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -116,7 +116,7 @@ bool BufferControlBlock::can_sink() {
     return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || 
_is_cancelled;
 }
 
-Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result) {
+Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result, bool is_pipeline) {
     std::unique_lock<std::mutex> l(_lock);
 
     if (_is_cancelled) {
@@ -125,7 +125,8 @@ Status 
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result)
 
     int num_rows = result->result_batch.rows.size();
 
-    while ((!_fe_result_batch_queue.empty() && _buffer_rows > _buffer_limit) 
&& !_is_cancelled) {
+    while (!is_pipeline && (!_fe_result_batch_queue.empty() && _buffer_rows > 
_buffer_limit) &&
+           !_is_cancelled) {
         _data_removal.wait_for(l, std::chrono::seconds(1));
     }
 
@@ -276,8 +277,9 @@ void BufferControlBlock::cancel() {
     _waiting_rpc.clear();
 }
 
-Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result) {
-    RETURN_IF_ERROR(BufferControlBlock::add_batch(result));
+Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& 
result,
+                                        bool is_pipeline) {
+    RETURN_IF_ERROR(BufferControlBlock::add_batch(result, true));
     _update_dependency();
     return Status::OK();
 }
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 33fd1eed724..b8b3f3d163e 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -77,7 +77,7 @@ public:
     Status init();
     // Only one fragment is written, so can_sink returns true, then the sink 
must be executed
     virtual bool can_sink();
-    virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result);
+    virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool 
is_pipeline = false);
     virtual Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& 
result);
 
     virtual void get_batch(GetResultBatchCtx* ctx);
@@ -144,7 +144,7 @@ public:
         return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || 
_is_cancelled;
     }
 
-    Status add_batch(std::unique_ptr<TFetchDataResult>& result) override;
+    Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool 
is_pipeline = true) override;
 
     Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) 
override;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to