This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c0d9a8d53c6 [minor](pipelineX) refine error message for broadcast shuffle buffer (#26442) c0d9a8d53c6 is described below commit c0d9a8d53c66c40910d9b1b42486be5cd7ec13ea Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Nov 6 15:10:13 2023 +0800 [minor](pipelineX) refine error message for broadcast shuffle buffer (#26442) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 5 ++++- be/src/pipeline/exec/exchange_sink_operator.h | 6 ++++-- be/src/vec/sink/vdata_stream_sender.h | 3 +-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 9e05d682864..a32fbe45c55 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -451,7 +451,10 @@ Status ExchangeSinkLocalState::get_next_available_buffer( return Status::OK(); } } - return Status::InternalError("No broadcast buffer left!"); + return Status::InternalError("No broadcast buffer left! Available blocks: " + + std::to_string(_broadcast_dependency->available_blocks()) + + " and number of buffer is " + + std::to_string(_broadcast_pb_blocks.size())); } template <typename Channels, typename HashValueType> diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 05fa799e5cf..0d35f819442 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -68,7 +68,7 @@ class ExchangeSinkQueueDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency); ExchangeSinkQueueDependency(int id) : WriteDependency(id, "ResultQueueDependency") {} - ~ExchangeSinkQueueDependency() = default; + ~ExchangeSinkQueueDependency() override = default; void* shared_state() override { return nullptr; } }; @@ -77,7 +77,7 @@ class BroadcastDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(BroadcastDependency); BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"), _available_block(0) {} - virtual ~BroadcastDependency() = default; + ~BroadcastDependency() override = default; [[nodiscard]] WriteDependency* write_blocked_by() override { if (config::enable_fuzzy_mode && _available_block == 0 && @@ -107,6 +107,8 @@ public: throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); } + int available_blocks() const { return _available_block; } + private: std::atomic<int> _available_block; }; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 3a58514c8df..a09cb4b7d47 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -505,9 +505,8 @@ public: if (eos) { if (_eos_send) { return Status::OK(); - } else { - _eos_send = true; } + _eos_send = true; } if (eos || block->get_block()->column_metas_size()) { RETURN_IF_ERROR(_buffer->add_block({this, block, eos}, sent)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org