This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 584a256a252 [opt](coordinator) optimize parallel degree of shuffle 
when use nereids (#44754)
584a256a252 is described below

commit 584a256a25274722d22c36eafe57ba3cb8e0dc88
Author: 924060929 <lanhuaj...@selectdb.com>
AuthorDate: Fri Jan 10 11:16:13 2025 +0800

    [opt](coordinator) optimize parallel degree of shuffle when use nereids 
(#44754)
    
    optimize parallel degree of shuffle when use nereids , this pr can fix
    some performance rollback when upgrade doris from 1.2 to 2.x/3.x
---
 .../aggregate_function_collect.h                   |   8 +-
 .../trees/plans/distribute/DistributePlanner.java  |  24 +++--
 .../worker/job/AbstractUnassignedScanJob.java      |  18 ++--
 .../worker/job/LocalShuffleAssignedJob.java        |  13 +--
 .../job/LocalShuffleBucketJoinAssignedJob.java     |   6 +-
 .../distribute/worker/job/UnassignedGatherJob.java |  16 ++--
 .../job/UnassignedScanBucketOlapTableJob.java      |  99 +++++++++++++++------
 .../worker/job/UnassignedShuffleJob.java           |  34 +------
 .../main/java/org/apache/doris/qe/Coordinator.java |   3 +-
 .../doris/qe/runtime/ThriftPlansBuilder.java       |  21 ++---
 .../tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out | Bin 101642 -> 101666 bytes
 .../orc_tvf/test_hdfs_orc_group0_orc_files.groovy  |   6 +-
 12 files changed, 127 insertions(+), 121 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h 
b/be/src/vec/aggregate_functions/aggregate_function_collect.h
index 2d18a56313f..1b4eadf259d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_collect.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h
@@ -263,10 +263,10 @@ struct AggregateFunctionCollectListData<StringRef, 
HasLimit> {
             }
             max_size = rhs.max_size;
 
-            data->insert_range_from(
-                    *rhs.data, 0,
-                    std::min(assert_cast<size_t, 
TypeCheckOnRelease::DISABLE>(max_size - size()),
-                             rhs.size()));
+            data->insert_range_from(*rhs.data, 0,
+                                    std::min(assert_cast<size_t, 
TypeCheckOnRelease::DISABLE>(
+                                                     
static_cast<size_t>(max_size - size())),
+                                             rhs.size()));
         } else {
             data->insert_range_from(*rhs.data, 0, rhs.size());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index 75a2326236f..388cef6f062 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -46,7 +46,6 @@ import org.apache.doris.thrift.TUniqueId;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.SetMultimap;
 import org.apache.logging.log4j.LogManager;
@@ -136,6 +135,16 @@ public class DistributePlanner {
                         link.getKey(),
                         enableShareHashTableForBroadcastJoin
                 );
+                for (Entry<DataSink, List<AssignedJob>> kv :
+                        ((PipelineDistributedPlan) 
link.getValue()).getDestinations().entrySet()) {
+                    if (kv.getValue().isEmpty()) {
+                        int sourceFragmentId = 
link.getValue().getFragmentJob().getFragment().getFragmentId().asInt();
+                        String msg = "Invalid plan which exchange not contains 
receiver, "
+                                + "exchange id: " + 
kv.getKey().getExchNodeId().asInt()
+                                + ", source fragmentId: " + sourceFragmentId;
+                        throw new IllegalStateException(msg);
+                    }
+                }
             }
         }
         return plans;
