This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 6c8412d9275 remove stale codes 6c8412d9275 is described below commit 6c8412d9275c056702b558fede83c6fc10d810db Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Tue Jan 21 11:41:01 2025 +0800 remove stale codes --- be/src/pipeline/exec/operator.h | 36 ---------------------- .../exec/partitioned_aggregation_sink_operator.cpp | 2 -- .../partitioned_aggregation_source_operator.cpp | 2 -- .../exec/partitioned_hash_join_probe_operator.cpp | 2 -- .../exec/partitioned_hash_join_sink_operator.cpp | 2 -- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 2 -- .../pipeline/exec/spill_sort_source_operator.cpp | 2 -- be/src/runtime/query_context.h | 11 ------- 8 files changed, 59 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 7d1236b85b1..1ea06d54ea9 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -260,27 +260,9 @@ public: return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {}; } - void inc_running_big_mem_op_num(RuntimeState* state) { - if (!_big_mem_op_num_added) { - state->get_query_ctx()->inc_running_big_mem_op_num(); - _big_mem_op_num_added = true; - } - } - - void dec_running_big_mem_op_num(RuntimeState* state) { - if (_big_mem_op_num_added && !_big_mem_op_num_deced) { - state->get_query_ctx()->dec_running_big_mem_op_num(); - _big_mem_op_num_deced = true; - } - } - protected: Dependency* _dependency = nullptr; SharedStateArg* _shared_state = nullptr; - -private: - bool _big_mem_op_num_added = false; - bool _big_mem_op_num_deced = false; }; template <typename SharedStateArg> @@ -527,28 +509,10 @@ public: return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {}; } - void inc_running_big_mem_op_num(RuntimeState* state) { - if (!_big_mem_op_num_added) { - state->get_query_ctx()->inc_running_big_mem_op_num(); - _big_mem_op_num_added = true; - } - } - - void dec_running_big_mem_op_num(RuntimeState* state) { - if (_big_mem_op_num_added && !_big_mem_op_num_deced) { - state->get_query_ctx()->dec_running_big_mem_op_num(); - _big_mem_op_num_deced = true; - } - } - protected: Dependency* _dependency = nullptr; std::shared_ptr<Dependency> _spill_dependency; SharedStateType* _shared_state = nullptr; - -private: - bool _big_mem_op_num_added = false; - bool _big_mem_op_num_deced = false; }; class DataSinkOperatorXBase : public OperatorBase { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 982c13e60b5..e38f8090227 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -83,7 +83,6 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat if (Base::_closed) { return Status::OK(); } - dec_running_big_mem_op_num(state); return Base::close(state, exec_status); } @@ -171,7 +170,6 @@ Status PartitionedAggSinkOperatorX::open(RuntimeState* state) { Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); - local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); local_state._eos = eos; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index cd9a0a72bb1..edee6a66a41 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -111,7 +111,6 @@ Status PartitionedAggLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - dec_running_big_mem_op_num(state); return Base::close(state); } PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool, @@ -153,7 +152,6 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: } }}; - local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._shared_state->is_spilled && diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 565aeaa5fee..adb686ffa21 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -168,7 +168,6 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - dec_running_big_mem_op_num(state); RETURN_IF_ERROR(PipelineXSpillLocalState::close(state)); return Status::OK(); } @@ -546,7 +545,6 @@ Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) { Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* input_block, bool eos) const { auto& local_state = get_local_state(state); - local_state.inc_running_big_mem_op_num(state); const auto rows = input_block->rows(); auto& partitioned_blocks = local_state._partitioned_blocks; if (rows == 0) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 9ae263bc5c8..635b642cbf3 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -87,7 +87,6 @@ Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec if (PipelineXSpillSinkLocalState::_closed) { return Status::OK(); } - dec_running_big_mem_op_num(state); auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); if (!_shared_state->need_to_spill && _shared_state->inner_runtime_state) { RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->inner_runtime_state.get(), @@ -571,7 +570,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B bool eos) { auto& local_state = get_local_state(state); CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr); - local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); local_state._child_eos = eos; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 943d9a7c8d2..759b88e28bc 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -77,7 +77,6 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* child_profile) { } Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { - dec_running_big_mem_op_num(state); return Base::close(state, execsink_status); } @@ -151,7 +150,6 @@ size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const { Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); - local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index d344b22e08b..d815bfb7d1b 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -61,7 +61,6 @@ Status SpillSortLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - dec_running_big_mem_op_num(state); return Base::close(state); } int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const { @@ -260,7 +259,6 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc local_state._current_merging_streams.clear(); } }}; - local_state.inc_running_big_mem_op_num(state); SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._shared_state->is_spilled) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 954369e29fa..68f64d26f21 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -222,16 +222,6 @@ public: WorkloadGroupPtr workload_group() const { return _workload_group; } - void inc_running_big_mem_op_num() { - _running_big_mem_op_num.fetch_add(1, std::memory_order_relaxed); - } - void dec_running_big_mem_op_num() { - _running_big_mem_op_num.fetch_sub(1, std::memory_order_relaxed); - } - int32_t get_running_big_mem_op_num() { - return _running_big_mem_op_num.load(std::memory_order_relaxed); - } - void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); } void decrease_revoking_tasks_count(); @@ -351,7 +341,6 @@ private: ExecEnv* _exec_env = nullptr; MonotonicStopWatch _query_watcher; bool _is_nereids = false; - std::atomic<int> _running_big_mem_op_num = 0; std::mutex _revoking_tasks_mutex; std::atomic<int> _revoking_tasks_count = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org