This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 637e124f0fd734dde6f3e661419f9bcc5b9586cf
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Thu Aug 22 18:51:49 2024 +0800

    [refine](pipeline) refine some some operator close function (#39397)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/dependency.h                       |  1 -
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  5 ++-
 be/src/pipeline/exec/aggregation_sink_operator.h   |  1 +
 .../pipeline/exec/aggregation_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/analytic_source_operator.cpp  |  4 +--
 be/src/pipeline/exec/file_scan_operator.cpp        |  1 +
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  2 +-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  6 ++--
 be/src/pipeline/exec/hive_table_sink_operator.cpp  | 15 +--------
 be/src/pipeline/exec/hive_table_sink_operator.h    |  4 ---
 .../pipeline/exec/iceberg_table_sink_operator.cpp  | 15 +--------
 be/src/pipeline/exec/iceberg_table_sink_operator.h |  5 ---
 .../exec/nested_loop_join_build_operator.cpp       |  2 +-
 be/src/pipeline/exec/olap_table_sink_operator.cpp  | 37 ----------------------
 be/src/pipeline/exec/olap_table_sink_operator.h    |  4 ---
 .../pipeline/exec/olap_table_sink_v2_operator.cpp  | 37 ----------------------
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  4 ---
 .../pipeline/exec/partition_sort_sink_operator.cpp |  4 +--
 .../pipeline/exec/partition_sort_sink_operator.h   |  4 ++-
 .../exec/partition_sort_source_operator.cpp        |  2 +-
 .../partitioned_aggregation_source_operator.cpp    |  6 ++--
 be/src/pipeline/exec/result_file_sink_operator.cpp |  2 +-
 22 files changed, 24 insertions(+), 139 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 957a6ca8bd3..8def7be6147 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -316,7 +316,6 @@ public:
     vectorized::VExprContextSPtrs probe_expr_ctxs;
     size_t input_num_rows = 0;
     std::vector<vectorized::AggregateDataPtr> values;
-    std::unique_ptr<vectorized::Arena> agg_profile_arena;
     /// The total size of the row from the aggregate functions.
     size_t total_size_of_aggregate_states = 0;
     size_t align_aggregate_states = 1;
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ba93602cb81..a287d7fb278 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -48,7 +48,7 @@ namespace doris::pipeline {
 /// is in a random order. This means that we assume that the reduction factor 
will
 /// increase over time.
 AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
-        : Base(parent, state) {}
+        : Base(parent, state), 
_agg_profile_arena(std::make_unique<vectorized::Arena>()) {}
 
 Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -97,11 +97,10 @@ Status AggSinkLocalState::open(RuntimeState* state) {
         RETURN_IF_ERROR(
                 p._probe_expr_ctxs[i]->clone(state, 
Base::_shared_state->probe_expr_ctxs[i]));
     }
-    Base::_shared_state->agg_profile_arena = 
std::make_unique<vectorized::Arena>();
 
     if (Base::_shared_state->probe_expr_ctxs.empty()) {
         _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
-                
Base::_shared_state->agg_profile_arena->alloc(p._total_size_of_aggregate_states));
+                _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
 
         if (p._is_merge) {
             _executor = std::make_unique<Executor<true, true>>();
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 96f068b6dca..579b9eda1a6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -120,6 +120,7 @@ protected:
 
     AggregatedDataVariants* _agg_data = nullptr;
     vectorized::Arena* _agg_arena_pool = nullptr;
+    std::unique_ptr<vectorized::Arena> _agg_profile_arena;
 
     std::unique_ptr<ExecutorBase> _executor = nullptr;
 };
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 0c05c965f1f..3264ad56f3c 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -37,7 +37,7 @@ AggLocalState::AggLocalState(RuntimeState* state, 
OperatorXBase* parent)
 Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _get_results_timer = ADD_TIMER(profile(), "GetResultsTime");
     _serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime");
     _hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 406108fbc4f..3583642273b 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -34,7 +34,8 @@ AnalyticLocalState::AnalyticLocalState(RuntimeState* state, 
OperatorXBase* paren
           _rows_end_offset(0),
           _fn_place_ptr(nullptr),
           _agg_functions_size(0),
-          _agg_functions_created(false) {}
+          _agg_functions_created(false),
+          _agg_arena_pool(std::make_unique<vectorized::Arena>()) {}
 
 //_partition_by_columns,_order_by_columns save in blocks, so if need to 
calculate the boundary, may find in which blocks firstly
 BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx, BlockRowPos 
start,
@@ -168,7 +169,6 @@ Status AnalyticLocalState::open(RuntimeState* state) {
     RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    _agg_arena_pool = std::make_unique<vectorized::Arena>();
 
     auto& p = _parent->cast<AnalyticSourceOperatorX>();
     _agg_functions_size = p._agg_functions.size();
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 686f8be3021..8c1e4d19407 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -93,6 +93,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
 
 Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::init(state, info));
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<FileScanOperatorX>();
     _output_tuple_id = p._output_tuple_id;
     return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 3a55fdd9b86..cde42eae1e1 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -37,7 +37,7 @@ 
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
 Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 30943b56ff7..d953e80b701 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -28,12 +28,13 @@
 namespace doris::pipeline {
 
 HashJoinProbeLocalState::HashJoinProbeLocalState(RuntimeState* state, 
OperatorXBase* parent)
-        : JoinProbeLocalState<HashJoinSharedState, 
HashJoinProbeLocalState>(state, parent) {}
+        : JoinProbeLocalState<HashJoinSharedState, 
HashJoinProbeLocalState>(state, parent),
+          
_process_hashtable_ctx_variants(std::make_unique<HashTableCtxVariants>()) {}
 
 Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& 
info) {
     RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<HashJoinProbeOperatorX>();
     _shared_state->probe_ignore_null = p._probe_ignore_null;
     _probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
@@ -71,7 +72,6 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(JoinProbeLocalState::open(state));
 
-    _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
     auto& p = _parent->cast<HashJoinProbeOperatorX>();
     std::visit(
             [&](auto&& join_op_variants, auto have_other_join_conjunct) {
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.cpp 
b/be/src/pipeline/exec/hive_table_sink_operator.cpp
index b931d48e832..f7cb31eea5e 100644
--- a/be/src/pipeline/exec/hive_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/hive_table_sink_operator.cpp
@@ -24,23 +24,10 @@ namespace doris::pipeline {
 Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<Parent>();
     RETURN_IF_ERROR(_writer->init_properties(p._pool));
     return Status::OK();
 }
 
-Status HiveTableSinkLocalState::close(RuntimeState* state, Status exec_status) 
{
-    if (Base::_closed) {
-        return Status::OK();
-    }
-    SCOPED_TIMER(_close_timer);
-    SCOPED_TIMER(exec_time_counter());
-    if (_closed) {
-        return _close_status;
-    }
-    _close_status = Base::close(state, exec_status);
-    return _close_status;
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h 
b/be/src/pipeline/exec/hive_table_sink_operator.h
index e06338e1427..bee90a9f6c6 100644
--- a/be/src/pipeline/exec/hive_table_sink_operator.h
+++ b/be/src/pipeline/exec/hive_table_sink_operator.h
@@ -39,11 +39,7 @@ public:
         return Base::open(state);
     }
 
-    Status close(RuntimeState* state, Status exec_status) override;
     friend class HiveTableSinkOperatorX;
-
-private:
-    Status _close_status = Status::OK();
 };
 
 class HiveTableSinkOperatorX final : public 
DataSinkOperatorX<HiveTableSinkLocalState> {
diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp 
b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
index f63f23ddec5..44bde4e8812 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
@@ -24,23 +24,10 @@ namespace doris::pipeline {
 Status IcebergTableSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<Parent>();
     RETURN_IF_ERROR(_writer->init_properties(p._pool));
     return Status::OK();
 }
 
-Status IcebergTableSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
-    if (Base::_closed) {
-        return Status::OK();
-    }
-    SCOPED_TIMER(_close_timer);
-    SCOPED_TIMER(exec_time_counter());
-    if (_closed) {
-        return _close_status;
-    }
-    _close_status = Base::close(state, exec_status);
-    return _close_status;
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.h 
b/be/src/pipeline/exec/iceberg_table_sink_operator.h
index 09df1c20b40..dd93d6934e1 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.h
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.h
@@ -38,12 +38,7 @@ public:
         SCOPED_TIMER(_open_timer);
         return Base::open(state);
     }
-
-    Status close(RuntimeState* state, Status exec_status) override;
     friend class IcebergTableSinkOperatorX;
-
-private:
-    Status _close_status = Status::OK();
 };
 
 class IcebergTableSinkOperatorX final : public 
DataSinkOperatorX<IcebergTableSinkLocalState> {
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index e01505b5f79..9e44a399bd8 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -61,7 +61,7 @@ 
NestedLoopJoinBuildSinkLocalState::NestedLoopJoinBuildSinkLocalState(DataSinkOpe
 Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
     _runtime_filters.resize(p._runtime_filter_descs.size());
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.cpp 
b/be/src/pipeline/exec/olap_table_sink_operator.cpp
deleted file mode 100644
index 60e6180469c..00000000000
--- a/be/src/pipeline/exec/olap_table_sink_operator.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap_table_sink_operator.h"
-
-#include "common/status.h"
-
-namespace doris::pipeline {
-
-Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status) 
{
-    if (Base::_closed) {
-        return Status::OK();
-    }
-    SCOPED_TIMER(_close_timer);
-    SCOPED_TIMER(exec_time_counter());
-    if (_closed) {
-        return _close_status;
-    }
-    _close_status = Base::close(state, exec_status);
-    return _close_status;
-}
-
-} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index 74decf9c278..5eafc2ea25f 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -32,11 +32,7 @@ public:
     ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState);
     OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state) {};
-    Status close(RuntimeState* state, Status exec_status) override;
     friend class OlapTableSinkOperatorX;
-
-private:
-    Status _close_status = Status::OK();
 };
 class OlapTableSinkOperatorX final : public 
DataSinkOperatorX<OlapTableSinkLocalState> {
 public:
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
deleted file mode 100644
index b476611b719..00000000000
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
+++ /dev/null
@@ -1,37 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap_table_sink_v2_operator.h"
-
-#include "common/status.h"
-
-namespace doris::pipeline {
-
-Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status 
exec_status) {
-    if (Base::_closed) {
-        return Status::OK();
-    }
-    SCOPED_TIMER(_close_timer);
-    SCOPED_TIMER(exec_time_counter());
-    if (_closed) {
-        return _close_status;
-    }
-    _close_status = Base::close(state, exec_status);
-    return _close_status;
-}
-
-} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 2cd82016f9e..4ffd062f99e 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -32,11 +32,7 @@ public:
     ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
     OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
             : Base(parent, state) {};
-    Status close(RuntimeState* state, Status exec_status) override;
     friend class OlapTableSinkV2OperatorX;
-
-private:
-    Status _close_status = Status::OK();
 };
 
 class OlapTableSinkV2OperatorX final : public 
DataSinkOperatorX<OlapTableSinkV2LocalState> {
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 404d9095f96..0c165350613 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -99,7 +99,7 @@ Status PartitionBlocks::do_partition_topn_sort() {
 Status PartitionSortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     
RETURN_IF_ERROR(PipelineXSinkLocalState<PartitionSortNodeSharedState>::init(state,
 info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<PartitionSortSinkOperatorX>();
     RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
     _partition_expr_ctxs.resize(p._partition_expr_ctxs.size());
@@ -108,8 +108,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
         RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, 
_partition_expr_ctxs[i]));
     }
     _partition_exprs_num = p._partition_exprs_num;
-    _partitioned_data = std::make_unique<PartitionedHashMapVariants>();
-    _agg_arena_pool = std::make_unique<vectorized::Arena>();
     _hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", 
TUnit::UNIT);
     _build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
     _selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 25ad0309bde..5c1484ed3bc 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -214,7 +214,9 @@ class PartitionSortSinkLocalState : public 
PipelineXSinkLocalState<PartitionSort
 
 public:
     PartitionSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
-            : PipelineXSinkLocalState<PartitionSortNodeSharedState>(parent, 
state) {}
+            : PipelineXSinkLocalState<PartitionSortNodeSharedState>(parent, 
state),
+              
_partitioned_data(std::make_unique<PartitionedHashMapVariants>()),
+              _agg_arena_pool(std::make_unique<vectorized::Arena>()) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 17400d108d0..2f94a652a89 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -27,7 +27,7 @@ namespace pipeline {
 Status PartitionSortSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortNodeSharedState>::init(state, 
info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
     _sorted_partition_output_rows_counter =
             ADD_COUNTER(profile(), "SortedPartitionOutputRows", TUnit::UNIT);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 153676851ac..5e030e7ab49 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -36,18 +36,20 @@ 
PartitionedAggLocalState::PartitionedAggLocalState(RuntimeState* state, Operator
 Status PartitionedAggLocalState::init(RuntimeState* state, LocalStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _init_counters();
     return Status::OK();
 }
 
 Status PartitionedAggLocalState::open(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::open(state));
+    SCOPED_TIMER(_open_timer);
     if (_opened) {
         return Status::OK();
     }
     _opened = true;
     RETURN_IF_ERROR(setup_in_memory_agg_op(state));
-    return Base::open(state);
+    return Status::OK();
 }
 
 void PartitionedAggLocalState::_init_counters() {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 0ba727543cd..8871a299cbb 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -85,7 +85,7 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) {
 Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _sender_id = info.sender_id;
 
     _brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to