This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 0245397a4f0 branch-4.0: [test](be) test revert local shuffle union 
#58813 (#59203)
0245397a4f0 is described below

commit 0245397a4f0f0d37886822fddbba6552d5461ad5
Author: 924060929 <[email protected]>
AuthorDate: Mon Dec 22 14:36:24 2025 +0800

    branch-4.0: [test](be) test revert local shuffle union #58813 (#59203)
    
    cherry pick from #58813
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  23 +---
 be/src/pipeline/exec/exchange_sink_operator.h      |   1 -
 .../org/apache/doris/nereids/NereidsPlanner.java   |   8 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  47 -------
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   7 +-
 .../mv/InitMaterializationContextHook.java         |   6 +-
 .../worker/job/UnassignedJobBuilder.java           |   6 +-
 .../worker/job/UnassignedLocalShuffleUnionJob.java |  99 ---------------
 .../org/apache/doris/planner/DataPartition.java    |   3 +-
 .../org/apache/doris/planner/ExchangeNode.java     |  13 +-
 .../java/org/apache/doris/planner/UnionNode.java   |  10 --
 .../java/org/apache/doris/qe/ConnectContext.java   |   5 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   3 -
 .../nereids/distribute/LocalShuffleUnionTest.java  | 141 ---------------------
 .../org/apache/doris/nereids/util/PlanChecker.java |  13 +-
 .../org/apache/doris/qe/OlapQueryCacheTest.java    |  23 ----
 gensrc/thrift/Partitions.thrift                    |   3 -
 18 files changed, 29 insertions(+), 396 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index be65e5f176d..71a0edf31b7 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -104,7 +104,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     for (int i = 0; i < channels.size(); ++i) {
         if (channels[i]->is_local()) {
             local_size++;
-            local_channel_ids.emplace_back(i);
             _last_local_channel_idx = i;
         }
     }
@@ -287,8 +286,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
            sink.output_partition.type == 
TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
            sink.output_partition.type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
            sink.output_partition.type == 
TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
-           sink.output_partition.type == 
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED ||
-           sink.output_partition.type == TPartitionType::RANDOM_LOCAL_SHUFFLE);
+           sink.output_partition.type == 
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
 #endif
     _name = "ExchangeSinkOperatorX";
     _pool = std::make_shared<ObjectPool>();
