This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 47832e3e323 [refactor](rename) Rename open function in operator 
(#48402)
47832e3e323 is described below

commit 47832e3e323db0dc36a9bfd13c595cb1beeb36cc
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Thu Feb 27 16:20:25 2025 +0800

    [refactor](rename) Rename open function in operator (#48402)
    
    Change `open` to `prepare` in operator class.
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  4 ++--
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  4 ++--
 be/src/pipeline/exec/analytic_sink_operator.h      |  2 +-
 be/src/pipeline/exec/analytic_source_operator.cpp  |  4 ++--
 be/src/pipeline/exec/analytic_source_operator.h    |  2 +-
 be/src/pipeline/exec/cache_source_operator.h       |  5 -----
 be/src/pipeline/exec/datagen_operator.cpp          |  4 ++--
 be/src/pipeline/exec/datagen_operator.h            |  2 +-
 .../distinct_streaming_aggregation_operator.cpp    |  4 ++--
 .../exec/distinct_streaming_aggregation_operator.h |  2 +-
 be/src/pipeline/exec/es_scan_operator.cpp          |  4 ++--
 be/src/pipeline/exec/es_scan_operator.h            |  2 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  4 ++--
 be/src/pipeline/exec/exchange_sink_operator.h      |  2 +-
 be/src/pipeline/exec/exchange_source_operator.cpp  |  4 ++--
 be/src/pipeline/exec/exchange_source_operator.h    |  2 +-
 be/src/pipeline/exec/file_scan_operator.cpp        |  4 ++--
 be/src/pipeline/exec/file_scan_operator.h          |  2 +-
 .../exec/group_commit_block_sink_operator.cpp      |  4 ++--
 .../exec/group_commit_block_sink_operator.h        |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  4 ++--
 be/src/pipeline/exec/hashjoin_build_sink.h         |  2 +-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  4 ++--
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  2 +-
 be/src/pipeline/exec/hive_table_sink_operator.h    |  4 ++--
 be/src/pipeline/exec/iceberg_table_sink_operator.h |  4 ++--
 be/src/pipeline/exec/jdbc_table_sink_operator.cpp  |  4 ++--
 be/src/pipeline/exec/jdbc_table_sink_operator.h    |  2 +-
 .../pipeline/exec/memory_scratch_sink_operator.cpp |  4 ++--
 .../pipeline/exec/memory_scratch_sink_operator.h   |  2 +-
 .../pipeline/exec/multi_cast_data_stream_source.h  |  4 ++--
 .../exec/nested_loop_join_build_operator.cpp       |  4 ++--
 .../exec/nested_loop_join_build_operator.h         |  2 +-
 .../exec/nested_loop_join_probe_operator.cpp       |  4 ++--
 .../exec/nested_loop_join_probe_operator.h         |  2 +-
 be/src/pipeline/exec/olap_table_sink_operator.h    |  4 ++--
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  4 ++--
 be/src/pipeline/exec/operator.cpp                  |  4 ++--
 be/src/pipeline/exec/operator.h                    |  6 +++---
 .../pipeline/exec/partition_sort_sink_operator.cpp |  4 ++--
 .../pipeline/exec/partition_sort_sink_operator.h   |  2 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  4 ++--
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +-
 .../partitioned_aggregation_source_operator.cpp    |  6 +++---
 .../exec/partitioned_aggregation_source_operator.h |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  8 ++++----
 .../exec/partitioned_hash_join_probe_operator.h    |  2 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  6 +++---
 .../exec/partitioned_hash_join_sink_operator.h     |  2 +-
 be/src/pipeline/exec/repeat_operator.cpp           |  4 ++--
 be/src/pipeline/exec/repeat_operator.h             |  2 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |  4 ++--
 be/src/pipeline/exec/result_file_sink_operator.h   |  2 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |  4 ++--
 be/src/pipeline/exec/result_sink_operator.h        |  2 +-
 be/src/pipeline/exec/scan_operator.cpp             |  4 ++--
 be/src/pipeline/exec/scan_operator.h               |  2 +-
 be/src/pipeline/exec/schema_scan_operator.cpp      |  4 ++--
 be/src/pipeline/exec/schema_scan_operator.h        |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  4 ++--
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  4 ++--
 be/src/pipeline/exec/set_sink_operator.h           |  2 +-
 be/src/pipeline/exec/sort_sink_operator.cpp        |  4 ++--
 be/src/pipeline/exec/sort_sink_operator.h          |  2 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |  4 ++--
 be/src/pipeline/exec/sort_source_operator.h        |  2 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  6 +++---
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  2 +-
 .../pipeline/exec/spill_sort_source_operator.cpp   |  6 +++---
 be/src/pipeline/exec/spill_sort_source_operator.h  |  2 +-
 .../exec/streaming_aggregation_operator.cpp        |  4 ++--
 .../pipeline/exec/streaming_aggregation_operator.h |  2 +-
 be/src/pipeline/exec/table_function_operator.cpp   |  4 ++--
 be/src/pipeline/exec/table_function_operator.h     |  2 +-
 be/src/pipeline/exec/union_sink_operator.cpp       |  4 ++--
 be/src/pipeline/exec/union_sink_operator.h         |  2 +-
 be/src/pipeline/exec/union_source_operator.h       |  4 ++--
 .../local_exchange_sink_operator.cpp               |  4 ++--
 .../local_exchange/local_exchange_sink_operator.h  |  2 +-
 .../local_exchange_source_operator.h               |  2 +-
 be/src/pipeline/pipeline.cpp                       |  4 ++--
 be/test/pipeline/operator/agg_operator_test.cpp    | 24 +++++++++++-----------
 .../operator/exchange_sink_operator_test.cpp       |  2 +-
 be/test/vec/exec/vfile_scanner_exception_test.cpp  |  2 +-
 be/test/vec/exec/vwal_scanner_test.cpp             |  2 +-
 87 files changed, 149 insertions(+), 154 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index a21d8aefdce..0f1c9c9ffeb 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -770,8 +770,8 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status AggSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state));
