This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 584a256a252 [opt](coordinator) optimize parallel degree of shuffle when use nereids (#44754) 584a256a252 is described below commit 584a256a25274722d22c36eafe57ba3cb8e0dc88 Author: 924060929 <lanhuaj...@selectdb.com> AuthorDate: Fri Jan 10 11:16:13 2025 +0800 [opt](coordinator) optimize parallel degree of shuffle when use nereids (#44754) optimize parallel degree of shuffle when use nereids , this pr can fix some performance rollback when upgrade doris from 1.2 to 2.x/3.x --- .../aggregate_function_collect.h | 8 +- .../trees/plans/distribute/DistributePlanner.java | 24 +++-- .../worker/job/AbstractUnassignedScanJob.java | 18 ++-- .../worker/job/LocalShuffleAssignedJob.java | 13 +-- .../job/LocalShuffleBucketJoinAssignedJob.java | 6 +- .../distribute/worker/job/UnassignedGatherJob.java | 16 ++-- .../job/UnassignedScanBucketOlapTableJob.java | 99 +++++++++++++++------ .../worker/job/UnassignedShuffleJob.java | 34 +------ .../main/java/org/apache/doris/qe/Coordinator.java | 3 +- .../doris/qe/runtime/ThriftPlansBuilder.java | 21 ++--- .../tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out | Bin 101642 -> 101666 bytes .../orc_tvf/test_hdfs_orc_group0_orc_files.groovy | 6 +- 12 files changed, 127 insertions(+), 121 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h b/be/src/vec/aggregate_functions/aggregate_function_collect.h index 2d18a56313f..1b4eadf259d 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_collect.h +++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h @@ -263,10 +263,10 @@ struct AggregateFunctionCollectListData<StringRef, HasLimit> { } max_size = rhs.max_size; - data->insert_range_from( - *rhs.data, 0, - std::min(assert_cast<size_t, TypeCheckOnRelease::DISABLE>(max_size - size()), - rhs.size())); + data->insert_range_from(*rhs.data, 0, + std::min(assert_cast<size_t, TypeCheckOnRelease::DISABLE>( + static_cast<size_t>(max_size - size())), + rhs.size())); } else { data->insert_range_from(*rhs.data, 0, rhs.size()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java index 75a2326236f..388cef6f062 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java @@ -46,7 +46,6 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableMap; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.SetMultimap; import org.apache.logging.log4j.LogManager; @@ -136,6 +135,16 @@ public class DistributePlanner { link.getKey(), enableShareHashTableForBroadcastJoin ); + for (Entry<DataSink, List<AssignedJob>> kv : + ((PipelineDistributedPlan) link.getValue()).getDestinations().entrySet()) { + if (kv.getValue().isEmpty()) { + int sourceFragmentId = link.getValue().getFragmentJob().getFragment().getFragmentId().asInt(); + String msg = "Invalid plan which exchange not contains receiver, " + + "exchange id: " + kv.getKey().getExchNodeId().asInt() + + ", source fragmentId: " + sourceFragmentId; + throw new IllegalStateException(msg); + } + } } } return plans; @@ -184,7 +193,7 @@ public class DistributePlanner { boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream() .anyMatch(LocalShuffleAssignedJob.class::isInstance); if (useLocalShuffle) { - return getFirstInstancePerShareScan(receiverPlan); + return getFirstInstancePerWorker(receiverPlan.getInstanceJobs()); } else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) { return getFirstInstancePerWorker(receiverPlan.getInstanceJobs()); } else { @@ -221,17 +230,6 @@ public class DistributePlanner { return Arrays.asList(instances); } - private List<AssignedJob> getFirstInstancePerShareScan(PipelineDistributedPlan plan) { - List<AssignedJob> canReceiveDataFromRemote = Lists.newArrayListWithCapacity(plan.getInstanceJobs().size()); - for (AssignedJob instanceJob : plan.getInstanceJobs()) { - LocalShuffleAssignedJob localShuffleJob = (LocalShuffleAssignedJob) instanceJob; - if (!localShuffleJob.receiveDataFromLocal) { - canReceiveDataFromRemote.add(localShuffleJob); - } - } - return canReceiveDataFromRemote; - } - private List<AssignedJob> getFirstInstancePerWorker(List<AssignedJob> instances) { Map<DistributedPlanWorker, AssignedJob> firstInstancePerWorker = Maps.newLinkedHashMap(); for (AssignedJob instance : instances) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java index d2fbb9905e1..37d665adcc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java @@ -91,7 +91,7 @@ public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob { // now we should compute how many instances to process the data, // for example: two instances - int instanceNum = degreeOfParallelism(scanSourceMaxParallel); + int instanceNum = degreeOfParallelism(scanSourceMaxParallel, useLocalShuffleToAddParallel); if (useLocalShuffleToAddParallel) { assignLocalShuffleJobs(scanSource, instanceNum, instances, context, worker); @@ -129,7 +129,7 @@ public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob { protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List<AssignedJob> instances, ConnectContext context, DistributedPlanWorker worker) { // only generate one instance to scan all data, in this step - List<ScanSource> instanceToScanRanges = scanSource.parallelize(scanNodes, 1); + List<ScanSource> assignedJoinBuckets = scanSource.parallelize(scanNodes, instanceNum); // when data not big, but aggregation too slow, we will use 1 instance to scan data, // and use more instances (to ***add parallel***) to process aggregate. @@ -144,7 +144,7 @@ public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob { // |(share scan node, instance1 will scan all data and local shuffle to other local instances | // | to parallel compute this data) | // +------------------------------------------------------------------------------------------------+ - ScanSource shareScanSource = instanceToScanRanges.get(0); + ScanSource shareScanSource = assignedJoinBuckets.get(0); // one scan range generate multiple instances, // different instances reference the same scan source @@ -152,15 +152,15 @@ public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob { ScanSource emptyShareScanSource = shareScanSource.newEmpty(); for (int i = 0; i < instanceNum; i++) { LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( - instances.size(), shareScanId, i > 0, - context.nextInstanceId(), this, worker, - i == 0 ? shareScanSource : emptyShareScanSource + instances.size(), shareScanId, context.nextInstanceId(), this, worker, + // only first instance need to scan data + i == 0 ? scanSource : emptyShareScanSource ); instances.add(instance); } } - protected int degreeOfParallelism(int maxParallel) { + protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) { Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive"); if (!fragment.getDataPartition().isPartitioned()) { return 1; @@ -179,6 +179,10 @@ public abstract class AbstractUnassignedScanJob extends AbstractUnassignedJob { } } + if (useLocalShuffleToAddParallel) { + return Math.max(fragment.getParallelExecNum(), 1); + } + // the scan instance num should not larger than the tablets num return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java index 2ba269a5a7b..9bb7c61ffaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java @@ -31,28 +31,17 @@ import java.util.Map; */ public class LocalShuffleAssignedJob extends StaticAssignedJob { public final int shareScanId; - public final boolean receiveDataFromLocal; public LocalShuffleAssignedJob( - int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal, TUniqueId instanceId, + int indexInUnassignedJob, int shareScanId, TUniqueId instanceId, UnassignedJob unassignedJob, DistributedPlanWorker worker, ScanSource scanSource) { super(indexInUnassignedJob, instanceId, unassignedJob, worker, scanSource); this.shareScanId = shareScanId; - this.receiveDataFromLocal = receiveDataFromLocal; } @Override protected Map<String, String> extraInfo() { return ImmutableMap.of("shareScanIndex", String.valueOf(shareScanId)); } - - @Override - protected String formatScanSourceString() { - if (receiveDataFromLocal) { - return "read data from first instance of " + getAssignedWorker(); - } else { - return super.formatScanSourceString(); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java index 443acb50d78..9090bf98b36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java @@ -30,11 +30,11 @@ public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob { private volatile Set<Integer> assignedJoinBucketIndexes; public LocalShuffleBucketJoinAssignedJob( - int indexInUnassignedJob, int shareScanId, boolean receiveDataFromLocal, + int indexInUnassignedJob, int shareScanId, TUniqueId instanceId, UnassignedJob unassignedJob, DistributedPlanWorker worker, ScanSource scanSource, Set<Integer> assignedJoinBucketIndexes) { - super(indexInUnassignedJob, shareScanId, receiveDataFromLocal, instanceId, unassignedJob, worker, scanSource); + super(indexInUnassignedJob, shareScanId, instanceId, unassignedJob, worker, scanSource); this.assignedJoinBucketIndexes = Utils.fastToImmutableSet(assignedJoinBucketIndexes); } @@ -42,7 +42,7 @@ public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob { return assignedJoinBucketIndexes; } - public void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) { + public synchronized void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) { this.assignedJoinBucketIndexes = ImmutableSet.<Integer>builder() .addAll(assignedJoinBucketIndexes) .addAll(joinBucketIndexes) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java index 830342514bd..6fe9fcfb439 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java @@ -32,7 +32,7 @@ import java.util.List; /** UnassignedGatherJob */ public class UnassignedGatherJob extends AbstractUnassignedJob { - private boolean useLocalShuffleToAddParallel; + private boolean useSerialSource; public UnassignedGatherJob( StatementContext statementContext, PlanFragment fragment, @@ -44,24 +44,24 @@ public class UnassignedGatherJob extends AbstractUnassignedJob { public List<AssignedJob> computeAssignedJobs( DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) { ConnectContext connectContext = statementContext.getConnectContext(); - useLocalShuffleToAddParallel = fragment.useSerialSource(connectContext); + useSerialSource = fragment.useSerialSource(connectContext); int expectInstanceNum = degreeOfParallelism(); DistributedPlanWorker selectedWorker = distributeContext.selectedWorkers.tryToSelectRandomUsedWorker(); - if (useLocalShuffleToAddParallel) { + if (useSerialSource) { + // Using serial source means a serial source operator will be used in this fragment (e.g. data will be + // shuffled to only 1 exchange operator) and then split by followed local exchanger ImmutableList.Builder<AssignedJob> instances = ImmutableList.builder(); - DefaultScanSource shareScan = new DefaultScanSource(ImmutableMap.of()); LocalShuffleAssignedJob receiveDataFromRemote = new LocalShuffleAssignedJob( - 0, 0, false, + 0, 0, connectContext.nextInstanceId(), this, selectedWorker, shareScan); instances.add(receiveDataFromRemote); for (int i = 1; i < expectInstanceNum; ++i) { LocalShuffleAssignedJob receiveDataFromLocal = new LocalShuffleAssignedJob( - i, 0, true, - connectContext.nextInstanceId(), this, selectedWorker, shareScan); + i, 0, connectContext.nextInstanceId(), this, selectedWorker, shareScan); instances.add(receiveDataFromLocal); } return instances.build(); @@ -76,6 +76,6 @@ public class UnassignedGatherJob extends AbstractUnassignedJob { } protected int degreeOfParallelism() { - return useLocalShuffleToAddParallel ? fragment.getParallelExecNum() : 1; + return useSerialSource ? Math.max(1, fragment.getParallelExecNum()) : 1; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index f90fe7ea6e2..88612640d50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -38,12 +38,16 @@ import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -184,13 +188,23 @@ public class UnassignedScanBucketOlapTableJob extends AbstractUnassignedScanJob Set<Integer> assignedJoinBuckets = ((BucketScanSource) assignJoinBuckets.get(i)).bucketIndexToScanNodeToTablets.keySet(); LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( - instances.size(), shareScanId, i > 0, - context.nextInstanceId(), this, worker, + instances.size(), shareScanId, context.nextInstanceId(), + this, worker, i == 0 ? shareScanSource : emptyShareScanSource, Utils.fastToImmutableSet(assignedJoinBuckets) ); instances.add(instance); } + + for (int i = assignJoinBuckets.size(); i < instanceNum; ++i) { + LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( + instances.size(), shareScanId, context.nextInstanceId(), + this, worker, emptyShareScanSource, + // these instance not need to join, because no any bucket assign to it + ImmutableSet.of() + ); + instances.add(instance); + } } private boolean shouldFillUpInstances(List<HashJoinNode> hashJoinNodes) { @@ -224,10 +238,21 @@ public class UnassignedScanBucketOlapTableJob extends AbstractUnassignedScanJob olapScanNode, randomPartition, missingBucketIndexes); boolean useLocalShuffle = instances.stream().anyMatch(LocalShuffleAssignedJob.class::isInstance); + Multimap<DistributedPlanWorker, AssignedJob> workerToAssignedJobs = ArrayListMultimap.create(); + int maxNumInstancePerWorker = 1; + if (useLocalShuffle) { + for (AssignedJob instance : instances) { + workerToAssignedJobs.put(instance.getAssignedWorker(), instance); + } + for (Collection<AssignedJob> instanceList : workerToAssignedJobs.asMap().values()) { + maxNumInstancePerWorker = Math.max(maxNumInstancePerWorker, instanceList.size()); + } + } + List<AssignedJob> newInstances = new ArrayList<>(instances); + for (Entry<DistributedPlanWorker, Collection<Integer>> workerToBuckets : missingBuckets.asMap().entrySet()) { Map<Integer, Map<ScanNode, ScanRanges>> scanEmptyBuckets = Maps.newLinkedHashMap(); - Set<Integer> assignedJoinBuckets = Utils.fastToImmutableSet(workerToBuckets.getValue()); for (Integer bucketIndex : workerToBuckets.getValue()) { Map<ScanNode, ScanRanges> scanTableWithEmptyData = Maps.newLinkedHashMap(); for (ScanNode scanNode : scanNodes) { @@ -236,42 +261,62 @@ public class UnassignedScanBucketOlapTableJob extends AbstractUnassignedScanJob scanEmptyBuckets.put(bucketIndex, scanTableWithEmptyData); } - AssignedJob fillUpInstance = null; DistributedPlanWorker worker = workerToBuckets.getKey(); BucketScanSource scanSource = new BucketScanSource(scanEmptyBuckets); if (useLocalShuffle) { - // when use local shuffle, we should ensure every backend only process one instance! - // so here we should try to merge the missing buckets into exist instances - boolean mergedBucketsInSameWorkerInstance = false; - for (AssignedJob newInstance : newInstances) { - if (newInstance.getAssignedWorker().equals(worker)) { - BucketScanSource bucketScanSource = (BucketScanSource) newInstance.getScanSource(); - bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets); - mergedBucketsInSameWorkerInstance = true; - - LocalShuffleBucketJoinAssignedJob instance = (LocalShuffleBucketJoinAssignedJob) newInstance; - instance.addAssignedJoinBucketIndexes(assignedJoinBuckets); - } + List<AssignedJob> sameWorkerInstances = (List) workerToAssignedJobs.get(worker); + if (sameWorkerInstances.isEmpty()) { + sameWorkerInstances = fillUpEmptyInstances( + maxNumInstancePerWorker, scanSource, worker, newInstances, context); } - if (!mergedBucketsInSameWorkerInstance) { - fillUpInstance = new LocalShuffleBucketJoinAssignedJob( - newInstances.size(), shareScanIdGenerator.getAndIncrement(), - false, context.nextInstanceId(), this, worker, scanSource, - assignedJoinBuckets - ); + + LocalShuffleBucketJoinAssignedJob firstInstance + = (LocalShuffleBucketJoinAssignedJob ) sameWorkerInstances.get(0); + BucketScanSource firstInstanceScanSource + = (BucketScanSource) firstInstance.getScanSource(); + firstInstanceScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets); + + Iterator<Integer> assignedJoinBuckets = new LinkedHashSet<>(workerToBuckets.getValue()).iterator(); + // make sure the first instance must be assigned some buckets: + // if the first instance assigned some buckets, we start assign empty + // bucket for second instance for balance, or else assign for first instance + int index = firstInstance.getAssignedJoinBucketIndexes().isEmpty() ? -1 : 0; + while (assignedJoinBuckets.hasNext()) { + Integer bucketIndex = assignedJoinBuckets.next(); + assignedJoinBuckets.remove(); + + index = (index + 1) % sameWorkerInstances.size(); + LocalShuffleBucketJoinAssignedJob instance + = (LocalShuffleBucketJoinAssignedJob) sameWorkerInstances.get(index); + instance.addAssignedJoinBucketIndexes(ImmutableSet.of(bucketIndex)); } } else { - fillUpInstance = assignWorkerAndDataSources( + newInstances.add(assignWorkerAndDataSources( newInstances.size(), context.nextInstanceId(), worker, scanSource - ); - } - if (fillUpInstance != null) { - newInstances.add(fillUpInstance); + )); } } return newInstances; } + private List<AssignedJob> fillUpEmptyInstances( + int maxNumInstancePerWorker, BucketScanSource scanSource, DistributedPlanWorker worker, + List<AssignedJob> existsInstances, ConnectContext context) { + int shareScanId = shareScanIdGenerator.getAndIncrement(); + List<AssignedJob> newInstances = new ArrayList<>(maxNumInstancePerWorker); + for (int i = 0; i < maxNumInstancePerWorker; i++) { + LocalShuffleBucketJoinAssignedJob newInstance = new LocalShuffleBucketJoinAssignedJob( + existsInstances.size(), shareScanId, + context.nextInstanceId(), this, worker, + scanSource.newEmpty(), + ImmutableSet.of() + ); + existsInstances.add(newInstance); + newInstances.add(newInstance); + } + return newInstances; + } + private int fullBucketNum() { for (ScanNode scanNode : scanNodes) { if (scanNode instanceof OlapScanNode) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java index 3d937bfb35d..d8eac65cded 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java @@ -17,13 +17,11 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.common.Pair; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; -import org.apache.doris.planner.PlanNode; import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ArrayListMultimap; @@ -43,7 +41,7 @@ import java.util.function.Function; /** UnassignedShuffleJob */ public class UnassignedShuffleJob extends AbstractUnassignedJob { - private boolean useLocalShuffleToAddParallel; + private boolean useSerialSource; public UnassignedShuffleJob( StatementContext statementContext, PlanFragment fragment, @@ -54,7 +52,7 @@ public class UnassignedShuffleJob extends AbstractUnassignedJob { @Override public List<AssignedJob> computeAssignedJobs( DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) { - useLocalShuffleToAddParallel = fragment.useSerialSource(statementContext.getConnectContext()); + useSerialSource = fragment.useSerialSource(statementContext.getConnectContext()); int expectInstanceNum = degreeOfParallelism(); List<AssignedJob> biggestParallelChildFragment = getInstancesOfBiggestParallelChildFragment(inputJobs); @@ -83,18 +81,10 @@ public class UnassignedShuffleJob extends AbstractUnassignedJob { protected int degreeOfParallelism() { // TODO: check we use nested loop join do right outer / semi / anti join, // we should add an exchange node with gather distribute under the nested loop join - int expectInstanceNum = -1; if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { expectInstanceNum = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel(); } - - // TODO: check nested loop join do right outer / semi / anti join - PlanNode leftMostNode = findLeftmostNode(fragment.getPlanRoot()).second; - // when we use nested loop join do right outer / semi / anti join, the instance must be 1. - if (leftMostNode.getNumInstances() == 1) { - expectInstanceNum = 1; - } return expectInstanceNum; } @@ -116,7 +106,7 @@ public class UnassignedShuffleJob extends AbstractUnassignedJob { private List<AssignedJob> buildInstances( int instanceNum, Function<Integer, DistributedPlanWorker> workerSelector) { ConnectContext connectContext = statementContext.getConnectContext(); - if (useLocalShuffleToAddParallel) { + if (useSerialSource) { return buildInstancesWithLocalShuffle(instanceNum, workerSelector, connectContext); } else { return buildInstancesWithoutLocalShuffle(instanceNum, workerSelector, connectContext); @@ -150,17 +140,13 @@ public class UnassignedShuffleJob extends AbstractUnassignedJob { for (Entry<DistributedPlanWorker, Collection<Integer>> kv : workerToInstanceIds.asMap().entrySet()) { DistributedPlanWorker worker = kv.getKey(); Collection<Integer> indexesInFragment = kv.getValue(); - DefaultScanSource shareScanSource = new DefaultScanSource(ImmutableMap.of()); - - boolean receiveDataFromLocal = false; for (Integer indexInFragment : indexesInFragment) { LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob( - indexInFragment, shareScanId, receiveDataFromLocal, connectContext.nextInstanceId(), + indexInFragment, shareScanId, connectContext.nextInstanceId(), this, worker, shareScanSource ); instances.add(instance); - receiveDataFromLocal = true; } shareScanId++; } @@ -176,16 +162,4 @@ public class UnassignedShuffleJob extends AbstractUnassignedJob { Collections.shuffle(candidateWorkers); return candidateWorkers; } - - // Returns the id of the leftmost node of any of the gives types in 'plan_root', - // or INVALID_PLAN_NODE_ID if no such node present. - private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) { - PlanNode childPlan = plan; - PlanNode fatherPlan = null; - while (childPlan.getChildren().size() != 0 && !(childPlan instanceof ExchangeNode)) { - fatherPlan = childPlan; - childPlan = childPlan.getChild(0); - } - return Pair.of(fatherPlan, childPlan); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 3bf5c44d564..09a9a857f33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1809,7 +1809,8 @@ public class Coordinator implements CoordInterface { exchangeInstances = ConnectContext.get().getSessionVariable().getExchangeInstanceParallel(); } // when we use nested loop join do right outer / semi / anti join, the instance must be 1. - if (leftMostNode.getNumInstances() == 1) { + boolean isNereids = context != null && context.getState().isNereids(); + if (!isNereids && leftMostNode.getNumInstances() == 1) { exchangeInstances = 1; } // Using serial source means a serial source operator will be used in this fragment (e.g. data will be diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 54bc0b24d3e..0caca2f47c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -419,21 +419,18 @@ public class ThriftPlansBuilder { if (runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) { runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams); } + boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob; + if (isLocalShuffle) { + // a fragment in a backend only enable local shuffle once for the first local shuffle instance, + // because we just skip set scan params for LocalShuffleAssignedJob.receiveDataFromLocal == true + ignoreDataDistribution(currentFragmentParam); + } return instanceParam; } private static void setScanSourceParam( TPipelineFragmentParams currentFragmentParam, AssignedJob instance, TPipelineInstanceParams instanceParams) { - - boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob; - if (isLocalShuffle && ((LocalShuffleAssignedJob) instance).receiveDataFromLocal) { - // save thrift rpc message size, don't need perNodeScanRanges, - // but the perNodeScanRanges is required rpc field - instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap()); - return; - } - ScanSource scanSource = instance.getScanSource(); PerNodeScanParams scanParams; if (scanSource instanceof BucketScanSource) { @@ -443,12 +440,6 @@ public class ThriftPlansBuilder { } // perNodeScanRanges is required instanceParams.setPerNodeScanRanges(scanParams.perNodeScanRanges); - - if (isLocalShuffle) { - // a fragment in a backend only enable local shuffle once for the first local shuffle instance, - // because we just skip set scan params for LocalShuffleAssignedJob.receiveDataFromLocal == true - ignoreDataDistribution(currentFragmentParam); - } } // local shuffle has two functions: diff --git a/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out b/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out index 01158a2fb60..cb2f86d7ba4 100644 Binary files a/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out and b/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out differ diff --git a/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy b/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy index 924ceca4204..bf2929ee5b4 100644 --- a/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy +++ b/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy @@ -174,7 +174,11 @@ suite("test_hdfs_orc_group0_orc_files","external,hive,tvf,external_docker") { order_qt_test_25 """ select * from HDFS( "uri" = "${uri}", "hadoop.username" = "${hdfsUserName}", - "format" = "orc") order by _col0 DESC limit 100; """ + "format" = "orc") + where _col0 is not null and _col1 is not null and _col2 is not null and _col3 is not null + and _col4 is not null and _col5 is not null and _col6 is not null + order by _col0 DESC, _col1 DESC, _col2 DESC, _col3 DESC, _col4 DESC, _col5 DESC, _col6 DESC + limit 100; """ uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_orc/group0/TestVectorOrcFile.testLzo.orc" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org