This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b0fa458f2e6 [UT](local exchanger) Add UT case for local exchanger 
(#47630)
b0fa458f2e6 is described below

commit b0fa458f2e69c8d82d36304e7bf1f7d6f207c569
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Sat Feb 8 15:11:14 2025 +0800

    [UT](local exchanger) Add UT case for local exchanger (#47630)
---
 .../local_exchange_source_operator.cpp             |   7 +-
 be/test/pipeline/local_exchanger_test.cpp          | 390 +++++++++++++++++++++
 2 files changed, 394 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 63e36cdfdb0..706d8bb9bf9 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -76,15 +76,16 @@ std::vector<Dependency*> 
LocalExchangeSourceLocalState::dependencies() const {
         // If this is a local merge exchange, source operator is runnable only 
if all sink operators
         // set dependencies ready
         std::vector<Dependency*> deps;
-        auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
         DCHECK_GT(_local_merge_deps.size(), 1);
         // If this is a local merge exchange, we should use all dependencies 
here.
         for (auto& dep : _local_merge_deps) {
             deps.push_back(dep.get());
         }
         return deps;
-    } else if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT && 
_channel_id != 0) {
-        // If this is a local merge exchange and is not the first task, source 
operators always
+    } else if ((_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT ||
+                _exchanger->get_type() == ExchangeType::PASS_TO_ONE) &&
+               _channel_id != 0) {
+        // If this is a LOCAL_MERGE_SORT/PASS_TO_ONE exchange and is not the 
first task, source operators always
         // return empty result so no dependencies here.
         return {};
     } else {
diff --git a/be/test/pipeline/local_exchanger_test.cpp 
b/be/test/pipeline/local_exchanger_test.cpp
index 3db2375866c..686b98627bf 100644
--- a/be/test/pipeline/local_exchanger_test.cpp
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -283,4 +283,394 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) {
     }
 }
 
