This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d39f878cab [UT](local merge) Add case for local merge exchanger 
(#47838)
7d39f878cab is described below

commit 7d39f878cab229ef3f27b94508c555c31aaf90d7
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Feb 17 09:54:27 2025 +0800

    [UT](local merge) Add case for local merge exchanger (#47838)
---
 be/src/pipeline/dependency.h                       |  46 +++---
 .../local_exchange_source_operator.h               |   5 +
 be/src/pipeline/local_exchange/local_exchanger.cpp |  35 +++-
 be/src/pipeline/local_exchange/local_exchanger.h   |  14 +-
 be/src/vec/core/sort_cursor.h                      |   9 +-
 be/test/pipeline/local_exchanger_test.cpp          | 183 ++++++++++++++++++++-
 6 files changed, 248 insertions(+), 44 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 489497897dc..67434e2c18d 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -757,23 +757,24 @@ public:
         dep->set_ready();
     }
 
-    void add_mem_usage(int channel_id, size_t delta) { 
mem_counters[channel_id]->update(delta); }
+    virtual void add_mem_usage(int channel_id, size_t delta) {
+        mem_counters[channel_id]->update(delta);
+    }
 
-    void sub_mem_usage(int channel_id, size_t delta) {
+    virtual void sub_mem_usage(int channel_id, size_t delta) {
         mem_counters[channel_id]->update(-(int64_t)delta);
     }
 
-    virtual void add_total_mem_usage(size_t delta, int channel_id) {
+    virtual void add_total_mem_usage(size_t delta) {
         if (cast_set<int64_t>(mem_usage.fetch_add(delta) + delta) >
             config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->block();
         }
     }
 
-    virtual void sub_total_mem_usage(size_t delta, int channel_id) {
+    virtual void sub_total_mem_usage(size_t delta) {
         auto prev_usage = mem_usage.fetch_sub(delta);
-        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " 
delta: " << delta
-                                         << " channel_id: " << channel_id;
+        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " 
delta: " << delta;
         if (cast_set<int64_t>(prev_usage - delta) <= 
config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->set_ready();
         }
@@ -784,12 +785,7 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
     ENABLE_FACTORY_CREATOR(LocalMergeExchangeSharedState);
     LocalMergeExchangeSharedState(int num_instances)
             : LocalExchangeSharedState(num_instances),
-              _queues_mem_usage(num_instances),
-              _each_queue_limit(config::local_exchange_buffer_mem_limit / 
num_instances) {
-        for (size_t i = 0; i < num_instances; i++) {
-            _queues_mem_usage[i] = 0;
-        }
-    }
+              _each_queue_limit(config::local_exchange_buffer_mem_limit / 
num_instances) {}
 
     void create_dependencies(int local_exchange_id) override {
         sink_deps.resize(source_deps.size());
@@ -805,22 +801,21 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
         }
     }
 
-    void sub_total_mem_usage(size_t delta, int channel_id) override {
-        auto prev_usage = _queues_mem_usage[channel_id].fetch_sub(delta);
-        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " 
delta: " << delta
-                                         << " channel_id: " << channel_id;
-        if (prev_usage - delta <= _each_queue_limit) {
-            sink_deps[channel_id]->set_ready();
-        }
-        if (_queues_mem_usage[channel_id] == 0) {
-            source_deps[channel_id]->block();
+    void sub_total_mem_usage(size_t delta) override { 
mem_usage.fetch_sub(delta); }
+    void add_total_mem_usage(size_t delta) override { 
mem_usage.fetch_add(delta); }
+
+    void add_mem_usage(int channel_id, size_t delta) override {
+        LocalExchangeSharedState::add_mem_usage(channel_id, delta);
+        if (mem_counters[channel_id]->value() > _each_queue_limit) {
+            sink_deps[channel_id]->block();
         }
     }
-    void add_total_mem_usage(size_t delta, int channel_id) override {
-        if (_queues_mem_usage[channel_id].fetch_add(delta) + delta > 
_each_queue_limit) {
-            sink_deps[channel_id]->block();
+
+    void sub_mem_usage(int channel_id, size_t delta) override {
+        LocalExchangeSharedState::sub_mem_usage(channel_id, delta);
+        if (mem_counters[channel_id]->value() <= _each_queue_limit) {
+            sink_deps[channel_id]->set_ready();
         }
-        source_deps[channel_id]->set_ready();
     }
 
     Dependency* get_sink_dep_by_channel_id(int channel_id) override {
@@ -832,7 +827,6 @@ struct LocalMergeExchangeSharedState : public 
LocalExchangeSharedState {
     }
 
 private:
-    std::vector<std::atomic_int64_t> _queues_mem_usage;
     const int64_t _each_queue_limit;
 };
 #include "common/compile_check_end.h"
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index e0d1715afb1..d6c8cecfef3 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -40,6 +40,11 @@ public:
     std::string debug_string(int indentation_level) const override;
 
     std::vector<Dependency*> dependencies() const override;
+    Dependency* get_dependency(int id) {
+        return _exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT
+                       ? _local_merge_deps[id].get()
+                       : _dependency;
+    }
 
 private:
     friend class LocalExchangeSourceOperatorX;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 1fc47b2f55d..76a8a8e1274 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -82,7 +82,7 @@ bool 
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_st
             return true;
         }
         COUNTER_UPDATE(local_state->_get_block_failed_counter, 1);
-        local_state->_dependency->block();
+        local_state->get_dependency(channel_id)->block();
     }
     return false;
 }
@@ -377,6 +377,7 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, 
vectorized::Block* in_
                         *sink_info.channel_id));
     }
     if (eos && sink_info.local_state) {
+        _eos[*sink_info.channel_id]->store(true);
         
sink_info.local_state->_shared_state->source_deps[*sink_info.channel_id]
                 ->set_always_ready();
     }
@@ -421,7 +422,37 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState* 
state,
         vectorized::BlockSupplier block_supplier = [&, local_state, id = 
channel_id](
                                                            vectorized::Block* 
block, bool* eos) {
             BlockWrapperSPtr next_block;
-            _dequeue_data(local_state, next_block, eos, block, id);
+            auto got = _dequeue_data(local_state, next_block, eos, block, id);
+            if (got) {
+                // If this block is the last block, we should block this 
pipeline task to wait for
+                // the next block.
+                // TODO: LocalMergeSortExchanger should be refactored.
+                if (_data_queue[id].data_queue.size_approx() == 0) {
+                    std::unique_lock l(*_m[id]);
+                    if (_data_queue[id].data_queue.size_approx() == 0) {
+                        local_state->get_dependency(id)->block();
+                    }
+                }
+            }
+#ifndef NDEBUG
+            if (*eos && !(*_eos[id])) {
+                return Status::InternalError(
+                        "LocalMergeSortExchanger{} meet error! _eos[id] should 
be true if no "
+                        "source operators are running",
+                        local_state->debug_string(0));
+            }
+#endif
+            // `eos` is true if all sink operators are closed and no block 
remains. `_eos[id]` is
+            // true means sink operator of instance[id] has already sent the 
last block with `eos`
+            // flag.
+            if (block->empty() && !(*_eos[id])) {
+                return Status::InternalError(
+                        "LocalMergeSortExchanger{} meet error! Block should 
not be empty when eos "
+                        "is false",
+                        local_state->debug_string(0));
+            } else if (block->empty()) {
+                *eos = *_eos[id];
+            }
             return Status::OK();
         };
         child_block_suppliers.push_back(block_supplier);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 25f090e4208..64562c9019e 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -81,7 +81,7 @@ public:
                   _shared_state(shared_state),
                   _allocated_bytes(_data_block.allocated_bytes()) {
             if (_shared_state) {
-                _shared_state->add_total_mem_usage(_allocated_bytes, 
channel_id);
+                _shared_state->add_total_mem_usage(_allocated_bytes);
             }
         }
         ~BlockWrapper() {
@@ -89,8 +89,7 @@ public:
                 DCHECK_GT(_allocated_bytes, 0);
                 // `_channel_ids` may be empty if exchanger is shuffled 
exchanger and channel id is
                 // not used by `sub_total_mem_usage`. So we just pass -1 here.
-                _shared_state->sub_total_mem_usage(
-                        _allocated_bytes, _channel_ids.empty() ? -1 : 
_channel_ids.front());
+                _shared_state->sub_total_mem_usage(_allocated_bytes);
                 if (_shared_state->exchanger->_free_block_limit == 0 ||
                     _shared_state->exchanger->_free_blocks.size_approx() <
                             _shared_state->exchanger->_free_block_limit *
@@ -348,7 +347,12 @@ public:
     LocalMergeSortExchanger(MergeInfo&& merge_info, int 
running_sink_operators, int num_partitions,
                             int free_block_limit)
             : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions, free_block_limit),
-              _merge_info(std::move(merge_info)) {}
+              _merge_info(std::move(merge_info)) {
+        _eos.resize(num_partitions, nullptr);
+        for (size_t i = 0; i < num_partitions; i++) {
+            _eos[i] = std::make_shared<std::atomic_bool>(false);
+        }
+    }
     ~LocalMergeSortExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, 
Profile&& profile,
                 SinkInfo&& sink_info) override;
@@ -365,7 +369,7 @@ public:
 private:
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
     MergeInfo _merge_info;
-    std::vector<std::atomic_int64_t> _queues_mem_usege;
+    std::vector<std::shared_ptr<std::atomic_bool>> _eos;
 };
 
 class BroadcastExchanger final : public Exchanger<BroadcastBlock> {
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index c7993e7fc95..68abe710110 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -192,12 +192,12 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
             desc[i].direction = is_asc_order[i] ? 1 : -1;
             desc[i].nulls_direction = nulls_first[i] ? -desc[i].direction : 
desc[i].direction;
         }
-        _is_eof = !has_next_block();
+        has_next_block();
     }
 
     BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const 
