This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a95d58b7d1a [Chore](exchange) change LocalExchangeSharedState:mem_usage signed type to avoid query … (#36682) a95d58b7d1a is described below commit a95d58b7d1a5d74813e0fa6adb7e7d4b4db7e845 Author: Pxl <pxl...@qq.com> AuthorDate: Mon Jun 24 10:00:59 2024 +0800 [Chore](exchange) change LocalExchangeSharedState:mem_usage signed type to avoid query … (#36682) …blocked when negative mem_usage ## Proposed changes change LocalExchangeSharedState:mem_usage signed type to avoid query blocked when negative mem_usage --- be/src/pipeline/dependency.h | 40 +++----- be/src/pipeline/local_exchange/local_exchanger.cpp | 114 +++++++++++---------- be/src/pipeline/local_exchange/local_exchanger.h | 1 + 3 files changed, 75 insertions(+), 80 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 0f9c698a82e..5214022db13 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -336,9 +336,8 @@ public: std::vector<size_t> make_nullable_keys; struct MemoryRecord { - MemoryRecord() : used_in_arena(0), used_in_state(0) {} - int64_t used_in_arena; - int64_t used_in_state; + int64_t used_in_arena {}; + int64_t used_in_state {}; }; MemoryRecord mem_usage_record; bool agg_data_created_without_key = false; @@ -362,11 +361,7 @@ public: _order_directions(order_directions), _null_directions(null_directions) {} - HeapLimitCursor(const HeapLimitCursor& other) noexcept - : _row_id(other._row_id), - _limit_columns(other._limit_columns), - _order_directions(other._order_directions), - _null_directions(other._null_directions) {} + HeapLimitCursor(const HeapLimitCursor& other) = default; HeapLimitCursor(HeapLimitCursor&& other) noexcept : _row_id(other._row_id), @@ -567,11 +562,10 @@ public: }; struct BlockRowPos { - BlockRowPos() : block_num(0), row_num(0), pos(0) {} - int64_t block_num; //the pos at which block - int64_t row_num; //the pos at which row - int64_t pos; //pos = all blocks size + row_num - std::string debug_string() { + int64_t block_num {}; //the pos at which block + int64_t row_num {}; //the pos at which row + int64_t pos {}; //pos = all blocks size + row_num + std::string debug_string() const { std::string res = "\t block_num: "; res += std::to_string(block_num); res += "\t row_num: "; @@ -823,14 +817,9 @@ struct DataDistribution { DataDistribution(ExchangeType type) : distribution_type(type) {} DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_) : distribution_type(type), partition_exprs(partition_exprs_) {} - DataDistribution(const DataDistribution& other) - : distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {} + DataDistribution(const DataDistribution& other) = default; bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } - DataDistribution& operator=(const DataDistribution& other) { - distribution_type = other.distribution_type; - partition_exprs = other.partition_exprs; - return *this; - } + DataDistribution& operator=(const DataDistribution& other) = default; ExchangeType distribution_type; std::vector<TExpr> partition_exprs; }; @@ -843,13 +832,14 @@ public: LocalExchangeSharedState(int num_instances); std::unique_ptr<ExchangerBase> exchanger {}; std::vector<MemTracker*> mem_trackers; - std::atomic<size_t> mem_usage = 0; + std::atomic<int64_t> mem_usage = 0; + // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. std::mutex le_lock; void create_source_dependencies(int operator_id, int node_id) { - for (size_t i = 0; i < source_deps.size(); i++) { - source_deps[i] = std::make_shared<Dependency>(operator_id, node_id, - "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY"); - source_deps[i]->set_shared_state(this); + for (auto& source_dep : source_deps) { + source_dep = std::make_shared<Dependency>(operator_id, node_id, + "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY"); + source_dep->set_shared_state(this); } }; void sub_running_sink_operators(); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 51d2c8268e7..27b7fc7e7fd 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -74,20 +74,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block return Status::OK(); }; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { - SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( - block, partitioned_block.first->data_block); - RETURN_IF_ERROR(get_data(block)); - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data(block)); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -144,6 +138,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->set_ready_to_read(it.second); } else { + local_state._shared_state->sub_mem_usage( + it.second, new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); } } else { @@ -162,6 +158,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->set_ready_to_read(i % _num_sources); } else { + local_state._shared_state->sub_mem_usage( + i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), + false); new_block_wrapper->unref(local_state._shared_state); } } else { @@ -181,6 +180,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->set_ready_to_read(map[i]); } else { + local_state._shared_state->sub_mem_usage( + map[i], new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); } } else { @@ -200,9 +201,12 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo } new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; - local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); + size_t memory_usage = new_block.allocated_bytes(); + local_state._shared_state->add_mem_usage(channel_id, memory_usage); if (_data_queue[channel_id].enqueue(std::move(new_block))) { local_state._shared_state->set_ready_to_read(channel_id); + } else { + local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } return Status::OK(); @@ -220,25 +224,16 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - block->swap(next_block); - local_state._shared_state->sub_mem_usage(local_state._channel_id, - block->allocated_bytes()); - if (_free_block_limit == 0 || - _free_blocks.size_approx() < _free_block_limit * _num_sources) { - _free_blocks.enqueue(std::move(next_block)); - } - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); + local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } - local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -264,14 +259,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[0].try_dequeue(next_block)) { - *block = std::move(next_block); - } else { - *eos = true; - } - } else if (_data_queue[0].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[0].try_dequeue(next_block)) { *block = std::move(next_block); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -287,10 +279,14 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_ } new_block.swap(*in_block); DCHECK_LE(local_state._channel_id, _data_queue.size()); - add_mem_usage(local_state, new_block.allocated_bytes()); + + size_t memory_usage = new_block.allocated_bytes(); + add_mem_usage(local_state, memory_usage); if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) { local_state._shared_state->set_ready_to_read(0); + } else { + sub_mem_usage(local_state, memory_usage); } if (eos) { _queue_deps[local_state._channel_id]->set_always_ready(); @@ -350,6 +346,19 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block return Status::OK(); } +void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSinkLocalState& local_state, + int64_t delta) { + const auto channel_id = local_state._channel_id; + local_state._shared_state->mem_trackers[channel_id]->release(delta); + if (_queues_mem_usege[channel_id].fetch_sub(delta) > _each_queue_limit) { + _sink_deps[channel_id]->set_ready(); + } + // if queue empty , block this queue + if (_queues_mem_usege[channel_id] == 0) { + _queue_deps[channel_id]->block(); + } +} + void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta) { const auto channel_id = local_state._channel_id; @@ -412,14 +421,11 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - *block = std::move(next_block); - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { *block = std::move(next_block); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); @@ -436,9 +442,12 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, } new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; - local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); + size_t memory_usage = new_block.allocated_bytes(); + local_state._shared_state->add_mem_usage(channel_id, memory_usage); if (_data_queue[channel_id].enqueue(std::move(new_block))) { local_state._shared_state->set_ready_to_read(channel_id); + } else { + local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } return Status::OK(); @@ -494,9 +503,13 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, vectorized::MutableBlock::create_unique(block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); - local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes()); + + size_t memory_usage = new_block.allocated_bytes(); + local_state._shared_state->add_mem_usage(i, memory_usage); if (data_queue[i].enqueue(std::move(new_block))) { local_state._shared_state->set_ready_to_read(i); + } else { + local_state._shared_state->sub_mem_usage(i, memory_usage); } } } @@ -519,25 +532,16 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (_running_sink_operators == 0) { - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - block->swap(next_block); - if (_free_block_limit == 0 || - _free_blocks.size_approx() < _free_block_limit * _num_sources) { - _free_blocks.enqueue(std::move(next_block)); - } - local_state._shared_state->sub_mem_usage(local_state._channel_id, - block->allocated_bytes()); - } else { - *eos = true; - } - } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + } else if (all_finished) { + *eos = true; } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 741b86aa8bb..2c4f8f5b785 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -256,6 +256,7 @@ public: std::vector<Dependency*> local_state_dependency(int channel_id) override; void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta); + void sub_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t delta); void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int channel_id, int64_t delta); void close(LocalExchangeSourceLocalState& local_state) override {} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org