+Status AggSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::prepare(state));
 
     RETURN_IF_ERROR(_init_probe_expr_ctx(state));
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7ee1d973be7..e664c3bf297 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -150,7 +150,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 939a924b9bd..fb3117c800f 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -644,8 +644,8 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
     return Status::OK();
 }
 
-Status AnalyticSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
+Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::prepare(state));
     for (const auto& ctx : _agg_expr_ctxs) {
         RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, 
_child->row_desc()));
     }
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 31b15d5a245..99e09372302 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -176,7 +176,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index ada7fa511a8..44e334de6a1 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -86,8 +86,8 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
     return Status::OK();
 }
 
-Status AnalyticSourceOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
+Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state));
     DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index be1fdb2c9e5..b1828eeabe6 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -50,7 +50,7 @@ public:
 
     bool is_source() const override { return true; }
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
 private:
     friend class AnalyticLocalState;
diff --git a/be/src/pipeline/exec/cache_source_operator.h 
b/be/src/pipeline/exec/cache_source_operator.h
index 146c984d04a..651f9ff5596 100644
--- a/be/src/pipeline/exec/cache_source_operator.h
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -81,11 +81,6 @@ public:
 
     bool is_source() const override { return true; }
 
-    Status open(RuntimeState* state) override {
-        static_cast<void>(Base::open(state));
-        return Status::OK();
-    }
-
     const RowDescriptor& intermediate_row_desc() const override {
         return _child->intermediate_row_desc();
     }
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 4e6e4ec513f..7118574282a 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
     return Status::OK();
 }
 
-Status DataGenSourceOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state));
+Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
     // get tuple desc
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
 
diff --git a/be/src/pipeline/exec/datagen_operator.h 
b/be/src/pipeline/exec/datagen_operator.h
index 8284db35c76..a45f7381f57 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -58,7 +58,7 @@ public:
 #endif
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     [[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 6187e356351..24df662bb57 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -363,8 +363,8 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& 
tnode, RuntimeState*
     return Status::OK();
 }
 
-Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
+Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
     _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     DCHECK_EQ(_intermediate_tuple_desc->slots().size(), 
_output_tuple_desc->slots().size());
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 4c5fcd5efa7..1066ea37236 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -98,7 +98,7 @@ public:
     DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
                                   const DescriptorTbl& descs, bool 
require_bucket_distribution);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) 
const override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override;
     bool need_more_input_data(RuntimeState* state) const override;
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index 2cb3cd5e0b2..030753cd231 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -136,8 +136,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status EsScanOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state));
+Status EsScanOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
 
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     if (_tuple_desc == nullptr) {
diff --git a/be/src/pipeline/exec/es_scan_operator.h 
b/be/src/pipeline/exec/es_scan_operator.h
index 6e64110997e..daf391ae982 100644
--- a/be/src/pipeline/exec/es_scan_operator.h
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -72,7 +72,7 @@ public:
                     const DescriptorTbl& descs, int parallel_tasks);
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
 private:
     friend class EsScanLocalState;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71311e4d51f..18f3f0740c5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -313,8 +313,8 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
     return Status::OK();
 }
 
-Status ExchangeSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
+Status ExchangeSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::prepare(state));
     _state = state;
     _compression_type = state->fragement_transmission_compression_type();
     if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index eb82474d694..3254992bbd5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -195,7 +195,7 @@ public:
 
     RuntimeState* state() { return _state; }
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 2aefdf8cd3d..e81e2e5c7b4 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -128,8 +128,8 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state
     return Status::OK();
 }
 