@@ -495,25 +493,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             }
         }
         local_state.current_channel_idx = (local_state.current_channel_idx + 
1) % _writer_count;
-    } else if (_part_type == TPartitionType::RANDOM_LOCAL_SHUFFLE) {
-        DCHECK_LT(local_state.current_channel_idx, 
local_state.local_channel_ids.size())
-                << "local_state.current_channel_idx: " << 
local_state.current_channel_idx
-                << ", local_channel_ids: " << 
to_string(local_state.local_channel_ids);
-
-        // 1. select channel
-        auto& current_channel =
-                local_state
-                        
.channels[local_state.local_channel_ids[local_state.current_channel_idx]];
-        DCHECK(current_channel->is_local())
-                << "Only local channel are supported, current_channel_idx: "
-                << 
local_state.local_channel_ids[local_state.current_channel_idx];
-        if (!current_channel->is_receiver_eof()) {
-            // 2. serialize, send and rollover block
-            auto status = current_channel->send_local_block(block, eos, true);
-            HANDLE_CHANNEL_STATUS(state, current_channel, status);
-        }
-        local_state.current_channel_idx =
-                (local_state.current_channel_idx + 1) % 
local_state.local_channel_ids.size();
     } else {
         // Range partition
         // 1. calculate range
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 37651f268ea..6900c5491b4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -106,7 +106,6 @@ public:
     std::vector<std::shared_ptr<vectorized::Channel>> channels;
     int current_channel_idx {0}; // index of current channel to send to if 
_random == true
     bool _only_local_exchange {false};
-    std::vector<uint32_t> local_channel_ids;
 
     void on_channel_finished(InstanceLoId channel_id);
     vectorized::PartitionerBase* partitioner() const { return 
_partitioner.get(); }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 6038c672790..c54f2c13a6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -702,10 +702,10 @@ public class NereidsPlanner extends Planner {
         }
 
         boolean notNeedBackend = false;
-        // the internal query not support process Resultset, so must process 
by backend
-        if (cascadesContext.getConnectContext().supportHandleByFe()
-                && physicalPlan instanceof ComputeResultSet
-                && 
!cascadesContext.getConnectContext().getState().isInternal()) {
+        // if the query can compute without backend, we can skip check cluster 
privileges
+        if (Config.isCloudMode()
+                && cascadesContext.getConnectContext().supportHandleByFe()
+                && physicalPlan instanceof ComputeResultSet) {
             Optional<ResultSet> resultSet = ((ComputeResultSet) 
physicalPlan).computeResultInFe(
                     cascadesContext, Optional.empty(), 
physicalPlan.getOutput());
             if (resultSet.isPresent()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index d6001464573..4b84e792e2b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -186,7 +186,6 @@ import 
org.apache.doris.planner.BackendPartitionedSchemaScanNode;
 import org.apache.doris.planner.BlackholeSink;
 import org.apache.doris.planner.CTEScanNode;
 import org.apache.doris.planner.DataPartition;
-import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.DictionarySink;
 import org.apache.doris.planner.EmptySetNode;
@@ -247,7 +246,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -2326,37 +2324,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             setOperationNode.setColocate(true);
         }
 
-        // whether accept LocalShuffleUnion.
-        // the backend need `enable_local_exchange=true` to compute whether a 
channel is `local`,
-        // and LocalShuffleUnion need `local` channels to do random local 
shuffle, so we need check
-        // `enable_local_exchange`
-        if (setOperation instanceof PhysicalUnion
-                && 
context.getConnectContext().getSessionVariable().getEnableLocalExchange()
-                && SessionVariable.canUseNereidsDistributePlanner()) {
-            boolean isLocalShuffleUnion = false;
-            if (setOperation.getPhysicalProperties().getDistributionSpec() 
instanceof DistributionSpecExecutionAny) {
-                Map<Integer, ExchangeNode> exchangeIdToExchangeNode = new 
IdentityHashMap<>();
-                for (PlanNode child : setOperationNode.getChildren()) {
-                    if (child instanceof ExchangeNode) {
-                        exchangeIdToExchangeNode.put(child.getId().asInt(), 
(ExchangeNode) child);
-                    }
-                }
-
-                for (PlanFragment childFragment : 
setOperationFragment.getChildren()) {
-                    DataSink sink = childFragment.getSink();
-                    if (sink instanceof DataStreamSink) {
-                        isLocalShuffleUnion |= 
setLocalRandomPartition(exchangeIdToExchangeNode, (DataStreamSink) sink);
-                    } else if (sink instanceof MultiCastDataSink) {
-                        MultiCastDataSink multiCastDataSink = 
(MultiCastDataSink) sink;
-                        for (DataStreamSink dataStreamSink : 
multiCastDataSink.getDataStreamSinks()) {
-                            isLocalShuffleUnion |= 
setLocalRandomPartition(exchangeIdToExchangeNode, dataStreamSink);
-                        }
-                    }
-                }
-            }
-            ((UnionNode) 
setOperationNode).setLocalShuffleUnion(isLocalShuffleUnion);
-        }
-
         return setOperationFragment;
     }
 
@@ -3325,18 +3292,4 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
         return child instanceof PhysicalRelation;
     }
-
-    private boolean setLocalRandomPartition(
-            Map<Integer, ExchangeNode> exchangeIdToExchangeNode, 
DataStreamSink dataStreamSink) {
-        ExchangeNode exchangeNode = exchangeIdToExchangeNode.get(
-                dataStreamSink.getExchNodeId().asInt());
-        if (exchangeNode == null) {
-            return false;
-        }
-        exchangeNode.setPartitionType(TPartitionType.RANDOM_LOCAL_SHUFFLE);
-
-        DataPartition p2pPartition = new 
DataPartition(TPartitionType.RANDOM_LOCAL_SHUFFLE);
-        dataStreamSink.setOutputPartition(p2pPartition);
-        return true;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index ad1495a7a2d..8721f11038f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -503,7 +503,6 @@ import 
org.apache.doris.nereids.analyzer.UnboundVariable.VariableType;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.nereids.exceptions.ParseException;
-import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.hint.DistributeHint;
 import org.apache.doris.nereids.hint.JoinSkewInfo;
 import org.apache.doris.nereids.load.NereidsDataDescription;
@@ -2032,10 +2031,8 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
                 connectContext.setStatementContext(statementContext);
                 statementContext.setConnectContext(connectContext);
             }
-            Pair<LogicalPlan, StatementContext> planAndContext = Pair.of(
-                    ParserUtils.withOrigin(ctx, () -> (LogicalPlan) 
visit(statement)), statementContext);
-            statementContext.setParsedStatement(new 
LogicalPlanAdapter(planAndContext.first, statementContext));
-            logicalPlans.add(planAndContext);
+            logicalPlans.add(Pair.of(
+                    ParserUtils.withOrigin(ctx, () -> (LogicalPlan) 
visit(statement)), statementContext));
             List<Placeholder> params = new 
ArrayList<>(tokenPosToParameters.values());
             statementContext.setPlaceholders(params);
             tokenPosToParameters.clear();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index 7362646888a..bc2c705a4f7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -200,10 +200,8 @@ public class InitMaterializationContextHook implements 
PlannerHook {
             return ImmutableList.of();
         }
         if (CollectionUtils.isEmpty(availableMTMVs)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Enable materialized view rewrite but availableMTMVs 
is empty, query id "
-                        + "is {}", 
cascadesContext.getConnectContext().getQueryIdentifier());
-            }
+            LOG.info("Enable materialized view rewrite but availableMTMVs is 
empty, query id "
+                    + "is {}", 
cascadesContext.getConnectContext().getQueryIdentifier());
             return ImmutableList.of();
         }
         List<MaterializationContext> asyncMaterializationContext = new 
