This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 47832e3e323 [refactor](rename) Rename open function in operator (#48402) 47832e3e323 is described below commit 47832e3e323db0dc36a9bfd13c595cb1beeb36cc Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Thu Feb 27 16:20:25 2025 +0800 [refactor](rename) Rename open function in operator (#48402) Change `open` to `prepare` in operator class. --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_source_operator.cpp | 4 ++-- be/src/pipeline/exec/analytic_source_operator.h | 2 +- be/src/pipeline/exec/cache_source_operator.h | 5 ----- be/src/pipeline/exec/datagen_operator.cpp | 4 ++-- be/src/pipeline/exec/datagen_operator.h | 2 +- .../distinct_streaming_aggregation_operator.cpp | 4 ++-- .../exec/distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/es_scan_operator.cpp | 4 ++-- be/src/pipeline/exec/es_scan_operator.h | 2 +- be/src/pipeline/exec/exchange_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/exchange_sink_operator.h | 2 +- be/src/pipeline/exec/exchange_source_operator.cpp | 4 ++-- be/src/pipeline/exec/exchange_source_operator.h | 2 +- be/src/pipeline/exec/file_scan_operator.cpp | 4 ++-- be/src/pipeline/exec/file_scan_operator.h | 2 +- .../exec/group_commit_block_sink_operator.cpp | 4 ++-- .../exec/group_commit_block_sink_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 ++-- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 4 ++-- be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/hive_table_sink_operator.h | 4 ++-- be/src/pipeline/exec/iceberg_table_sink_operator.h | 4 ++-- be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/jdbc_table_sink_operator.h | 2 +- .../pipeline/exec/memory_scratch_sink_operator.cpp | 4 ++-- .../pipeline/exec/memory_scratch_sink_operator.h | 2 +- .../pipeline/exec/multi_cast_data_stream_source.h | 4 ++-- .../exec/nested_loop_join_build_operator.cpp | 4 ++-- .../exec/nested_loop_join_build_operator.h | 2 +- .../exec/nested_loop_join_probe_operator.cpp | 4 ++-- .../exec/nested_loop_join_probe_operator.h | 2 +- be/src/pipeline/exec/olap_table_sink_operator.h | 4 ++-- be/src/pipeline/exec/olap_table_sink_v2_operator.h | 4 ++-- be/src/pipeline/exec/operator.cpp | 4 ++-- be/src/pipeline/exec/operator.h | 6 +++--- .../pipeline/exec/partition_sort_sink_operator.cpp | 4 ++-- .../pipeline/exec/partition_sort_sink_operator.h | 2 +- .../exec/partitioned_aggregation_sink_operator.cpp | 4 ++-- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../partitioned_aggregation_source_operator.cpp | 6 +++--- .../exec/partitioned_aggregation_source_operator.h | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 8 ++++---- .../exec/partitioned_hash_join_probe_operator.h | 2 +- .../exec/partitioned_hash_join_sink_operator.cpp | 6 +++--- .../exec/partitioned_hash_join_sink_operator.h | 2 +- be/src/pipeline/exec/repeat_operator.cpp | 4 ++-- be/src/pipeline/exec/repeat_operator.h | 2 +- be/src/pipeline/exec/result_file_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/result_file_sink_operator.h | 2 +- be/src/pipeline/exec/result_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/result_sink_operator.h | 2 +- be/src/pipeline/exec/scan_operator.cpp | 4 ++-- be/src/pipeline/exec/scan_operator.h | 2 +- be/src/pipeline/exec/schema_scan_operator.cpp | 4 ++-- be/src/pipeline/exec/schema_scan_operator.h | 2 +- be/src/pipeline/exec/set_probe_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/sort_sink_operator.h | 2 +- be/src/pipeline/exec/sort_source_operator.cpp | 4 ++-- be/src/pipeline/exec/sort_source_operator.h | 2 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +++--- be/src/pipeline/exec/spill_sort_sink_operator.h | 2 +- .../pipeline/exec/spill_sort_source_operator.cpp | 6 +++--- be/src/pipeline/exec/spill_sort_source_operator.h | 2 +- .../exec/streaming_aggregation_operator.cpp | 4 ++-- .../pipeline/exec/streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/table_function_operator.cpp | 4 ++-- be/src/pipeline/exec/table_function_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.cpp | 4 ++-- be/src/pipeline/exec/union_sink_operator.h | 2 +- be/src/pipeline/exec/union_source_operator.h | 4 ++-- .../local_exchange_sink_operator.cpp | 4 ++-- .../local_exchange/local_exchange_sink_operator.h | 2 +- .../local_exchange_source_operator.h | 2 +- be/src/pipeline/pipeline.cpp | 4 ++-- be/test/pipeline/operator/agg_operator_test.cpp | 24 +++++++++++----------- .../operator/exchange_sink_operator_test.cpp | 2 +- be/test/vec/exec/vfile_scanner_exception_test.cpp | 2 +- be/test/vec/exec/vwal_scanner_test.cpp | 2 +- 87 files changed, 149 insertions(+), 154 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index a21d8aefdce..0f1c9c9ffeb 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -770,8 +770,8 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status AggSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state)); +Status AggSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::prepare(state)); RETURN_IF_ERROR(_init_probe_expr_ctx(state)); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 7ee1d973be7..e664c3bf297 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -150,7 +150,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 939a924b9bd..fb3117c800f 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -644,8 +644,8 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status AnalyticSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state)); +Status AnalyticSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::prepare(state)); for (const auto& ctx : _agg_expr_ctxs) { RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc())); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 31b15d5a245..99e09372302 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -176,7 +176,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index ada7fa511a8..44e334de6a1 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -86,8 +86,8 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block return Status::OK(); } -Status AnalyticSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state)); +Status AnalyticSourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state)); DCHECK(_child->row_desc().is_prefix_of(_row_descriptor)); return Status::OK(); } diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index be1fdb2c9e5..b1828eeabe6 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -50,7 +50,7 @@ public: bool is_source() const override { return true; } - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; private: friend class AnalyticLocalState; diff --git a/be/src/pipeline/exec/cache_source_operator.h b/be/src/pipeline/exec/cache_source_operator.h index 146c984d04a..651f9ff5596 100644 --- a/be/src/pipeline/exec/cache_source_operator.h +++ b/be/src/pipeline/exec/cache_source_operator.h @@ -81,11 +81,6 @@ public: bool is_source() const override { return true; } - Status open(RuntimeState* state) override { - static_cast<void>(Base::open(state)); - return Status::OK(); - } - const RowDescriptor& intermediate_row_desc() const override { return _child->intermediate_row_desc(); } diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 4e6e4ec513f..7118574282a 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status DataGenSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state)); +Status DataGenSourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state)); // get tuple desc _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index 8284db35c76..a45f7381f57 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -58,7 +58,7 @@ public: #endif Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 6187e356351..24df662bb57 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -363,8 +363,8 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* return Status::OK(); } -Status DistinctStreamingAggOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state)); +Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state)); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 4c5fcd5efa7..1066ea37236 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -98,7 +98,7 @@ public: DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; bool need_more_input_data(RuntimeState* state) const override; diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index 2cb3cd5e0b2..030753cd231 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -136,8 +136,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status EsScanOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state)); +Status EsScanOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state)); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); if (_tuple_desc == nullptr) { diff --git a/be/src/pipeline/exec/es_scan_operator.h b/be/src/pipeline/exec/es_scan_operator.h index 6e64110997e..daf391ae982 100644 --- a/be/src/pipeline/exec/es_scan_operator.h +++ b/be/src/pipeline/exec/es_scan_operator.h @@ -72,7 +72,7 @@ public: const DescriptorTbl& descs, int parallel_tasks); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; private: friend class EsScanLocalState; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 71311e4d51f..18f3f0740c5 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -313,8 +313,8 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { return Status::OK(); } -Status ExchangeSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state)); +Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::prepare(state)); _state = state; _compression_type = state->fragement_transmission_compression_type(); if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index eb82474d694..3254992bbd5 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -195,7 +195,7 @@ public: RuntimeState* state() { return _state; } - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 2aefdf8cd3d..e81e2e5c7b4 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -128,8 +128,8 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state return Status::OK(); } -Status ExchangeSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state)); +Status ExchangeSourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state)); DCHECK_GT(_num_senders, 0); if (_is_merging) { diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index ff9c5840033..414b0f79c34 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -70,7 +70,7 @@ public: ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, int num_senders); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 8d9dc657501..4fcf51f8320 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -125,8 +125,8 @@ Status FileScanLocalState::_process_conjuncts(RuntimeState* state) { return Status::OK(); } -Status FileScanOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::open(state)); +Status FileScanOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state)); if (state->get_query_ctx() != nullptr && state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) { TFileScanRangeParams& params = diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 8b7b25a025e..73b8454c532 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -77,7 +77,7 @@ public: _output_tuple_id = tnode.file_scan_node.tuple_id; } - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; bool is_file_scan_operator() const override { return true; } diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 9f99d55d3ea..958a60b66e3 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -269,8 +269,8 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) { return Status::OK(); } -Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); +Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); // get table's tuple descriptor _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); if (_output_tuple_desc == nullptr) { diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h b/be/src/pipeline/exec/group_commit_block_sink_operator.h index a4d2fa16f96..89dbf76a6bc 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.h +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h @@ -100,7 +100,7 @@ public: Status init(const TDataSink& sink) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* block, bool eos) override; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index e271dbacba0..e2b911a583b 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -574,8 +574,8 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st return Status::OK(); } -Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::open(state)); +Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::prepare(state)); if (_is_broadcast_join) { if (state->enable_share_hash_table_for_broadcast_join()) { _shared_hashtable_controller = diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index b1f0cdca640..62a4be16792 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -121,7 +121,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index e3212800d5c..a55379a02ad 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -493,8 +493,8 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status HashJoinProbeOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::open(state)); +Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state)); // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need // insert to output block of hash join. // _left_output_slots_flags : column of left table need to output set flag = true diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index d36658b995a..7928c4bc411 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -129,7 +129,7 @@ public: HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; Status pull(doris::RuntimeState* state, vectorized::Block* output_block, diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h b/be/src/pipeline/exec/hive_table_sink_operator.h index 9304d15962c..725fd8e0490 100644 --- a/be/src/pipeline/exec/hive_table_sink_operator.h +++ b/be/src/pipeline/exec/hive_table_sink_operator.h @@ -60,8 +60,8 @@ public: return Status::OK(); } - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Base::open(state)); + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.h b/be/src/pipeline/exec/iceberg_table_sink_operator.h index 9b7ed510936..284657a7219 100644 --- a/be/src/pipeline/exec/iceberg_table_sink_operator.h +++ b/be/src/pipeline/exec/iceberg_table_sink_operator.h @@ -59,8 +59,8 @@ public: return Status::OK(); } - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Base::open(state)); + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp index df47a6ea3ac..d0abe6aa0d2 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp @@ -39,8 +39,8 @@ Status JdbcTableSinkOperatorX::init(const TDataSink& thrift_sink) { return Status::OK(); } -Status JdbcTableSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::open(state)); +Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); return Status::OK(); diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h b/be/src/pipeline/exec/jdbc_table_sink_operator.h index a0dae301a5f..d1b617402b2 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.h +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h @@ -45,7 +45,7 @@ public: JdbcTableSinkOperatorX(const RowDescriptor& row_desc, int operator_id, const std::vector<TExpr>& select_exprs); Status init(const TDataSink& thrift_sink) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index 69ef3ec8b58..86afd607432 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -77,8 +77,8 @@ Status MemoryScratchSinkOperatorX::init(const TDataSink& thrift_sink) { return Status::OK(); } -Status MemoryScratchSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::open(state)); +Status MemoryScratchSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); _timezone_obj = state->timezone_obj(); RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.h b/be/src/pipeline/exec/memory_scratch_sink_operator.h index 352826955fc..6b5c15feed7 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.h +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.h @@ -56,7 +56,7 @@ public: MemoryScratchSinkOperatorX(const RowDescriptor& row_desc, int operator_id, const std::vector<TExpr>& t_output_expr); Status init(const TDataSink& thrift_sink) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index a69ef52ba1c..bfe6b00a29a 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -91,8 +91,8 @@ public: }; ~MultiCastDataStreamerSourceOperatorX() override = default; - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Base::open(state)); + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); // init profile for runtime filter // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); if (_t_data_stream_sink.__isset.output_exprs) { diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index a4dd990f666..bc6c7e274da 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -105,8 +105,8 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta return Status::OK(); } -Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::open(state)); +Status NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::prepare(state)); size_t num_build_tuples = _child->row_desc().tuple_descriptors().size(); for (size_t i = 0; i < num_build_tuples; ++i) { diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index c5a9dd7dcde..2ffaec3873d 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -67,7 +67,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index b860b454e41..c602c6d82d4 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -409,8 +409,8 @@ Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* return Status::OK(); } -Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::open(state)); +Status NestedLoopJoinProbeOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::prepare(state)); for (auto& conjunct : _join_conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 4f50a98efdb..63ce1f2e2df 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -204,7 +204,7 @@ public: NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; Status pull(doris::RuntimeState* state, vectorized::Block* output_block, diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index fe4fdb13b0b..755fa6cdc21 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -52,8 +52,8 @@ public: return Status::OK(); } - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Base::open(state)); + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index 69ac9796866..af49c34d581 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -53,8 +53,8 @@ public: return Status::OK(); } - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Base::open(state)); + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); return vectorized::VExpr::open(_output_vexpr_ctxs, state); } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 443f49b572f..d2d0d6a6827 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -208,7 +208,7 @@ Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) { return Status::OK(); } -Status OperatorXBase::open(RuntimeState* state) { +Status OperatorXBase::prepare(RuntimeState* state) { for (auto& conjunct : _conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); } @@ -231,7 +231,7 @@ Status OperatorXBase::open(RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::open(projections, state)); } if (_child && !is_source()) { - RETURN_IF_ERROR(_child->open(state)); + RETURN_IF_ERROR(_child->prepare(state)); } return Status::OK(); } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 85a9059db97..1f46fa7ea61 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -101,7 +101,7 @@ public: [[nodiscard]] virtual Status init(const TDataSink& tsink) { return Status::OK(); } [[nodiscard]] virtual std::string get_name() const = 0; - [[nodiscard]] virtual Status open(RuntimeState* state) = 0; + [[nodiscard]] virtual Status prepare(RuntimeState* state) = 0; [[nodiscard]] virtual Status close(RuntimeState* state); [[nodiscard]] virtual Status set_child(OperatorPtr child) { @@ -550,7 +550,7 @@ public: return Status::InternalError("init() is only implemented in local exchange!"); } - Status open(RuntimeState* state) override { return Status::OK(); } + Status prepare(RuntimeState* state) override { return Status::OK(); } [[nodiscard]] bool is_finished(RuntimeState* state) const { auto result = state->get_sink_local_state_result(); if (!result) { @@ -807,7 +807,7 @@ public: // Tablets should be hold before open phase. [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); } - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 1569fc12170..b2444414dde 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -94,8 +94,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st return Status::OK(); } -Status PartitionSortSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::open(state)); +Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::prepare(state)); RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child->row_desc())); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index a6f18ea8448..a158a3772a9 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -79,7 +79,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index f70a115d686..e003ea23240 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -135,8 +135,8 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* s return _agg_sink_operator->init(tnode, state); } -Status PartitionedAggSinkOperatorX::open(RuntimeState* state) { - return _agg_sink_operator->open(state); +Status PartitionedAggSinkOperatorX::prepare(RuntimeState* state) { + return _agg_sink_operator->prepare(state); } Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 47ca1a47a42..a625ac20d36 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -299,7 +299,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 5a68c69a7e2..524a8b7db64 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -106,9 +106,9 @@ Status PartitionedAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* return _agg_source_operator->init(tnode, state); } -Status PartitionedAggSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(OperatorXBase::open(state)); - return _agg_source_operator->open(state); +Status PartitionedAggSourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(OperatorXBase::prepare(state)); + return _agg_source_operator->prepare(state); } Status PartitionedAggSourceOperatorX::close(RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index bc800f02fb5..24e56df1be8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -74,7 +74,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 1aafab2a7cd..af84d515e4a 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -501,16 +501,16 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt return Status::OK(); } -Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) { +Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) { // to avoid open _child twice auto child = std::move(_child); - RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); + RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); RETURN_IF_ERROR(_inner_probe_operator->set_child(child)); DCHECK(_build_side_child != nullptr); _inner_probe_operator->set_build_side_child(_build_side_child); RETURN_IF_ERROR(_inner_sink_operator->set_child(_build_side_child)); - RETURN_IF_ERROR(_inner_probe_operator->open(state)); - RETURN_IF_ERROR(_inner_sink_operator->open(state)); + RETURN_IF_ERROR(_inner_probe_operator->prepare(state)); + RETURN_IF_ERROR(_inner_sink_operator->prepare(state)); _child = std::move(child); RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index cceac79b357..4bbdfa7371c 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -118,7 +118,7 @@ public: PartitionedHashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs, uint32_t partition_count); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; [[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 24c91ea3180..83f094bf92e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -510,12 +510,12 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta return Status::OK(); } -Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>::open(state)); +Status PartitionedHashJoinSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>::prepare(state)); RETURN_IF_ERROR(_inner_sink_operator->set_child(_child)); RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); - return _inner_sink_operator->open(state); + return _inner_sink_operator->prepare(state); } Status PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 155a1500e28..d073a69516b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -101,7 +101,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index c3649a1f8c0..43f5a5f4944 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -63,9 +63,9 @@ Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status RepeatOperatorX::open(RuntimeState* state) { +Status RepeatOperatorX::prepare(RuntimeState* state) { VLOG_CRITICAL << "VRepeatNode::open"; - RETURN_IF_ERROR(OperatorXBase::open(state)); + RETURN_IF_ERROR(OperatorXBase::prepare(state)); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); if (_output_tuple_desc == nullptr) { return Status::InternalError("Failed to get tuple descriptor."); diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index 2c2af32de0b..337ce1e08c5 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -68,7 +68,7 @@ public: const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; bool need_more_input_data(RuntimeState* state) const override; Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) const override; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 76d60bbdf1a..60e3663ee8c 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -71,8 +71,8 @@ Status ResultFileSinkOperatorX::init(const TDataSink& tsink) { return Status::OK(); } -Status ResultFileSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::open(state)); +Status ResultFileSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); if (state->query_options().enable_parallel_outfile) { RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(), _buf_size, diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index c3c5e345f77..6e570b8181b 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -60,7 +60,7 @@ public: const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs); Status init(const TDataSink& thrift_sink) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 256a90d8852..e9f1661b3a2 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -117,8 +117,8 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r _name = "ResultSink"; } -Status ResultSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<ResultSinkLocalState>::open(state)); +Status ResultSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<ResultSinkLocalState>::prepare(state)); // prepare output_expr // From the thrift expressions create the real exprs. RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 479343ed6d5..6659d19025f 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -146,7 +146,7 @@ class ResultSinkOperatorX final : public DataSinkOperatorX<ResultSinkLocalState> public: ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs, const TResultSink& sink); - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index b1028be10d2..ea8ee0e5cbe 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1212,10 +1212,10 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState* } template <typename LocalStateType> -Status ScanOperatorX<LocalStateType>::open(RuntimeState* state) { +Status ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) { _input_tuple_desc = state->desc_tbl().get_tuple_descriptor(_input_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); - RETURN_IF_ERROR(OperatorX<LocalStateType>::open(state)); + RETURN_IF_ERROR(OperatorX<LocalStateType>::prepare(state)); const auto slots = _output_tuple_desc->slots(); for (auto* slot : slots) { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index a7e769319a9..aace8f40e24 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -350,7 +350,7 @@ template <typename LocalStateType> class ScanOperatorX : public OperatorX<LocalStateType> { public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) override { diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index af578637c31..d22943b740a 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -135,8 +135,8 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status SchemaScanOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); +Status SchemaScanOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); // get dest tuple desc _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h index 2d861002748..594901e812c 100644 --- a/be/src/pipeline/exec/schema_scan_operator.h +++ b/be/src/pipeline/exec/schema_scan_operator.h @@ -65,7 +65,7 @@ public: ~SchemaScanOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index ee1d1bc128c..be8191ecc0f 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -57,8 +57,8 @@ Status SetProbeSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, Runtime } template <bool is_intersect> -Status SetProbeSinkOperatorX<is_intersect>::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::open(state)); +Status SetProbeSinkOperatorX<is_intersect>::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc())); return vectorized::VExpr::open(_child_exprs, state); } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 5e4098e863c..a90a9775d1b 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -94,7 +94,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 82bf523c60a..4faeb975ef9 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -233,8 +233,8 @@ size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state, } template <bool is_intersect> -Status SetSinkOperatorX<is_intersect>::open(RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); +Status SetSinkOperatorX<is_intersect>::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc())); return vectorized::VExpr::open(_child_exprs, state); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 4d57c0351f9..08f789f702a 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -87,7 +87,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index a1851d4ee41..12301c60501 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -114,8 +114,8 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status SortSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::open(state)); +Status SortSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::prepare(state)); RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); return _vsort_exec_exprs.open(state); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index d2edaf7c281..ee6d364fa57 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -64,7 +64,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index b57d422309d..d8982ce1466 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -41,8 +41,8 @@ Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status SortSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); +Status SortSourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); // spill sort _child may be nullptr. if (_child) { RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index 7902e4815bf..14ac7b81da9 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -49,7 +49,7 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; bool is_source() const override { return true; } diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 7893fd57bb4..5d1df0b4be0 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -116,9 +116,9 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return _sort_sink_operator->init(tnode, state); } -Status SpillSortSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::open(state)); - return _sort_sink_operator->open(state); +Status SpillSortSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::prepare(state)); + return _sort_sink_operator->prepare(state); } size_t SpillSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 0dda4e83e15..4094bb9a36f 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -69,7 +69,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { return _sort_sink_operator->required_data_distribution(); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index d815bfb7d1b..ef777f19403 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -235,9 +235,9 @@ Status SpillSortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* stat return _sort_source_operator->init(tnode, state); } -Status SpillSortSourceOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(OperatorXBase::open(state)); - return _sort_source_operator->open(state); +Status SpillSortSourceOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(OperatorXBase::prepare(state)); + return _sort_source_operator->prepare(state); } Status SpillSortSourceOperatorX::close(RuntimeState* state) { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index fae64e051f4..1706f5dda72 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -73,7 +73,7 @@ public: ~SpillSortSourceOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status close(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index a2cc5e72b15..16ab1703059 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1167,8 +1167,8 @@ Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status StreamingAggOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::open(state)); +Status StreamingAggOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::prepare(state)); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 3ee52eeb6ec..a75cb16a436 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -205,7 +205,7 @@ public: const DescriptorTbl& descs); ~StreamingAggOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; bool need_more_input_data(RuntimeState* state) const override; diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index acd867fcc6d..975e5907177 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -287,8 +287,8 @@ Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -Status TableFunctionOperatorX::open(doris::RuntimeState* state) { - RETURN_IF_ERROR(Base::open(state)); +Status TableFunctionOperatorX::prepare(doris::RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); for (auto* fn : _fns) { RETURN_IF_ERROR(fn->prepare()); } diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index 9aa26e9ae22..ada828cab6e 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -83,7 +83,7 @@ public: TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(doris::RuntimeState* state) override; + Status prepare(doris::RuntimeState* state) override; bool need_more_input_data(RuntimeState* state) const override { auto& local_state = state->get_local_state(operator_id())->cast<TableFunctionLocalState>(); diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index d04ae2130d8..4bbb5eba3e3 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -76,8 +76,8 @@ Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -Status UnionSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::open(state)); +Status UnionSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::prepare(state)); RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _row_descriptor)); // open const expr lists. diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 75b3adab49e..170b99f12f1 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -74,7 +74,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 8a00c35b04b..6619b623ef5 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -83,8 +83,8 @@ public: return Status::OK(); } - Status open(RuntimeState* state) override { - static_cast<void>(Base::open(state)); + Status prepare(RuntimeState* state) override { + static_cast<void>(Base::prepare(state)); // Prepare const expr lists. for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) { RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, _row_descriptor)); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 1cf94af663e..45d99004750 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -66,8 +66,8 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets return Status::OK(); } -Status LocalExchangeSinkOperatorX::open(RuntimeState* state) { - RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::open(state)); +Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state)); if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 148dab29604..16e40c2a428 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -113,7 +113,7 @@ public: Status init(ExchangeType type, const int num_buckets, const bool use_global_hash_shuffle, const std::map<int, int>& shuffle_idx_to_instance_idx) override; - Status open(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index d6c8cecfef3..a71cdaafacc 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -78,7 +78,7 @@ public: _exchange_type = type; return Status::OK(); } - Status open(RuntimeState* state) override { return Status::OK(); } + Status prepare(RuntimeState* state) override { return Status::OK(); } const RowDescriptor& intermediate_row_desc() const override { return _child->intermediate_row_desc(); } diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 27980976d81..4ef3cff9dbd 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -86,8 +86,8 @@ Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) { } Status Pipeline::prepare(RuntimeState* state) { - RETURN_IF_ERROR(_operators.back()->open(state)); - RETURN_IF_ERROR(_sink->open(state)); + RETURN_IF_ERROR(_operators.back()->prepare(state)); + RETURN_IF_ERROR(_sink->prepare(state)); _name.append(std::to_string(id())); _name.push_back('-'); for (auto& op : _operators) { diff --git a/be/test/pipeline/operator/agg_operator_test.cpp b/be/test/pipeline/operator/agg_operator_test.cpp index 7f0c0e6ae07..de9af15c1ba 100644 --- a/be/test/pipeline/operator/agg_operator_test.cpp +++ b/be/test/pipeline/operator/agg_operator_test.cpp @@ -96,7 +96,7 @@ std::shared_ptr<AggSinkOperatorX> create_agg_sink_op(OperatorContext& ctx, bool op->_aggregate_evaluators.push_back( vectorized::create_mock_agg_fn_evaluator(ctx.pool, is_merge, without_key)); op->_pool = &ctx.pool; - EXPECT_TRUE(op->open(&ctx.state).ok()); + EXPECT_TRUE(op->prepare(&ctx.state).ok()); return op; } @@ -107,7 +107,7 @@ std::shared_ptr<AggSourceOperatorX> create_agg_source_op(OperatorContext& ctx, b new MockRowDescriptor {{std::make_shared<vectorized::DataTypeInt64>()}, &ctx.pool}); op->_without_key = without_key; op->_needs_finalize = needs_finalize; - EXPECT_TRUE(op->open(&ctx.state).ok()); + EXPECT_TRUE(op->prepare(&ctx.state).ok()); return op; } @@ -231,7 +231,7 @@ TEST(AggOperatorTestWithOutGroupBy, test_multi_input) { ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<const DataTypeInt64>()), false, true)); sink_op->_pool = &ctx.pool; - EXPECT_TRUE(sink_op->open(&ctx.state).ok()); + EXPECT_TRUE(sink_op->prepare(&ctx.state).ok()); auto source_op = std::make_shared<MockAggSourceOperator>(); source_op->mock_row_descriptor.reset( @@ -240,7 +240,7 @@ TEST(AggOperatorTestWithOutGroupBy, test_multi_input) { &ctx.pool}); source_op->_without_key = true; source_op->_needs_finalize = true; - EXPECT_TRUE(source_op->open(&ctx.state).ok()); + EXPECT_TRUE(source_op->prepare(&ctx.state).ok()); auto shared_state = init_sink_and_source(sink_op, source_op, ctx); @@ -297,7 +297,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize_only_key) { sink_op->_aggregate_evaluators.push_back( vectorized::create_mock_agg_fn_evaluator(ctx.pool, false, false)); sink_op->_pool = &ctx.pool; - EXPECT_TRUE(sink_op->open(&ctx.state).ok()); + EXPECT_TRUE(sink_op->prepare(&ctx.state).ok()); sink_op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>()); @@ -308,7 +308,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize_only_key) { &ctx.pool}); source_op->_without_key = false; source_op->_needs_finalize = true; - EXPECT_TRUE(source_op->open(&ctx.state).ok()); + EXPECT_TRUE(source_op->prepare(&ctx.state).ok()); auto shared_state = init_sink_and_source(sink_op, source_op, ctx); @@ -359,7 +359,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize) { ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()), false, false)); sink_op->_pool = &ctx.pool; - EXPECT_TRUE(sink_op->open(&ctx.state).ok()); + EXPECT_TRUE(sink_op->prepare(&ctx.state).ok()); sink_op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared<DataTypeInt64>()); @@ -370,7 +370,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize) { &ctx.pool}); source_op->_without_key = false; source_op->_needs_finalize = true; - EXPECT_TRUE(source_op->open(&ctx.state).ok()); + EXPECT_TRUE(source_op->prepare(&ctx.state).ok()); auto shared_state = init_sink_and_source(sink_op, source_op, ctx); @@ -425,7 +425,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) { ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()), false, false)); sink_op->_pool = &ctx.pool; - EXPECT_TRUE(sink_op->open(&ctx.state).ok()); + EXPECT_TRUE(sink_op->prepare(&ctx.state).ok()); sink_op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared<DataTypeInt64>()); @@ -436,7 +436,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) { &ctx.pool}); source_op->_without_key = false; source_op->_needs_finalize = false; - EXPECT_TRUE(source_op->open(&ctx.state).ok()); + EXPECT_TRUE(source_op->prepare(&ctx.state).ok()); auto shared_state = init_sink_and_source(sink_op, source_op, ctx); @@ -466,7 +466,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) { ctx.pool, MockSlotRef::create_mock_contexts(1, std::make_shared<DataTypeInt64>()), true, false)); sink_op->_pool = &ctx.pool; - EXPECT_TRUE(sink_op->open(&ctx.state).ok()); + EXPECT_TRUE(sink_op->prepare(&ctx.state).ok()); sink_op->_probe_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared<DataTypeInt64>()); @@ -477,7 +477,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) { &ctx.pool}); source_op->_without_key = false; source_op->_needs_finalize = true; - EXPECT_TRUE(source_op->open(&ctx.state).ok()); + EXPECT_TRUE(source_op->prepare(&ctx.state).ok()); auto shared_state = init_sink_and_source(sink_op, source_op, ctx); diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp b/be/test/pipeline/operator/exchange_sink_operator_test.cpp index efa30c82b57..945eab84ec5 100644 --- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp +++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp @@ -77,7 +77,7 @@ auto create_exchange_sink(std::vector<ChannelInfo> channel_info) { std::shared_ptr<MockExchangeSinkOperatorX> op = std::make_shared<MockExchangeSinkOperatorX>(*ctx); - EXPECT_TRUE(op->open(&ctx->state)); + EXPECT_TRUE(op->prepare(&ctx->state)); auto local_state = std::make_unique<MockExchangeLocalState>(op.get(), &ctx->state); diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp b/be/test/vec/exec/vfile_scanner_exception_test.cpp index 4b6ce46bd88..bf0d35d9eba 100644 --- a/be/test/vec/exec/vfile_scanner_exception_test.cpp +++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp @@ -247,7 +247,7 @@ void VfileScannerExceptionTest::init() { std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode, 0, *_desc_tbl, 1); _scan_node->_output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id); WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init scan_node"); - WARN_IF_ERROR(_scan_node->open(&_runtime_state), "fail to open scan_node"); + WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to open scan_node"); auto local_state = pipeline::FileScanLocalState::create_unique(&_runtime_state, _scan_node.get()); diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 2e6d4bf5cde..af32b8677d4 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -261,7 +261,7 @@ void VWalScannerTest::init() { std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode, 0, *_desc_tbl, 1); _scan_node->_output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id); WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init scan_node"); - WARN_IF_ERROR(_scan_node->open(&_runtime_state), "fail to prepare scan_node"); + WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare scan_node"); auto local_state = pipeline::FileScanLocalState::create_unique(&_runtime_state, _scan_node.get()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org