SortDescription& desc_)
             : MergeSortCursorImpl(desc_), 
_block_supplier(std::move(block_supplier)) {
-        _is_eof = !has_next_block();
+        has_next_block();
     }
 
     bool has_next_block() override {
@@ -205,9 +205,8 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
             return false;
         }
         block->clear();
-        do {
-            THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof));
-        } while (block->empty() && !_is_eof);
+        THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof));
+        DCHECK(!block->empty() || _is_eof);
         if (!block->empty()) {
             DCHECK_EQ(_ordering_expr.size(), desc.size());
             for (int i = 0; i < desc.size(); ++i) {
diff --git a/be/test/pipeline/local_exchanger_test.cpp 
b/be/test/pipeline/local_exchanger_test.cpp
index b0e91f0018f..da19e6ef8d6 100644
--- a/be/test/pipeline/local_exchanger_test.cpp
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -446,7 +446,7 @@ TEST_F(LocalExchangerTest, PassthroughExchanger) {
                                               _local_states[i].get()}),
                         Status::OK());
                 EXPECT_EQ(block.rows(), j == 1 ? 0 : 10);
-                EXPECT_EQ(eos, false);
+                EXPECT_FALSE(eos);
                 EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1);
             }
         }
@@ -645,7 +645,7 @@ TEST_F(LocalExchangerTest, PassToOneExchanger) {
                                               _local_states[i].get()}),
                         Status::OK());
                 EXPECT_EQ(block.rows(), j == 1 ? 0 : 10);
