This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ff090ff641c [refactor](shuffle) (PART II) Split local exchange operators from loc… (#45280) ff090ff641c is described below commit ff090ff641ce8e96c3db918de84875632f453623 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Wed Dec 11 17:55:56 2024 +0800 [refactor](shuffle) (PART II) Split local exchange operators from loc… (#45280) …al exchanger This is a PR which wants to decouple the local exchanger and local exchange operators. Since all type of exchangers are similar as exchange sink does, I plan to use exchanger to split data in both local exchange operators and exchange sink operators. --- be/src/pipeline/dependency.cpp | 5 +- be/src/pipeline/dependency.h | 2 +- .../local_exchange_sink_operator.cpp | 5 +- .../local_exchange/local_exchange_sink_operator.h | 1 - .../local_exchange_source_operator.cpp | 8 +- be/src/pipeline/local_exchange/local_exchanger.cpp | 367 +++++++++++++-------- be/src/pipeline/local_exchange/local_exchanger.h | 127 ++++--- 7 files changed, 325 insertions(+), 190 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 5fef018423d..dcf5c7a0a81 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -179,12 +179,11 @@ void LocalExchangeSharedState::sub_running_sink_operators() { } } -void LocalExchangeSharedState::sub_running_source_operators( - LocalExchangeSourceLocalState& local_state) { +void LocalExchangeSharedState::sub_running_source_operators() { std::unique_lock<std::mutex> lc(le_lock); if (exchanger->_running_source_operators.fetch_sub(1) == 1) { _set_always_ready(); - exchanger->finalize(local_state); + exchanger->finalize(); } } diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index ad018c8b4f8..f1cfe2b0297 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -758,7 +758,7 @@ public: } } void sub_running_sink_operators(); - void sub_running_source_operators(LocalExchangeSourceLocalState& local_state); + void sub_running_source_operators(); void _set_always_ready() { for (auto& dep : source_deps) { DCHECK(dep); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 22007a4b220..b22ee9fd77e 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -144,7 +144,10 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, eos, local_state)); + RETURN_IF_ERROR(local_state._exchanger->sink( + state, in_block, eos, + {local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr}, + {&local_state._channel_id, local_state._partitioner.get(), &local_state})); // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 435f7a410a4..c067f023c8d 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -65,7 +65,6 @@ private: RuntimeProfile::Counter* _compute_hash_value_timer = nullptr; RuntimeProfile::Counter* _distribute_timer = nullptr; std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr; - std::vector<uint32_t> _partition_rows_histogram; // Used by random passthrough exchanger int _channel_id = 0; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index c4832b9958c..63e36cdfdb0 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -61,10 +61,10 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) { } if (_exchanger) { - _exchanger->close(*this); + _exchanger->close({_channel_id, this}); } if (_shared_state) { - _shared_state->sub_running_source_operators(*this); + _shared_state->sub_running_source_operators(); } std::vector<DependencySPtr> {}.swap(_local_merge_deps); @@ -116,7 +116,9 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized:: bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, eos, local_state)); + RETURN_IF_ERROR(local_state._exchanger->get_block( + state, block, eos, {nullptr, nullptr, local_state._copy_data_timer}, + {local_state._channel_id, &local_state})); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 647988f8b79..a963de8b684 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -29,8 +29,12 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" template <typename BlockType> void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, - LocalExchangeSinkLocalState& local_state, + LocalExchangeSinkLocalState* local_state, BlockType&& block) { + if (local_state == nullptr) { + _enqueue_data_and_set_ready(channel_id, std::move(block)); + return; + } size_t allocated_bytes = 0; // PartitionedBlock is used by shuffle exchanger. // PartitionedBlock will be push into multiple queues with different row ranges, so it will be @@ -44,47 +48,47 @@ void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, allocated_bytes = block->data_block.allocated_bytes(); } std::unique_lock l(_m); - local_state._shared_state->add_mem_usage(channel_id, allocated_bytes, - !std::is_same_v<PartitionedBlock, BlockType> && - !std::is_same_v<BroadcastBlock, BlockType>); + local_state->_shared_state->add_mem_usage(channel_id, allocated_bytes, + !std::is_same_v<PartitionedBlock, BlockType> && + !std::is_same_v<BroadcastBlock, BlockType>); if (_data_queue[channel_id].enqueue(std::move(block))) { - local_state._shared_state->set_ready_to_read(channel_id); + local_state->_shared_state->set_ready_to_read(channel_id); } else { - local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes); + local_state->_shared_state->sub_mem_usage(channel_id, allocated_bytes); // `enqueue(block)` return false iff this queue's source operator is already closed so we // just unref the block. if constexpr (std::is_same_v<PartitionedBlock, BlockType> || std::is_same_v<BroadcastBlock, BlockType>) { - block.first->unref(local_state._shared_state, allocated_bytes, channel_id); + block.first->unref(local_state->_shared_state, allocated_bytes, channel_id); } else { - block->unref(local_state._shared_state, allocated_bytes, channel_id); + block->unref(local_state->_shared_state, allocated_bytes, channel_id); DCHECK_EQ(block->ref_value(), 0); } } } template <typename BlockType> -bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_state, - BlockType& block, bool* eos, - vectorized::Block* data_block) { - return _dequeue_data(local_state, block, eos, data_block, local_state._channel_id); -} - -template <typename BlockType> -bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_state, +bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id) { + if (local_state == nullptr) { + if (!_dequeue_data(block, eos, data_block, channel_id)) { + throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data: {}", + data_queue_debug_string(channel_id)); + } + return true; + } bool all_finished = _running_sink_operators == 0; if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v<PartitionedBlock, BlockType> || std::is_same_v<BroadcastBlock, BlockType>) { - local_state._shared_state->sub_mem_usage(channel_id, - block.first->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage(channel_id, + block.first->data_block.allocated_bytes()); } else { - local_state._shared_state->sub_mem_usage(channel_id, - block->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage(channel_id, + block->data_block.allocated_bytes()); data_block->swap(block->data_block); - block->unref(local_state._shared_state, data_block->allocated_bytes(), channel_id); + block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); DCHECK_EQ(block->ref_value(), 0); } return true; @@ -95,54 +99,88 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st if (_data_queue[channel_id].try_dequeue(block)) { if constexpr (std::is_same_v<PartitionedBlock, BlockType> || std::is_same_v<BroadcastBlock, BlockType>) { - local_state._shared_state->sub_mem_usage(channel_id, - block.first->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage( + channel_id, block.first->data_block.allocated_bytes()); } else { - local_state._shared_state->sub_mem_usage(channel_id, - block->data_block.allocated_bytes()); + local_state->_shared_state->sub_mem_usage(channel_id, + block->data_block.allocated_bytes()); data_block->swap(block->data_block); - block->unref(local_state._shared_state, data_block->allocated_bytes(), channel_id); + block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); DCHECK_EQ(block->ref_value(), 0); } return true; } - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); + COUNTER_UPDATE(local_state->_get_block_failed_counter, 1); + local_state->_dependency->block(); + } + return false; +} + +template <typename BlockType> +void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id, BlockType&& block) { + if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && + !std::is_same_v<BroadcastBlock, BlockType>) { + block->ref(1); + } + if (!_data_queue[channel_id].enqueue(std::move(block))) { + if constexpr (std::is_same_v<PartitionedBlock, BlockType> || + std::is_same_v<BroadcastBlock, BlockType>) { + block.first->unref(); + } else { + block->unref(); + DCHECK_EQ(block->ref_value(), 0); + } + } +} + +template <typename BlockType> +bool Exchanger<BlockType>::_dequeue_data(BlockType& block, bool* eos, vectorized::Block* data_block, + int channel_id) { + if (_data_queue[channel_id].try_dequeue(block)) { + if constexpr (!std::is_same_v<PartitionedBlock, BlockType> && + !std::is_same_v<BroadcastBlock, BlockType>) { + data_block->swap(block->data_block); + block->unref(); + DCHECK_EQ(block->ref_value(), 0); + } + return true; } return false; } Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } { - SCOPED_TIMER(local_state._compute_hash_value_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block)); + SCOPED_TIMER(profile.compute_hash_value_timer); + RETURN_IF_ERROR(sink_info.partitioner->do_partitioning(state, in_block)); } { - SCOPED_TIMER(local_state._distribute_timer); - RETURN_IF_ERROR(_split_rows(state, - local_state._partitioner->get_channel_ids().get<uint32_t>(), - in_block, local_state)); + SCOPED_TIMER(profile.distribute_timer); + RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids().get<uint32_t>(), + in_block, *sink_info.channel_id, sink_info.local_state)); } return Status::OK(); } -void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) { +void ShuffleExchanger::close(SourceInfo&& source_info) { PartitionedBlock partitioned_block; bool eos; vectorized::Block block; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, partitioned_block, &eos, &block)) { - partitioned_block.first->unref(local_state._shared_state, local_state._channel_id); + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, + source_info.channel_id)) { + partitioned_block.first->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } } Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { + Profile&& profile, SourceInfo&& source_info) { PartitionedBlock partitioned_block; vectorized::MutableBlock mutable_block; @@ -153,14 +191,18 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block auto block_wrapper = partitioned_block.first; RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, offset_start, offset_start + partitioned_block.second.length)); - block_wrapper->unref(local_state._shared_state, local_state._channel_id); + block_wrapper->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } while (mutable_block.rows() < state->batch_size() && !*eos && - _dequeue_data(local_state, partitioned_block, eos, block)); + _dequeue_data(source_info.local_state, partitioned_block, eos, block, + source_info.channel_id)); return Status::OK(); }; - if (_dequeue_data(local_state, partitioned_block, eos, block)) { - SCOPED_TIMER(local_state._copy_data_timer); + if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, + source_info.channel_id)) { + SCOPED_TIMER(profile.copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data()); @@ -169,22 +211,25 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block } Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, - LocalExchangeSinkLocalState& local_state) { + vectorized::Block* block, int channel_id, + LocalExchangeSinkLocalState* local_state) { + if (local_state == nullptr) { + return _split_rows(state, channel_ids, block, channel_id); + } const auto rows = cast_set<int32_t>(block->rows()); auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows); + auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; { - local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); + partition_rows_histogram.assign(_num_partitions + 1, 0); for (int32_t i = 0; i < rows; ++i) { - local_state._partition_rows_histogram[channel_ids[i]]++; + partition_rows_histogram[channel_ids[i]]++; } for (int32_t i = 1; i <= _num_partitions; ++i) { - local_state._partition_rows_histogram[i] += - local_state._partition_rows_histogram[i - 1]; + partition_rows_histogram[i] += partition_rows_histogram[i - 1]; } for (int32_t i = rows - 1; i >= 0; --i) { - (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i; - local_state._partition_rows_histogram[channel_ids[i]]--; + (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; + partition_rows_histogram[channel_ids[i]]--; } } @@ -200,10 +245,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (new_block_wrapper->data_block.empty()) { return Status::OK(); } - local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(), - local_state._channel_id); + local_state->_shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(), + channel_id); auto bucket_seq_to_instance_idx = - local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx; + local_state->_parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx; if (get_type() == ExchangeType::HASH_SHUFFLE) { /** * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed to all instances of @@ -211,32 +256,32 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest * For example, row 1 get a hash value 1 which means we should distribute to instance 1 on * BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3. */ - const auto& map = local_state._parent->cast<LocalExchangeSinkOperatorX>() + const auto& map = local_state->_parent->cast<LocalExchangeSinkOperatorX>() ._shuffle_idx_to_instance_idx; new_block_wrapper->ref(cast_set<int>(map.size())); for (const auto& it : map) { DCHECK(it.second >= 0 && it.second < _num_partitions) << it.first << " : " << it.second << " " << _num_partitions; - uint32_t start = local_state._partition_rows_histogram[it.first]; - uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; + uint32_t start = partition_rows_histogram[it.first]; + uint32_t size = partition_rows_histogram[it.first + 1] - start; if (size > 0) { _enqueue_data_and_set_ready(it.second, local_state, {new_block_wrapper, {row_idx, start, size}}); } else { - new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); + new_block_wrapper->unref(local_state->_shared_state, channel_id); } } } else { DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); for (int i = 0; i < _num_partitions; i++) { - uint32_t start = local_state._partition_rows_histogram[i]; - uint32_t size = local_state._partition_rows_histogram[i + 1] - start; + uint32_t start = partition_rows_histogram[i]; + uint32_t size = partition_rows_histogram[i + 1] - start; if (size > 0) { _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], local_state, {new_block_wrapper, {row_idx, start, size}}); } else { - new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); + new_block_wrapper->unref(local_state->_shared_state, channel_id); } } } @@ -244,8 +289,53 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest return Status::OK(); } +Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, + vectorized::Block* block, int channel_id) { + const auto rows = cast_set<int32_t>(block->rows()); + auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows); + auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; + { + partition_rows_histogram.assign(_num_partitions + 1, 0); + for (int32_t i = 0; i < rows; ++i) { + partition_rows_histogram[channel_ids[i]]++; + } + for (int32_t i = 1; i <= _num_partitions; ++i) { + partition_rows_histogram[i] += partition_rows_histogram[i - 1]; + } + for (int32_t i = rows - 1; i >= 0; --i) { + (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; + partition_rows_histogram[channel_ids[i]]--; + } + } + + vectorized::Block data_block; + std::shared_ptr<BlockWrapper> new_block_wrapper; + if (_free_blocks.try_dequeue(data_block)) { + new_block_wrapper = BlockWrapper::create_shared(std::move(data_block)); + } else { + new_block_wrapper = BlockWrapper::create_shared(block->clone_empty()); + } + + new_block_wrapper->data_block.swap(*block); + if (new_block_wrapper->data_block.empty()) { + return Status::OK(); + } + new_block_wrapper->ref(cast_set<int>(_num_partitions)); + for (int i = 0; i < _num_partitions; i++) { + uint32_t start = partition_rows_histogram[i]; + uint32_t size = partition_rows_histogram[i + 1] - start; + if (size > 0) { + _enqueue_data_and_set_ready(i, {new_block_wrapper, {row_idx, start, size}}); + } else { + new_block_wrapper->unref(); + } + } + + return Status::OK(); +} + Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } @@ -256,41 +346,43 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo } new_block.swap(*in_block); wrapper = BlockWrapper::create_shared(std::move(new_block)); - auto channel_id = (local_state._channel_id++) % _num_partitions; - _enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper)); + auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; + _enqueue_data_and_set_ready(channel_id, sink_info.local_state, std::move(wrapper)); return Status::OK(); } -void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { +void PassthroughExchanger::close(SourceInfo&& source_info) { vectorized::Block next_block; BlockWrapperSPtr wrapper; bool eos; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, wrapper, &eos, &next_block)) { + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + source_info.channel_id)) { // do nothing } } -void PassToOneExchanger::close(LocalExchangeSourceLocalState& local_state) { +void PassToOneExchanger::close(SourceInfo&& source_info) { vectorized::Block next_block; BlockWrapperSPtr wrapper; bool eos; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, wrapper, &eos, &next_block)) { + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + source_info.channel_id)) { // do nothing } } Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { + Profile&& profile, SourceInfo&& source_info) { BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block); + _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); return Status::OK(); } Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } @@ -301,70 +393,72 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block new_block.swap(*in_block); BlockWrapperSPtr wrapper = BlockWrapper::create_shared(std::move(new_block)); - _enqueue_data_and_set_ready(0, local_state, std::move(wrapper)); + _enqueue_data_and_set_ready(0, sink_info.local_state, std::move(wrapper)); return Status::OK(); } Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { - if (local_state._channel_id != 0) { + Profile&& profile, SourceInfo&& source_info) { + if (source_info.channel_id != 0) { *eos = true; return Status::OK(); } BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block); + _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); return Status::OK(); } Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (!in_block->empty()) { vectorized::Block new_block; if (!_free_blocks.try_dequeue(new_block)) { new_block = {in_block->clone_empty()}; } - DCHECK_LE(local_state._channel_id, _data_queue.size()); + DCHECK_LE(*sink_info.channel_id, _data_queue.size()); new_block.swap(*in_block); - _enqueue_data_and_set_ready(local_state._channel_id, local_state, + _enqueue_data_and_set_ready(*sink_info.channel_id, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); } - if (eos) { - local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready(); + if (eos && sink_info.local_state) { + sink_info.local_state->_shared_state->source_deps[*sink_info.channel_id] + ->set_always_ready(); } return Status::OK(); } -void ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) { +void ExchangerBase::finalize() { DCHECK(_running_source_operators == 0); vectorized::Block block; while (_free_blocks.try_dequeue(block)) { // do nothing } } -void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState& local_state) { + +void LocalMergeSortExchanger::finalize() { BlockWrapperSPtr next_block; vectorized::Block block; bool eos; int id = 0; for (auto& data_queue : _data_queue) { data_queue.set_eos(); - while (_dequeue_data(local_state, next_block, &eos, &block, id)) { + while (_dequeue_data(next_block, &eos, &block, id)) { block = vectorized::Block(); } id++; } - ExchangerBase::finalize(local_state); + ExchangerBase::finalize(); } Status LocalMergeSortExchanger::build_merger(RuntimeState* state, - LocalExchangeSourceLocalState& local_state) { - RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, local_state.profile())); + LocalExchangeSourceLocalState* local_state) { + RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, local_state->profile())); std::vector<vectorized::BlockSupplier> child_block_suppliers; for (int channel_id = 0; channel_id < _num_partitions; channel_id++) { - vectorized::BlockSupplier block_supplier = [&, id = channel_id](vectorized::Block* block, - bool* eos) { + vectorized::BlockSupplier block_supplier = [&, local_state, id = channel_id]( + vectorized::Block* block, bool* eos) { BlockWrapperSPtr next_block; _dequeue_data(local_state, next_block, eos, block, id); return Status::OK(); @@ -388,20 +482,21 @@ now sort(8) --> local merge(1) ---> datasink(1) [2] ----> */ Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { - if (local_state._channel_id != 0) { + Profile&& profile, SourceInfo&& source_info) { + if (source_info.channel_id != 0) { *eos = true; return Status::OK(); } if (!_merger) { - RETURN_IF_ERROR(build_merger(state, local_state)); + DCHECK(source_info.local_state); + RETURN_IF_ERROR(build_merger(state, source_info.local_state)); } RETURN_IF_ERROR(_merger->get_next(block, eos)); return Status::OK(); } Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) { + Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } @@ -411,32 +506,40 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block } new_block.swap(*in_block); auto wrapper = BlockWrapper::create_shared(std::move(new_block)); - local_state._shared_state->add_total_mem_usage(wrapper->data_block.allocated_bytes(), - local_state._channel_id); + if (sink_info.local_state) { + sink_info.local_state->_shared_state->add_total_mem_usage( + wrapper->data_block.allocated_bytes(), *sink_info.channel_id); + } + wrapper->ref(_num_partitions); for (int i = 0; i < _num_partitions; i++) { - _enqueue_data_and_set_ready(i, local_state, {wrapper, {0, wrapper->data_block.rows()}}); + _enqueue_data_and_set_ready(i, sink_info.local_state, + {wrapper, {0, wrapper->data_block.rows()}}); } return Status::OK(); } -void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { +void BroadcastExchanger::close(SourceInfo&& source_info) { BroadcastBlock partitioned_block; bool eos; vectorized::Block block; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, partitioned_block, &eos, &block)) { - partitioned_block.first->unref(local_state._shared_state, local_state._channel_id); + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, + source_info.channel_id)) { + partitioned_block.first->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } } Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) { + Profile&& profile, SourceInfo&& source_info) { BroadcastBlock partitioned_block; - if (_dequeue_data(local_state, partitioned_block, eos, block)) { - SCOPED_TIMER(local_state._copy_data_timer); + if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, + source_info.channel_id)) { + SCOPED_TIMER(profile.copy_data_timer); vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); @@ -444,7 +547,9 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, partitioned_block.second.offset_start, partitioned_block.second.length)); - block_wrapper->unref(local_state._shared_state, local_state._channel_id); + block_wrapper->unref( + source_info.local_state ? source_info.local_state->_shared_state : nullptr, + source_info.channel_id); } return Status::OK(); @@ -452,21 +557,21 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, vectorized::Block* in_block, - LocalExchangeSinkLocalState& local_state) { + SinkInfo&& sink_info) { vectorized::Block new_block; if (!_free_blocks.try_dequeue(new_block)) { new_block = {in_block->clone_empty()}; } new_block.swap(*in_block); - auto channel_id = (local_state._channel_id++) % _num_partitions; - _enqueue_data_and_set_ready(channel_id, local_state, + auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; + _enqueue_data_and_set_ready(channel_id, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); return Status::OK(); } Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectorized::Block* block, - LocalExchangeSinkLocalState& local_state) { + SinkInfo&& sink_info) { std::vector<uint32_t> channel_ids; const auto num_rows = block->rows(); channel_ids.resize(num_rows, 0); @@ -481,40 +586,39 @@ Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz std::iota(channel_ids.begin() + i, channel_ids.end(), 0); } } - return _split_rows(state, channel_ids.data(), block, local_state); + return _split_rows(state, channel_ids.data(), block, std::move(sink_info)); } Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, - LocalExchangeSinkLocalState& local_state) { + vectorized::Block* block, SinkInfo&& sink_info) { const auto rows = cast_set<int32_t>(block->rows()); auto row_idx = std::make_shared<std::vector<uint32_t>>(rows); + auto& partition_rows_histogram = _partition_rows_histogram[*sink_info.channel_id]; { - local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); + partition_rows_histogram.assign(_num_partitions + 1, 0); for (int32_t i = 0; i < rows; ++i) { - local_state._partition_rows_histogram[channel_ids[i]]++; + partition_rows_histogram[channel_ids[i]]++; } for (int32_t i = 1; i <= _num_partitions; ++i) { - local_state._partition_rows_histogram[i] += - local_state._partition_rows_histogram[i - 1]; + partition_rows_histogram[i] += partition_rows_histogram[i - 1]; } for (int32_t i = rows - 1; i >= 0; --i) { - (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i; - local_state._partition_rows_histogram[channel_ids[i]]--; + (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; + partition_rows_histogram[channel_ids[i]]--; } } for (int32_t i = 0; i < _num_partitions; i++) { - const size_t start = local_state._partition_rows_histogram[i]; - const size_t size = local_state._partition_rows_histogram[i + 1] - start; + const size_t start = partition_rows_histogram[i]; + const size_t size = partition_rows_histogram[i + 1] - start; if (size > 0) { std::unique_ptr<vectorized::MutableBlock> mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); - _enqueue_data_and_set_ready(i, local_state, + _enqueue_data_and_set_ready(i, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); } } @@ -522,34 +626,35 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, } Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, - bool eos, LocalExchangeSinkLocalState& local_state) { + bool eos, Profile&& profile, SinkInfo&& sink_info) { if (in_block->empty()) { return Status::OK(); } if (_is_pass_through) { - return _passthrough_sink(state, in_block, local_state); + return _passthrough_sink(state, in_block, std::move(sink_info)); } else { if (_total_block++ > _num_partitions) { _is_pass_through = true; } - return _shuffle_sink(state, in_block, local_state); + return _shuffle_sink(state, in_block, std::move(sink_info)); } } Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, - bool* eos, - LocalExchangeSourceLocalState& local_state) { + bool* eos, Profile&& profile, + SourceInfo&& source_info) { BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block); + _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); return Status::OK(); } -void AdaptivePassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { +void AdaptivePassthroughExchanger::close(SourceInfo&& source_info) { vectorized::Block next_block; bool eos; BlockWrapperSPtr wrapper; - _data_queue[local_state._channel_id].set_eos(); - while (_dequeue_data(local_state, wrapper, &eos, &next_block)) { + _data_queue[source_info.channel_id].set_eos(); + while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + source_info.channel_id)) { // do nothing } } diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 4d699baa52f..d6871b2ba97 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -20,14 +20,33 @@ #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" -namespace doris::pipeline { +namespace doris { #include "common/compile_check_begin.h" - +namespace vectorized { +class PartitionerBase; +} +namespace pipeline { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; struct BlockWrapper; class SortSourceOperatorX; +struct Profile { + RuntimeProfile::Counter* compute_hash_value_timer = nullptr; + RuntimeProfile::Counter* distribute_timer = nullptr; + RuntimeProfile::Counter* copy_data_timer = nullptr; +}; + +struct SinkInfo { + int* channel_id; + vectorized::PartitionerBase* partitioner; + LocalExchangeSinkLocalState* local_state; +}; + +struct SourceInfo { + int channel_id; + LocalExchangeSourceLocalState* local_state; +}; /** * One exchanger is hold by one `LocalExchangeSharedState`. And one `LocalExchangeSharedState` is * shared by all local exchange sink operators and source operators with the same id. @@ -60,15 +79,15 @@ public: _free_block_limit(free_block_limit) {} virtual ~ExchangerBase() = default; virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) = 0; + Profile&& profile, SourceInfo&& source_info) = 0; virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) = 0; + Profile&& profile, SinkInfo&& sink_info) = 0; virtual ExchangeType get_type() const = 0; // Called if a local exchanger source operator are closed. Free the unused data block in data_queue. - virtual void close(LocalExchangeSourceLocalState& local_state) = 0; + virtual void close(SourceInfo&& source_info) = 0; // Called if all local exchanger source operators are closed. We free the memory in // `_free_blocks` here. - virtual void finalize(LocalExchangeSourceLocalState& local_state); + virtual void finalize(); virtual std::string data_queue_debug_string(int i) = 0; @@ -155,12 +174,13 @@ public: protected: // Enqueue data block and set downstream source operator to read. - void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state, + void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state, BlockType&& block); - bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos, - vectorized::Block* data_block); - bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos, + bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); + + void _enqueue_data_and_set_ready(int channel_id, BlockType&& block); + bool _dequeue_data(BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); std::vector<BlockQueue<BlockType>> _data_queue; private: @@ -186,7 +206,7 @@ struct BlockWrapper { ~BlockWrapper() { DCHECK_EQ(ref_count.load(), 0); } void ref(int delta) { ref_count += delta; } void unref(LocalExchangeSharedState* shared_state, size_t allocated_bytes, int channel_id) { - if (ref_count.fetch_sub(1) == 1) { + if (ref_count.fetch_sub(1) == 1 && shared_state != nullptr) { DCHECK_GT(allocated_bytes, 0); shared_state->sub_total_mem_usage(allocated_bytes, channel_id); if (shared_state->exchanger->_free_block_limit == 0 || @@ -201,7 +221,7 @@ struct BlockWrapper { } } - void unref(LocalExchangeSharedState* shared_state, int channel_id) { + void unref(LocalExchangeSharedState* shared_state = nullptr, int channel_id = 0) { unref(shared_state, data_block.allocated_bytes(), channel_id); } int ref_value() const { return ref_count.load(); } @@ -219,19 +239,24 @@ public: DCHECK_GT(num_partitions, 0); DCHECK_GT(num_sources, 0); _data_queue.resize(num_sources); + _partition_rows_histogram.resize(running_sink_operators); } ~ShuffleExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; - void close(LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; + void close(SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, LocalExchangeSinkLocalState& local_state); + vectorized::Block* block, int channel_id, + LocalExchangeSinkLocalState* local_state); + Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, + vectorized::Block* block, int channel_id); + std::vector<std::vector<uint32_t>> _partition_rows_histogram; }; class BucketShuffleExchanger final : public ShuffleExchanger { @@ -255,13 +280,13 @@ public: _data_queue.resize(num_partitions); } ~PassthroughExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; }; class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> { @@ -273,13 +298,13 @@ public: _data_queue.resize(num_partitions); } ~PassToOneExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; }; class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> { @@ -292,17 +317,17 @@ public: _data_queue.resize(num_partitions); } ~LocalMergeSortExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::LOCAL_MERGE_SORT; } - Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState& local_state); + Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState* local_state); - void close(LocalExchangeSourceLocalState& local_state) override {} - void finalize(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override {} + void finalize() override; private: std::unique_ptr<vectorized::VSortedRunMerger> _merger; @@ -318,13 +343,13 @@ public: _data_queue.resize(num_partitions); } ~BroadcastExchanger() override = default; - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::BROADCAST; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; }; //The code in AdaptivePassthroughExchanger is essentially @@ -337,26 +362,28 @@ public: : Exchanger<BlockWrapperSPtr>(running_sink_operators, num_partitions, free_block_limit) { _data_queue.resize(num_partitions); + _partition_rows_histogram.resize(running_sink_operators); } - Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, - LocalExchangeSinkLocalState& local_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, Profile&& profile, + SinkInfo&& sink_info) override; - Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, - LocalExchangeSourceLocalState& local_state) override; + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, + SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } - void close(LocalExchangeSourceLocalState& local_state) override; + void close(SourceInfo&& source_info) override; private: Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, - LocalExchangeSinkLocalState& local_state); - Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, - LocalExchangeSinkLocalState& local_state); + SinkInfo&& sink_info); + Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, SinkInfo&& sink_info); Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, LocalExchangeSinkLocalState& local_state); + vectorized::Block* block, SinkInfo&& sink_info); std::atomic_bool _is_pass_through = false; std::atomic_int32_t _total_block = 0; + std::vector<std::vector<uint32_t>> _partition_rows_histogram; }; #include "common/compile_check_end.h" -} // namespace doris::pipeline +} // namespace pipeline +} // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org