Copilot commented on code in PR #61988:
URL: https://github.com/apache/doris/pull/61988#discussion_r3019849310
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java:
##########
@@ -254,13 +254,18 @@ protected void assignLocalShuffleJobs(ScanSource
scanSource, int instanceNum, Li
}
}
+ // When instanceNum > numBuckets (physical bucket count), extra
instances would get
+ // no data through BUCKET_HASH_SHUFFLE local exchange because hash %
numBuckets never
+ // maps to them. Fix: assign virtual bucket indexes (beyond physical
range) to extra
+ // instances so the expanded bucket space covers all instances evenly.
int thisWorkerInstanceNum = instances.size() - existsInstanceNum;
+ int numBuckets = fullBucketNum();
for (int i = thisWorkerInstanceNum; i < instanceNum; ++i) {
+ int virtualBucketIndex = numBuckets + (i - thisWorkerInstanceNum);
LocalShuffleBucketJoinAssignedJob instance = new
LocalShuffleBucketJoinAssignedJob(
instances.size(), shareScanId, context.nextInstanceId(),
this, worker, emptyShareScanSource,
- // these instance not need to join, because no any bucket
assign to it
- ImmutableSet.of()
+ ImmutableSet.of(virtualBucketIndex)
);
Review Comment:
The computed virtualBucketIndex can exceed the valid bucket-id range used by
BE local exchange. Here numBuckets = fullBucketNum() (table physical bucket
count), so if this worker scans fewer than fullBucketNum() buckets (common) and
instanceNum > thisWorkerInstanceNum, virtualBucketIndex may become >=
params.num_buckets (and even > params.num_buckets-1), which in BE is used to
index arrays sized (num_buckets+1) and can lead to out-of-bounds / crash.
Consider deriving a per-worker “expanded bucket space” N (e.g., N =
max(physicalBucketNum, instanceNum) or another explicit full/virtual bucket
count) and ensure all assignedJoinBucketIndexes are within [0, N).
##########
fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java:
##########
@@ -411,11 +411,16 @@ private static TPipelineFragmentParams
fragmentToThriftIfAbsent(
params.setFileScanParams(fileScanRangeParamsMap);
if (fragmentPlan.getFragmentJob() instanceof
UnassignedScanBucketOlapTableJob) {
- int bucketNum = ((UnassignedScanBucketOlapTableJob)
fragmentPlan.getFragmentJob())
+ int physicalBucketNum = ((UnassignedScanBucketOlapTableJob)
fragmentPlan.getFragmentJob())
.getOlapScanNodes()
.get(0)
.getBucketNum();
- params.setNumBuckets(bucketNum);
+ // When local shuffle creates more instances than physical
buckets,
+ // virtual bucket indexes are assigned beyond the physical
range.
+ // Expand num_buckets to cover all virtual buckets so that
+ // hash % num_buckets can route data to every instance.
+ int workerInstanceNum = instancesPerWorker.get(worker).size();
+ params.setNumBuckets(Math.max(physicalBucketNum,
workerInstanceNum));
Review Comment:
Setting params.num_buckets to max(physicalBucketNum, workerInstanceNum) does
not necessarily “cover all virtual buckets” assigned by
UnassignedScanBucketOlapTableJob. That job currently assigns virtual bucket ids
starting at physicalBucketNum and can produce bucket ids > workerInstanceNum-1
when this worker scans fewer physical buckets than fullBucketNum(), even when
physicalBucketNum >= workerInstanceNum (so num_buckets stays
physicalBucketNum). In BE, bucket_seq_to_instance_idx keys are treated as
bucket ids in [0, num_buckets), so any key >= num_buckets can trigger
out-of-bounds. Suggest computing num_buckets from the maximum bucket id
actually used in bucketSeqToInstanceIdx (+1), or adjusting the virtual bucket
id scheme so all assigned bucket ids are < num_buckets for every worker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]