This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 0245397a4f0 branch-4.0: [test](be) test revert local shuffle union
#58813 (#59203)
0245397a4f0 is described below
commit 0245397a4f0f0d37886822fddbba6552d5461ad5
Author: 924060929 <[email protected]>
AuthorDate: Mon Dec 22 14:36:24 2025 +0800
branch-4.0: [test](be) test revert local shuffle union #58813 (#59203)
cherry pick from #58813
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 23 +---
be/src/pipeline/exec/exchange_sink_operator.h | 1 -
.../org/apache/doris/nereids/NereidsPlanner.java | 8 +-
.../glue/translator/PhysicalPlanTranslator.java | 47 -------
.../doris/nereids/parser/LogicalPlanBuilder.java | 7 +-
.../mv/InitMaterializationContextHook.java | 6 +-
.../worker/job/UnassignedJobBuilder.java | 6 +-
.../worker/job/UnassignedLocalShuffleUnionJob.java | 99 ---------------
.../org/apache/doris/planner/DataPartition.java | 3 +-
.../org/apache/doris/planner/ExchangeNode.java | 13 +-
.../java/org/apache/doris/planner/UnionNode.java | 10 --
.../java/org/apache/doris/qe/ConnectContext.java | 5 +-
.../java/org/apache/doris/qe/SessionVariable.java | 14 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 3 -
.../nereids/distribute/LocalShuffleUnionTest.java | 141 ---------------------
.../org/apache/doris/nereids/util/PlanChecker.java | 13 +-
.../org/apache/doris/qe/OlapQueryCacheTest.java | 23 ----
gensrc/thrift/Partitions.thrift | 3 -
18 files changed, 29 insertions(+), 396 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index be65e5f176d..71a0edf31b7 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -104,7 +104,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
for (int i = 0; i < channels.size(); ++i) {
if (channels[i]->is_local()) {
local_size++;
- local_channel_ids.emplace_back(i);
_last_local_channel_idx = i;
}
}
@@ -287,8 +286,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type ==
TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type ==
TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
- sink.output_partition.type ==
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED ||
- sink.output_partition.type == TPartitionType::RANDOM_LOCAL_SHUFFLE);
+ sink.output_partition.type ==
TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
#endif
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
@@ -495,25 +493,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
}
local_state.current_channel_idx = (local_state.current_channel_idx +
1) % _writer_count;
- } else if (_part_type == TPartitionType::RANDOM_LOCAL_SHUFFLE) {
- DCHECK_LT(local_state.current_channel_idx,
local_state.local_channel_ids.size())
- << "local_state.current_channel_idx: " <<
local_state.current_channel_idx
- << ", local_channel_ids: " <<
to_string(local_state.local_channel_ids);
-
- // 1. select channel
- auto& current_channel =
- local_state
-
.channels[local_state.local_channel_ids[local_state.current_channel_idx]];
- DCHECK(current_channel->is_local())
- << "Only local channel are supported, current_channel_idx: "
- <<
local_state.local_channel_ids[local_state.current_channel_idx];
- if (!current_channel->is_receiver_eof()) {
- // 2. serialize, send and rollover block
- auto status = current_channel->send_local_block(block, eos, true);
- HANDLE_CHANNEL_STATUS(state, current_channel, status);
- }
- local_state.current_channel_idx =
- (local_state.current_channel_idx + 1) %
local_state.local_channel_ids.size();
} else {
// Range partition
// 1. calculate range
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 37651f268ea..6900c5491b4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -106,7 +106,6 @@ public:
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if
_random == true
bool _only_local_exchange {false};
- std::vector<uint32_t> local_channel_ids;
void on_channel_finished(InstanceLoId channel_id);
vectorized::PartitionerBase* partitioner() const { return
_partitioner.get(); }
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 6038c672790..c54f2c13a6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -702,10 +702,10 @@ public class NereidsPlanner extends Planner {
}
boolean notNeedBackend = false;
- // the internal query not support process Resultset, so must process
by backend
- if (cascadesContext.getConnectContext().supportHandleByFe()
- && physicalPlan instanceof ComputeResultSet
- &&
!cascadesContext.getConnectContext().getState().isInternal()) {
+ // if the query can compute without backend, we can skip check cluster
privileges
+ if (Config.isCloudMode()
+ && cascadesContext.getConnectContext().supportHandleByFe()
+ && physicalPlan instanceof ComputeResultSet) {
Optional<ResultSet> resultSet = ((ComputeResultSet)
physicalPlan).computeResultInFe(
cascadesContext, Optional.empty(),
physicalPlan.getOutput());
if (resultSet.isPresent()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index d6001464573..4b84e792e2b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -186,7 +186,6 @@ import
org.apache.doris.planner.BackendPartitionedSchemaScanNode;
import org.apache.doris.planner.BlackholeSink;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataPartition;
-import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.DictionarySink;
import org.apache.doris.planner.EmptySetNode;
@@ -247,7 +246,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -2326,37 +2324,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
setOperationNode.setColocate(true);
}
- // whether accept LocalShuffleUnion.
- // the backend need `enable_local_exchange=true` to compute whether a
channel is `local`,
- // and LocalShuffleUnion need `local` channels to do random local
shuffle, so we need check
- // `enable_local_exchange`
- if (setOperation instanceof PhysicalUnion
- &&
context.getConnectContext().getSessionVariable().getEnableLocalExchange()
- && SessionVariable.canUseNereidsDistributePlanner()) {
- boolean isLocalShuffleUnion = false;
- if (setOperation.getPhysicalProperties().getDistributionSpec()
instanceof DistributionSpecExecutionAny) {
- Map<Integer, ExchangeNode> exchangeIdToExchangeNode = new
IdentityHashMap<>();
- for (PlanNode child : setOperationNode.getChildren()) {
- if (child instanceof ExchangeNode) {
- exchangeIdToExchangeNode.put(child.getId().asInt(),
(ExchangeNode) child);
- }
- }
-
- for (PlanFragment childFragment :
setOperationFragment.getChildren()) {
- DataSink sink = childFragment.getSink();
- if (sink instanceof DataStreamSink) {
- isLocalShuffleUnion |=
setLocalRandomPartition(exchangeIdToExchangeNode, (DataStreamSink) sink);
- } else if (sink instanceof MultiCastDataSink) {
- MultiCastDataSink multiCastDataSink =
(MultiCastDataSink) sink;
- for (DataStreamSink dataStreamSink :
multiCastDataSink.getDataStreamSinks()) {
- isLocalShuffleUnion |=
setLocalRandomPartition(exchangeIdToExchangeNode, dataStreamSink);
- }
- }
- }
- }
- ((UnionNode)
setOperationNode).setLocalShuffleUnion(isLocalShuffleUnion);
- }
-
return setOperationFragment;
}
@@ -3325,18 +3292,4 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
return child instanceof PhysicalRelation;
}
-
- private boolean setLocalRandomPartition(
- Map<Integer, ExchangeNode> exchangeIdToExchangeNode,
DataStreamSink dataStreamSink) {
- ExchangeNode exchangeNode = exchangeIdToExchangeNode.get(
- dataStreamSink.getExchNodeId().asInt());
- if (exchangeNode == null) {
- return false;
- }
- exchangeNode.setPartitionType(TPartitionType.RANDOM_LOCAL_SHUFFLE);
-
- DataPartition p2pPartition = new
DataPartition(TPartitionType.RANDOM_LOCAL_SHUFFLE);
- dataStreamSink.setOutputPartition(p2pPartition);
- return true;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index ad1495a7a2d..8721f11038f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -503,7 +503,6 @@ import
org.apache.doris.nereids.analyzer.UnboundVariable.VariableType;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.exceptions.ParseException;
-import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.hint.JoinSkewInfo;
import org.apache.doris.nereids.load.NereidsDataDescription;
@@ -2032,10 +2031,8 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
connectContext.setStatementContext(statementContext);
statementContext.setConnectContext(connectContext);
}
- Pair<LogicalPlan, StatementContext> planAndContext = Pair.of(
- ParserUtils.withOrigin(ctx, () -> (LogicalPlan)
visit(statement)), statementContext);
- statementContext.setParsedStatement(new
LogicalPlanAdapter(planAndContext.first, statementContext));
- logicalPlans.add(planAndContext);
+ logicalPlans.add(Pair.of(
+ ParserUtils.withOrigin(ctx, () -> (LogicalPlan)
visit(statement)), statementContext));
List<Placeholder> params = new
ArrayList<>(tokenPosToParameters.values());
statementContext.setPlaceholders(params);
tokenPosToParameters.clear();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index 7362646888a..bc2c705a4f7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -200,10 +200,8 @@ public class InitMaterializationContextHook implements
PlannerHook {
return ImmutableList.of();
}
if (CollectionUtils.isEmpty(availableMTMVs)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Enable materialized view rewrite but availableMTMVs
is empty, query id "
- + "is {}",
cascadesContext.getConnectContext().getQueryIdentifier());
- }
+ LOG.info("Enable materialized view rewrite but availableMTMVs is
empty, query id "
+ + "is {}",
cascadesContext.getConnectContext().getQueryIdentifier());
return ImmutableList.of();
}
List<MaterializationContext> asyncMaterializationContext = new
ArrayList<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
index ce75dd3dbfa..bc20d3efa17 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
@@ -31,7 +31,6 @@ import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
-import org.apache.doris.planner.UnionNode;
import org.apache.doris.thrift.TExplainLevel;
import com.google.common.collect.ArrayListMultimap;
@@ -216,10 +215,7 @@ public class UnassignedJobBuilder {
private UnassignedJob buildShuffleJob(
StatementContext statementContext, PlanFragment planFragment,
ListMultimap<ExchangeNode, UnassignedJob> inputJobs) {
- if
(planFragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance)
-
.stream().map(UnionNode.class::cast).anyMatch(UnionNode::isLocalShuffleUnion)) {
- return new UnassignedLocalShuffleUnionJob(statementContext,
planFragment, inputJobs);
- } else if (planFragment.isPartitioned()) {
+ if (planFragment.isPartitioned()) {
return new UnassignedShuffleJob(statementContext, planFragment,
inputJobs);
} else {
return new UnassignedGatherJob(statementContext, planFragment,
inputJobs);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
deleted file mode 100644
index 5c212507275..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedLocalShuffleUnionJob.java
+++ /dev/null
@@ -1,99 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.plans.distribute.worker.job;
-
-import org.apache.doris.nereids.StatementContext;
-import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
-import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanNode;
-import org.apache.doris.planner.UnionNode;
-import org.apache.doris.qe.ConnectContext;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * this class is used to local shuffle between the same backend, to save
network io.
- *
- * for example: we have A/B/C three backend, and every backend process 3
instances before the Union,
- * then the Union will generate same instances for the source
backend, and every source
- * instances will random local shuffle to the self backend's
three target instances, like this:
- *
- * UnionNode(9 target instances, [A4, B4, C4, A5, B5, C5, A6, B6, C6]) -- say
there has 3 backends: A/B/C
- * |
- * +- ExchangeNode(3 source instances, [A1, B1, C1]) -- A1 random local
shuffle to A4/A5/A6,
- * | B1 random local
shuffle to B4/B5/B6,
- * | C1 random local
shuffle to C4/C5/C6
- * |
- * +- ExchangeNode(3 source instances, [A2, B2, C2]) -- A2 random local
shuffle to A4/A5/A6,
- * | B2 random local
shuffle to B4/B5/B6,
- * | C2 random local
shuffle to C4/C5/C6
- * |
- * +- ExchangeNode(3 source instances, [A3, B3, C3]) -- A3 random local
shuffle to A4/A5/A6,
- * B3 random local
shuffle to B4/B5/B6,
- * C3 random local
shuffle to C4/C5/C6
- */
-public class UnassignedLocalShuffleUnionJob extends AbstractUnassignedJob {
-
- public UnassignedLocalShuffleUnionJob(StatementContext statementContext,
PlanFragment fragment,
- ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob) {
- super(statementContext, fragment, ImmutableList.of(),
exchangeToChildJob);
- }
-
- @Override
- public List<AssignedJob> computeAssignedJobs(
- DistributeContext context, ListMultimap<ExchangeNode, AssignedJob>
inputJobs) {
- ConnectContext connectContext = statementContext.getConnectContext();
- DefaultScanSource noScanSource = DefaultScanSource.empty();
- List<AssignedJob> unionInstances =
Lists.newArrayListWithCapacity(inputJobs.size());
-
- List<UnionNode> unionNodes =
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
- Set<Integer> exchangeIdToUnion = Sets.newLinkedHashSet();
- for (UnionNode unionNode : unionNodes) {
- for (PlanNode child : unionNode.getChildren()) {
- if (child instanceof ExchangeNode) {
- exchangeIdToUnion.add(child.getId().asInt());
- }
- }
- }
-
- int id = 0;
- for (Entry<ExchangeNode, Collection<AssignedJob>>
exchangeNodeToSources : inputJobs.asMap().entrySet()) {
- ExchangeNode exchangeNode = exchangeNodeToSources.getKey();
- if (!exchangeIdToUnion.contains(exchangeNode.getId().asInt())) {
- continue;
- }
- for (AssignedJob inputInstance : exchangeNodeToSources.getValue())
{
- StaticAssignedJob unionInstance = new StaticAssignedJob(
- id++, connectContext.nextInstanceId(), this,
- inputInstance.getAssignedWorker(), noScanSource
- );
- unionInstances.add(unionInstance);
- }
- }
- return unionInstances;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index b78bd43677b..648fbace47c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -65,8 +65,7 @@ public class DataPartition {
Preconditions.checkState(type == TPartitionType.UNPARTITIONED
|| type == TPartitionType.RANDOM
|| type == TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED
- || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED
- || type == TPartitionType.RANDOM_LOCAL_SHUFFLE);
+ || type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.of();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 09f02890879..69883fe7988 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -68,7 +68,7 @@ public class ExchangeNode extends PlanNode {
offset = 0;
limit = -1;
this.conjuncts = Collections.emptyList();
- this.children.add(inputNode);
+ children.add(inputNode);
computeTupleIds();
}
@@ -175,15 +175,8 @@ public class ExchangeNode extends PlanNode {
*/
@Override
public boolean isSerialOperator() {
- return (
- (
- ConnectContext.get() != null
- &&
ConnectContext.get().getSessionVariable().isUseSerialExchange()
- && (partitionType !=
TPartitionType.RANDOM_LOCAL_SHUFFLE)
- )
- || partitionType == TPartitionType.UNPARTITIONED
- )
- && mergeInfo == null;
+ return (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().isUseSerialExchange()
+ || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo
== null;
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 15cc0ae0d74..09d366f5456 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -26,8 +26,6 @@ import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
public class UnionNode extends SetOperationNode {
- private boolean localShuffleUnion;
-
public UnionNode(PlanNodeId id, TupleId tupleId) {
super(id, tupleId, "UNION", StatisticalType.UNION_NODE);
}
@@ -43,12 +41,4 @@ public class UnionNode extends SetOperationNode {
public boolean isSerialOperator() {
return children.isEmpty();
}
-
- public boolean isLocalShuffleUnion() {
- return localShuffleUnion;
- }
-
- public void setLocalShuffleUnion(boolean localShuffleUnion) {
- this.localShuffleUnion = localShuffleUnion;
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 03b94143521..8df0a2fef0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -984,11 +984,8 @@ public class ConnectContext {
public TUniqueId nextInstanceId() {
if (loadId != null) {
return new TUniqueId(loadId.hi, loadId.lo +
instanceIdGenerator.incrementAndGet());
- } else if (queryId != null) {
- return new TUniqueId(queryId.hi, queryId.lo +
instanceIdGenerator.incrementAndGet());
} else {
- // for test
- return new TUniqueId(0, instanceIdGenerator.incrementAndGet());
+ return new TUniqueId(queryId.hi, queryId.lo +
instanceIdGenerator.incrementAndGet());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ec7a4f4a25b..25b97590d45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -18,6 +18,7 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.SetVar;
+import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
@@ -28,6 +29,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.metrics.Event;
import org.apache.doris.nereids.metrics.EventSwitchParser;
import org.apache.doris.nereids.parser.Dialect;
@@ -1919,7 +1921,7 @@ public class SessionVariable implements Serializable,
Writable {
public boolean enableCommonExprPushdown = true;
@VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = false, flag =
VariableMgr.INVISIBLE,
- varType = VariableAnnotation.DEPRECATED, needForward = true)
+ varType = VariableAnnotation.DEPRECATED)
public boolean enableLocalExchange = true;
/**
@@ -4376,7 +4378,15 @@ public class SessionVariable implements Serializable,
Writable {
if (connectContext == null) {
return true;
}
- return
connectContext.getSessionVariable().enableNereidsDistributePlanner;
+ SessionVariable sessionVariable = connectContext.getSessionVariable();
+ StatementContext statementContext =
connectContext.getStatementContext();
+ if (statementContext != null) {
+ StatementBase parsedStatement =
statementContext.getParsedStatement();
+ if (!(parsedStatement instanceof LogicalPlanAdapter)) {
+ return false;
+ }
+ }
+ return sessionVariable.enableNereidsDistributePlanner;
}
public boolean isEnableNereidsDistributePlanner() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 61ccbc7b07b..0c2478a5ae0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -862,9 +862,6 @@ public class StmtExecutor {
}
parsedStmt = statements.get(originStmt.idx);
}
- if (parsedStmt != null && statementContext.getParsedStatement() ==
null) {
- statementContext.setParsedStatement(parsedStmt);
- }
}
public void finalizeQuery() {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
deleted file mode 100644
index 66f5e55c539..00000000000
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/distribute/LocalShuffleUnionTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.distribute;
-
-import org.apache.doris.nereids.NereidsPlanner;
-import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
-import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
-import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
-import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
-import
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedLocalShuffleUnionJob;
-import org.apache.doris.planner.DataSink;
-import org.apache.doris.planner.DataStreamSink;
-import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.MultiCastDataSink;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanNode;
-import org.apache.doris.planner.UnionNode;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.thrift.TPartitionType;
-import org.apache.doris.utframe.TestWithFeService;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class LocalShuffleUnionTest extends TestWithFeService {
- @Override
- protected void runBeforeAll() throws Exception {
- createDatabase("test");
- connectContext.setDatabase("test");
- createTable("create table test.tbl(id int)
properties('replication_num' = '1')");
- }
-
- @Test
- public void testLocalShuffleUnion() throws Exception {
-
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
- StmtExecutor stmtExecutor = executeNereidsSql(
- "explain distributed plan select * from test.tbl union all
select * from test.tbl");
- List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
- assertHasLocalShuffleUnion(fragments);
- }
-
- @Test
- public void testLocalShuffleUnionWithCte() throws Exception {
-
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
- StmtExecutor stmtExecutor = executeNereidsSql(
- "explain distributed plan with a as (select * from test.tbl)
select * from a union all select * from a");
- List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
- assertHasLocalShuffleUnion(fragments);
- }
-
- @Test
- public void testLocalShuffleUnionWithJoin() throws Exception {
-
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
- StmtExecutor stmtExecutor = executeNereidsSql(
- "explain distributed plan select * from (select * from
test.tbl union all select * from test.tbl)a left join[broadcast] (select * from
test.tbl)b on a.id=b.id");
- List<PlanFragment> fragments = stmtExecutor.planner().getFragments();
- assertHasLocalShuffleUnion(fragments);
-
- FragmentIdMapping<DistributedPlan> distributedPlans
- = ((NereidsPlanner)
stmtExecutor.planner()).getDistributedPlans();
- for (DistributedPlan plan : distributedPlans.values()) {
- PipelineDistributedPlan pipelineDistributedPlan =
(PipelineDistributedPlan) plan;
- if (pipelineDistributedPlan.getFragmentJob() instanceof
UnassignedLocalShuffleUnionJob) {
- List<AssignedJob> sourcesInstances =
pipelineDistributedPlan.getInputs()
- .values()
- .stream()
- .flatMap(source -> ((PipelineDistributedPlan)
source).getInstanceJobs().stream())
- .collect(Collectors.toList());
-
- List<AssignedJob> broadSourceInstances =
pipelineDistributedPlan.getInputs()
- .entries()
- .stream()
- .filter(kv -> kv.getKey().getPartitionType() !=
TPartitionType.RANDOM_LOCAL_SHUFFLE)
- .flatMap(kv -> ((PipelineDistributedPlan)
kv.getValue()).getInstanceJobs().stream())
- .collect(Collectors.toList());
-
- Assertions.assertTrue(
- pipelineDistributedPlan.getInstanceJobs().size() <
sourcesInstances.size()
- );
-
- Assertions.assertEquals(
- (sourcesInstances.size() -
broadSourceInstances.size()),
- pipelineDistributedPlan.getInstanceJobs().size()
- );
- }
- }
- }
-
- private void assertHasLocalShuffleUnion(List<PlanFragment> fragments) {
- boolean hasLocalShuffleUnion = false;
- for (PlanFragment fragment : fragments) {
- List<PlanNode> unions =
fragment.getPlanRoot().collectInCurrentFragment(UnionNode.class::isInstance);
- for (PlanNode planNode : unions) {
- UnionNode union = (UnionNode) planNode;
- assertUnionIsInplace(union, fragment);
- hasLocalShuffleUnion = true;
- }
- }
- Assertions.assertTrue(hasLocalShuffleUnion);
- }
-
- private void assertUnionIsInplace(UnionNode unionNode, PlanFragment
unionFragment) {
- Assertions.assertTrue(unionNode.isLocalShuffleUnion());
- for (PlanNode child : unionNode.getChildren()) {
- if (child instanceof ExchangeNode) {
- ExchangeNode exchangeNode = (ExchangeNode) child;
- Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE,
exchangeNode.getPartitionType());
- for (PlanFragment childFragment : unionFragment.getChildren())
{
- DataSink sink = childFragment.getSink();
- if (sink instanceof DataStreamSink &&
sink.getExchNodeId().asInt() == exchangeNode.getId().asInt()) {
-
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE,
sink.getOutputPartition().getType());
- } else if (sink instanceof MultiCastDataSink) {
- for (DataStreamSink dataStreamSink :
((MultiCastDataSink) sink).getDataStreamSinks()) {
- if (dataStreamSink.getExchNodeId().asInt() ==
exchangeNode.getId().asInt()) {
-
Assertions.assertEquals(TPartitionType.RANDOM_LOCAL_SHUFFLE,
dataStreamSink.getOutputPartition().getType());
- }
- }
- }
- }
- }
- }
- }
-}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
index 8a1e3da7018..c23ff24c868 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
@@ -128,7 +128,8 @@ public class PlanChecker {
public AbstractInsertExecutor getInsertExecutor(String sql) throws
Exception {
StatementContext statementContext =
MemoTestUtils.createStatementContext(connectContext, sql);
LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
- setQueryId();
+ UUID uuid = UUID.randomUUID();
+ connectContext.setQueryId(new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
InsertIntoTableCommand insertIntoTableCommand =
(InsertIntoTableCommand) parsedPlan;
LogicalPlanAdapter logicalPlanAdapter = new
LogicalPlanAdapter(parsedPlan, statementContext);
return insertIntoTableCommand.initPlan(connectContext,
@@ -210,7 +211,6 @@ public class PlanChecker {
public List<PlanProcess> explainPlanProcess(String sql) {
NereidsParser parser = new NereidsParser();
LogicalPlan command = parser.parseSingle(sql);
- setQueryId();
NereidsPlanner planner = new NereidsPlanner(
new StatementContext(connectContext, new OriginStatement(sql,
0)));
planner.planWithLock(command, PhysicalProperties.ANY,
ExplainLevel.ALL_PLAN, true);
@@ -395,7 +395,6 @@ public class PlanChecker {
connectContext.setStatementContext(statementContext);
NereidsPlanner planner = new NereidsPlanner(statementContext);
LogicalPlan parsedPlan = new NereidsParser().parseSingle(sql);
- setQueryId();
LogicalPlanAdapter parsedPlanAdaptor = new
LogicalPlanAdapter(parsedPlan, statementContext);
statementContext.setParsedStatement(parsedPlanAdaptor);
@@ -723,7 +722,6 @@ public class PlanChecker {
connectContext.setStatementContext(statementContext);
LogicalPlan parsed = new NereidsParser().parseSingle(sql);
- setQueryId();
NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
LogicalPlanAdapter adapter = LogicalPlanAdapter.of(parsed);
adapter.setIsExplain(new ExplainOptions(ExplainLevel.ALL_PLAN, false));
@@ -751,7 +749,6 @@ public class PlanChecker {
connectContext.setStatementContext(statementContext);
LogicalPlan parsed = new NereidsParser().parseSingle(sql);
- setQueryId();
NereidsPlanner nereidsPlanner = new NereidsPlanner(statementContext);
SessionVariable sessionVariable = connectContext.getSessionVariable();
try {
@@ -881,12 +878,6 @@ public class PlanChecker {
return this;
}
- private void setQueryId() {
- UUID uuid = UUID.randomUUID();
- TUniqueId id = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
- connectContext.setQueryId(id);
- }
-
public static boolean isPlanEqualWithoutID(Plan plan1, Plan plan2) {
if (plan1.arity() != plan2.arity()
|| !plan1.getOutput().equals(plan2.getOutput()) ||
plan1.getClass() != plan2.getClass()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
index 54bdb3aebeb..f499caa96f6 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
@@ -68,11 +68,9 @@ import org.apache.doris.qe.cache.RowBatchBuilder;
import org.apache.doris.qe.cache.SqlCache;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TUniqueId;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import mockit.Expectations;
@@ -92,7 +90,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
-import java.util.UUID;
public class OlapQueryCacheTest {
private static final Logger LOG =
LogManager.getLogger(OlapQueryCacheTest.class);
@@ -275,24 +272,6 @@ public class OlapQueryCacheTest {
db.registerTable(view3);
View view4 = createEventNestedView();
db.registerTable(view4);
-
- SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
- Backend be = new Backend(0, "127.0.0.1", 0);
- be.setAlive(true);
- ImmutableMap<Long, Backend> backends = ImmutableMap.of(0L, be);
- new Expectations(clusterInfo) {
- {
- clusterInfo.getBackendsByCurrentCluster();
- minTimes = 0;
- result = backends;
- }
-
- {
- clusterInfo.getAllBackendsByAllCluster();
- minTimes = 0;
- result = backends;
- }
- };
}
private OlapTable createOrderTable() {
@@ -523,8 +502,6 @@ public class OlapQueryCacheTest {
LogicalPlan plan = new NereidsParser().parseSingle(sql);
OriginStatement originStatement = new OriginStatement(sql, 0);
StatementContext statementContext = new StatementContext(ctx,
originStatement);
- UUID uuid = UUID.randomUUID();
- ctx.setQueryId(new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
ctx.setStatementContext(statementContext);
NereidsPlanner nereidsPlanner = new
NereidsPlanner(statementContext);
LogicalPlanAdapter adapter = new LogicalPlanAdapter(plan,
statementContext);
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 69100109a1b..86a2d9be555 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -50,9 +50,6 @@ enum TPartitionType {
// used for hive unparititoned table
HIVE_TABLE_SINK_UNPARTITIONED = 8
-
- // used for random local shuffle union, one source instance random send data
to target instances in the same backend
- RANDOM_LOCAL_SHUFFLE = 9
}
enum TDistributionType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]