-Status ExchangeSourceOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
+Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state));
     DCHECK_GT(_num_senders, 0);
 
     if (_is_merging) {
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index ff9c5840033..414b0f79c34 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -70,7 +70,7 @@ public:
     ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                             const DescriptorTbl& descs, int num_senders);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 8d9dc657501..4fcf51f8320 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -125,8 +125,8 @@ Status FileScanLocalState::_process_conjuncts(RuntimeState* 
state) {
     return Status::OK();
 }
 
-Status FileScanOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::open(state));
+Status FileScanOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
     if (state->get_query_ctx() != nullptr &&
         
state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) {
         TFileScanRangeParams& params =
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index 8b7b25a025e..73b8454c532 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -77,7 +77,7 @@ public:
         _output_tuple_id = tnode.file_scan_node.tuple_id;
     }
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     bool is_file_scan_operator() const override { return true; }
 
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 9f99d55d3ea..958a60b66e3 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -269,8 +269,8 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& 
t_sink) {
     return Status::OK();
 }
 
-Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(Base::open(state));
+Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
     // get table's tuple descriptor
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
     if (_output_tuple_desc == nullptr) {
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h 
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index a4d2fa16f96..89dbf76a6bc 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -100,7 +100,7 @@ public:
 
     Status init(const TDataSink& sink) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e271dbacba0..e2b911a583b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -574,8 +574,8 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
     return Status::OK();
 }
 
-Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::open(state));
+Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::prepare(state));
     if (_is_broadcast_join) {
         if (state->enable_share_hash_table_for_broadcast_join()) {
             _shared_hashtable_controller =
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index b1f0cdca640..62a4be16792 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -121,7 +121,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index e3212800d5c..a55379a02ad 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -493,8 +493,8 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
     return Status::OK();
 }
 
-Status HashJoinProbeOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::open(state));
+Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
     // init left/right output slots flags, only column of slot_id in 
_hash_output_slot_ids need
     // insert to output block of hash join.
     // _left_output_slots_flags : column of left table need to output set flag 
= true
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index d36658b995a..7928c4bc411 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -129,7 +129,7 @@ public:
     HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                            const DescriptorTbl& descs);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override;
     Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h 
b/be/src/pipeline/exec/hive_table_sink_operator.h
index 9304d15962c..725fd8e0490 100644
--- a/be/src/pipeline/exec/hive_table_sink_operator.h
+++ b/be/src/pipeline/exec/hive_table_sink_operator.h
@@ -60,8 +60,8 @@ public:
         return Status::OK();
     }
 
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(Base::open(state));
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
         RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
         return vectorized::VExpr::open(_output_vexpr_ctxs, state);
     }
diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.h 
b/be/src/pipeline/exec/iceberg_table_sink_operator.h
index 9b7ed510936..284657a7219 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.h
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.h
@@ -59,8 +59,8 @@ public:
         return Status::OK();
     }
 
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(Base::open(state));
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
         RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
         return vectorized::VExpr::open(_output_vexpr_ctxs, state);
     }
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp 
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index df47a6ea3ac..d0abe6aa0d2 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -39,8 +39,8 @@ Status JdbcTableSinkOperatorX::init(const TDataSink& 
thrift_sink) {
     return Status::OK();
 }
 
-Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::open(state));
+Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
     RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
     return Status::OK();
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h 
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
index a0dae301a5f..d1b617402b2 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.h
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -45,7 +45,7 @@ public:
     JdbcTableSinkOperatorX(const RowDescriptor& row_desc, int operator_id,
                            const std::vector<TExpr>& select_exprs);
     Status init(const TDataSink& thrift_sink) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp 
b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
index 69ef3ec8b58..86afd607432 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
@@ -77,8 +77,8 @@ Status MemoryScratchSinkOperatorX::init(const TDataSink& 
thrift_sink) {
     return Status::OK();
 }
 
-Status MemoryScratchSinkOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::open(state));
+Status MemoryScratchSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
     _timezone_obj = state->timezone_obj();
     RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.h 
b/be/src/pipeline/exec/memory_scratch_sink_operator.h
index 352826955fc..6b5c15feed7 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.h
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.h
@@ -56,7 +56,7 @@ public:
     MemoryScratchSinkOperatorX(const RowDescriptor& row_desc, int operator_id,
                                const std::vector<TExpr>& t_output_expr);
     Status init(const TDataSink& thrift_sink) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index a69ef52ba1c..bfe6b00a29a 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -91,8 +91,8 @@ public:
     };
     ~MultiCastDataStreamerSourceOperatorX() override = default;
 
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(Base::open(state));
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
         // init profile for runtime filter
         // 
RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile());
         if (_t_data_stream_sink.__isset.output_exprs) {
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index a4dd990f666..bc6c7e274da 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -105,8 +105,8 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const 
TPlanNode& tnode, RuntimeSta
     return Status::OK();
 }
 
-Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::open(state));
+Status NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::prepare(state));
     size_t num_build_tuples = _child->row_desc().tuple_descriptors().size();
 
     for (size_t i = 0; i < num_build_tuples; ++i) {
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index c5a9dd7dcde..2ffaec3873d 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -67,7 +67,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index b860b454e41..c602c6d82d4 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -409,8 +409,8 @@ Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& 
tnode, RuntimeState*
     return Status::OK();
 }
 
-Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::open(state));
+Status NestedLoopJoinProbeOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::prepare(state));
     for (auto& conjunct : _join_conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
     }
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 4f50a98efdb..63ce1f2e2df 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -204,7 +204,7 @@ public:
     NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                                  const DescriptorTbl& descs);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override;
     Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index fe4fdb13b0b..755fa6cdc21 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -52,8 +52,8 @@ public:
         return Status::OK();
     }
 
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(Base::open(state));
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
         RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
         return vectorized::VExpr::open(_output_vexpr_ctxs, state);
     }
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 69ac9796866..af49c34d581 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -53,8 +53,8 @@ public:
         return Status::OK();
     }
 
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(Base::open(state));
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
         RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
         return vectorized::VExpr::open(_output_vexpr_ctxs, state);
     }
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 443f49b572f..d2d0d6a6827 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -208,7 +208,7 @@ Status OperatorXBase::init(const TPlanNode& tnode, 
RuntimeState* /*state*/) {
     return Status::OK();
 }
 
-Status OperatorXBase::open(RuntimeState* state) {
+Status OperatorXBase::prepare(RuntimeState* state) {
     for (auto& conjunct : _conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
     }
@@ -231,7 +231,7 @@ Status OperatorXBase::open(RuntimeState* state) {
         RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
     }
     if (_child && !is_source()) {
-        RETURN_IF_ERROR(_child->open(state));
+        RETURN_IF_ERROR(_child->prepare(state));
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 85a9059db97..1f46fa7ea61 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -101,7 +101,7 @@ public:
     [[nodiscard]] virtual Status init(const TDataSink& tsink) { return 
Status::OK(); }
 
     [[nodiscard]] virtual std::string get_name() const = 0;
-    [[nodiscard]] virtual Status open(RuntimeState* state) = 0;
+    [[nodiscard]] virtual Status prepare(RuntimeState* state) = 0;
     [[nodiscard]] virtual Status close(RuntimeState* state);
 
     [[nodiscard]] virtual Status set_child(OperatorPtr child) {
@@ -550,7 +550,7 @@ public:
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
 
-    Status open(RuntimeState* state) override { return Status::OK(); }
+    Status prepare(RuntimeState* state) override { return Status::OK(); }
     [[nodiscard]] bool is_finished(RuntimeState* state) const {
         auto result = state->get_sink_local_state_result();
         if (!result) {
@@ -807,7 +807,7 @@ public:
 
     // Tablets should be hold before open phase.
     [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return 
Status::OK(); }
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     [[nodiscard]] virtual Status get_block(RuntimeState* state, 
vectorized::Block* block,
                                            bool* eos) = 0;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 1569fc12170..b2444414dde 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -94,8 +94,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
     return Status::OK();
 }
 
-Status PartitionSortSinkOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::open(state));
+Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), 
_row_descriptor));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, 
_child->row_desc()));
     RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index a6f18ea8448..a158a3772a9 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -79,7 +79,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
         if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index f70a115d686..e003ea23240 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -135,8 +135,8 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* s
     return _agg_sink_operator->init(tnode, state);
 }
 
-Status PartitionedAggSinkOperatorX::open(RuntimeState* state) {
-    return _agg_sink_operator->open(state);
+Status PartitionedAggSinkOperatorX::prepare(RuntimeState* state) {
+    return _agg_sink_operator->prepare(state);
 }
 
 Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in_block,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 47ca1a47a42..a625ac20d36 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -299,7 +299,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 5a68c69a7e2..524a8b7db64 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -106,9 +106,9 @@ Status PartitionedAggSourceOperatorX::init(const TPlanNode& 
tnode, RuntimeState*
     return _agg_source_operator->init(tnode, state);
 }
 
-Status PartitionedAggSourceOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(OperatorXBase::open(state));
-    return _agg_source_operator->open(state);
+Status PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorXBase::prepare(state));
+    return _agg_source_operator->prepare(state);
 }
 
 Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index bc800f02fb5..24e56df1be8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -74,7 +74,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status close(RuntimeState* state) override;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 1aafab2a7cd..af84d515e4a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -501,16 +501,16 @@ Status PartitionedHashJoinProbeOperatorX::init(const 
TPlanNode& tnode, RuntimeSt
     return Status::OK();
 }
 
-Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
+Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
     // to avoid open _child twice
     auto child = std::move(_child);
