This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9214a5c0b43afec2047e43b4a60a2f5f18c1afb7 Author: 924060929 <[email protected]> AuthorDate: Mon Mar 30 12:52:09 2026 +0800 [fix](local shuffle) fix downstream pipeline num_tasks in LOCAL_EXCHANGE_NODE handler In the FE-planned local exchange path, the LOCAL_EXCHANGE_NODE handler creates a downstream pipeline (containing LocalExchangeSource) that inherits num_tasks from its parent. When the parent pipeline was reduced by a serial operator (e.g., UNPARTITIONED Exchange), the downstream inherits num_tasks=1. But the deferred exchanger creates _num_instances channels — only 1 source task initializes mem_counters[0], leaving the rest as nullptr. The sink round-robins to all channels and crashes on uninitialized mem_counters (SIGSEGV on Linux ASAN, deadlock on macOS). Fix: Set downstream_num_tasks = _num_instances unconditionally, matching BE-native _inherit_pipeline_properties which always sets pipe_with_source.set_num_tasks(_num_instances). Also revert the deriveAndEnforceChildLocalExchange PASSTHROUGH insertion that was an incorrect attempt to fix this from the FE side. --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 15 +++++++++----- .../java/org/apache/doris/planner/PlanNode.java | 23 +--------------------- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 2c44a5637f6..9a4d14494ea 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -1802,11 +1802,16 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } case TPlanNodeType::LOCAL_EXCHANGE_NODE: { op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs); - // Save downstream pipeline's num_tasks before add_operator potentially reduces it - // (is_serial_operator on the LocalExchangeSourceOperatorX would set num_tasks=1, - // but the downstream pipeline needs _num_instances tasks — the serial semantics - // should only apply to the upstream scan pipeline). - auto downstream_num_tasks = cur_pipe->num_tasks(); + // The downstream pipeline (containing LocalExchangeSource) must have + // _num_instances tasks — matching BE-native _inherit_pipeline_properties + // which sets pipe_with_source.set_num_tasks(_num_instances). + // Without this, when the parent pipeline was reduced by a serial operator + // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED + // Exchange), the downstream inherits the reduced num_tasks via + // add_pipeline(parent). The deferred exchanger creates _num_instances + // channels but only fewer source tasks initialize mem_counters — the + // sink round-robins to all channels and crashes on uninitialized ones. + auto downstream_num_tasks = _num_instances; RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties: // downstream keeps _num_instances, upstream gets the serial/reduced count) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index be96e7b9e4d..e5114f9cbd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1014,28 +1014,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> { ? false : hasSerialAncestorInPipeline || isSerialOperator(); translatorContext.setHasSerialAncestorInPipeline(child, childHasSerialAncestorInPipeline); - Pair<PlanNode, LocalExchangeType> result = - child.enforceAndDeriveLocalExchange(translatorContext, this, requireChild); - - // If the returned node is a serial EXCHANGE operator, insert a - // PASSTHROUGH local exchange to create a pipeline boundary. This - // mirrors BE's _add_local_exchange which inserts an exchange after - // serial operators, ensuring downstream pipelines have _num_instances - // tasks for shared state injection and proper mem_counters - // initialization. Without this, the serial Exchange reduces the - // pipeline's num_tasks to 1, causing SIGSEGV (null mem_counters) or - // "must set shared state" errors. - // - // Only apply when fragment uses serial source (pooling scan mode). - // Without useSerialSource(), non-pooling fragments may get unnecessary - // PASSTHROUGH exchanges that change pipeline structure and cause deadlocks. - if (result.first instanceof ExchangeNode - && result.first.isSerialOperator() - && fragment.useSerialSource(translatorContext.getConnectContext())) { - return Pair.of( - new LocalExchangeNode(translatorContext.nextPlanNodeId(), result.first, - LocalExchangeType.PASSTHROUGH, null), - result.second); + return child.enforceAndDeriveLocalExchange(translatorContext, this, requireChild); } return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