ArrayList<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
index ce75dd3dbfa..bc20d3efa17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
@@ -31,7 +31,6 @@ import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SchemaScanNode;
-import org.apache.doris.planner.UnionNode;
 import org.apache.doris.thrift.TExplainLevel;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -216,10 +215,7 @@ public class UnassignedJobBuilder {
     private UnassignedJob buildShuffleJob(
             StatementContext statementContext, PlanFragment planFragment,
             ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
-        if 
(planFragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance)
-                
.stream().map(UnionNode.class::cast).anyMatch(UnionNode::isLocalShuffleUnion)) {
-            return new UnassignedLocalShuffleUnionJob(statementContext, 
planFragment, inputJobs);
-        } else if (planFragment.isPartitioned()) {
+        if (planFragment.isPartitioned()) {
             return new UnassignedShuffleJob(statementContext, planFragment, 
inputJobs);
         } else {
             return new UnassignedGatherJob(statementContext, planFragment, 
inputJobs);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
deleted file mode 100644
index 5c212507275..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.plans.distribute.worker.job;
-
-import org.apache.doris.nereids.StatementContext;
-import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
-import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanNode;
-import org.apache.doris.planner.UnionNode;
-import org.apache.doris.qe.ConnectContext;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * this class is used to local shuffle between the same backend, to save 
network io.
- *
- * for example: we have A/B/C three backend, and every backend process 3 
instances before the Union,
- *              then the Union will generate same instances for the source 
backend, and every source
- *              instances will random local shuffle to the self backend's 
three target instances, like this:
- *
- * UnionNode(9 target instances, [A4, B4, C4, A5, B5, C5, A6, B6, C6]) -- say 
there has 3 backends: A/B/C
- * |
- * +- ExchangeNode(3 source instances, [A1, B1, C1]) -- A1 random local 
shuffle to A4/A5/A6,
- * |                                                    B1 random local 
shuffle to B4/B5/B6,
- * |                                                    C1 random local 
shuffle to C4/C5/C6
- * |
- * +- ExchangeNode(3 source instances, [A2, B2, C2]) -- A2 random local 
shuffle to A4/A5/A6,
- * |                                                    B2 random local 
shuffle to B4/B5/B6,
- * |                                                    C2 random local 
shuffle to C4/C5/C6
- * |
- * +- ExchangeNode(3 source instances, [A3, B3, C3]) -- A3 random local 
shuffle to A4/A5/A6,
- *                                                      B3 random local 
shuffle to B4/B5/B6,
- *                                                      C3 random local 
shuffle to C4/C5/C6
- */
-public class UnassignedLocalShuffleUnionJob extends AbstractUnassignedJob {
-
-    public UnassignedLocalShuffleUnionJob(StatementContext statementContext, 
PlanFragment fragment,
-            ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
-        super(statementContext, fragment, ImmutableList.of(), 
exchangeToChildJob);
-    }
-
-    @Override
-    public List<AssignedJob> computeAssignedJobs(
-            DistributeContext context, ListMultimap<ExchangeNode, AssignedJob> 
inputJobs) {
-        ConnectContext connectContext = statementContext.getConnectContext();
-        DefaultScanSource noScanSource = DefaultScanSource.empty();
-        List<AssignedJob> unionInstances = 
Lists.newArrayListWithCapacity(inputJobs.size());
-
-        List<UnionNode> unionNodes = 
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
-        Set<Integer> exchangeIdToUnion = Sets.newLinkedHashSet();
-        for (UnionNode unionNode : unionNodes) {
-            for (PlanNode child : unionNode.getChildren()) {
-                if (child instanceof ExchangeNode) {
-                    exchangeIdToUnion.add(child.getId().asInt());
-                }
-            }
-        }
-
-        int id = 0;
-        for (Entry<ExchangeNode, Collection<AssignedJob>> 
exchangeNodeToSources : inputJobs.asMap().entrySet()) {
-            ExchangeNode exchangeNode = exchangeNodeToSources.getKey();
-            if (!exchangeIdToUnion.contains(exchangeNode.getId().asInt())) {
-                continue;
-            }
-            for (AssignedJob inputInstance : exchangeNodeToSources.getValue()) 
{
-                StaticAssignedJob unionInstance = new StaticAssignedJob(
-                        id++, connectContext.nextInstanceId(), this,
-                        inputInstance.getAssignedWorker(), noScanSource
-                );
-                unionInstances.add(unionInstance);
-            }
-        }
-        return unionInstances;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index b78bd43677b..648fbace47c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -65,8 +65,7 @@ public class DataPartition {
         Preconditions.checkState(type == TPartitionType.UNPARTITIONED
                 || type == TPartitionType.RANDOM
                 || type == TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED
-                || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED
-                || type == TPartitionType.RANDOM_LOCAL_SHUFFLE);
+                || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED);
         this.type = type;
         this.partitionExprs = ImmutableList.of();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 09f02890879..69883fe7988 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -68,7 +68,7 @@ public class ExchangeNode extends PlanNode {
         offset = 0;
         limit = -1;
         this.conjuncts = Collections.emptyList();
-        this.children.add(inputNode);
+        children.add(inputNode);
         computeTupleIds();
     }
 
@@ -175,15 +175,8 @@ public class ExchangeNode extends PlanNode {
      */
     @Override
     public boolean isSerialOperator() {
-        return (
-                (
-                    ConnectContext.get() != null
-                        && 
ConnectContext.get().getSessionVariable().isUseSerialExchange()
-                        && (partitionType != 
TPartitionType.RANDOM_LOCAL_SHUFFLE)
-                )
-                || partitionType == TPartitionType.UNPARTITIONED
-            )
-            && mergeInfo == null;
+        return (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().isUseSerialExchange()
+                || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo 
== null;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 15cc0ae0d74..09d366f5456 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -26,8 +26,6 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
 public class UnionNode extends SetOperationNode {
-    private boolean localShuffleUnion;
-
     public UnionNode(PlanNodeId id, TupleId tupleId) {
         super(id, tupleId, "UNION", StatisticalType.UNION_NODE);
     }
@@ -43,12 +41,4 @@ public class UnionNode extends SetOperationNode {
     public boolean isSerialOperator() {
         return children.isEmpty();
     }
-
-    public boolean isLocalShuffleUnion() {
-        return localShuffleUnion;
-    }
-
-    public void setLocalShuffleUnion(boolean localShuffleUnion) {
-        this.localShuffleUnion = localShuffleUnion;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 03b94143521..8df0a2fef0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -984,11 +984,8 @@ public class ConnectContext {
     public TUniqueId nextInstanceId() {
         if (loadId != null) {
             return new TUniqueId(loadId.hi, loadId.lo + 
instanceIdGenerator.incrementAndGet());
-        } else if (queryId != null) {
-            return new TUniqueId(queryId.hi, queryId.lo + 
instanceIdGenerator.incrementAndGet());
         } else {
-            // for test
-            return new TUniqueId(0, instanceIdGenerator.incrementAndGet());
+            return new TUniqueId(queryId.hi, queryId.lo + 
instanceIdGenerator.incrementAndGet());
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ec7a4f4a25b..25b97590d45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -18,6 +18,7 @@
 package org.apache.doris.qe;
 
 import org.apache.doris.analysis.SetVar;
+import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
@@ -28,6 +29,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.metrics.Event;
 import org.apache.doris.nereids.metrics.EventSwitchParser;
 import org.apache.doris.nereids.parser.Dialect;
@@ -1919,7 +1921,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean enableCommonExprPushdown = true;
 
     @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = false, flag = 
VariableMgr.INVISIBLE,
-            varType = VariableAnnotation.DEPRECATED, needForward = true)
+            varType = VariableAnnotation.DEPRECATED)
     public boolean enableLocalExchange = true;
 
     /**
@@ -4376,7 +4378,15 @@ public class SessionVariable implements Serializable, 
Writable {
         if (connectContext == null) {
             return true;
         }
-        return 
connectContext.getSessionVariable().enableNereidsDistributePlanner;
+        SessionVariable sessionVariable = connectContext.getSessionVariable();
+        StatementContext statementContext = 
connectContext.getStatementContext();
+        if (statementContext != null) {
+            StatementBase parsedStatement = 
statementContext.getParsedStatement();
+            if (!(parsedStatement instanceof LogicalPlanAdapter)) {
+                return false;
+            }
+        }
+        return sessionVariable.enableNereidsDistributePlanner;
     }
 
     public boolean isEnableNereidsDistributePlanner() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 61ccbc7b07b..0c2478a5ae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -862,9 +862,6 @@ public class StmtExecutor {
             }
             parsedStmt = statements.get(originStmt.idx);
         }
-        if (parsedStmt != null && statementContext.getParsedStatement() == 
null) {
-            statementContext.setParsedStatement(parsedStmt);
-        }
     }
 
     public void finalizeQuery() {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
deleted file mode 100644
index 66f5e55c539..00000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.distribute;
-
-import org.apache.doris.nereids.NereidsPlanner;
-import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
-import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
-import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
-import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
-import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedLocalShuffleUnionJob;
-import org.apache.doris.planner.DataSink;
-import org.apache.doris.planner.DataStreamSink;
-import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.MultiCastDataSink;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanNode;
-import org.apache.doris.planner.UnionNode;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.thrift.TPartitionType;
-import org.apache.doris.utframe.TestWithFeService;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class LocalShuffleUnionTest extends TestWithFeService {
-    @Override
-    protected void runBeforeAll() throws Exception {
-        createDatabase("test");
-        connectContext.setDatabase("test");
-        createTable("create table test.tbl(id int) 
properties('replication_num' = '1')");
-    }
-
-    @Test
-    public void testLocalShuffleUnion() throws Exception {
-        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
-        StmtExecutor stmtExecutor = executeNereidsSql(
-                "explain distributed plan select * from test.tbl union all 
select * from test.tbl");
-        List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
-        assertHasLocalShuffleUnion(fragments);
-    }
-
-    @Test
-    public void testLocalShuffleUnionWithCte() throws Exception {
-        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
-        StmtExecutor stmtExecutor = executeNereidsSql(
-                "explain distributed plan with a as (select * from test.tbl) 
select * from a union all select * from a");
-        List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
-        assertHasLocalShuffleUnion(fragments);
-    }
-
-    @Test
-    public void testLocalShuffleUnionWithJoin() throws Exception {
-        
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
-        StmtExecutor stmtExecutor = executeNereidsSql(
-                "explain distributed plan select * from (select * from 
test.tbl union all select * from test.tbl)a left join[broadcast] (select * from 
test.tbl)b on a.id=b.id");
-        List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
-        assertHasLocalShuffleUnion(fragments);
-
-        FragmentIdMapping<DistributedPlan> distributedPlans
-                = ((NereidsPlanner) 
stmtExecutor.planner()).getDistributedPlans();
-        for (DistributedPlan plan : distributedPlans.values()) {
-            PipelineDistributedPlan pipelineDistributedPlan = 
(PipelineDistributedPlan) plan;
-            if (pipelineDistributedPlan.getFragmentJob() instanceof 
UnassignedLocalShuffleUnionJob) {
-                List<AssignedJob> sourcesInstances = 
pipelineDistributedPlan.getInputs()
-                        .values()
-                        .stream()
-                        .flatMap(source -> ((PipelineDistributedPlan) 
source).getInstanceJobs().stream())
-                        .collect(Collectors.toList());
-
-                List<AssignedJob> broadSourceInstances = 
pipelineDistributedPlan.getInputs()
-                        .entries()
-                        .stream()
-                        .filter(kv -> kv.getKey().getPartitionType() != 
TPartitionType.RANDOM_LOCAL_SHUFFLE)
-                        .flatMap(kv -> ((PipelineDistributedPlan) 
kv.getValue()).getInstanceJobs().stream())
-                        .collect(Collectors.toList());
-
-                Assertions.assertTrue(
-                        pipelineDistributedPlan.getInstanceJobs().size() < 
sourcesInstances.size()
-                );
-
-                Assertions.assertEquals(
-                        (sourcesInstances.size() - 
broadSourceInstances.size()),
-                        pipelineDistributedPlan.getInstanceJobs().size()
-                );
-            }
-        }
-    }
-
-    private void assertHasLocalShuffleUnion(List<PlanFragment> fragments) {
-        boolean hasLocalShuffleUnion = false;
-        for (PlanFragment fragment : fragments) {
-            List<PlanNode> unions = 
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
-            for (PlanNode planNode : unions) {
-                UnionNode union = (UnionNode) planNode;
-                assertUnionIsInplace(union, fragment);
-                hasLocalShuffleUnion = true;
-            }
-        }
-        Assertions.assertTrue(hasLocalShuffleUnion);
-    }
-
-    private void assertUnionIsInplace(UnionNode unionNode, PlanFragment 
unionFragment) {
-        Assertions.assertTrue(unionNode.isLocalShuffleUnion());
-        for (PlanNode child : unionNode.getChildren()) {
-            if (child instanceof ExchangeNode) {
-                ExchangeNode exchangeNode = (ExchangeNode) child;
-                Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, 
exchangeNode.getPartitionType());
-                for (PlanFragment childFragment : unionFragment.getChildren()) 
{
-                    DataSink sink = childFragment.getSink();
-                    if (sink instanceof DataStreamSink && 
sink.getExchNodeId().asInt() == exchangeNode.getId().asInt()) {
-                        
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, 
sink.getOutputPartition().getType());
-                    } else if (sink instanceof MultiCastDataSink) {
-                        for (DataStreamSink dataStreamSink : 
((MultiCastDataSink) sink).getDataStreamSinks()) {
-                            if (dataStreamSink.getExchNodeId().asInt() == 
exchangeNode.getId().asInt()) {
-                                
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE, 
dataStreamSink.getOutputPartition().getType());
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
index 8a1e3da7018..c23ff24c868 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
@@ -128,7 +128,8 @@ public class PlanChecker {
     public AbstractInsertExecutor getInsertExecutor(String sql) throws 
Exception {
         StatementContext statementContext = 
MemoTestUtils.createStatementContext(connectContext, sql);
         LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
-        setQueryId();
+        UUID uuid = UUID.randomUUID();
+        connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()));
         InsertIntoTableCommand insertIntoTableCommand = 
(InsertIntoTableCommand) parsedPlan;
         LogicalPlanAdapter logicalPlanAdapter = new 
LogicalPlanAdapter(parsedPlan, statementContext);
         return insertIntoTableCommand.initPlan(connectContext,
@@ -210,7 +211,6 @@ public class PlanChecker {
     public List<PlanProcess> explainPlanProcess(String sql) {
         NereidsParser parser = new NereidsParser();
         LogicalPlan command = parser.parseSingle(sql);
-        setQueryId();
         NereidsPlanner planner = new NereidsPlanner(
                 new StatementContext(connectContext, new OriginStatement(sql, 
0)));
         planner.planWithLock(command, PhysicalProperties.ANY, 
ExplainLevel.ALL_PLAN, true);
@@ -395,7 +395,6 @@ public class PlanChecker {
         connectContext.setStatementContext(statementContext);
         NereidsPlanner planner = new NereidsPlanner(statementContext);
         LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
-        setQueryId();
         LogicalPlanAdapter parsedPlanAdaptor = new 
LogicalPlanAdapter(parsedPlan, statementContext);
         statementContext.setParsedStatement(parsedPlanAdaptor);
 
@@ -723,7 +722,6 @@ public class PlanChecker {
         connectContext.setStatementContext(statementContext);
 
         LogicalPlan parsed = new NereidsParser().parseSingle(sql);
-        setQueryId();
         NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
         LogicalPlanAdapter adapter = LogicalPlanAdapter.of(parsed);
         adapter.setIsExplain(new ExplainOptions(ExplainLevel.ALL_PLAN, false));
@@ -751,7 +749,6 @@ public class PlanChecker {
         connectContext.setStatementContext(statementContext);
 
         LogicalPlan parsed = new NereidsParser().parseSingle(sql);
-        setQueryId();
         NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
         SessionVariable sessionVariable = connectContext.getSessionVariable();
         try {
@@ -881,12 +878,6 @@ public class PlanChecker {
         return this;
     }
 
-    private void setQueryId() {
-        UUID uuid = UUID.randomUUID();
-        TUniqueId id = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
-        connectContext.setQueryId(id);
-    }
-
     public static boolean isPlanEqualWithoutID(Plan plan1, Plan plan2) {
         if (plan1.arity() != plan2.arity()
                 || !plan1.getOutput().equals(plan2.getOutput()) || 
plan1.getClass() != plan2.getClass()) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
index 54bdb3aebeb..f499caa96f6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
@@ -68,11 +68,9 @@ import org.apache.doris.qe.cache.RowBatchBuilder;
 import org.apache.doris.qe.cache.SqlCache;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TUniqueId;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import mockit.Expectations;
@@ -92,7 +90,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
-import java.util.UUID;
 
 public class OlapQueryCacheTest {
     private static final Logger LOG = 
LogManager.getLogger(OlapQueryCacheTest.class);
@@ -275,24 +272,6 @@ public class OlapQueryCacheTest {
         db.registerTable(view3);
         View view4 = createEventNestedView();
         db.registerTable(view4);
-
-        SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
-        Backend be = new Backend(0, "127.0.0.1", 0);
-        be.setAlive(true);
-        ImmutableMap<Long, Backend> backends = ImmutableMap.of(0L, be);
-        new Expectations(clusterInfo) {
-            {
-                clusterInfo.getBackendsByCurrentCluster();
-                minTimes = 0;
-                result = backends;
-            }
-
-            {
-                clusterInfo.getAllBackendsByAllCluster();
-                minTimes = 0;
-                result = backends;
-            }
-        };
     }
 
     private OlapTable createOrderTable() {
@@ -523,8 +502,6 @@ public class OlapQueryCacheTest {
             LogicalPlan plan = new NereidsParser().parseSingle(sql);
             OriginStatement originStatement = new OriginStatement(sql, 0);
             StatementContext statementContext = new StatementContext(ctx, 
originStatement);
-            UUID uuid = UUID.randomUUID();
-            ctx.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits()));
             ctx.setStatementContext(statementContext);
             NereidsPlanner nereidsPlanner = new 
NereidsPlanner(statementContext);
             LogicalPlanAdapter adapter = new LogicalPlanAdapter(plan, 
statementContext);
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 69100109a1b..86a2d9be555 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -50,9 +50,6 @@ enum TPartitionType {
 
   // used for hive unparititoned table
   HIVE_TABLE_SINK_UNPARTITIONED = 8
-
-  // used for random local shuffle union, one source instance random send data 
to target instances in the same backend
-  RANDOM_LOCAL_SHUFFLE = 9
 }
 
 enum TDistributionType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to