This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b0fa458f2e6 [UT](local exchanger) Add UT case for local exchanger (#47630) b0fa458f2e6 is described below commit b0fa458f2e69c8d82d36304e7bf1f7d6f207c569 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Sat Feb 8 15:11:14 2025 +0800 [UT](local exchanger) Add UT case for local exchanger (#47630) --- .../local_exchange_source_operator.cpp | 7 +- be/test/pipeline/local_exchanger_test.cpp | 390 +++++++++++++++++++++ 2 files changed, 394 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 63e36cdfdb0..706d8bb9bf9 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -76,15 +76,16 @@ std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const { // If this is a local merge exchange, source operator is runnable only if all sink operators // set dependencies ready std::vector<Dependency*> deps; - auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id); DCHECK_GT(_local_merge_deps.size(), 1); // If this is a local merge exchange, we should use all dependencies here. for (auto& dep : _local_merge_deps) { deps.push_back(dep.get()); } return deps; - } else if (_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT && _channel_id != 0) { - // If this is a local merge exchange and is not the first task, source operators always + } else if ((_exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT || + _exchanger->get_type() == ExchangeType::PASS_TO_ONE) && + _channel_id != 0) { + // If this is a LOCAL_MERGE_SORT/PASS_TO_ONE exchange and is not the first task, source operators always // return empty result so no dependencies here. return {}; } else { diff --git a/be/test/pipeline/local_exchanger_test.cpp b/be/test/pipeline/local_exchanger_test.cpp index 3db2375866c..686b98627bf 100644 --- a/be/test/pipeline/local_exchanger_test.cpp +++ b/be/test/pipeline/local_exchanger_test.cpp @@ -283,4 +283,394 @@ TEST_F(LocalExchangerTest, ShuffleExchanger) { } } +TEST_F(LocalExchangerTest, PassthroughExchanger) { + int num_sink = 4; + int num_sources = 4; + int free_block_limit = 0; + + std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> _sink_local_states; + std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states; + _sink_local_states.resize(num_sink); + _local_states.resize(num_sources); + auto profile = std::make_shared<RuntimeProfile>(""); + auto shared_state = LocalExchangeSharedState::create_shared(num_sources); + shared_state->exchanger = + PassthroughExchanger::create_unique(num_sink, num_sources, free_block_limit); + auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + shared_state->create_dependencies(0); + + auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get(); + for (size_t i = 0; i < num_sink; i++) { + auto compute_hash_value_timer = + ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i)); + auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + std::to_string(i)); + _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, nullptr)); + _sink_local_states[i]->_exchanger = shared_state->exchanger.get(); + _sink_local_states[i]->_compute_hash_value_timer = compute_hash_value_timer; + _sink_local_states[i]->_distribute_timer = distribute_timer; + _sink_local_states[i]->_channel_id = i; + _sink_local_states[i]->_shared_state = shared_state.get(); + _sink_local_states[i]->_dependency = sink_dep.get(); + } + for (size_t i = 0; i < num_sources; i++) { + auto get_block_failed_counter = + ADD_TIMER(profile, "_get_block_failed_counter" + std::to_string(i)); + auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + std::to_string(i)); + _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, nullptr)); + _local_states[i]->_exchanger = shared_state->exchanger.get(); + _local_states[i]->_get_block_failed_counter = get_block_failed_counter; + _local_states[i]->_copy_data_timer = copy_data_timer; + _local_states[i]->_channel_id = i; + _local_states[i]->_shared_state = shared_state.get(); + _local_states[i]->_dependency = shared_state->get_dep_by_channel_id(i).front().get(); + _local_states[i]->_memory_used_counter = profile->AddHighWaterMarkCounter( + "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); + shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter; + } + + const auto expect_block_bytes = 128; + const auto num_blocks = num_sources + 1; + config::local_exchange_buffer_mem_limit = (num_sources - 1) * num_blocks * expect_block_bytes; + { + // Enqueue `num_blocks` blocks with 10 rows for each data queue. + for (size_t i = 0; i < num_sources; i++) { + for (size_t j = 0; j < num_blocks; j++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes()); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), i < num_sources - 1); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i + 1 + j); + } + } + } + + { + int64_t mem_usage = 0; + for (size_t i = 0; i < num_sources; i++) { + EXPECT_GT(shared_state->mem_counters[i]->value(), 0); + mem_usage += shared_state->mem_counters[i]->value(); + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + } + EXPECT_EQ(shared_state->mem_usage, mem_usage); + // Dequeue from data queue and accumulate rows if rows is smaller than batch_size. + for (size_t i = 0; i < num_sources; i++) { + for (size_t j = 0; j <= num_blocks; j++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ( + exchanger->get_block(_runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), + _local_states[i].get()}), + Status::OK()); + EXPECT_EQ(block.rows(), j == num_blocks ? 0 : 10); + EXPECT_EQ(eos, false); + EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_blocks); + } + } + EXPECT_EQ(shared_state->mem_usage, 0); + } + { + // Add new block and source dependency will be ready again. + for (size_t i = 0; i < num_sink; i++) { + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true); + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i + 1 + num_blocks); + } + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + for (size_t j = 0; j <= 1; j++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ( + exchanger->get_block(_runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), + _local_states[i].get()}), + Status::OK()); + EXPECT_EQ(block.rows(), j == 1 ? 0 : 10); + EXPECT_EQ(eos, false); + EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); + } + } + } + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].eos, false); + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); + } + for (size_t i = 0; i < num_sink; i++) { + shared_state->sub_running_sink_operators(); + } + for (size_t i = 0; i < num_sources; i++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ(exchanger->get_block( + _runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), _local_states[i].get()}), + Status::OK()); + EXPECT_EQ(block.rows(), 0); + EXPECT_EQ(eos, true); + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + } + for (size_t i = 0; i < num_sources; i++) { + exchanger->close({cast_set<int>(i), nullptr}); + } + for (size_t i = 0; i < num_sources; i++) { + shared_state->sub_running_source_operators(); + } + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].eos, true); + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); + } + + { + // After exchanger closed, data will never push into data queue again. + for (size_t i = 0; i < num_sink; i++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i + 2 + num_blocks); + } + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].eos, true); + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); + } + } +} + +TEST_F(LocalExchangerTest, PassToOneExchanger) { + int num_sink = 4; + int num_sources = 4; + int free_block_limit = 0; + + std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> _sink_local_states; + std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states; + _sink_local_states.resize(num_sink); + _local_states.resize(num_sources); + auto profile = std::make_shared<RuntimeProfile>(""); + auto shared_state = LocalExchangeSharedState::create_shared(num_sources); + shared_state->exchanger = + PassToOneExchanger::create_unique(num_sink, num_sources, free_block_limit); + auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + shared_state->create_dependencies(0); + + auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get(); + for (size_t i = 0; i < num_sink; i++) { + auto compute_hash_value_timer = + ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i)); + auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + std::to_string(i)); + _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, nullptr)); + _sink_local_states[i]->_exchanger = shared_state->exchanger.get(); + _sink_local_states[i]->_compute_hash_value_timer = compute_hash_value_timer; + _sink_local_states[i]->_distribute_timer = distribute_timer; + _sink_local_states[i]->_channel_id = i; + _sink_local_states[i]->_shared_state = shared_state.get(); + _sink_local_states[i]->_dependency = sink_dep.get(); + } + for (size_t i = 0; i < num_sources; i++) { + auto get_block_failed_counter = + ADD_TIMER(profile, "_get_block_failed_counter" + std::to_string(i)); + auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + std::to_string(i)); + _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, nullptr)); + _local_states[i]->_exchanger = shared_state->exchanger.get(); + _local_states[i]->_get_block_failed_counter = get_block_failed_counter; + _local_states[i]->_copy_data_timer = copy_data_timer; + _local_states[i]->_channel_id = i; + _local_states[i]->_shared_state = shared_state.get(); + _local_states[i]->_dependency = shared_state->get_dep_by_channel_id(i).front().get(); + _local_states[i]->_memory_used_counter = profile->AddHighWaterMarkCounter( + "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); + shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter; + } + + const auto expect_block_bytes = 128; + const auto num_blocks = 2; + config::local_exchange_buffer_mem_limit = (num_sources - 1) * num_blocks * expect_block_bytes; + { + // Enqueue `num_blocks` blocks with 10 rows for each data queue. + for (size_t i = 0; i < num_sources; i++) { + for (size_t j = 0; j < num_blocks; j++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes()); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), i < num_sources - 1); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i); + } + } + for (size_t i = 1; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); + } + } + + { + int64_t mem_usage = 0; + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(shared_state->mem_counters[i]->value(), + i == 0 ? expect_block_bytes * num_blocks * num_sink : 0); + mem_usage += shared_state->mem_counters[i]->value(); + if (i == 0) { + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + } + } + EXPECT_EQ(shared_state->mem_usage, mem_usage); + // Dequeue from data queue and accumulate rows if rows is smaller than batch_size. + for (size_t i = 0; i < num_sources; i++) { + for (size_t j = 0; j <= (i == 0 ? num_blocks * num_sink : 0); j++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ( + exchanger->get_block(_runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), + _local_states[i].get()}), + Status::OK()); + EXPECT_EQ(block.rows(), i == 0 && j < num_blocks * num_sink ? 10 : 0); + EXPECT_EQ(eos, i != 0); + if (i == 0) { + EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_blocks * num_sink); + } + } + } + EXPECT_EQ(shared_state->mem_usage, 0); + } + { + // Add new block and source dependency will be ready again. + for (size_t i = 0; i < 1; i++) { + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true); + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i); + } + for (size_t i = 0; i < 1; i++) { + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + for (size_t j = 0; j <= 1; j++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ( + exchanger->get_block(_runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), + _local_states[i].get()}), + Status::OK()); + EXPECT_EQ(block.rows(), j == 1 ? 0 : 10); + EXPECT_EQ(eos, false); + EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); + } + } + } + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].eos, false); + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); + } + for (size_t i = 0; i < num_sink; i++) { + shared_state->sub_running_sink_operators(); + } + for (size_t i = 0; i < 1; i++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ(exchanger->get_block( + _runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), _local_states[i].get()}), + Status::OK()); + EXPECT_EQ(block.rows(), 0); + EXPECT_EQ(eos, true); + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + } + for (size_t i = 0; i < num_sources; i++) { + exchanger->close({cast_set<int>(i), nullptr}); + } + for (size_t i = 0; i < num_sources; i++) { + shared_state->sub_running_source_operators(); + } + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].eos, true); + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); + } + + { + // After exchanger closed, data will never push into data queue again. + for (size_t i = 0; i < num_sink; i++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i); + } + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].eos, true); + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0); + } + } +} + } // namespace doris::pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org