This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7d39f878cab [UT](local merge) Add case for local merge exchanger (#47838) 7d39f878cab is described below commit 7d39f878cab229ef3f27b94508c555c31aaf90d7 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Mon Feb 17 09:54:27 2025 +0800 [UT](local merge) Add case for local merge exchanger (#47838) --- be/src/pipeline/dependency.h | 46 +++--- .../local_exchange_source_operator.h | 5 + be/src/pipeline/local_exchange/local_exchanger.cpp | 35 +++- be/src/pipeline/local_exchange/local_exchanger.h | 14 +- be/src/vec/core/sort_cursor.h | 9 +- be/test/pipeline/local_exchanger_test.cpp | 183 ++++++++++++++++++++- 6 files changed, 248 insertions(+), 44 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 489497897dc..67434e2c18d 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -757,23 +757,24 @@ public: dep->set_ready(); } - void add_mem_usage(int channel_id, size_t delta) { mem_counters[channel_id]->update(delta); } + virtual void add_mem_usage(int channel_id, size_t delta) { + mem_counters[channel_id]->update(delta); + } - void sub_mem_usage(int channel_id, size_t delta) { + virtual void sub_mem_usage(int channel_id, size_t delta) { mem_counters[channel_id]->update(-(int64_t)delta); } - virtual void add_total_mem_usage(size_t delta, int channel_id) { + virtual void add_total_mem_usage(size_t delta) { if (cast_set<int64_t>(mem_usage.fetch_add(delta) + delta) > config::local_exchange_buffer_mem_limit) { sink_deps.front()->block(); } } - virtual void sub_total_mem_usage(size_t delta, int channel_id) { + virtual void sub_total_mem_usage(size_t delta) { auto prev_usage = mem_usage.fetch_sub(delta); - DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta - << " channel_id: " << channel_id; + DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta; if (cast_set<int64_t>(prev_usage - delta) <= config::local_exchange_buffer_mem_limit) { sink_deps.front()->set_ready(); } @@ -784,12 +785,7 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { ENABLE_FACTORY_CREATOR(LocalMergeExchangeSharedState); LocalMergeExchangeSharedState(int num_instances) : LocalExchangeSharedState(num_instances), - _queues_mem_usage(num_instances), - _each_queue_limit(config::local_exchange_buffer_mem_limit / num_instances) { - for (size_t i = 0; i < num_instances; i++) { - _queues_mem_usage[i] = 0; - } - } + _each_queue_limit(config::local_exchange_buffer_mem_limit / num_instances) {} void create_dependencies(int local_exchange_id) override { sink_deps.resize(source_deps.size()); @@ -805,22 +801,21 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { } } - void sub_total_mem_usage(size_t delta, int channel_id) override { - auto prev_usage = _queues_mem_usage[channel_id].fetch_sub(delta); - DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta - << " channel_id: " << channel_id; - if (prev_usage - delta <= _each_queue_limit) { - sink_deps[channel_id]->set_ready(); - } - if (_queues_mem_usage[channel_id] == 0) { - source_deps[channel_id]->block(); + void sub_total_mem_usage(size_t delta) override { mem_usage.fetch_sub(delta); } + void add_total_mem_usage(size_t delta) override { mem_usage.fetch_add(delta); } + + void add_mem_usage(int channel_id, size_t delta) override { + LocalExchangeSharedState::add_mem_usage(channel_id, delta); + if (mem_counters[channel_id]->value() > _each_queue_limit) { + sink_deps[channel_id]->block(); } } - void add_total_mem_usage(size_t delta, int channel_id) override { - if (_queues_mem_usage[channel_id].fetch_add(delta) + delta > _each_queue_limit) { - sink_deps[channel_id]->block(); + + void sub_mem_usage(int channel_id, size_t delta) override { + LocalExchangeSharedState::sub_mem_usage(channel_id, delta); + if (mem_counters[channel_id]->value() <= _each_queue_limit) { + sink_deps[channel_id]->set_ready(); } - source_deps[channel_id]->set_ready(); } Dependency* get_sink_dep_by_channel_id(int channel_id) override { @@ -832,7 +827,6 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { } private: - std::vector<std::atomic_int64_t> _queues_mem_usage; const int64_t _each_queue_limit; }; #include "common/compile_check_end.h" diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index e0d1715afb1..d6c8cecfef3 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -40,6 +40,11 @@ public: std::string debug_string(int indentation_level) const override; std::vector<Dependency*> dependencies() const override; + Dependency* get_dependency(int id) { + return _exchanger->get_type() == ExchangeType::LOCAL_MERGE_SORT + ? _local_merge_deps[id].get() + : _dependency; + } private: friend class LocalExchangeSourceOperatorX; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 1fc47b2f55d..76a8a8e1274 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -82,7 +82,7 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_st return true; } COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); - local_state->_dependency->block(); + local_state->get_dependency(channel_id)->block(); } return false; } @@ -377,6 +377,7 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_ *sink_info.channel_id)); } if (eos && sink_info.local_state) { + _eos[*sink_info.channel_id]->store(true); sink_info.local_state->_shared_state->source_deps[*sink_info.channel_id] ->set_always_ready(); } @@ -421,7 +422,37 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState* state, vectorized::BlockSupplier block_supplier = [&, local_state, id = channel_id]( vectorized::Block* block, bool* eos) { BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block, id); + auto got = _dequeue_data(local_state, next_block, eos, block, id); + if (got) { + // If this block is the last block, we should block this pipeline task to wait for + // the next block. + // TODO: LocalMergeSortExchanger should be refactored. + if (_data_queue[id].data_queue.size_approx() == 0) { + std::unique_lock l(*_m[id]); + if (_data_queue[id].data_queue.size_approx() == 0) { + local_state->get_dependency(id)->block(); + } + } + } +#ifndef NDEBUG + if (*eos && !(*_eos[id])) { + return Status::InternalError( + "LocalMergeSortExchanger{} meet error! _eos[id] should be true if no " + "source operators are running", + local_state->debug_string(0)); + } +#endif + // `eos` is true if all sink operators are closed and no block remains. `_eos[id]` is + // true means sink operator of instance[id] has already sent the last block with `eos` + // flag. + if (block->empty() && !(*_eos[id])) { + return Status::InternalError( + "LocalMergeSortExchanger{} meet error! Block should not be empty when eos " + "is false", + local_state->debug_string(0)); + } else if (block->empty()) { + *eos = *_eos[id]; + } return Status::OK(); }; child_block_suppliers.push_back(block_supplier); diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 25f090e4208..64562c9019e 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -81,7 +81,7 @@ public: _shared_state(shared_state), _allocated_bytes(_data_block.allocated_bytes()) { if (_shared_state) { - _shared_state->add_total_mem_usage(_allocated_bytes, channel_id); + _shared_state->add_total_mem_usage(_allocated_bytes); } } ~BlockWrapper() { @@ -89,8 +89,7 @@ public: DCHECK_GT(_allocated_bytes, 0); // `_channel_ids` may be empty if exchanger is shuffled exchanger and channel id is // not used by `sub_total_mem_usage`. So we just pass -1 here. - _shared_state->sub_total_mem_usage( - _allocated_bytes, _channel_ids.empty() ? -1 : _channel_ids.front()); + _shared_state->sub_total_mem_usage(_allocated_bytes); if (_shared_state->exchanger->_free_block_limit == 0 || _shared_state->exchanger->_free_blocks.size_approx() < _shared_state->exchanger->_free_block_limit * @@ -348,7 +347,12 @@ public: LocalMergeSortExchanger(MergeInfo&& merge_info, int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit), - _merge_info(std::move(merge_info)) {} + _merge_info(std::move(merge_info)) { + _eos.resize(num_partitions, nullptr); + for (size_t i = 0; i < num_partitions; i++) { + _eos[i] = std::make_shared<std::atomic_bool>(false); + } + } ~LocalMergeSortExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, SinkInfo&& sink_info) override; @@ -365,7 +369,7 @@ public: private: std::unique_ptr<vectorized::VSortedRunMerger> _merger; MergeInfo _merge_info; - std::vector<std::atomic_int64_t> _queues_mem_usege; + std::vector<std::shared_ptr<std::atomic_bool>> _eos; }; class BroadcastExchanger final : public Exchanger<BroadcastBlock> { diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index c7993e7fc95..68abe710110 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -192,12 +192,12 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { desc[i].direction = is_asc_order[i] ? 1 : -1; desc[i].nulls_direction = nulls_first[i] ? -desc[i].direction : desc[i].direction; } - _is_eof = !has_next_block(); + has_next_block(); } BlockSupplierSortCursorImpl(BlockSupplier block_supplier, const SortDescription& desc_) : MergeSortCursorImpl(desc_), _block_supplier(std::move(block_supplier)) { - _is_eof = !has_next_block(); + has_next_block(); } bool has_next_block() override { @@ -205,9 +205,8 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { return false; } block->clear(); - do { - THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof)); - } while (block->empty() && !_is_eof); + THROW_IF_ERROR(_block_supplier(block.get(), &_is_eof)); + DCHECK(!block->empty() || _is_eof); if (!block->empty()) { DCHECK_EQ(_ordering_expr.size(), desc.size()); for (int i = 0; i < desc.size(); ++i) { diff --git a/be/test/pipeline/local_exchanger_test.cpp b/be/test/pipeline/local_exchanger_test.cpp index b0e91f0018f..da19e6ef8d6 100644 --- a/be/test/pipeline/local_exchanger_test.cpp +++ b/be/test/pipeline/local_exchanger_test.cpp @@ -446,7 +446,7 @@ TEST_F(LocalExchangerTest, PassthroughExchanger) { _local_states[i].get()}), Status::OK()); EXPECT_EQ(block.rows(), j == 1 ? 0 : 10); - EXPECT_EQ(eos, false); + EXPECT_FALSE(eos); EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); } } @@ -645,7 +645,7 @@ TEST_F(LocalExchangerTest, PassToOneExchanger) { _local_states[i].get()}), Status::OK()); EXPECT_EQ(block.rows(), j == 1 ? 0 : 10); - EXPECT_EQ(eos, false); + EXPECT_FALSE(eos); EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); } } @@ -799,7 +799,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) { _local_states[i].get()}), Status::OK()); EXPECT_EQ(block.rows(), j == num_blocks * num_sources ? 0 : 10); - EXPECT_EQ(eos, false); + EXPECT_FALSE(eos); EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_blocks * num_sources); } } @@ -836,7 +836,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) { _local_states[i].get()}), Status::OK()); EXPECT_EQ(block.rows(), j == num_sources ? 0 : 10); - EXPECT_EQ(eos, false); + EXPECT_FALSE(eos); EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_sources); } } @@ -1038,7 +1038,7 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) { _local_states[i].get()}), Status::OK()); EXPECT_EQ(block.rows(), j == 1 ? 0 : num_rows_per_block); - EXPECT_EQ(eos, false); + EXPECT_FALSE(eos); EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); } } @@ -1324,6 +1324,7 @@ TEST_F(LocalExchangerTest, LocalMergeSortExchanger) { "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter; } + _local_states[0]->_local_merge_deps = shared_state->source_deps; { // Enqueue 2 blocks with 10 rows for each data queue. for (size_t i = 0; i < num_partitions; i++) { @@ -1354,7 +1355,7 @@ TEST_F(LocalExchangerTest, LocalMergeSortExchanger) { { for (size_t i = 0; i < num_sources; i++) { - EXPECT_EQ(shared_state->mem_counters[i]->value(), shared_state->_queues_mem_usage[i]); + EXPECT_EQ(shared_state->mem_counters[i]->value(), expect_block_bytes * num_blocks); EXPECT_EQ(_local_states[i]->_dependency->ready(), true); } // Dequeue from data queue and accumulate rows if rows is smaller than batch_size. @@ -1382,4 +1383,174 @@ TEST_F(LocalExchangerTest, LocalMergeSortExchanger) { } } +TEST_F(LocalExchangerTest, LocalMergeSortExchangerWithEmptyBlock) { + int num_sink = 4; + int num_sources = 4; + int num_partitions = 4; + int free_block_limit = 0; + const auto expect_block_bytes = 128; + const auto num_blocks = 2; + std::vector<bool> is_asc_order; + std::vector<bool> nulls_first; + vectorized::VExprContextSPtrs ordering_expr_ctxs; + auto texpr = TExprNodeBuilder(TExprNodeType::SLOT_REF, + TTypeDescBuilder() + .set_types(TTypeNodeBuilder() + .set_type(TTypeNodeType::SCALAR) + .set_scalar_type(TPrimitiveType::INT) + .build()) + .build(), + 0) + .set_slot_ref(TSlotRefBuilder(0, 0).build()) + .build(); + auto slot = doris::vectorized::VSlotRef::create_shared(texpr); + slot->_column_id = 0; + slot->_prepare_finished = true; + ordering_expr_ctxs.push_back(std::make_shared<doris::vectorized::VExprContext>(slot)); + is_asc_order.push_back(true); + nulls_first.push_back(true); + ordering_expr_ctxs[0]->_prepared = true; + ordering_expr_ctxs[0]->_opened = true; + + std::vector<std::unique_ptr<LocalExchangeSinkLocalState>> _sink_local_states; + std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states; + _sink_local_states.resize(num_sink); + _local_states.resize(num_sources); + config::local_exchange_buffer_mem_limit = expect_block_bytes * num_partitions; + auto profile = std::make_shared<RuntimeProfile>(""); + auto shared_state = LocalMergeExchangeSharedState::create_shared(num_partitions); + shared_state->exchanger = LocalMergeSortExchanger::create_unique( + LocalMergeSortExchanger::MergeInfo {is_asc_order, nulls_first, -1, 0, + ordering_expr_ctxs}, + num_sink, num_partitions, free_block_limit); + auto sink_dep = std::make_shared<Dependency>(0, 0, "LOCAL_EXCHANGE_SINK_DEPENDENCY", true); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + shared_state->create_dependencies(0); + auto& sink_deps = shared_state->sink_deps; + EXPECT_EQ(sink_deps.size(), num_sink); + + auto* exchanger = (LocalMergeSortExchanger*)shared_state->exchanger.get(); + for (size_t i = 0; i < num_sink; i++) { + auto compute_hash_value_timer = + ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i)); + auto distribute_timer = ADD_TIMER(profile, "distribute_timer" + std::to_string(i)); + _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr, nullptr)); + _sink_local_states[i]->_exchanger = shared_state->exchanger.get(); + _sink_local_states[i]->_compute_hash_value_timer = compute_hash_value_timer; + _sink_local_states[i]->_distribute_timer = distribute_timer; + _sink_local_states[i]->_channel_id = i; + _sink_local_states[i]->_shared_state = shared_state.get(); + _sink_local_states[i]->_dependency = sink_deps[i].get(); + } + LocalExchangeSourceOperatorX op; + for (size_t i = 0; i < num_sources; i++) { + auto get_block_failed_counter = + ADD_TIMER(profile, "_get_block_failed_counter" + std::to_string(i)); + auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" + std::to_string(i)); + _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr, nullptr)); + _local_states[i]->_runtime_profile = + std::make_unique<RuntimeProfile>("source_profile " + std::to_string(i)); + _local_states[i]->_exchanger = shared_state->exchanger.get(); + _local_states[i]->_get_block_failed_counter = get_block_failed_counter; + _local_states[i]->_copy_data_timer = copy_data_timer; + _local_states[i]->_channel_id = i; + _local_states[i]->_shared_state = shared_state.get(); + _local_states[i]->_dependency = shared_state->get_dep_by_channel_id(i).front().get(); + _local_states[i]->_memory_used_counter = profile->AddHighWaterMarkCounter( + "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1); + shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter; + _local_states[i]->_parent = &op; + } + _local_states[0]->_local_merge_deps = shared_state->source_deps; + { + // Enqueue 2 blocks with 10 rows for each data queue. + for (size_t i = 0; i < num_partitions; i++) { + for (size_t j = 0; j < num_blocks; j++) { + vectorized::Block in_block; + vectorized::DataTypePtr int_type = std::make_shared<vectorized::DataTypeInt32>(); + auto int_col0 = vectorized::ColumnInt32::create(); + int_col0->insert_many_vals(i, 10); + + in_block.insert({std::move(int_col0), int_type, "test_int_col0"}); + EXPECT_EQ(expect_block_bytes, in_block.allocated_bytes()); + bool in_eos = false; + EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos, + {_sink_local_states[i]->_compute_hash_value_timer, + _sink_local_states[i]->_distribute_timer, nullptr}, + {&_sink_local_states[i]->_channel_id, + _sink_local_states[i]->_partitioner.get(), + _sink_local_states[i].get(), nullptr}), + Status::OK()); + EXPECT_EQ(_sink_local_states[i]->_dependency->ready(), j == 0); + EXPECT_EQ(_sink_local_states[i]->_channel_id, i); + } + } + } + + for (size_t i = 0; i < num_sources; i++) { + EXPECT_EQ(shared_state->mem_counters[i]->value(), expect_block_bytes * num_blocks); + EXPECT_EQ(_local_states[i]->_dependency->ready(), true); + } + // Dequeue from data queue and accumulate rows if rows is smaller than batch_size. + EXPECT_EQ(exchanger->_merger, nullptr); + for (size_t i = 0; i < num_sources; i++) { + for (size_t j = 0; j < num_blocks; j++) { + bool eos = false; + vectorized::Block block; + EXPECT_EQ(exchanger->get_block(_runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), + _local_states[i].get()}), + Status::OK()); + } + } + + bool error = false; + try { + vectorized::Block block; + bool eos = false; + EXPECT_TRUE(exchanger + ->get_block(_runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[0]->_copy_data_timer}, + {cast_set<int>(_local_states[0]->_channel_id), + _local_states[0].get()}) + .is<ErrorCode::INTERNAL_ERROR>()); + } catch (const Exception& e) { + error = true; + EXPECT_TRUE(e.to_status().is<ErrorCode::INTERNAL_ERROR>()); + } + EXPECT_TRUE(error); + for (size_t i = 0; i < num_sink; i++) { + shared_state->sub_running_sink_operators(); + } + + { + for (size_t i = 0; i < num_sources; i++) { + for (size_t j = 0; j < num_sink * num_blocks - num_blocks; j++) { + bool eos = false; + error = false; + vectorized::Block block; + try { + EXPECT_EQ(exchanger->get_block( + _runtime_state.get(), &block, &eos, + {nullptr, nullptr, _local_states[i]->_copy_data_timer}, + {cast_set<int>(_local_states[i]->_channel_id), + _local_states[i].get()}), + Status::OK()); + } catch (const Exception& e) { + error = true; + EXPECT_TRUE(e.to_status().is<ErrorCode::INTERNAL_ERROR>()); + } + // error is true when we get another block after all blocks in one queue are exhausted. + EXPECT_EQ(error, i == 0 && (j % (num_blocks + 1) == 0)) << i << " " << j; + EXPECT_TRUE(exchanger->_merger != nullptr); + EXPECT_EQ(block.rows(), i > 0 || (j % (num_blocks + 1) == 0) ? 0 : 10) + << i << " " << j; + EXPECT_EQ(eos, i > 0 || j == num_sink * num_blocks - num_blocks) << i << " " << j; + } + } + } +} + } // namespace doris::pipeline --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org