+TEST_F(LocalExchangerTest, PassthroughExchanger) {
+    int num_sink = 4;
+    int num_sources = 4;
+    int free_block_limit = 0;
+
+    std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> 
_sink_local_states;
+    std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+    _sink_local_states.resize(num_sink);
+    _local_states.resize(num_sources);
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto shared_state = LocalExchangeSharedState::create_shared(num_sources);
+    shared_state->exchanger =
+            PassthroughExchanger::create_unique(num_sink, num_sources, 
free_block_limit);
+    auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_deps.push_back(sink_dep);
+    shared_state->create_dependencies(0);
+
+    auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
+    for (size_t i = 0; i < num_sink; i++) {
+        auto compute_hash_value_timer =
+                ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+        auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + 
std::to_string(i));
+        _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, 
nullptr));
+        _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+        _sink_local_states[i]->_compute_hash_value_timer = 
compute_hash_value_timer;
+        _sink_local_states[i]->_distribute_timer = distribute_timer;
+        _sink_local_states[i]->_channel_id = i;
+        _sink_local_states[i]->_shared_state = shared_state.get();
+        _sink_local_states[i]->_dependency = sink_dep.get();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        auto get_block_failed_counter =
+                ADD_TIMER(profile, "_get_block_failed_counter" + 
std::to_string(i));
+        auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + 
std::to_string(i));
+        _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, 
nullptr));
+        _local_states[i]->_exchanger = shared_state->exchanger.get();
+        _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+        _local_states[i]->_copy_data_timer = copy_data_timer;
+        _local_states[i]->_channel_id = i;
+        _local_states[i]->_shared_state = shared_state.get();
+        _local_states[i]->_dependency = 
shared_state->get_dep_by_channel_id(i).front().get();
+        _local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+        shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+    }
+
+    const auto expect_block_bytes = 128;
+    const auto num_blocks = num_sources + 1;
+    config::local_exchange_buffer_mem_limit = (num_sources - 1) * num_blocks * 
expect_block_bytes;
+    {
+        // Enqueue `num_blocks` blocks with 10 rows for each data queue.
+        for (size_t i = 0; i < num_sources; i++) {
+            for (size_t j = 0; j < num_blocks; j++) {
+                vectorized::Block in_block;
+                vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+                auto int_col0 = vectorized::ColumnInt32::create();
+                int_col0->insert_many_vals(i, 10);
+                in_block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+                EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes());
+                bool in_eos = false;
+                EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, 
in_eos,
+                                          
{_sink_local_states[i]->_compute_hash_value_timer,
+                                           
_sink_local_states[i]->_distribute_timer, nullptr},
+                                          {&_sink_local_states[i]->_channel_id,
+                                           
_sink_local_states[i]->_partitioner.get(),
+                                           _sink_local_states[i].get(), 
nullptr}),
+                          Status::OK());
+                EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), i < 
num_sources - 1);
+                EXPECT_EQ(_sink_local_states[i]->_channel_id, i + 1 + j);
+            }
+        }
+    }
+
+    {
+        int64_t mem_usage = 0;
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_GT(shared_state->mem_counters[i]->value(), 0);
+            mem_usage += shared_state->mem_counters[i]->value();
+            EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+        }
+        EXPECT_EQ(shared_state->mem_usage, mem_usage);
+        // Dequeue from data queue and accumulate rows if rows is smaller than 
batch_size.
+        for (size_t i = 0; i < num_sources; i++) {
+            for (size_t j = 0; j <= num_blocks; j++) {
+                bool eos = false;
+                vectorized::Block block;
+                EXPECT_EQ(
+                        exchanger->get_block(_runtime_state.get(), &block, 
&eos,
+                                             {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                                             
{cast_set<int>(_local_states[i]->_channel_id),
+                                              _local_states[i].get()}),
+                        Status::OK());
+                EXPECT_EQ(block.rows(), j == num_blocks ? 0 : 10);
+                EXPECT_EQ(eos, false);
+                EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 
num_blocks);
+            }
+        }
+        EXPECT_EQ(shared_state->mem_usage, 0);
+    }
+    {
+        // Add new block and source dependency will be ready again.
+        for (size_t i = 0; i < num_sink; i++) {
+            EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true);
+            vectorized::Block in_block;
+            vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+            auto int_col0 = vectorized::ColumnInt32::create();
+            int_col0->insert_many_vals(i, 10);
+            in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+            bool in_eos = false;
+            EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos,
+                                      
{_sink_local_states[i]->_compute_hash_value_timer,
+                                       
_sink_local_states[i]->_distribute_timer, nullptr},
+                                      {&_sink_local_states[i]->_channel_id,
+                                       
_sink_local_states[i]->_partitioner.get(),
+                                       _sink_local_states[i].get(), nullptr}),
+                      Status::OK());
+            EXPECT_EQ(_sink_local_states[i]->_channel_id, i + 1 + num_blocks);
+        }
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+            for (size_t j = 0; j <= 1; j++) {
+                bool eos = false;
+                vectorized::Block block;
+                EXPECT_EQ(
+                        exchanger->get_block(_runtime_state.get(), &block, 
&eos,
+                                             {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                                             
{cast_set<int>(_local_states[i]->_channel_id),
+                                              _local_states[i].get()}),
+                        Status::OK());
+                EXPECT_EQ(block.rows(), j == 1 ? 0 : 10);
+                EXPECT_EQ(eos, false);
+                EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1);
+            }
+        }
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        EXPECT_EQ(exchanger->_data_queue[i].eos, false);
+        EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+    }
+    for (size_t i = 0; i < num_sink; i++) {
+        shared_state->sub_running_sink_operators();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        bool eos = false;
+        vectorized::Block block;
+        EXPECT_EQ(exchanger->get_block(
+                          _runtime_state.get(), &block, &eos,
+                          {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                          {cast_set<int>(_local_states[i]->_channel_id), 
_local_states[i].get()}),
+                  Status::OK());
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_EQ(eos, true);
+        EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        exchanger->close({cast_set<int>(i), nullptr});
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        shared_state->sub_running_source_operators();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+        EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+    }
+
+    {
+        // After exchanger closed, data will never push into data queue again.
+        for (size_t i = 0; i < num_sink; i++) {
+            vectorized::Block in_block;
+            vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+            auto int_col0 = vectorized::ColumnInt32::create();
+            int_col0->insert_many_vals(i, 10);
+            in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+            bool in_eos = false;
+            EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos,
+                                      
{_sink_local_states[i]->_compute_hash_value_timer,
+                                       
_sink_local_states[i]->_distribute_timer, nullptr},
+                                      {&_sink_local_states[i]->_channel_id,
+                                       
_sink_local_states[i]->_partitioner.get(),
+                                       _sink_local_states[i].get(), nullptr}),
+                      Status::OK());
+            EXPECT_EQ(_sink_local_states[i]->_channel_id, i + 2 + num_blocks);
+        }
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+            EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+        }
+    }
+}
+
+TEST_F(LocalExchangerTest, PassToOneExchanger) {
+    int num_sink = 4;
+    int num_sources = 4;
+    int free_block_limit = 0;
+
+    std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> 
_sink_local_states;
+    std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+    _sink_local_states.resize(num_sink);
+    _local_states.resize(num_sources);
+    auto profile = std::make_shared<RuntimeProfile>("");
+    auto shared_state = LocalExchangeSharedState::create_shared(num_sources);
+    shared_state->exchanger =
+            PassToOneExchanger::create_unique(num_sink, num_sources, 
free_block_limit);
+    auto sink_dep = std::make_shared<Dependency>(0, 0, 
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+    sink_dep->set_shared_state(shared_state.get());
+    shared_state->sink_deps.push_back(sink_dep);
+    shared_state->create_dependencies(0);
+
+    auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
+    for (size_t i = 0; i < num_sink; i++) {
+        auto compute_hash_value_timer =
+                ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+        auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + 
std::to_string(i));
+        _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, 
nullptr));
+        _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+        _sink_local_states[i]->_compute_hash_value_timer = 
compute_hash_value_timer;
+        _sink_local_states[i]->_distribute_timer = distribute_timer;
+        _sink_local_states[i]->_channel_id = i;
+        _sink_local_states[i]->_shared_state = shared_state.get();
+        _sink_local_states[i]->_dependency = sink_dep.get();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        auto get_block_failed_counter =
+                ADD_TIMER(profile, "_get_block_failed_counter" + 
std::to_string(i));
+        auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + 
std::to_string(i));
+        _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, 
nullptr));
+        _local_states[i]->_exchanger = shared_state->exchanger.get();
+        _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+        _local_states[i]->_copy_data_timer = copy_data_timer;
+        _local_states[i]->_channel_id = i;
+        _local_states[i]->_shared_state = shared_state.get();
+        _local_states[i]->_dependency = 
shared_state->get_dep_by_channel_id(i).front().get();
+        _local_states[i]->_memory_used_counter = 
profile->AddHighWaterMarkCounter(
+                "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+        shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+    }
+
+    const auto expect_block_bytes = 128;
+    const auto num_blocks = 2;
+    config::local_exchange_buffer_mem_limit = (num_sources - 1) * num_blocks * 
expect_block_bytes;
+    {
+        // Enqueue `num_blocks` blocks with 10 rows for each data queue.
+        for (size_t i = 0; i < num_sources; i++) {
+            for (size_t j = 0; j < num_blocks; j++) {
+                vectorized::Block in_block;
+                vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+                auto int_col0 = vectorized::ColumnInt32::create();
+                int_col0->insert_many_vals(i, 10);
+                in_block.insert({std::move(int_col0), int_type, 
"test_int_col0"});
+                EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes());
+                bool in_eos = false;
+                EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, 
in_eos,
+                                          
{_sink_local_states[i]->_compute_hash_value_timer,
+                                           
_sink_local_states[i]->_distribute_timer, nullptr},
+                                          {&_sink_local_states[i]->_channel_id,
+                                           
_sink_local_states[i]->_partitioner.get(),
+                                           _sink_local_states[i].get(), 
nullptr}),
+                          Status::OK());
+                EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), i < 
num_sources - 1);
+                EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+            }
+        }
+        for (size_t i = 1; i < num_sources; i++) {
+            EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+        }
+    }
+
+    {
+        int64_t mem_usage = 0;
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_EQ(shared_state->mem_counters[i]->value(),
+                      i == 0 ? expect_block_bytes * num_blocks * num_sink : 0);
+            mem_usage += shared_state->mem_counters[i]->value();
+            if (i == 0) {
+                EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+            }
+        }
+        EXPECT_EQ(shared_state->mem_usage, mem_usage);
+        // Dequeue from data queue and accumulate rows if rows is smaller than 
batch_size.
+        for (size_t i = 0; i < num_sources; i++) {
+            for (size_t j = 0; j <= (i == 0 ? num_blocks * num_sink : 0); j++) 
{
+                bool eos = false;
+                vectorized::Block block;
+                EXPECT_EQ(
+                        exchanger->get_block(_runtime_state.get(), &block, 
&eos,
+                                             {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                                             
{cast_set<int>(_local_states[i]->_channel_id),
+                                              _local_states[i].get()}),
+                        Status::OK());
+                EXPECT_EQ(block.rows(), i == 0 && j < num_blocks * num_sink ? 
10 : 0);
+                EXPECT_EQ(eos, i != 0);
+                if (i == 0) {
+                    EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 
num_blocks * num_sink);
+                }
+            }
+        }
+        EXPECT_EQ(shared_state->mem_usage, 0);
+    }
+    {
+        // Add new block and source dependency will be ready again.
+        for (size_t i = 0; i < 1; i++) {
+            EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true);
+            vectorized::Block in_block;
+            vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+            auto int_col0 = vectorized::ColumnInt32::create();
+            int_col0->insert_many_vals(i, 10);
+            in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+            bool in_eos = false;
+            EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos,
+                                      
{_sink_local_states[i]->_compute_hash_value_timer,
+                                       
_sink_local_states[i]->_distribute_timer, nullptr},
+                                      {&_sink_local_states[i]->_channel_id,
+                                       
_sink_local_states[i]->_partitioner.get(),
+                                       _sink_local_states[i].get(), nullptr}),
+                      Status::OK());
+            EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+        }
+        for (size_t i = 0; i < 1; i++) {
+            EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+            for (size_t j = 0; j <= 1; j++) {
+                bool eos = false;
+                vectorized::Block block;
+                EXPECT_EQ(
+                        exchanger->get_block(_runtime_state.get(), &block, 
&eos,
+                                             {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                                             
{cast_set<int>(_local_states[i]->_channel_id),
+                                              _local_states[i].get()}),
+                        Status::OK());
+                EXPECT_EQ(block.rows(), j == 1 ? 0 : 10);
+                EXPECT_EQ(eos, false);
+                EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1);
+            }
+        }
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        EXPECT_EQ(exchanger->_data_queue[i].eos, false);
+        EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+    }
+    for (size_t i = 0; i < num_sink; i++) {
+        shared_state->sub_running_sink_operators();
+    }
+    for (size_t i = 0; i < 1; i++) {
+        bool eos = false;
+        vectorized::Block block;
+        EXPECT_EQ(exchanger->get_block(
+                          _runtime_state.get(), &block, &eos,
+                          {nullptr, nullptr, 
_local_states[i]->_copy_data_timer},
+                          {cast_set<int>(_local_states[i]->_channel_id), 
_local_states[i].get()}),
+                  Status::OK());
+        EXPECT_EQ(block.rows(), 0);
+        EXPECT_EQ(eos, true);
+        EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        exchanger->close({cast_set<int>(i), nullptr});
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        shared_state->sub_running_source_operators();
+    }
+    for (size_t i = 0; i < num_sources; i++) {
+        EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+        EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+    }
+
+    {
+        // After exchanger closed, data will never push into data queue again.
+        for (size_t i = 0; i < num_sink; i++) {
+            vectorized::Block in_block;
+            vectorized::DataTypePtr int_type = 
std::make_shared<vectorized::DataTypeInt32>();
+            auto int_col0 = vectorized::ColumnInt32::create();
+            int_col0->insert_many_vals(i, 10);
+            in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+            bool in_eos = false;
+            EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos,
+                                      
{_sink_local_states[i]->_compute_hash_value_timer,
+                                       
_sink_local_states[i]->_distribute_timer, nullptr},
+                                      {&_sink_local_states[i]->_channel_id,
+                                       
_sink_local_states[i]->_partitioner.get(),
+                                       _sink_local_states[i].get(), nullptr}),
+                      Status::OK());
+            EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+        }
+        for (size_t i = 0; i < num_sources; i++) {
+            EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+            EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+        }
+    }
+}
+
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to