Copilot commented on code in PR #61988:
URL: https://github.com/apache/doris/pull/61988#discussion_r3019849310


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java:
##########
@@ -254,13 +254,18 @@ protected void assignLocalShuffleJobs(ScanSource 
scanSource, int instanceNum, Li
             }
         }
 
+        // When instanceNum > numBuckets (physical bucket count), extra 
instances would get
+        // no data through BUCKET_HASH_SHUFFLE local exchange because hash % 
numBuckets never
+        // maps to them. Fix: assign virtual bucket indexes (beyond physical 
range) to extra
+        // instances so the expanded bucket space covers all instances evenly.
         int thisWorkerInstanceNum = instances.size() - existsInstanceNum;
+        int numBuckets = fullBucketNum();
         for (int i = thisWorkerInstanceNum; i < instanceNum; ++i) {
+            int virtualBucketIndex = numBuckets + (i - thisWorkerInstanceNum);
             LocalShuffleBucketJoinAssignedJob instance = new 
LocalShuffleBucketJoinAssignedJob(
                     instances.size(), shareScanId, context.nextInstanceId(),
                     this, worker, emptyShareScanSource,
-                    // these instance not need to join, because no any bucket 
assign to it
-                    ImmutableSet.of()
+                    ImmutableSet.of(virtualBucketIndex)
             );

Review Comment:
   The computed virtualBucketIndex can exceed the valid bucket-id range used by 
BE local exchange. Here numBuckets = fullBucketNum() (table physical bucket 
count), so if this worker scans fewer than fullBucketNum() buckets (common) and 
instanceNum > thisWorkerInstanceNum, virtualBucketIndex may become >= 
params.num_buckets (and even > params.num_buckets-1), which in BE is used to 
index arrays sized (num_buckets+1) and can lead to out-of-bounds / crash. 
Consider deriving a per-worker “expanded bucket space” N (e.g., N = 
max(physicalBucketNum, instanceNum) or another explicit full/virtual bucket 
count) and ensure all assignedJoinBucketIndexes are within [0, N).



##########
fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java:
##########
@@ -411,11 +411,16 @@ private static TPipelineFragmentParams 
fragmentToThriftIfAbsent(
             params.setFileScanParams(fileScanRangeParamsMap);
 
             if (fragmentPlan.getFragmentJob() instanceof 
UnassignedScanBucketOlapTableJob) {
-                int bucketNum = ((UnassignedScanBucketOlapTableJob) 
fragmentPlan.getFragmentJob())
+                int physicalBucketNum = ((UnassignedScanBucketOlapTableJob) 
fragmentPlan.getFragmentJob())
                         .getOlapScanNodes()
                         .get(0)
                         .getBucketNum();
-                params.setNumBuckets(bucketNum);
+                // When local shuffle creates more instances than physical 
buckets,
+                // virtual bucket indexes are assigned beyond the physical 
range.
+                // Expand num_buckets to cover all virtual buckets so that
+                // hash % num_buckets can route data to every instance.
+                int workerInstanceNum = instancesPerWorker.get(worker).size();
+                params.setNumBuckets(Math.max(physicalBucketNum, 
workerInstanceNum));

Review Comment:
   Setting params.num_buckets to max(physicalBucketNum, workerInstanceNum) does 
not necessarily “cover all virtual buckets” assigned by 
UnassignedScanBucketOlapTableJob. That job currently assigns virtual bucket ids 
starting at physicalBucketNum and can produce bucket ids > workerInstanceNum-1 
when this worker scans fewer physical buckets than fullBucketNum(), even when 
physicalBucketNum >= workerInstanceNum (so num_buckets stays 
physicalBucketNum). In BE, bucket_seq_to_instance_idx keys are treated as 
bucket ids in [0, num_buckets), so any key >= num_buckets can trigger 
out-of-bounds. Suggest computing num_buckets from the maximum bucket id 
actually used in bucketSeqToInstanceIdx (+1), or adjusting the virtual bucket 
id scheme so all assigned bucket ids are < num_buckets for every worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to