-    RETURN_IF_ERROR(JoinProbeOperatorX::open(state));
+    RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
     RETURN_IF_ERROR(_inner_probe_operator->set_child(child));
     DCHECK(_build_side_child != nullptr);
     _inner_probe_operator->set_build_side_child(_build_side_child);
     RETURN_IF_ERROR(_inner_sink_operator->set_child(_build_side_child));
-    RETURN_IF_ERROR(_inner_probe_operator->open(state));
-    RETURN_IF_ERROR(_inner_sink_operator->open(state));
+    RETURN_IF_ERROR(_inner_probe_operator->prepare(state));
+    RETURN_IF_ERROR(_inner_sink_operator->prepare(state));
     _child = std::move(child);
     RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
     RETURN_IF_ERROR(_partitioner->open(state));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index cceac79b357..4bbdfa7371c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -118,7 +118,7 @@ public:
     PartitionedHashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& 
tnode, int operator_id,
                                       const DescriptorTbl& descs, uint32_t 
partition_count);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     [[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* 
block,
                                    bool* eos) override;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 24c91ea3180..83f094bf92e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -510,12 +510,12 @@ Status PartitionedHashJoinSinkOperatorX::init(const 
TPlanNode& tnode, RuntimeSta
     return Status::OK();
 }
 
-Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>::open(state));
+Status PartitionedHashJoinSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(_inner_sink_operator->set_child(_child));
     RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
     RETURN_IF_ERROR(_partitioner->open(state));
-    return _inner_sink_operator->open(state);
+    return _inner_sink_operator->prepare(state);
 }
 
 Status 
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 155a1500e28..d073a69516b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -101,7 +101,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index c3649a1f8c0..43f5a5f4944 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -63,9 +63,9 @@ Status RepeatOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status RepeatOperatorX::open(RuntimeState* state) {
+Status RepeatOperatorX::prepare(RuntimeState* state) {
     VLOG_CRITICAL << "VRepeatNode::open";
-    RETURN_IF_ERROR(OperatorXBase::open(state));
+    RETURN_IF_ERROR(OperatorXBase::prepare(state));
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     if (_output_tuple_desc == nullptr) {
         return Status::InternalError("Failed to get tuple descriptor.");
diff --git a/be/src/pipeline/exec/repeat_operator.h 
b/be/src/pipeline/exec/repeat_operator.h
index 2c2af32de0b..337ce1e08c5 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -68,7 +68,7 @@ public:
                     const DescriptorTbl& descs);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     bool need_more_input_data(RuntimeState* state) const override;
     Status pull(RuntimeState* state, vectorized::Block* output_block, bool* 
eos) const override;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 76d60bbdf1a..60e3663ee8c 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -71,8 +71,8 @@ Status ResultFileSinkOperatorX::init(const TDataSink& tsink) {
     return Status::OK();
 }
 
-Status ResultFileSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::open(state));
+Status ResultFileSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
     if (state->query_options().enable_parallel_outfile) {
         
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(),
 _buf_size,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index c3c5e345f77..6e570b8181b 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -60,7 +60,7 @@ public:
                             const std::vector<TExpr>& t_output_expr, 
DescriptorTbl& descs);
     Status init(const TDataSink& thrift_sink) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 256a90d8852..e9f1661b3a2 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -117,8 +117,8 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, 
const RowDescriptor& r
     _name = "ResultSink";
 }
 
-Status ResultSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<ResultSinkLocalState>::open(state));
+Status ResultSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<ResultSinkLocalState>::prepare(state));
     // prepare output_expr
     // From the thrift expressions create the real exprs.
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index 479343ed6d5..6659d19025f 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -146,7 +146,7 @@ class ResultSinkOperatorX final : public 
DataSinkOperatorX<ResultSinkLocalState>
 public:
     ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc,
                         const std::vector<TExpr>& select_exprs, const 
TResultSink& sink);
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index b1028be10d2..ea8ee0e5cbe 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1212,10 +1212,10 @@ Status ScanOperatorX<LocalStateType>::init(const 
TPlanNode& tnode, RuntimeState*
 }
 
 template <typename LocalStateType>
