This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e0a2e861cf [pipelineX](refactor) rename functions (#28846)
9e0a2e861cf is described below

commit 9e0a2e861cf05fcb7cb22fbcfe95955b16ab7f73
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Dec 22 17:24:39 2023 +0800

    [pipelineX](refactor) rename functions (#28846)
---
 be/src/pipeline/exec/aggregation_sink_operator.h           | 4 ++--
 be/src/pipeline/exec/analytic_sink_operator.h              | 4 ++--
 be/src/pipeline/exec/assert_num_rows_operator.h            | 2 +-
 be/src/pipeline/exec/exchange_source_operator.h            | 2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h                 | 2 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h             | 2 +-
 be/src/pipeline/exec/nested_loop_join_build_operator.h     | 2 +-
 be/src/pipeline/exec/nested_loop_join_probe_operator.h     | 2 +-
 be/src/pipeline/exec/partition_sort_sink_operator.h        | 4 ++--
 be/src/pipeline/exec/scan_operator.h                       | 2 +-
 be/src/pipeline/exec/set_probe_sink_operator.h             | 2 +-
 be/src/pipeline/exec/set_sink_operator.h                   | 2 +-
 be/src/pipeline/exec/sort_sink_operator.h                  | 4 ++--
 be/src/pipeline/exec/streaming_aggregation_sink_operator.h | 2 +-
 be/src/pipeline/pipeline.h                                 | 2 +-
 be/src/pipeline/pipeline_x/operator.h                      | 4 ++--
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++----
 17 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 2cd6ef50939..97be9dcd6a3 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -366,12 +366,12 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
 
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_probe_expr_ctxs.empty()) {
             return _needs_finalize || 
DataSinkOperatorX<LocalStateType>::_child_x
                                               ->ignore_data_distribution()
                            ? DataDistribution(ExchangeType::PASSTHROUGH)
-                           : 
DataSinkOperatorX<LocalStateType>::get_local_exchange_type();
+                           : 
DataSinkOperatorX<LocalStateType>::required_data_distribution();
         }
         return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 14ed8c815b1..3e0eb85f76d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -107,7 +107,7 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_partition_by_eq_expr_ctxs.empty()) {
             return {ExchangeType::PASSTHROUGH};
         } else if (_order_by_eq_expr_ctxs.empty()) {
@@ -115,7 +115,7 @@ public:
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
-        return 
DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type();
+        return 
DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
     }
 
 private:
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h 
b/be/src/pipeline/exec/assert_num_rows_operator.h
index 1e796b622dc..bb5e65168b6 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -57,7 +57,7 @@ public:
 
     [[nodiscard]] bool is_source() const override { return false; }
 
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         return {ExchangeType::PASSTHROUGH};
     }
 
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 221a43779a1..b621da38072 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -117,7 +117,7 @@ public:
         return _sub_plan_query_statistics_recvr;
     }
 
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
             return {ExchangeType::NOOP};
         }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index ecf0a4a3122..24faa4115dd 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -156,7 +156,7 @@ public:
                                               ._should_build_hash_table;
     }
 
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         } else if (_is_broadcast_join) {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 16b455e4f6c..5dde597ec76 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -163,7 +163,7 @@ public:
                 SourceState& source_state) const override;
 
     bool need_more_input_data(RuntimeState* state) const override;
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         }
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index daa976b4e78..ea0820253cc 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -102,7 +102,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
 
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         }
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 5e57399eae8..bc8913f5d08 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -227,7 +227,7 @@ public:
         return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
     }
 
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         }
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 1a47e0fa133..486e7056213 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -105,9 +105,9 @@ public:
     Status open(RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
-            return 
DataSinkOperatorX<PartitionSortSinkLocalState>::get_local_exchange_type();
+            return 
DataSinkOperatorX<PartitionSortSinkLocalState>::required_data_distribution();
         }
         return {ExchangeType::PASSTHROUGH};
     }
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 9bc42453c79..3690e9eb39c 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -434,7 +434,7 @@ public:
 
     TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
 
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_col_distribute_ids.empty() || 
OperatorX<LocalStateType>::ignore_data_distribution()) {
             // 1. `_col_distribute_ids` is empty means storage distribution is 
not effective, so we prefer to do local shuffle.
             // 2. `ignore_data_distribution()` returns true means we ignore 
the distribution.
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index a86bf491721..6f453ff31fc 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -144,7 +144,7 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 375906b5aa3..635d1ee8675 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -129,7 +129,7 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 3146e915eef..2f5512e108b 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -93,12 +93,12 @@ public:
     Status open(RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         if (_merge_by_exchange) {
             // The current sort node is used for the ORDER BY
             return {ExchangeType::PASSTHROUGH};
         }
-        return 
DataSinkOperatorX<SortSinkLocalState>::get_local_exchange_type();
+        return 
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
     }
 
 private:
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index ef7f71b7e29..a7fcdcf847b 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -120,7 +120,7 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
-    DataDistribution get_local_exchange_type() const override {
+    DataDistribution required_data_distribution() const override {
         return {ExchangeType::PASSTHROUGH};
     }
 };
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 305676856a0..2775c45019e 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -148,7 +148,7 @@ public:
         }
     }
     void init_data_distribution() {
-        set_data_distribution(operatorXs.front()->get_local_exchange_type());
+        
set_data_distribution(operatorXs.front()->required_data_distribution());
     }
     void set_data_distribution(const DataDistribution& data_distribution) {
         _data_distribution = data_distribution;
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index fc95785924b..da52706b56c 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -181,7 +181,7 @@ public:
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
     [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
-    [[nodiscard]] virtual DataDistribution get_local_exchange_type() const {
+    [[nodiscard]] virtual DataDistribution required_data_distribution() const {
         return _child_x && _child_x->ignore_data_distribution() && !is_source()
                        ? DataDistribution(ExchangeType::PASSTHROUGH)
                        : DataDistribution(ExchangeType::NOOP);
@@ -481,7 +481,7 @@ public:
     }
 
     virtual void get_dependency(std::vector<DependencySPtr>& dependency, 
QueryContext* ctx) = 0;
-    virtual DataDistribution get_local_exchange_type() const {
+    virtual DataDistribution required_data_distribution() const {
         return _child_x && _child_x->ignore_data_distribution()
                        ? DataDistribution(ExchangeType::PASSTHROUGH)
                        : DataDistribution(ExchangeType::NOOP);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index fe7388735ed..7efe476c6de 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -288,10 +288,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
         do_local_exchange = false;
         // Plan local exchange for each operator.
         for (; idx < ops.size();) {
-            if (ops[idx]->get_local_exchange_type().need_local_exchange()) {
+            if (ops[idx]->required_data_distribution().need_local_exchange()) {
                 RETURN_IF_ERROR(_add_local_exchange(
                         pip_idx, idx, ops[idx]->node_id(), 
_runtime_state->obj_pool(), pip,
-                        ops[idx]->get_local_exchange_type(), 
&do_local_exchange, num_buckets,
+                        ops[idx]->required_data_distribution(), 
&do_local_exchange, num_buckets,
                         bucket_seq_to_instance_idx, 
ignore_data_hash_distribution));
             }
             if (do_local_exchange) {
@@ -305,10 +305,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
             idx++;
         }
     } while (do_local_exchange);
-    if (pip->sink_x()->get_local_exchange_type().need_local_exchange()) {
+    if (pip->sink_x()->required_data_distribution().need_local_exchange()) {
         RETURN_IF_ERROR(_add_local_exchange(
                 pip_idx, idx, pip->sink_x()->node_id(), 
_runtime_state->obj_pool(), pip,
-                pip->sink_x()->get_local_exchange_type(), &do_local_exchange, 
num_buckets,
+                pip->sink_x()->required_data_distribution(), 
&do_local_exchange, num_buckets,
                 bucket_seq_to_instance_idx, ignore_data_hash_distribution));
     }
     return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to