BiteTheDDDDt commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3301894165


##########
be/src/exec/exchange/local_exchange_sink_operator.cpp:
##########
@@ -37,24 +37,30 @@ std::vector<Dependency*> 
LocalExchangeSinkLocalState::dependencies() const {
     return deps;
 }
 
-Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
-                                        const int num_buckets, const bool 
use_global_hash_shuffle,
+Status LocalExchangeSinkOperatorX::init(RuntimeState* state, 
TLocalPartitionType::type type,
+                                        const int num_buckets,
                                         const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
+    DCHECK(!_planned_by_fe);
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) + 
")";
     _type = type;
-    if (_type == ExchangeType::HASH_SHUFFLE) {
-        _shuffle_idx_to_instance_idx.clear();
-        _use_global_shuffle = use_global_hash_shuffle;
+    if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
         // distributed to all BEs. Otherwise, we should use shuffle idx 
directly.
-        if (use_global_hash_shuffle) {
-            _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+        _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+        if (state->query_options().__isset.enable_new_shuffle_hash_method &&

Review Comment:
   这里和init_partitioner似乎有点冗余



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -290,6 +290,32 @@ Status 
PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr
         RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), 
*_query_ctx->desc_tbl,
                                          &_root_op, root_pipeline));
 
+        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor 
pipelines
+        // that inherited reduced num_tasks from a serial operator.
+        _propagate_local_exchange_num_tasks();
+
+        // Create deferred local exchangers now that all pipelines have final 
num_tasks.
+        RETURN_IF_ERROR(_create_deferred_local_exchangers());

Review Comment:
   如果fe完全接管了local 
exchange的计算,应该就不需要be自己算required_data_distribution和算子并行度了?be不需要感知required_data_distribution,全按fe规划的来应该就行?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to