This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9214a5c0b43afec2047e43b4a60a2f5f18c1afb7
Author: 924060929 <[email protected]>
AuthorDate: Mon Mar 30 12:52:09 2026 +0800

    [fix](local shuffle) fix downstream pipeline num_tasks in 
LOCAL_EXCHANGE_NODE handler
    
    In the FE-planned local exchange path, the LOCAL_EXCHANGE_NODE handler
    creates a downstream pipeline (containing LocalExchangeSource) that
    inherits num_tasks from its parent.  When the parent pipeline was reduced
    by a serial operator (e.g., UNPARTITIONED Exchange), the downstream
    inherits num_tasks=1.  But the deferred exchanger creates _num_instances
    channels — only 1 source task initializes mem_counters[0], leaving the
    rest as nullptr.  The sink round-robins to all channels and crashes on
    uninitialized mem_counters (SIGSEGV on Linux ASAN, deadlock on macOS).
    
    Fix: Set downstream_num_tasks = _num_instances unconditionally, matching
    BE-native _inherit_pipeline_properties which always sets
    pipe_with_source.set_num_tasks(_num_instances).
    
    Also revert the deriveAndEnforceChildLocalExchange PASSTHROUGH insertion
    that was an incorrect attempt to fix this from the FE side.
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 15 +++++++++-----
 .../java/org/apache/doris/planner/PlanNode.java    | 23 +---------------------
 2 files changed, 11 insertions(+), 27 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 2c44a5637f6..9a4d14494ea 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -1802,11 +1802,16 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
         op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
-        // Save downstream pipeline's num_tasks before add_operator 
potentially reduces it
-        // (is_serial_operator on the LocalExchangeSourceOperatorX would set 
num_tasks=1,
-        // but the downstream pipeline needs _num_instances tasks — the serial 
semantics
-        // should only apply to the upstream scan pipeline).
-        auto downstream_num_tasks = cur_pipe->num_tasks();
+        // The downstream pipeline (containing LocalExchangeSource) must have
+        // _num_instances tasks — matching BE-native 
_inherit_pipeline_properties
+        // which sets pipe_with_source.set_num_tasks(_num_instances).
+        // Without this, when the parent pipeline was reduced by a serial 
operator
+        // (e.g., serial Exchange with use_serial_exchange=true, or 
UNPARTITIONED
+        // Exchange), the downstream inherits the reduced num_tasks via
+        // add_pipeline(parent).  The deferred exchanger creates _num_instances
+        // channels but only fewer source tasks initialize mem_counters — the
+        // sink round-robins to all channels and crashes on uninitialized ones.
+        auto downstream_num_tasks = _num_instances;
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
         // Restore downstream pipeline's num_tasks (mirroring 
_inherit_pipeline_properties:
         // downstream keeps _num_instances, upstream gets the serial/reduced 
count)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index be96e7b9e4d..e5114f9cbd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1014,28 +1014,7 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> {
                 ? false
                 : hasSerialAncestorInPipeline || isSerialOperator();
         translatorContext.setHasSerialAncestorInPipeline(child, 
childHasSerialAncestorInPipeline);
-        Pair<PlanNode, LocalExchangeType> result =
-                child.enforceAndDeriveLocalExchange(translatorContext, this, 
requireChild);
-
-        // If the returned node is a serial EXCHANGE operator, insert a
-        // PASSTHROUGH local exchange to create a pipeline boundary.  This
-        // mirrors BE's _add_local_exchange which inserts an exchange after
-        // serial operators, ensuring downstream pipelines have _num_instances
-        // tasks for shared state injection and proper mem_counters
-        // initialization.  Without this, the serial Exchange reduces the
-        // pipeline's num_tasks to 1, causing SIGSEGV (null mem_counters) or
-        // "must set shared state" errors.
-        //
-        // Only apply when fragment uses serial source (pooling scan mode).
-        // Without useSerialSource(), non-pooling fragments may get unnecessary
-        // PASSTHROUGH exchanges that change pipeline structure and cause 
deadlocks.
-        if (result.first instanceof ExchangeNode
-                && result.first.isSerialOperator()
-                && 
fragment.useSerialSource(translatorContext.getConnectContext())) {
-            return Pair.of(
-                    new LocalExchangeNode(translatorContext.nextPlanNodeId(), 
result.first,
-                            LocalExchangeType.PASSTHROUGH, null),
-                    result.second);
+        return child.enforceAndDeriveLocalExchange(translatorContext, this, 
requireChild);
         }
         return result;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to