BiteTheDDDDt commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3301894165
##########
be/src/exec/exchange/local_exchange_sink_operator.cpp:
##########
@@ -37,24 +37,30 @@ std::vector<Dependency*>
LocalExchangeSinkLocalState::dependencies() const {
return deps;
}
-Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
- const int num_buckets, const bool
use_global_hash_shuffle,
+Status LocalExchangeSinkOperatorX::init(RuntimeState* state,
TLocalPartitionType::type type,
+ const int num_buckets,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
+ DCHECK(!_planned_by_fe);
_name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) +
")";
_type = type;
- if (_type == ExchangeType::HASH_SHUFFLE) {
- _shuffle_idx_to_instance_idx.clear();
- _use_global_shuffle = use_global_hash_shuffle;
+ if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
// For shuffle join, if data distribution has been broken by previous
operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To
be mentioned,
// we should use map shuffle idx to instance idx because all instances
will be
// distributed to all BEs. Otherwise, we should use shuffle idx
directly.
- if (use_global_hash_shuffle) {
- _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+ _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
+ if (state->query_options().__isset.enable_new_shuffle_hash_method &&
Review Comment:
这里和init_partitioner似乎有点冗余
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -290,6 +290,32 @@ Status
PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thr
RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(),
*_query_ctx->desc_tbl,
&_root_op, root_pipeline));
+ // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor
pipelines
+ // that inherited reduced num_tasks from a serial operator.
+ _propagate_local_exchange_num_tasks();
+
+ // Create deferred local exchangers now that all pipelines have final
num_tasks.
+ RETURN_IF_ERROR(_create_deferred_local_exchangers());
Review Comment:
如果fe完全接管了local
exchange的计算,应该就不需要be自己算required_data_distribution和算子并行度了?be不需要感知required_data_distribution,全按fe规划的来应该就行?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]