-Status ScanOperatorX<LocalStateType>::open(RuntimeState* state) {
+Status ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
     _input_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
-    RETURN_IF_ERROR(OperatorX<LocalStateType>::open(state));
+    RETURN_IF_ERROR(OperatorX<LocalStateType>::prepare(state));
 
     const auto slots = _output_tuple_desc->slots();
     for (auto* slot : slots) {
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index a7e769319a9..aace8f40e24 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -350,7 +350,7 @@ template <typename LocalStateType>
 class ScanOperatorX : public OperatorX<LocalStateType> {
 public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
     Status get_block_after_projects(RuntimeState* state, vectorized::Block* 
block,
                                     bool* eos) override {
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index af578637c31..d22943b740a 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -135,8 +135,8 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaScanOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(Base::open(state));
+Status SchemaScanOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
 
     // get dest tuple desc
     _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
diff --git a/be/src/pipeline/exec/schema_scan_operator.h 
b/be/src/pipeline/exec/schema_scan_operator.h
index 2d861002748..594901e812c 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -65,7 +65,7 @@ public:
     ~SchemaScanOperatorX() override = default;
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     [[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index ee1d1bc128c..be8191ecc0f 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -57,8 +57,8 @@ Status SetProbeSinkOperatorX<is_intersect>::init(const 
TPlanNode& tnode, Runtime
 }
 
 template <bool is_intersect>
-Status SetProbeSinkOperatorX<is_intersect>::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::open(state));
+Status SetProbeSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, 
_child->row_desc()));
     return vectorized::VExpr::open(_child_exprs, state);
 }
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 5e4098e863c..a90a9775d1b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -94,7 +94,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 82bf523c60a..4faeb975ef9 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -233,8 +233,8 @@ size_t 
SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state,
 }
 
 template <bool is_intersect>
-Status SetSinkOperatorX<is_intersect>::open(RuntimeState* state) {
-    RETURN_IF_ERROR(Base::open(state));
+Status SetSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, 
_child->row_desc()));
     return vectorized::VExpr::open(_child_exprs, state);
 }
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 4d57c0351f9..08f789f702a 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -87,7 +87,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index a1851d4ee41..12301c60501 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -114,8 +114,8 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status SortSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::open(state));
+Status SortSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), 
_row_descriptor));
     return _vsort_exec_exprs.open(state);
 }
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index d2edaf7c281..ee6d364fa57 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -64,7 +64,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
         if (_is_analytic_sort) {
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index b57d422309d..d8982ce1466 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -41,8 +41,8 @@ Status SortSourceOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status SortSourceOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(Base::open(state));
+Status SortSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
     // spill sort _child may be nullptr.
     if (_child) {
         RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), 
_row_descriptor));
diff --git a/be/src/pipeline/exec/sort_source_operator.h 
b/be/src/pipeline/exec/sort_source_operator.h
index 7902e4815bf..14ac7b81da9 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -49,7 +49,7 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     bool is_source() const override { return true; }
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 7893fd57bb4..5d1df0b4be0 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -116,9 +116,9 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
     return _sort_sink_operator->init(tnode, state);
 }
 
-Status SpillSortSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::open(state));
-    return _sort_sink_operator->open(state);
+Status SpillSortSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::prepare(state));
+    return _sort_sink_operator->prepare(state);
 }
 
 size_t SpillSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 0dda4e83e15..4094bb9a36f 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -69,7 +69,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution() const override {
         return _sort_sink_operator->required_data_distribution();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index d815bfb7d1b..ef777f19403 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -235,9 +235,9 @@ Status SpillSortSourceOperatorX::init(const TPlanNode& 
tnode, RuntimeState* stat
     return _sort_source_operator->init(tnode, state);
 }
 
-Status SpillSortSourceOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(OperatorXBase::open(state));
-    return _sort_source_operator->open(state);
+Status SpillSortSourceOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(OperatorXBase::prepare(state));
+    return _sort_source_operator->prepare(state);
 }
 
 Status SpillSortSourceOperatorX::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index fae64e051f4..1706f5dda72 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -73,7 +73,7 @@ public:
     ~SpillSortSourceOperatorX() override = default;
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status close(RuntimeState* state) override;
 
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index a2cc5e72b15..16ab1703059 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1167,8 +1167,8 @@ Status StreamingAggOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state)
     return Status::OK();
 }
 
-Status StreamingAggOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::open(state));
+Status StreamingAggOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::prepare(state));
     _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     DCHECK_EQ(_intermediate_tuple_desc->slots().size(), 
_output_tuple_desc->slots().size());
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 3ee52eeb6ec..a75cb16a436 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -205,7 +205,7 @@ public:
                           const DescriptorTbl& descs);
     ~StreamingAggOperatorX() override = default;
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) 
const override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override;
     bool need_more_input_data(RuntimeState* state) const override;
diff --git a/be/src/pipeline/exec/table_function_operator.cpp 
b/be/src/pipeline/exec/table_function_operator.cpp
index acd867fcc6d..975e5907177 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -287,8 +287,8 @@ Status TableFunctionOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
     return Status::OK();
 }
 