-                EXPECT_EQ(eos, false);
+                EXPECT_FALSE(eos);
                 EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1);
             }
         }
@@ -799,7 +799,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) {
                                               _local_states[i].get()}),
                         Status::OK());
                 EXPECT_EQ(block.rows(), j == num_blocks * num_sources ? 0 : 
10);
-                EXPECT_EQ(eos, false);
+                EXPECT_FALSE(eos);
                 EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 
num_blocks * num_sources);
             }
         }
@@ -836,7 +836,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) {
                                               _local_states[i].get()}),
                         Status::OK());
                 EXPECT_EQ(block.rows(), j == num_sources ? 0 : 10);
-                EXPECT_EQ(eos, false);
+                EXPECT_FALSE(eos);
                 EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 
num_sources);
             }
         }
@@ -1038,7 +1038,7 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) {
                                               _local_states[i].get()}),
                         Status::OK());
                 EXPECT_EQ(block.rows(), j == 1 ? 0 : num_rows_per_block);
-                EXPECT_EQ(eos, false);
+                EXPECT_FALSE(eos);
                 EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1);
             }
         }
@@ -1324,6 +1324,7 @@ TEST_F(LocalExchangerTest, LocalMergeSortExchanger) {
                 "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
         shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
     }
+    _local_states[0]->_local_merge_deps = shared_state->source_deps;
     {
         // Enqueue 2 blocks with 10 rows for each data queue.
         for (size_t i = 0; i < num_partitions; i++) {
@@ -1354,7 +1355,7 @@ TEST_F(LocalExchangerTest, LocalMergeSortExchanger) {
 
     {
         for (size_t i = 0; i < num_sources; i++) {
-            EXPECT_EQ(shared_state->mem_counters[i]->value(), 
shared_state->_queues_mem_usage[i]);
+            EXPECT_EQ(shared_state->mem_counters[i]->value(), 
expect_block_bytes * num_blocks);
             EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
         }
         // Dequeue from data queue and accumulate rows if rows is smaller than 
batch_size.
@@ -1382,4 +1383,174 @@ TEST_F(LocalExchangerTest, LocalMergeSortExchanger) {
     }
 }
 
+TEST_F(LocalExchangerTest, LocalMergeSortExchangerWithEmptyBlock) {
+    int num_sink = 4;
+    int num_sources = 4;
+    int num_partitions = 4;
+    int free_block_limit = 0;
+    const auto expect_block_bytes = 128;
+    const auto num_blocks = 2;
+    std::vector<bool> is_asc_order;
+    std::vector<bool> nulls_first;
+    vectorized::VExprContextSPtrs ordering_expr_ctxs;
+    auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF,
+                                  TTypeDescBuilder()
+                                          .set_types(TTypeNodeBuilder()
+                                                             
.set_type(TTypeNodeType::SCALAR)
+                                                             
.set_scalar_type(TPrimitiveType::INT)
+                                                             .build())
+                                          .build(),
+                                  0)
+                         .set_slot_ref(TSlotRefBuilder(0, 0).build())
+                         .build();
+    auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+    slot->_column_id = 0;
+    slot->_prepare_finished = true;
+    
ordering_expr_ctxs.push_back(std::make_shared<doris::vectorized::VExprContext>(slot));
+    is_asc_order.push_back(true);
+    nulls_first.push_back(true);
+    ordering_expr_ctxs[0]->_prepared = true;
+    ordering_expr_ctxs[0]->_opened = true;
+
+    std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> 
_sink_local_states;
+    std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+    _sink_local_states.resize(num_sink);
+    _local_states.resize(num_sources);
+    config::local_exchange_buffer_mem_limit = expect_block_bytes * 
num_partitions;
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto shared_state = 
LocalMergeExchangeSharedState::create_shared(num_partitions);
+    shared_state->exchanger = LocalMergeSortExchanger::create_unique(
+            LocalMergeSortExchanger::MergeInfo {is_asc_order, nulls_first, -1, 
0,
+                                                ordering_expr_ctxs},
+            num_sink, num_partitions, free_block_limit);
+    auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_deps.push_back(sink_dep);
+    shared_state->create_dependencies(0);
+    auto& sink_deps = shared_state->sink_deps;
+    EXPECT_EQ(sink_deps.size(), num_sink);
+
+    auto* exchanger = (LocalMergeSortExchanger*)shared_state->exchanger.get();
+    for (size_t i = 0; i < num_sink; i++) {
+        auto compute_hash_value_timer =
+                ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+        auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + 
std::to_string(i));
+        _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, 
nullptr));
+        _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+        _sink_local_states[i]->_compute_hash_value_timer = 
compute_hash_value_timer;
+        _sink_local_states[i]->_distribute_timer = distribute_timer;
+        _sink_local_states[i]->_channel_id = i;
+        _sink_local_states[i]->_shared_state = shared_state.get();
+        _sink_local_states[i]->_dependency = sink_deps[i].get();
+    }
+    LocalExchangeSourceOperatorX op;
+    for (size_t i = 0; i < num_sources; i++) {
+        auto get_block_failed_counter =
+                ADD_TIMER(profile, "_get_block_failed_counter" + 
std::to_string(i));
+        auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + 
std::to_string(i));
+        _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, 
nullptr));
+        _local_states[i]->_runtime_profile =
+                std::make_unique<RuntimeProfile>("source_profile " + 
std::to_string(i));
+        _local_states[i]->_exchanger = shared_state->exchanger.get();
+        _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+        _local_states[i]->_copy_data_timer = copy_data_timer;
+        _local_states[i]->_channel_id = i;
+        _local_states[i]->_shared_state = shared_state.get();
+        _local_states[i]->_dependency = 
shared_state->get_dep_by_channel_id(i).front().get();
+        _local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+        shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+        _local_states[i]->_parent = &op;
+    }
+    _local_states[0]->_local_merge_deps = shared_state->source_deps;
+    {
+        // Enqueue 2 blocks with 10 rows for each data queue.
+        for (size_t i = 0; i < num_partitions; i++) {
+            for (size_t j = 0; j < num_blocks; j++) {
+                vectorized::Block in_block;
+                vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+                auto int_col0 = vectorized::ColumnInt32::create();
+                int_col0->insert_many_vals(i, 10);
+
+                in_block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+                EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes());
+                bool in_eos = false;
+                EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, 
in_eos,
+                                          
{_sink_local_states[i]->_compute_hash_value_timer,
+                                           
_sink_local_states[i]->_distribute_timer, nullptr},
+                                          {&_sink_local_states[i]->_channel_id,
+                                           
_sink_local_states[i]->_partitioner.get(),
+                                           _sink_local_states[i].get(), 
nullptr}),
+                          Status::OK());
+                EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), j == 0);
+                EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+            }
+        }
+    }
+
+    for (size_t i = 0; i < num_sources; i++) {
+        EXPECT_EQ(shared_state->mem_counters[i]->value(), expect_block_bytes * 
num_blocks);
+        EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+    }
+    // Dequeue from data queue and accumulate rows if rows is smaller than 
batch_size.
+    EXPECT_EQ(exchanger->_merger, nullptr);
+    for (size_t i = 0; i < num_sources; i++) {
+        for (size_t j = 0; j < num_blocks; j++) {
+            bool eos = false;
+            vectorized::Block block;
+            EXPECT_EQ(exchanger->get_block(_runtime_state.get(), &block, &eos,
+                                           {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                                           
{cast_set<int>(_local_states[i]->_channel_id),
+                                            _local_states[i].get()}),
+                      Status::OK());
+        }
+    }
+
+    bool error = false;
+    try {
+        vectorized::Block block;
+        bool eos = false;
+        EXPECT_TRUE(exchanger
+                            ->get_block(_runtime_state.get(), &block, &eos,
+                                        {nullptr, nullptr, 
_local_states[0]->_copy_data_timer},
+                                        
{cast_set<int>(_local_states[0]->_channel_id),
+                                         _local_states[0].get()})
+                            .is<ErrorCode::INTERNAL_ERROR>());
+    } catch (const Exception& e) {
+        error = true;
+        EXPECT_TRUE(e.to_status().is<ErrorCode::INTERNAL_ERROR>());
+    }
+    EXPECT_TRUE(error);
+    for (size_t i = 0; i < num_sink; i++) {
+        shared_state->sub_running_sink_operators();
+    }
+
+    {
+        for (size_t i = 0; i < num_sources; i++) {
+            for (size_t j = 0; j < num_sink * num_blocks - num_blocks; j++) {
+                bool eos = false;
+                error = false;
+                vectorized::Block block;
+                try {
+                    EXPECT_EQ(exchanger->get_block(
+                                      _runtime_state.get(), &block, &eos,
+                                      {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                                      
{cast_set<int>(_local_states[i]->_channel_id),
+                                       _local_states[i].get()}),
+                              Status::OK());
+                } catch (const Exception& e) {
+                    error = true;
+                    EXPECT_TRUE(e.to_status().is<ErrorCode::INTERNAL_ERROR>());
+                }
+                // error is true when we get another block after all blocks in 
one queue are exhausted.
+                EXPECT_EQ(error, i == 0 && (j % (num_blocks + 1) == 0)) << i 
<< " " << j;
+                EXPECT_TRUE(exchanger->_merger != nullptr);
+                EXPECT_EQ(block.rows(), i > 0 || (j % (num_blocks + 1) == 0) ? 
0 : 10)
+                        << i << " " << j;
+                EXPECT_EQ(eos, i > 0 || j == num_sink * num_blocks - 
num_blocks) << i << " " << j;
+            }
+        }
+    }
+}
+
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to