This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c0d9a8d53c6 [minor](pipelineX) refine error message for broadcast 
shuffle buffer (#26442)
c0d9a8d53c6 is described below

commit c0d9a8d53c66c40910d9b1b42486be5cd7ec13ea
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Nov 6 15:10:13 2023 +0800

    [minor](pipelineX) refine error message for broadcast shuffle buffer 
(#26442)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 5 ++++-
 be/src/pipeline/exec/exchange_sink_operator.h   | 6 ++++--
 be/src/vec/sink/vdata_stream_sender.h           | 3 +--
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 9e05d682864..a32fbe45c55 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -451,7 +451,10 @@ Status ExchangeSinkLocalState::get_next_available_buffer(
             return Status::OK();
         }
     }
-    return Status::InternalError("No broadcast buffer left!");
+    return Status::InternalError("No broadcast buffer left! Available blocks: 
" +
+                                 
std::to_string(_broadcast_dependency->available_blocks()) +
+                                 " and number of buffer is " +
+                                 std::to_string(_broadcast_pb_blocks.size()));
 }
 
 template <typename Channels, typename HashValueType>
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 05fa799e5cf..0d35f819442 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -68,7 +68,7 @@ class ExchangeSinkQueueDependency final : public 
WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
     ExchangeSinkQueueDependency(int id) : WriteDependency(id, 
"ResultQueueDependency") {}
-    ~ExchangeSinkQueueDependency() = default;
+    ~ExchangeSinkQueueDependency() override = default;
 
     void* shared_state() override { return nullptr; }
 };
@@ -77,7 +77,7 @@ class BroadcastDependency final : public WriteDependency {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastDependency);
     BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"), 
_available_block(0) {}
-    virtual ~BroadcastDependency() = default;
+    ~BroadcastDependency() override = default;
 
     [[nodiscard]] WriteDependency* write_blocked_by() override {
         if (config::enable_fuzzy_mode && _available_block == 0 &&
@@ -107,6 +107,8 @@ public:
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not 
reach here!");
     }
 
+    int available_blocks() const { return _available_block; }
+
 private:
     std::atomic<int> _available_block;
 };
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 3a58514c8df..a09cb4b7d47 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -505,9 +505,8 @@ public:
         if (eos) {
             if (_eos_send) {
                 return Status::OK();
-            } else {
-                _eos_send = true;
             }
+            _eos_send = true;
         }
         if (eos || block->get_block()->column_metas_size()) {
             RETURN_IF_ERROR(_buffer->add_block({this, block, eos}, sent));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to