-Status TableFunctionOperatorX::open(doris::RuntimeState* state) {
-    RETURN_IF_ERROR(Base::open(state));
+Status TableFunctionOperatorX::prepare(doris::RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
     for (auto* fn : _fns) {
         RETURN_IF_ERROR(fn->prepare());
     }
diff --git a/be/src/pipeline/exec/table_function_operator.h 
b/be/src/pipeline/exec/table_function_operator.h
index 9aa26e9ae22..ada828cab6e 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -83,7 +83,7 @@ public:
     TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                            const DescriptorTbl& descs);
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
-    Status open(doris::RuntimeState* state) override;
+    Status prepare(doris::RuntimeState* state) override;
 
     bool need_more_input_data(RuntimeState* state) const override {
         auto& local_state = 
state->get_local_state(operator_id())->cast<TableFunctionLocalState>();
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index d04ae2130d8..4bbb5eba3e3 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -76,8 +76,8 @@ Status UnionSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status UnionSinkOperatorX::open(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::open(state));
+Status UnionSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::prepare(state));
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, 
_child->row_desc()));
     RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, 
_row_descriptor));
     // open const expr lists.
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 75b3adab49e..170b99f12f1 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -74,7 +74,7 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 8a00c35b04b..6619b623ef5 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -83,8 +83,8 @@ public:
         return Status::OK();
     }
 
-    Status open(RuntimeState* state) override {
-        static_cast<void>(Base::open(state));
+    Status prepare(RuntimeState* state) override {
+        static_cast<void>(Base::prepare(state));
         // Prepare const expr lists.
         for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) {
             RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, 
_row_descriptor));
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 1cf94af663e..45d99004750 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -66,8 +66,8 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, 
const int num_buckets
     return Status::OK();
 }
 
-Status LocalExchangeSinkOperatorX::open(RuntimeState* state) {
-    
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::open(state));
+Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) {
+    
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state));
     if (_type == ExchangeType::HASH_SHUFFLE || _type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
         RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
         RETURN_IF_ERROR(_partitioner->open(state));
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 148dab29604..16e40c2a428 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -113,7 +113,7 @@ public:
     Status init(ExchangeType type, const int num_buckets, const bool 
use_global_hash_shuffle,
                 const std::map<int, int>& shuffle_idx_to_instance_idx) 
override;
 
-    Status open(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index d6c8cecfef3..a71cdaafacc 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -78,7 +78,7 @@ public:
         _exchange_type = type;
         return Status::OK();
     }
-    Status open(RuntimeState* state) override { return Status::OK(); }
+    Status prepare(RuntimeState* state) override { return Status::OK(); }
     const RowDescriptor& intermediate_row_desc() const override {
         return _child->intermediate_row_desc();
     }
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 27980976d81..4ef3cff9dbd 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -86,8 +86,8 @@ Status Pipeline::add_operator(OperatorPtr& op, const int 
parallelism) {
 }
 
 Status Pipeline::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(_operators.back()->open(state));
-    RETURN_IF_ERROR(_sink->open(state));
+    RETURN_IF_ERROR(_operators.back()->prepare(state));
+    RETURN_IF_ERROR(_sink->prepare(state));
     _name.append(std::to_string(id()));
     _name.push_back('-');
     for (auto& op : _operators) {
diff --git a/be/test/pipeline/operator/agg_operator_test.cpp 
b/be/test/pipeline/operator/agg_operator_test.cpp
index 7f0c0e6ae07..de9af15c1ba 100644
--- a/be/test/pipeline/operator/agg_operator_test.cpp
+++ b/be/test/pipeline/operator/agg_operator_test.cpp
@@ -96,7 +96,7 @@ std::shared_ptr<AggSinkOperatorX> 
create_agg_sink_op(OperatorContext& ctx, bool
     op->_aggregate_evaluators.push_back(
             vectorized::create_mock_agg_fn_evaluator(ctx.pool, is_merge, 
without_key));
     op->_pool = &ctx.pool;
-    EXPECT_TRUE(op->open(&ctx.state).ok());
+    EXPECT_TRUE(op->prepare(&ctx.state).ok());
     return op;
 }
 
@@ -107,7 +107,7 @@ std::shared_ptr<AggSourceOperatorX> 
create_agg_source_op(OperatorContext& ctx, b
             new MockRowDescriptor 
{{std::make_shared<vectorized::DataTypeInt64>()}, &ctx.pool});
     op->_without_key = without_key;
     op->_needs_finalize = needs_finalize;
-    EXPECT_TRUE(op->open(&ctx.state).ok());
+    EXPECT_TRUE(op->prepare(&ctx.state).ok());
     return op;
 }
 
@@ -231,7 +231,7 @@ TEST(AggOperatorTestWithOutGroupBy, test_multi_input) {
             ctx.pool, MockSlotRef::create_mock_contexts(1, 
std::make_shared<const DataTypeInt64>()),
             false, true));
     sink_op->_pool = &ctx.pool;
-    EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+    EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
 
     auto source_op = std::make_shared<MockAggSourceOperator>();
     source_op->mock_row_descriptor.reset(
@@ -240,7 +240,7 @@ TEST(AggOperatorTestWithOutGroupBy, test_multi_input) {
                                    &ctx.pool});
     source_op->_without_key = true;
     source_op->_needs_finalize = true;
