This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new c70d7d8423e [fix](sink) Do not block result sink on pipeline engine (#40094) c70d7d8423e is described below commit c70d7d8423ee61ee609902b11cf6493f0130b0fa Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Aug 29 14:04:25 2024 +0800 [fix](sink) Do not block result sink on pipeline engine (#40094) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/runtime/buffer_control_block.cpp | 10 ++++++---- be/src/runtime/buffer_control_block.h | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index a10ce354325..6ac22212820 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -116,7 +116,7 @@ bool BufferControlBlock::can_sink() { return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || _is_cancelled; } -Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result) { +Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result, bool is_pipeline) { std::unique_lock<std::mutex> l(_lock); if (_is_cancelled) { @@ -125,7 +125,8 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result) int num_rows = result->result_batch.rows.size(); - while ((!_fe_result_batch_queue.empty() && _buffer_rows > _buffer_limit) && !_is_cancelled) { + while (!is_pipeline && (!_fe_result_batch_queue.empty() && _buffer_rows > _buffer_limit) && + !_is_cancelled) { _data_removal.wait_for(l, std::chrono::seconds(1)); } @@ -276,8 +277,9 @@ void BufferControlBlock::cancel() { _waiting_rpc.clear(); } -Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result) { - RETURN_IF_ERROR(BufferControlBlock::add_batch(result)); +Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result, + bool is_pipeline) { + RETURN_IF_ERROR(BufferControlBlock::add_batch(result, true)); _update_dependency(); return Status::OK(); } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 33fd1eed724..b8b3f3d163e 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -77,7 +77,7 @@ public: Status init(); // Only one fragment is written, so can_sink returns true, then the sink must be executed virtual bool can_sink(); - virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result); + virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool is_pipeline = false); virtual Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result); virtual void get_batch(GetResultBatchCtx* ctx); @@ -144,7 +144,7 @@ public: return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || _is_cancelled; } - Status add_batch(std::unique_ptr<TFetchDataResult>& result) override; + Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool is_pipeline = true) override; Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result) override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org