@@ -184,7 +193,7 @@ public class DistributePlanner {
         boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
                 .anyMatch(LocalShuffleAssignedJob.class::isInstance);
         if (useLocalShuffle) {
-            return getFirstInstancePerShareScan(receiverPlan);
+            return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
         } else if (enableShareHashTableForBroadcastJoin && 
linkNode.isRightChildOfBroadcastHashJoin()) {
             return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
         } else {
@@ -221,17 +230,6 @@ public class DistributePlanner {
         return Arrays.asList(instances);
     }
 
-    private List<AssignedJob> 
getFirstInstancePerShareScan(PipelineDistributedPlan plan) {
-        List<AssignedJob> canReceiveDataFromRemote = 
Lists.newArrayListWithCapacity(plan.getInstanceJobs().size());
-        for (AssignedJob instanceJob : plan.getInstanceJobs()) {
-            LocalShuffleAssignedJob localShuffleJob = 
(LocalShuffleAssignedJob) instanceJob;
-            if (!localShuffleJob.receiveDataFromLocal) {
-                canReceiveDataFromRemote.add(localShuffleJob);
-            }
-        }
-        return canReceiveDataFromRemote;
-    }
-
     private List<AssignedJob> getFirstInstancePerWorker(List<AssignedJob> 
instances) {
         Map<DistributedPlanWorker, AssignedJob> firstInstancePerWorker = 
Maps.newLinkedHashMap();
         for (AssignedJob instance : instances) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index d2fbb9905e1..37d665adcc4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -91,7 +91,7 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
 
             // now we should compute how many instances to process the data,
             // for example: two instances
-            int instanceNum = degreeOfParallelism(scanSourceMaxParallel);
+            int instanceNum = degreeOfParallelism(scanSourceMaxParallel, 
useLocalShuffleToAddParallel);
 
             if (useLocalShuffleToAddParallel) {
                 assignLocalShuffleJobs(scanSource, instanceNum, instances, 
context, worker);
@@ -129,7 +129,7 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
     protected void assignLocalShuffleJobs(ScanSource scanSource, int 
instanceNum, List<AssignedJob> instances,
             ConnectContext context, DistributedPlanWorker worker) {
         // only generate one instance to scan all data, in this step
-        List<ScanSource> instanceToScanRanges = 
scanSource.parallelize(scanNodes, 1);
+        List<ScanSource> assignedJoinBuckets = 
scanSource.parallelize(scanNodes, instanceNum);
 
         // when data not big, but aggregation too slow, we will use 1 instance 
to scan data,
         // and use more instances (to ***add parallel***) to process aggregate.
@@ -144,7 +144,7 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         // |(share scan node, instance1 will scan all data and local shuffle 
to other local instances       |
         // |                           to parallel compute this data)          
                             |
         // 
+------------------------------------------------------------------------------------------------+
-        ScanSource shareScanSource = instanceToScanRanges.get(0);
+        ScanSource shareScanSource = assignedJoinBuckets.get(0);
 
         // one scan range generate multiple instances,
         // different instances reference the same scan source
@@ -152,15 +152,15 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
         ScanSource emptyShareScanSource = shareScanSource.newEmpty();
         for (int i = 0; i < instanceNum; i++) {
             LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
-                    instances.size(), shareScanId, i > 0,
-                    context.nextInstanceId(), this, worker,
-                    i == 0 ? shareScanSource : emptyShareScanSource
+                    instances.size(), shareScanId, context.nextInstanceId(), 
this, worker,
+                    // only first instance need to scan data
+                    i == 0 ? scanSource : emptyShareScanSource
             );
             instances.add(instance);
         }
     }
 
-    protected int degreeOfParallelism(int maxParallel) {
+    protected int degreeOfParallelism(int maxParallel, boolean 
useLocalShuffleToAddParallel) {
         Preconditions.checkArgument(maxParallel > 0, "maxParallel must be 
positive");
         if (!fragment.getDataPartition().isPartitioned()) {
             return 1;
@@ -179,6 +179,10 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
             }
         }
 
+        if (useLocalShuffleToAddParallel) {
+            return Math.max(fragment.getParallelExecNum(), 1);
+        }
+
         // the scan instance num should not larger than the tablets num
         return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 
1));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
index 2ba269a5a7b..9bb7c61ffaf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
@@ -31,28 +31,17 @@ import java.util.Map;
  */
 public class LocalShuffleAssignedJob extends StaticAssignedJob {
     public final int shareScanId;
-    public final boolean receiveDataFromLocal;
 
     public LocalShuffleAssignedJob(
-            int indexInUnassignedJob, int shareScanId, boolean 
receiveDataFromLocal, TUniqueId instanceId,
+            int indexInUnassignedJob, int shareScanId, TUniqueId instanceId,
             UnassignedJob unassignedJob,
             DistributedPlanWorker worker, ScanSource scanSource) {
         super(indexInUnassignedJob, instanceId, unassignedJob, worker, 
scanSource);
         this.shareScanId = shareScanId;
-        this.receiveDataFromLocal = receiveDataFromLocal;
     }
 
     @Override
     protected Map<String, String> extraInfo() {
         return ImmutableMap.of("shareScanIndex", String.valueOf(shareScanId));
     }
-
-    @Override
-    protected String formatScanSourceString() {
-        if (receiveDataFromLocal) {
-            return "read data from first instance of " + getAssignedWorker();
-        } else {
-            return super.formatScanSourceString();
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
index 443acb50d78..9090bf98b36 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
@@ -30,11 +30,11 @@ public class LocalShuffleBucketJoinAssignedJob extends 
LocalShuffleAssignedJob {
     private volatile Set<Integer> assignedJoinBucketIndexes;
 
     public LocalShuffleBucketJoinAssignedJob(
-            int indexInUnassignedJob, int shareScanId, boolean 
receiveDataFromLocal,
+            int indexInUnassignedJob, int shareScanId,
             TUniqueId instanceId, UnassignedJob unassignedJob,
             DistributedPlanWorker worker, ScanSource scanSource,
             Set<Integer> assignedJoinBucketIndexes) {
-        super(indexInUnassignedJob, shareScanId, receiveDataFromLocal, 
instanceId, unassignedJob, worker, scanSource);
+        super(indexInUnassignedJob, shareScanId, instanceId, unassignedJob, 
worker, scanSource);
         this.assignedJoinBucketIndexes = 
Utils.fastToImmutableSet(assignedJoinBucketIndexes);
     }
 
@@ -42,7 +42,7 @@ public class LocalShuffleBucketJoinAssignedJob extends 
LocalShuffleAssignedJob {
         return assignedJoinBucketIndexes;
     }
 
-    public void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) {
+    public synchronized void addAssignedJoinBucketIndexes(Set<Integer> 
joinBucketIndexes) {
         this.assignedJoinBucketIndexes = ImmutableSet.<Integer>builder()
                 .addAll(assignedJoinBucketIndexes)
                 .addAll(joinBucketIndexes)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
index 830342514bd..6fe9fcfb439 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
@@ -32,7 +32,7 @@ import java.util.List;
 
 /** UnassignedGatherJob */
 public class UnassignedGatherJob extends AbstractUnassignedJob {
-    private boolean useLocalShuffleToAddParallel;
+    private boolean useSerialSource;
 
     public UnassignedGatherJob(
             StatementContext statementContext, PlanFragment fragment,
@@ -44,24 +44,24 @@ public class UnassignedGatherJob extends 
AbstractUnassignedJob {
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
         ConnectContext connectContext = statementContext.getConnectContext();
-        useLocalShuffleToAddParallel = 
fragment.useSerialSource(connectContext);
+        useSerialSource = fragment.useSerialSource(connectContext);
 
         int expectInstanceNum = degreeOfParallelism();
 
         DistributedPlanWorker selectedWorker = 
distributeContext.selectedWorkers.tryToSelectRandomUsedWorker();
-        if (useLocalShuffleToAddParallel) {
+        if (useSerialSource) {
+            // Using serial source means a serial source operator will be used 
in this fragment (e.g. data will be
+            // shuffled to only 1 exchange operator) and then split by 
followed local exchanger
             ImmutableList.Builder<AssignedJob> instances = 
ImmutableList.builder();
-
             DefaultScanSource shareScan = new 
DefaultScanSource(ImmutableMap.of());
             LocalShuffleAssignedJob receiveDataFromRemote = new 
LocalShuffleAssignedJob(
-                    0, 0, false,
+                    0, 0,
                     connectContext.nextInstanceId(), this, selectedWorker, 
shareScan);
 
             instances.add(receiveDataFromRemote);
             for (int i = 1; i < expectInstanceNum; ++i) {
                 LocalShuffleAssignedJob receiveDataFromLocal = new 
LocalShuffleAssignedJob(
-                        i, 0, true,
-                        connectContext.nextInstanceId(), this, selectedWorker, 
shareScan);
+                        i, 0, connectContext.nextInstanceId(), this, 
selectedWorker, shareScan);
                 instances.add(receiveDataFromLocal);
             }
             return instances.build();
@@ -76,6 +76,6 @@ public class UnassignedGatherJob extends 
AbstractUnassignedJob {
     }
 
     protected int degreeOfParallelism() {
-        return useLocalShuffleToAddParallel ? fragment.getParallelExecNum() : 
1;
+        return useSerialSource ? Math.max(1, fragment.getParallelExecNum()) : 
1;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index f90fe7ea6e2..88612640d50 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -38,12 +38,16 @@ import org.apache.doris.qe.ConnectContext;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -184,13 +188,23 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
             Set<Integer> assignedJoinBuckets
                     = ((BucketScanSource) 
assignJoinBuckets.get(i)).bucketIndexToScanNodeToTablets.keySet();
             LocalShuffleBucketJoinAssignedJob instance = new 
LocalShuffleBucketJoinAssignedJob(
-                    instances.size(), shareScanId, i > 0,
-                    context.nextInstanceId(), this, worker,
+                    instances.size(), shareScanId, context.nextInstanceId(),
+                    this, worker,
                     i == 0 ? shareScanSource : emptyShareScanSource,
                     Utils.fastToImmutableSet(assignedJoinBuckets)
             );
             instances.add(instance);
         }
+
+        for (int i = assignJoinBuckets.size(); i < instanceNum; ++i) {
+            LocalShuffleBucketJoinAssignedJob instance = new 
LocalShuffleBucketJoinAssignedJob(
+                    instances.size(), shareScanId, context.nextInstanceId(),
+                    this, worker, emptyShareScanSource,
+                    // these instance not need to join, because no any bucket 
assign to it
+                    ImmutableSet.of()
+            );
+            instances.add(instance);
+        }
     }
 
     private boolean shouldFillUpInstances(List<HashJoinNode> hashJoinNodes) {
@@ -224,10 +238,21 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
                 olapScanNode, randomPartition, missingBucketIndexes);
 
         boolean useLocalShuffle = 
instances.stream().anyMatch(LocalShuffleAssignedJob.class::isInstance);
+        Multimap<DistributedPlanWorker, AssignedJob> workerToAssignedJobs = 
ArrayListMultimap.create();
+        int maxNumInstancePerWorker = 1;
+        if (useLocalShuffle) {
+            for (AssignedJob instance : instances) {
+                workerToAssignedJobs.put(instance.getAssignedWorker(), 
instance);
+            }
+            for (Collection<AssignedJob> instanceList : 
workerToAssignedJobs.asMap().values()) {
+                maxNumInstancePerWorker = Math.max(maxNumInstancePerWorker, 
instanceList.size());
+            }
+        }
+
         List<AssignedJob> newInstances = new ArrayList<>(instances);
+
         for (Entry<DistributedPlanWorker, Collection<Integer>> workerToBuckets 
: missingBuckets.asMap().entrySet()) {
             Map<Integer, Map<ScanNode, ScanRanges>> scanEmptyBuckets = 
Maps.newLinkedHashMap();
-            Set<Integer> assignedJoinBuckets = 
Utils.fastToImmutableSet(workerToBuckets.getValue());
             for (Integer bucketIndex : workerToBuckets.getValue()) {
                 Map<ScanNode, ScanRanges> scanTableWithEmptyData = 
Maps.newLinkedHashMap();
                 for (ScanNode scanNode : scanNodes) {
@@ -236,42 +261,62 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
                 scanEmptyBuckets.put(bucketIndex, scanTableWithEmptyData);
             }
 
-            AssignedJob fillUpInstance = null;
             DistributedPlanWorker worker = workerToBuckets.getKey();
             BucketScanSource scanSource = new 
BucketScanSource(scanEmptyBuckets);
             if (useLocalShuffle) {
-                // when use local shuffle, we should ensure every backend only 
process one instance!
-                // so here we should try to merge the missing buckets into 
exist instances
-                boolean mergedBucketsInSameWorkerInstance = false;
-                for (AssignedJob newInstance : newInstances) {
-                    if (newInstance.getAssignedWorker().equals(worker)) {
-                        BucketScanSource bucketScanSource = (BucketScanSource) 
newInstance.getScanSource();
-                        
bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets);
-                        mergedBucketsInSameWorkerInstance = true;
-
-                        LocalShuffleBucketJoinAssignedJob instance = 
(LocalShuffleBucketJoinAssignedJob) newInstance;
-                        
instance.addAssignedJoinBucketIndexes(assignedJoinBuckets);
-                    }
+                List<AssignedJob> sameWorkerInstances = (List) 
workerToAssignedJobs.get(worker);
+                if (sameWorkerInstances.isEmpty()) {
+                    sameWorkerInstances = fillUpEmptyInstances(
+                            maxNumInstancePerWorker, scanSource, worker, 
newInstances, context);
                 }
-                if (!mergedBucketsInSameWorkerInstance) {
-                    fillUpInstance = new LocalShuffleBucketJoinAssignedJob(
-                            newInstances.size(), 
shareScanIdGenerator.getAndIncrement(),
-                            false, context.nextInstanceId(), this, worker, 
scanSource,
-                            assignedJoinBuckets
-                    );
+
+                LocalShuffleBucketJoinAssignedJob firstInstance
+                        = (LocalShuffleBucketJoinAssignedJob ) 
sameWorkerInstances.get(0);
+                BucketScanSource firstInstanceScanSource
+                        = (BucketScanSource) firstInstance.getScanSource();
+                
firstInstanceScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets);
+
+                Iterator<Integer> assignedJoinBuckets = new 
LinkedHashSet<>(workerToBuckets.getValue()).iterator();
+                // make sure the first instance must be assigned some buckets:
+                // if the first instance assigned some buckets, we start 
assign empty
+                // bucket for second instance for balance, or else assign for 
first instance
+                int index = 
firstInstance.getAssignedJoinBucketIndexes().isEmpty() ? -1 : 0;
+                while (assignedJoinBuckets.hasNext()) {
+                    Integer bucketIndex = assignedJoinBuckets.next();
+                    assignedJoinBuckets.remove();
+
+                    index = (index + 1) % sameWorkerInstances.size();
+                    LocalShuffleBucketJoinAssignedJob instance
+                            = (LocalShuffleBucketJoinAssignedJob) 
sameWorkerInstances.get(index);
+                    
instance.addAssignedJoinBucketIndexes(ImmutableSet.of(bucketIndex));
                 }
             } else {
-                fillUpInstance = assignWorkerAndDataSources(
+                newInstances.add(assignWorkerAndDataSources(
                         newInstances.size(), context.nextInstanceId(), worker, 
scanSource
-                );
-            }
-            if (fillUpInstance != null) {
-                newInstances.add(fillUpInstance);
+                ));
             }
         }
         return newInstances;
     }
 
+    private List<AssignedJob> fillUpEmptyInstances(
+            int maxNumInstancePerWorker, BucketScanSource scanSource, 
DistributedPlanWorker worker,
+            List<AssignedJob> existsInstances, ConnectContext context) {
+        int shareScanId = shareScanIdGenerator.getAndIncrement();
+        List<AssignedJob> newInstances = new 
ArrayList<>(maxNumInstancePerWorker);
+        for (int i = 0; i < maxNumInstancePerWorker; i++) {
+            LocalShuffleBucketJoinAssignedJob newInstance = new 
LocalShuffleBucketJoinAssignedJob(
+                    existsInstances.size(), shareScanId,
+                    context.nextInstanceId(), this, worker,
+                    scanSource.newEmpty(),
+                    ImmutableSet.of()
+            );
+            existsInstances.add(newInstance);
+            newInstances.add(newInstance);
+        }
+        return newInstances;
+    }
+
     private int fullBucketNum() {
         for (ScanNode scanNode : scanNodes) {
             if (scanNode instanceof OlapScanNode) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
index 3d937bfb35d..d8eac65cded 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
@@ -17,13 +17,11 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
-import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
 import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanNode;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -43,7 +41,7 @@ import java.util.function.Function;
 
 /** UnassignedShuffleJob */
 public class UnassignedShuffleJob extends AbstractUnassignedJob {
-    private boolean useLocalShuffleToAddParallel;
+    private boolean useSerialSource;
 
     public UnassignedShuffleJob(
             StatementContext statementContext, PlanFragment fragment,
@@ -54,7 +52,7 @@ public class UnassignedShuffleJob extends 
AbstractUnassignedJob {
     @Override
     public List<AssignedJob> computeAssignedJobs(
             DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
-        useLocalShuffleToAddParallel = 
fragment.useSerialSource(statementContext.getConnectContext());
+        useSerialSource = 
fragment.useSerialSource(statementContext.getConnectContext());
 
         int expectInstanceNum = degreeOfParallelism();
         List<AssignedJob> biggestParallelChildFragment = 
getInstancesOfBiggestParallelChildFragment(inputJobs);
@@ -83,18 +81,10 @@ public class UnassignedShuffleJob extends 
AbstractUnassignedJob {
     protected int degreeOfParallelism() {
         // TODO: check we use nested loop join do right outer / semi / anti 
join,
         //       we should add an exchange node with gather distribute under 
the nested loop join
-
         int expectInstanceNum = -1;
         if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
             expectInstanceNum = 
ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
         }
-
-        // TODO: check nested loop join do right outer / semi / anti join
-        PlanNode leftMostNode = 
findLeftmostNode(fragment.getPlanRoot()).second;
-        // when we use nested loop join do right outer / semi / anti join, the 
instance must be 1.
-        if (leftMostNode.getNumInstances() == 1) {
-            expectInstanceNum = 1;
-        }
         return expectInstanceNum;
     }
 
@@ -116,7 +106,7 @@ public class UnassignedShuffleJob extends 
AbstractUnassignedJob {
     private List<AssignedJob> buildInstances(
             int instanceNum, Function<Integer, DistributedPlanWorker> 
workerSelector) {
         ConnectContext connectContext = statementContext.getConnectContext();
-        if (useLocalShuffleToAddParallel) {
+        if (useSerialSource) {
             return buildInstancesWithLocalShuffle(instanceNum, workerSelector, 
connectContext);
         } else {
             return buildInstancesWithoutLocalShuffle(instanceNum, 
workerSelector, connectContext);
@@ -150,17 +140,13 @@ public class UnassignedShuffleJob extends 
AbstractUnassignedJob {
         for (Entry<DistributedPlanWorker, Collection<Integer>> kv : 
workerToInstanceIds.asMap().entrySet()) {
             DistributedPlanWorker worker = kv.getKey();
             Collection<Integer> indexesInFragment = kv.getValue();
-
             DefaultScanSource shareScanSource = new 
DefaultScanSource(ImmutableMap.of());
-
-            boolean receiveDataFromLocal = false;
             for (Integer indexInFragment : indexesInFragment) {
                 LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
-                        indexInFragment, shareScanId, receiveDataFromLocal, 
connectContext.nextInstanceId(),
+                        indexInFragment, shareScanId, 
connectContext.nextInstanceId(),
                         this, worker, shareScanSource
                 );
                 instances.add(instance);
-                receiveDataFromLocal = true;
             }
             shareScanId++;
         }
@@ -176,16 +162,4 @@ public class UnassignedShuffleJob extends 
AbstractUnassignedJob {
         Collections.shuffle(candidateWorkers);
         return candidateWorkers;
     }
-
-    // Returns the id of the leftmost node of any of the gives types in 
'plan_root',
-    // or INVALID_PLAN_NODE_ID if no such node present.
-    private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) {
-        PlanNode childPlan = plan;
-        PlanNode fatherPlan = null;
-        while (childPlan.getChildren().size() != 0 && !(childPlan instanceof 
ExchangeNode)) {
-            fatherPlan = childPlan;
-            childPlan = childPlan.getChild(0);
-        }
-        return Pair.of(fatherPlan, childPlan);
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3bf5c44d564..09a9a857f33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1809,7 +1809,8 @@ public class Coordinator implements CoordInterface {
                     exchangeInstances = 
ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
                 }
                 // when we use nested loop join do right outer / semi / anti 
join, the instance must be 1.
-                if (leftMostNode.getNumInstances() == 1) {
+                boolean isNereids = context != null && 
context.getState().isNereids();
+                if (!isNereids && leftMostNode.getNumInstances() == 1) {
                     exchangeInstances = 1;
                 }
                 // Using serial source means a serial source operator will be 
used in this fragment (e.g. data will be
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 54bc0b24d3e..0caca2f47c1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -419,21 +419,18 @@ public class ThriftPlansBuilder {
         if 
(runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
             
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
         }
+        boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
+        if (isLocalShuffle) {
+            // a fragment in a backend only enable local shuffle once for the 
first local shuffle instance,
+            // because we just skip set scan params for 
LocalShuffleAssignedJob.receiveDataFromLocal == true
+            ignoreDataDistribution(currentFragmentParam);
+        }
         return instanceParam;
     }
 
     private static void setScanSourceParam(
             TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
             TPipelineInstanceParams instanceParams) {
-
-        boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
-        if (isLocalShuffle && ((LocalShuffleAssignedJob) 
instance).receiveDataFromLocal) {
-            // save thrift rpc message size, don't need perNodeScanRanges,
-            // but the perNodeScanRanges is required rpc field
-            instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap());
-            return;
-        }
-
         ScanSource scanSource = instance.getScanSource();
         PerNodeScanParams scanParams;
         if (scanSource instanceof BucketScanSource) {
@@ -443,12 +440,6 @@ public class ThriftPlansBuilder {
         }
         // perNodeScanRanges is required
         instanceParams.setPerNodeScanRanges(scanParams.perNodeScanRanges);
-
-        if (isLocalShuffle) {
-            // a fragment in a backend only enable local shuffle once for the 
first local shuffle instance,
-            // because we just skip set scan params for 
LocalShuffleAssignedJob.receiveDataFromLocal == true
-            ignoreDataDistribution(currentFragmentParam);
-        }
     }
 
     // local shuffle has two functions:
diff --git 
a/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
 
b/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
index 01158a2fb60..cb2f86d7ba4 100644
Binary files 
a/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
 and 
b/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
 
b/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
index 924ceca4204..bf2929ee5b4 100644
--- 
a/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
+++ 
b/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
@@ -174,7 +174,11 @@ 
suite("test_hdfs_orc_group0_orc_files","external,hive,tvf,external_docker") {
             order_qt_test_25 """ select * from HDFS(
                         "uri" = "${uri}",
                         "hadoop.username" = "${hdfsUserName}",
-                        "format" = "orc") order by _col0 DESC limit 100; """
+                        "format" = "orc")
+                        where _col0 is not null and _col1 is not null and 
_col2 is not null and _col3 is not null
+                              and _col4 is not null and _col5 is not null and 
_col6 is not null
+                        order by _col0 DESC, _col1 DESC, _col2 DESC, _col3 
DESC, _col4 DESC, _col5 DESC, _col6 DESC
+                        limit 100; """
 
 
             uri = "${defaultFS}" + 
"/user/doris/tvf_data/test_hdfs_orc/group0/TestVectorOrcFile.testLzo.orc"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to