-    EXPECT_TRUE(source_op->open(&ctx.state).ok());
+    EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
 
     auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
 
@@ -297,7 +297,7 @@ TEST_F(AggOperatorTestWithGroupBy, 
test_need_finalize_only_key) {
     sink_op->_aggregate_evaluators.push_back(
             vectorized::create_mock_agg_fn_evaluator(ctx.pool, false, false));
     sink_op->_pool = &ctx.pool;
-    EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+    EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
     sink_op->_probe_expr_ctxs =
             
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
 
@@ -308,7 +308,7 @@ TEST_F(AggOperatorTestWithGroupBy, 
test_need_finalize_only_key) {
                                    &ctx.pool});
     source_op->_without_key = false;
     source_op->_needs_finalize = true;
-    EXPECT_TRUE(source_op->open(&ctx.state).ok());
+    EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
 
     auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
 
@@ -359,7 +359,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize) {
             ctx.pool, MockSlotRef::create_mock_contexts(1, 
std::make_shared<DataTypeInt64>()),
             false, false));
     sink_op->_pool = &ctx.pool;
-    EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+    EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
     sink_op->_probe_expr_ctxs =
             MockSlotRef::create_mock_contexts(0, 
std::make_shared<DataTypeInt64>());
 
@@ -370,7 +370,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize) {
                                    &ctx.pool});
     source_op->_without_key = false;
     source_op->_needs_finalize = true;
-    EXPECT_TRUE(source_op->open(&ctx.state).ok());
+    EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
 
     auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
 
@@ -425,7 +425,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
                 ctx.pool, MockSlotRef::create_mock_contexts(1, 
std::make_shared<DataTypeInt64>()),
                 false, false));
         sink_op->_pool = &ctx.pool;
-        EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+        EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
         sink_op->_probe_expr_ctxs =
                 MockSlotRef::create_mock_contexts(0, 
std::make_shared<DataTypeInt64>());
 
@@ -436,7 +436,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
                                        &ctx.pool});
         source_op->_without_key = false;
         source_op->_needs_finalize = false;
-        EXPECT_TRUE(source_op->open(&ctx.state).ok());
+        EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
 
         auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
 
@@ -466,7 +466,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
                 ctx.pool, MockSlotRef::create_mock_contexts(1, 
std::make_shared<DataTypeInt64>()),
                 true, false));
         sink_op->_pool = &ctx.pool;
-        EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+        EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
         sink_op->_probe_expr_ctxs =
                 MockSlotRef::create_mock_contexts(0, 
std::make_shared<DataTypeInt64>());
 
@@ -477,7 +477,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
                                        &ctx.pool});
         source_op->_without_key = false;
         source_op->_needs_finalize = true;
-        EXPECT_TRUE(source_op->open(&ctx.state).ok());
+        EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
 
         auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
 
diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp 
b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
index efa30c82b57..945eab84ec5 100644
--- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
@@ -77,7 +77,7 @@ auto create_exchange_sink(std::vector<ChannelInfo> 
channel_info) {
 
     std::shared_ptr<MockExchangeSinkOperatorX> op =
             std::make_shared<MockExchangeSinkOperatorX>(*ctx);
-    EXPECT_TRUE(op->open(&ctx->state));
+    EXPECT_TRUE(op->prepare(&ctx->state));
 
     auto local_state = std::make_unique<MockExchangeLocalState>(op.get(), 
&ctx->state);
 
diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp 
b/be/test/vec/exec/vfile_scanner_exception_test.cpp
index 4b6ce46bd88..bf0d35d9eba 100644
--- a/be/test/vec/exec/vfile_scanner_exception_test.cpp
+++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp
@@ -247,7 +247,7 @@ void VfileScannerExceptionTest::init() {
             std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode, 
0, *_desc_tbl, 1);
     _scan_node->_output_tuple_desc = 
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
     WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init 
scan_node");
-    WARN_IF_ERROR(_scan_node->open(&_runtime_state), "fail to open scan_node");
+    WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to open 
scan_node");
 
     auto local_state =
             pipeline::FileScanLocalState::create_unique(&_runtime_state, 
_scan_node.get());
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp 
b/be/test/vec/exec/vwal_scanner_test.cpp
index 2e6d4bf5cde..af32b8677d4 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -261,7 +261,7 @@ void VWalScannerTest::init() {
             std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode, 
0, *_desc_tbl, 1);
     _scan_node->_output_tuple_desc = 
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
     WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init 
scan_node");
-    WARN_IF_ERROR(_scan_node->open(&_runtime_state), "fail to prepare 
scan_node");
+    WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare 
scan_node");
 
     auto local_state =
             pipeline::FileScanLocalState::create_unique(&_runtime_state, 
_scan_node.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to