This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new db0a43bad2e [Chore](exchange) change LocalExchangeSharedState:mem_usage signed ty… (#37981) db0a43bad2e is described below commit db0a43bad2ecf46974f90d2c3906d258abab5f43 Author: Pxl <pxl...@qq.com> AuthorDate: Wed Jul 17 13:46:51 2024 +0800 [Chore](exchange) change LocalExchangeSharedState:mem_usage signed ty… (#37981) pick from #36682 --- be/src/pipeline/pipeline_x/dependency.h | 17 ++-- .../pipeline_x/local_exchange/local_exchanger.cpp | 95 ++++++++++------------ 2 files changed, 47 insertions(+), 65 deletions(-) diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index b60b3e9ae3b..c82104f7535 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -365,9 +365,8 @@ public: std::vector<size_t> make_nullable_keys; struct MemoryRecord { - MemoryRecord() : used_in_arena(0), used_in_state(0) {} - int64_t used_in_arena; - int64_t used_in_state; + int64_t used_in_arena {}; + int64_t used_in_state {}; }; MemoryRecord mem_usage_record; bool agg_data_created_without_key = false; @@ -754,14 +753,9 @@ struct DataDistribution { DataDistribution(ExchangeType type) : distribution_type(type) {} DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_) : distribution_type(type), partition_exprs(partition_exprs_) {} - DataDistribution(const DataDistribution& other) - : distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {} + DataDistribution(const DataDistribution& other) = default; bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } - DataDistribution& operator=(const DataDistribution& other) { - distribution_type = other.distribution_type; - partition_exprs = other.partition_exprs; - return *this; - } + DataDistribution& operator=(const DataDistribution& other) = default; ExchangeType distribution_type; std::vector<TExpr> partition_exprs; }; @@ -774,7 +768,8 @@ public: LocalExchangeSharedState(int num_instances); std::unique_ptr<ExchangerBase> exchanger {}; std::vector<MemTracker*> mem_trackers; - std::atomic<size_t> mem_usage = 0; + std::atomic<int64_t> mem_usage = 0; + // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. std::mutex le_lock; void create_source_dependencies(int operator_id, int node_id, QueryContext* ctx) { for (size_t i = 0; i < source_deps.size(); i++) { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index f02fd0e5f04..7a044aaa77f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -72,20 +72,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block return Status::OK(); }; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { - SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( - block, partitioned_block.first->data_block); - RETURN_IF_ERROR(get_data(block)); - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data(block)); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -142,6 +136,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->set_ready_to_read(it.second); } else { + local_state._shared_state->sub_mem_usage( + it.second, new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); } } else { @@ -160,6 +156,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->set_ready_to_read(i % _num_sources); } else { + local_state._shared_state->sub_mem_usage( + i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), + false); new_block_wrapper->unref(local_state._shared_state); } } else { @@ -179,6 +178,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->set_ready_to_read(map[i]); } else { + local_state._shared_state->sub_mem_usage( + map[i], new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); } } else { @@ -198,9 +199,12 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo } new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; - local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); + size_t memory_usage = new_block.allocated_bytes(); + local_state._shared_state->add_mem_usage(channel_id, memory_usage); if (_data_queue[channel_id].enqueue(std::move(new_block))) { local_state._shared_state->set_ready_to_read(channel_id); + } else { + local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } return Status::OK(); @@ -218,25 +222,16 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - block->swap(next_block); - local_state._shared_state->sub_mem_usage(local_state._channel_id, - block->allocated_bytes()); - if (_free_block_limit == 0 || - _free_blocks.size_approx() < _free_block_limit * _num_sources) { - _free_blocks.enqueue(std::move(next_block)); - } - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); + local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } - local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -262,14 +257,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[0].try_dequeue(next_block)) { - *block = std::move(next_block); - } else { - *eos = true; - } - } else if (_data_queue[0].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[0].try_dequeue(next_block)) { *block = std::move(next_block); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -301,14 +293,11 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - *block = std::move(next_block); - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { *block = std::move(next_block); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -325,9 +314,12 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, } new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; - local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); + size_t memory_usage = new_block.allocated_bytes(); + local_state._shared_state->add_mem_usage(channel_id, memory_usage); if (_data_queue[channel_id].enqueue(std::move(new_block))) { local_state._shared_state->set_ready_to_read(channel_id); + } else { + local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } return Status::OK(); @@ -383,9 +375,13 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, vectorized::MutableBlock::create_unique(block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); - local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes()); + + size_t memory_usage = new_block.allocated_bytes(); + local_state._shared_state->add_mem_usage(i, memory_usage); if (data_queue[i].enqueue(std::move(new_block))) { local_state._shared_state->set_ready_to_read(i); + } else { + local_state._shared_state->sub_mem_usage(i, memory_usage); } } } @@ -408,25 +404,16 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - block->swap(next_block); - if (_free_block_limit == 0 || - _free_blocks.size_approx() < _free_block_limit * _num_sources) { - _free_blocks.enqueue(std::move(next_block)); - } - local_state._shared_state->sub_mem_usage(local_state._channel_id, - block->allocated_bytes()); - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org