This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 6c8412d9275 remove stale codes
6c8412d9275 is described below

commit 6c8412d9275c056702b558fede83c6fc10d810db
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Tue Jan 21 11:41:01 2025 +0800

    remove stale codes
---
 be/src/pipeline/exec/operator.h                    | 36 ----------------------
 .../exec/partitioned_aggregation_sink_operator.cpp |  2 --
 .../partitioned_aggregation_source_operator.cpp    |  2 --
 .../exec/partitioned_hash_join_probe_operator.cpp  |  2 --
 .../exec/partitioned_hash_join_sink_operator.cpp   |  2 --
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  2 --
 .../pipeline/exec/spill_sort_source_operator.cpp   |  2 --
 be/src/runtime/query_context.h                     | 11 -------
 8 files changed, 59 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 7d1236b85b1..1ea06d54ea9 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -260,27 +260,9 @@ public:
         return _dependency ? std::vector<Dependency*> {_dependency} : 
std::vector<Dependency*> {};
     }
 
-    void inc_running_big_mem_op_num(RuntimeState* state) {
-        if (!_big_mem_op_num_added) {
-            state->get_query_ctx()->inc_running_big_mem_op_num();
-            _big_mem_op_num_added = true;
-        }
-    }
-
-    void dec_running_big_mem_op_num(RuntimeState* state) {
-        if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
-            state->get_query_ctx()->dec_running_big_mem_op_num();
-            _big_mem_op_num_deced = true;
-        }
-    }
-
 protected:
     Dependency* _dependency = nullptr;
     SharedStateArg* _shared_state = nullptr;
-
-private:
-    bool _big_mem_op_num_added = false;
-    bool _big_mem_op_num_deced = false;
 };
 
 template <typename SharedStateArg>
@@ -527,28 +509,10 @@ public:
         return _dependency ? std::vector<Dependency*> {_dependency} : 
std::vector<Dependency*> {};
     }
 
-    void inc_running_big_mem_op_num(RuntimeState* state) {
-        if (!_big_mem_op_num_added) {
-            state->get_query_ctx()->inc_running_big_mem_op_num();
-            _big_mem_op_num_added = true;
-        }
-    }
-
-    void dec_running_big_mem_op_num(RuntimeState* state) {
-        if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
-            state->get_query_ctx()->dec_running_big_mem_op_num();
-            _big_mem_op_num_deced = true;
-        }
-    }
-
 protected:
     Dependency* _dependency = nullptr;
     std::shared_ptr<Dependency> _spill_dependency;
     SharedStateType* _shared_state = nullptr;
-
-private:
-    bool _big_mem_op_num_added = false;
-    bool _big_mem_op_num_deced = false;
 };
 
 class DataSinkOperatorXBase : public OperatorBase {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 982c13e60b5..e38f8090227 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -83,7 +83,6 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* 
state, Status exec_stat
     if (Base::_closed) {
         return Status::OK();
     }
-    dec_running_big_mem_op_num(state);
     return Base::close(state, exec_status);
 }
 
@@ -171,7 +170,6 @@ Status PartitionedAggSinkOperatorX::open(RuntimeState* 
state) {
 Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in_block,
                                          bool eos) {
     auto& local_state = get_local_state(state);
-    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     local_state._eos = eos;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index cd9a0a72bb1..edee6a66a41 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -111,7 +111,6 @@ Status PartitionedAggLocalState::close(RuntimeState* state) 
{
     if (_closed) {
         return Status::OK();
     }
-    dec_running_big_mem_op_num(state);
     return Base::close(state);
 }
 PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool,
@@ -153,7 +152,6 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
         }
     }};
 
-    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
 
     if (local_state._shared_state->is_spilled &&
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 565aeaa5fee..adb686ffa21 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -168,7 +168,6 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    dec_running_big_mem_op_num(state);
     RETURN_IF_ERROR(PipelineXSpillLocalState::close(state));
     return Status::OK();
 }
@@ -546,7 +545,6 @@ Status 
PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
 Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* input_block,
                                                bool eos) const {
     auto& local_state = get_local_state(state);
-    local_state.inc_running_big_mem_op_num(state);
     const auto rows = input_block->rows();
     auto& partitioned_blocks = local_state._partitioned_blocks;
     if (rows == 0) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 9ae263bc5c8..635b642cbf3 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -87,7 +87,6 @@ Status PartitionedHashJoinSinkLocalState::close(RuntimeState* 
state, Status exec
     if (PipelineXSpillSinkLocalState::_closed) {
         return Status::OK();
     }
-    dec_running_big_mem_op_num(state);
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     if (!_shared_state->need_to_spill && _shared_state->inner_runtime_state) {
         
RETURN_IF_ERROR(p._inner_sink_operator->close(_shared_state->inner_runtime_state.get(),
@@ -571,7 +570,6 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                                               bool eos) {
     auto& local_state = get_local_state(state);
     CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr);
-    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
 
     local_state._child_eos = eos;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 943d9a7c8d2..759b88e28bc 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -77,7 +77,6 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile* 
child_profile) {
 }
 
 Status SpillSortSinkLocalState::close(RuntimeState* state, Status 
execsink_status) {
-    dec_running_big_mem_op_num(state);
     return Base::close(state, execsink_status);
 }
 
@@ -151,7 +150,6 @@ size_t 
SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
 Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in_block,
                                     bool eos) {
     auto& local_state = get_local_state(state);
-    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index d344b22e08b..d815bfb7d1b 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -61,7 +61,6 @@ Status SpillSortLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    dec_running_big_mem_op_num(state);
     return Base::close(state);
 }
 int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) 
const {
@@ -260,7 +259,6 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Bloc
             local_state._current_merging_streams.clear();
         }
     }};
-    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
 
     if (local_state._shared_state->is_spilled) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 954369e29fa..68f64d26f21 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -222,16 +222,6 @@ public:
 
     WorkloadGroupPtr workload_group() const { return _workload_group; }
 
-    void inc_running_big_mem_op_num() {
-        _running_big_mem_op_num.fetch_add(1, std::memory_order_relaxed);
-    }
-    void dec_running_big_mem_op_num() {
-        _running_big_mem_op_num.fetch_sub(1, std::memory_order_relaxed);
-    }
-    int32_t get_running_big_mem_op_num() {
-        return _running_big_mem_op_num.load(std::memory_order_relaxed);
-    }
-
     void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); 
}
 
     void decrease_revoking_tasks_count();
@@ -351,7 +341,6 @@ private:
     ExecEnv* _exec_env = nullptr;
     MonotonicStopWatch _query_watcher;
     bool _is_nereids = false;
-    std::atomic<int> _running_big_mem_op_num = 0;
 
     std::mutex _revoking_tasks_mutex;
     std::atomic<int> _revoking_tasks_count = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to