This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1738a56dc9d [UT](exchanger) Add UT case to test local merge exchanger (#47787) 1738a56dc9d is described below commit 1738a56dc9d316431b63b8d7ab0134c722128533 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Wed Feb 12 10:15:32 2025 +0800 [UT](exchanger) Add UT case to test local merge exchanger (#47787) --- be/src/pipeline/exec/operator.h | 6 + .../local_exchange/local_exchange_sink_operator.h | 9 + .../local_exchange_source_operator.h | 3 + be/src/pipeline/local_exchange/local_exchanger.cpp | 1 - be/src/pipeline/local_exchange/local_exchanger.h | 5 +- be/src/vec/core/sort_cursor.h | 21 +- be/test/pipeline/local_exchanger_test.cpp | 284 +++++++++++++++++++++ 7 files changed, 313 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index e0f11049415..241fd42b5c7 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -435,6 +435,9 @@ public: DataSinkOperatorXBase(const int operator_id, const int node_id, std::vector<int>& sources) : OperatorBase(), _operator_id(operator_id), _node_id(node_id), _dests_id(sources) {} +#ifdef BE_TEST + DataSinkOperatorXBase() : _operator_id(-1), _node_id(0), _dests_id({-1}) {}; +#endif ~DataSinkOperatorXBase() override = default; @@ -537,6 +540,9 @@ public: DataSinkOperatorX(const int id, const int node_id, std::vector<int> sources) : DataSinkOperatorXBase(id, node_id, sources) {} +#ifdef BE_TEST + DataSinkOperatorX() = default; +#endif ~DataSinkOperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 7ef053af6f7..1620a68d33e 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -92,6 +92,15 @@ public: _texprs(texprs), _partitioned_exprs_num(texprs.size()), _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {} +#ifdef BE_TEST + LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs, + const std::map<int, int>& bucket_seq_to_instance_idx) + : Base(), + _num_partitions(0), + _texprs(texprs), + _partitioned_exprs_num(texprs.size()), + _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {} +#endif Status init(const TPlanNode& tnode, RuntimeState* state) override { return Status::InternalError("{} should not init with TPlanNode", Base::_name); diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index 3c706d50182..e0d1715afb1 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -65,6 +65,9 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL public: using Base = OperatorX<LocalExchangeSourceLocalState>; LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, id) {} +#ifdef BE_TEST + LocalExchangeSourceOperatorX() = default; +#endif Status init(ExchangeType type) override { _op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type) + ")"; _exchange_type = type; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 0fb4db625a5..1fc47b2f55d 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -19,7 +19,6 @@ #include "common/cast_set.h" #include "common/status.h" -#include "pipeline/exec/sort_sink_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" #include "pipeline/local_exchange/local_exchange_source_operator.h" #include "vec/runtime/partitioner.h" diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 7f87289e413..25f090e4208 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -87,7 +87,10 @@ public: ~BlockWrapper() { if (_shared_state != nullptr) { DCHECK_GT(_allocated_bytes, 0); - _shared_state->sub_total_mem_usage(_allocated_bytes, _channel_ids.front()); + // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is + // not used by `sub_total_mem_usage`. So we just pass -1 here. + _shared_state->sub_total_mem_usage( + _allocated_bytes, _channel_ids.empty() ? -1 : _channel_ids.front()); if (_shared_state->exchanger->_free_block_limit == 0 || _shared_state->exchanger->_free_blocks.size_approx() < _shared_state->exchanger->_free_block_limit * diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index a37b6feb21e..ee4ce22f42d 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -205,23 +205,16 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { return false; } block->clear(); - Status status; do { - status = _block_supplier(block.get(), &_is_eof); - } while (block->empty() && !_is_eof && status.ok()); - // If status not ok, upper callers could not detect whether it is eof or error. - // So that fatal here, and should throw exception in the future. - if (status.ok() && !block->empty()) { - if (!_ordering_expr.empty()) { - for (int i = 0; status.ok() && i < desc.size(); ++i) { - // TODO yiguolei: throw exception if status not ok in the future - status = _ordering_expr[i]->execute(block.get(), &desc[i].column_number); - } + THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof)); + } while (block->empty() && !_is_eof); + if (!block->empty()) { + DCHECK_EQ(_ordering_expr.size(), desc.size()); + for (int i = 0; i < desc.size(); ++i) { + THROW_IF_ERROR(_ordering_expr[i]->execute(block.get(), &desc[i].column_number)); } MergeSortCursorImpl::reset(); - return status.ok(); - } else if (!status.ok()) { - throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, status.msg()); + return true; } return false; } diff --git a/be/test/pipeline/local_exchanger_test.cpp b/be/test/pipeline/local_exchanger_test.cpp index c9a10eb32b5..b0e91f0018f 100644 --- a/be/test/pipeline/local_exchanger_test.cpp +++ b/be/test/pipeline/local_exchanger_test.cpp @@ -1098,4 +1098,288 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) { } } +TEST_F(LocalExchangerTest, TestShuffleExchangerWrongMap) { + int num_sink = 1; + int num_sources = 4; + int num_partitions = 4; + int free_block_limit = 0; + std::map<int, int> shuffle_idx_to_instance_idx; + for (int i = 0; i < num_partitions; i++) { + shuffle_idx_to_instance_idx[i] = i; + } + // Wrong map lost (0 -> 0) mapping + std::map<int, int> wrong_shuffle_idx_to_instance_idx; + for (int i = 1; i < num_partitions; i++) { + wrong_shuffle_idx_to_instance_idx[i] = i; + } + + std::vector<std::pair<std::vector<uint32_t>, int>> hash_vals_and_value; + std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> _sink_local_states; + std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states; + _sink_local_states.resize(num_sink); + _local_states.resize(num_sources); + auto profile = std::make_shared<RuntimeProfile>(""); + auto shared_state = LocalExchangeSharedState::create_shared(num_partitions); + shared_state->exchanger = ShuffleExchanger::create_unique(num_sink, num_sources, num_partitions, + free_block_limit); + auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + shared_state->create_dependencies(0); + + auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get(); + auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types(TTypeNodeBuilder() + .set_type(TTypeNodeType::SCALAR) + .set_scalar_type(TPrimitiveType::INT) + .build()) + .build(), + 0) + .set_slot_ref(TSlotRefBuilder(0, 0).build()) + .build(); + std::vector<TExpr> texprs; + texprs.push_back(TExpr {}); + for (size_t i = 0; i < num_sink; i++) { + auto compute_hash_value_timer = + ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i)); + auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + std::to_string(i)); + _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, nullptr)); + _sink_local_states[i]->_exchanger = shared_state->exchanger.get(); + _sink_local_states[i]->_compute_hash_value_timer = compute_hash_value_timer; + _sink_local_states[i]->_distribute_timer = distribute_timer; + _sink_local_states[i]->_partitioner.reset( + new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( + num_partitions)); + auto slot = doris::vectorized::VSlotRef::create_shared(texpr); + slot->_column_id = 0; + ((vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>*)_sink_local_states[i] + ->_partitioner.get()) + ->_partition_expr_ctxs.push_back( + std::make_shared<doris::vectorized::VExprContext>(slot)); + _sink_local_states[i]->_channel_id = i; + _sink_local_states[i]->_shared_state = shared_state.get(); + _sink_local_states[i]->_dependency = sink_dep.get(); + } + for (size_t i = 0; i < num_sources; i++) { + auto get_block_failed_counter = + ADD_TIMER(profile, "_get_block_failed_counter" + std::to_string(i)); + auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + std::to_string(i)); + _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, nullptr)); + _local_states[i]->_exchanger = shared_state->exchanger.get(); + _local_states[i]->_get_block_failed_counter = get_block_failed_counter; + _local_states[i]->_copy_data_timer = copy_data_timer; + _local_states[i]->_channel_id = i; + _local_states[i]->_shared_state = shared_state.get(); + _local_states[i]->_dependency = shared_state->get_dep_by_channel_id(i).front().get(); + _local_states[i]->_memory_used_counter = profile->AddHighWaterMarkCounter( + "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); + shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter; + } + const auto num_blocks = 1; + { + for (size_t i = 0; i < num_partitions; i++) { + hash_vals_and_value.push_back({std::vector<uint32_t> {}, i}); + for (size_t j = 0; j < num_blocks; j++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(hash_vals_and_value.back().second, 10); + + auto pre_size = hash_vals_and_value.back().first.size(); + hash_vals_and_value.back().first.resize(pre_size + 10); + std::fill(hash_vals_and_value.back().first.begin() + pre_size, + hash_vals_and_value.back().first.end(), 0); + int_col0->update_crcs_with_value(hash_vals_and_value.back().first.data() + pre_size, + PrimitiveType::TYPE_INT, + cast_set<uint32_t>(int_col0->size()), 0, nullptr); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + } + } + } + { + // Enqueue 2 blocks with 10 rows for each data queue. + for (size_t i = 0; i < num_partitions; i++) { + hash_vals_and_value.push_back({std::vector<uint32_t> {}, i}); + for (size_t j = 0; j < num_blocks; j++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(hash_vals_and_value[i].second, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + bool in_eos = false; + EXPECT_EQ(exchanger->sink( + _runtime_state.get(), &in_block, in_eos, + {_sink_local_states[0]->_compute_hash_value_timer, + _sink_local_states[0]->_distribute_timer, nullptr}, + {&_sink_local_states[0]->_channel_id, + _sink_local_states[0]->_partitioner.get(), + _sink_local_states[0].get(), &shuffle_idx_to_instance_idx}), + Status::OK()); + } + } + } + { + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 1); + } + } + + { + LocalExchangeSinkOperatorX op(texprs, wrong_shuffle_idx_to_instance_idx); + _sink_local_states[0]->_parent = &op; + EXPECT_EQ(hash_vals_and_value[0].first.front() % num_partitions, 0); + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(hash_vals_and_value[0].second, 10); + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + bool in_eos = false; + EXPECT_TRUE( + exchanger + ->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[0]->_compute_hash_value_timer, + _sink_local_states[0]->_distribute_timer, nullptr}, + {&_sink_local_states[0]->_channel_id, + _sink_local_states[0]->_partitioner.get(), + _sink_local_states[0].get(), &wrong_shuffle_idx_to_instance_idx}) + .is<ErrorCode::INTERNAL_ERROR>()); + } +} + +TEST_F(LocalExchangerTest, LocalMergeSortExchanger) { + int num_sink = 4; + int num_sources = 4; + int num_partitions = 4; + int free_block_limit = 0; + const auto expect_block_bytes = 128; + const auto num_blocks = 2; + std::vector<bool> is_asc_order; + std::vector<bool> nulls_first; + vectorized::VExprContextSPtrs ordering_expr_ctxs; + auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types(TTypeNodeBuilder() + .set_type(TTypeNodeType::SCALAR) + .set_scalar_type(TPrimitiveType::INT) + .build()) + .build(), + 0) + .set_slot_ref(TSlotRefBuilder(0, 0).build()) + .build(); + auto slot = doris::vectorized::VSlotRef::create_shared(texpr); + slot->_column_id = 0; + slot->_prepare_finished = true; + ordering_expr_ctxs.push_back(std::make_shared<doris::vectorized::VExprContext>(slot)); + is_asc_order.push_back(true); + nulls_first.push_back(true); + ordering_expr_ctxs[0]->_prepared = true; + ordering_expr_ctxs[0]->_opened = true; + + std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> _sink_local_states; + std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states; + _sink_local_states.resize(num_sink); + _local_states.resize(num_sources); + config::local_exchange_buffer_mem_limit = expect_block_bytes * num_partitions; + auto profile = std::make_shared<RuntimeProfile>(""); + auto shared_state = LocalMergeExchangeSharedState::create_shared(num_partitions); + shared_state->exchanger = LocalMergeSortExchanger::create_unique( + LocalMergeSortExchanger::MergeInfo {is_asc_order, nulls_first, -1, 0, + ordering_expr_ctxs}, + num_sink, num_partitions, free_block_limit); + auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + shared_state->create_dependencies(0); + auto& sink_deps = shared_state->sink_deps; + EXPECT_EQ(sink_deps.size(), num_sink); + + auto* exchanger = (LocalMergeSortExchanger*)shared_state->exchanger.get(); + for (size_t i = 0; i < num_sink; i++) { + auto compute_hash_value_timer = + ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i)); + auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + std::to_string(i)); + _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, nullptr)); + _sink_local_states[i]->_exchanger = shared_state->exchanger.get(); + _sink_local_states[i]->_compute_hash_value_timer = compute_hash_value_timer; + _sink_local_states[i]->_distribute_timer = distribute_timer; + _sink_local_states[i]->_channel_id = i; + _sink_local_states[i]->_shared_state = shared_state.get(); + _sink_local_states[i]->_dependency = sink_deps[i].get(); + } + for (size_t i = 0; i < num_sources; i++) { + auto get_block_failed_counter = + ADD_TIMER(profile, "_get_block_failed_counter" + std::to_string(i)); + auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + std::to_string(i)); + _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, nullptr)); + _local_states[i]->_runtime_profile = + std::make_unique<RuntimeProfile>("source_profile " + std::to_string(i)); + _local_states[i]->_exchanger = shared_state->exchanger.get(); + _local_states[i]->_get_block_failed_counter = get_block_failed_counter; + _local_states[i]->_copy_data_timer = copy_data_timer; + _local_states[i]->_channel_id = i; + _local_states[i]->_shared_state = shared_state.get(); + _local_states[i]->_dependency = shared_state->get_dep_by_channel_id(i).front().get(); + _local_states[i]->_memory_used_counter = profile->AddHighWaterMarkCounter( + "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); + shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter; + } + { + // Enqueue 2 blocks with 10 rows for each data queue. + for (size_t i = 0; i < num_partitions; i++) { + for (size_t j = 0; j < num_blocks; j++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes()); + bool in_eos = j == num_blocks - 1; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), j == 0); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i); + } + } + } + for (size_t i = 0; i < num_sink; i++) { + shared_state->sub_running_sink_operators(); + } + + { + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(shared_state->mem_counters[i]->value(), shared_state->_queues_mem_usage[i]); + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + } + // Dequeue from data queue and accumulate rows if rows is smaller than batch_size. + EXPECT_EQ(exchanger->_merger, nullptr); + for (size_t i = 0; i < num_sources; i++) { + for (size_t j = 0; j < num_sink * num_blocks + 1; j++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ( + exchanger->get_block(_runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), + _local_states[i].get()}), + Status::OK()); + EXPECT_TRUE(exchanger->_merger != nullptr); + EXPECT_EQ(block.rows(), i > 0 || j == num_sink * num_blocks ? 0 : 10); + EXPECT_EQ(eos, i > 0 || j == num_sink * num_blocks); + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + } + } + } + + for (size_t i = 0; i < num_sink; i++) { + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), true); + } +} + } // namespace doris::pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org