github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3417844764


##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -538,6 +543,98 @@ public Map<String, Integer> getBeToInstancesNum() {
         return result;
     }
 
+    public static final class AdaptiveRandomBucketSinkContext {
+        private final List<Long> sinkBackendIds;
+        private final int planFragmentNum;
+
+        private AdaptiveRandomBucketSinkContext(List<Long> sinkBackendIds, int 
planFragmentNum) {
+            this.sinkBackendIds = sinkBackendIds;
+            this.planFragmentNum = planFragmentNum;
+        }
+
+        public List<Long> getSinkBackendIds() {
+            return sinkBackendIds;
+        }
+
+        public int getPlanFragmentNum() {
+            return planFragmentNum;
+        }
+    }
+
+    public Optional<AdaptiveRandomBucketSinkContext> 
getAdaptiveRandomBucketSinkContext(long tableId) {
+        Set<Long> sinkBackendIds = new TreeSet<>();
+        int planFragmentNum = 0;
+        for (PipelineExecContext context : pipelineExecContexts.values()) {
+            TPipelineFragmentParams params = context.rpcParams;
+            if (params.getFragment().getOutputSink() == null
+                    || params.getFragment().getOutputSink().getType() != 
TDataSinkType.OLAP_TABLE_SINK) {
+                continue;
+            }
+            TOlapTableSink sink = 
params.getFragment().getOutputSink().getOlapTableSink();
+            if (sink.getTableId() != tableId) {
+                continue;
+            }
+            if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+                continue;
+            }
+            sinkBackendIds.add(params.getBackendId());
+            planFragmentNum += params.getLocalParamsSize();
+        }
+        if (sinkBackendIds.isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(new AdaptiveRandomBucketSinkContext(
+                new ArrayList<>(sinkBackendIds), Math.max(planFragmentNum, 
1)));
+    }
+
+    private static void assignAdaptiveRandomBucketForFragment(
+            Collection<TPipelineFragmentParams> fragmentParamsList) {
+        List<TPipelineFragmentParams> sinkParams = fragmentParamsList.stream()
+                .filter(param -> param.getFragment().getOutputSink() != null
+                        && param.getFragment().getOutputSink().getType() == 
TDataSinkType.OLAP_TABLE_SINK)
+                .collect(Collectors.toList());
+        if (sinkParams.isEmpty()) {
+            return;
+        }
+        TOlapTableSink sink = 
sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink();
+        if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+            return;
+        }
+        List<Long> sinkBackendIds = sinkParams.stream()
+                .map(TPipelineFragmentParams::getBackendId)
+                .distinct()
+                .sorted()
+                .collect(Collectors.toList());
+        int planFragmentNum = sinkParams.stream()
+                .mapToInt(TPipelineFragmentParams::getLocalParamsSize)
+                .sum();
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Adaptive random bucket planning in legacy fragment={}, 
sinkBackendIds={}, "
+                            + "planFragmentNum={}",
+                    sinkParams.get(0).getFragmentId(), sinkBackendIds, 
planFragmentNum);
+        }
+        Map<Long, Map<Long, OlapTableSink.AdaptiveBucketAssignment>> 
assignments =
+                
OlapTableSink.computeAdaptiveRandomBucketAssignments(sinkBackendIds,
+                        sink.getPartition().getPartitions(), 
sink.getLocation().getTablets(), planFragmentNum);
+        for (TPipelineFragmentParams sinkParam : sinkParams) {
+            Map<Long, OlapTableSink.AdaptiveBucketAssignment> 
partitionAssignments =
+                    assignments.get(sinkParam.getBackendId());
+            if (partitionAssignments == null) {
+                continue;
+            }
+            TOlapTableSink copiedSink = 
deepCopyOlapTableSinkForCurrentBackend(sinkParam);
+            OlapTableSink.applyAdaptiveRandomBucketAssignments(
+                    copiedSink.getPartition().getPartitions(),
+                    partitionAssignments);
+        }

Review Comment:
   `deepCopyOlapTableSinkForCurrentBackend()` still is not BE-local on the 
classic Coordinator path. In `FragmentExecParams.toThrift()` the legacy planner 
creates one `TPlanFragment fragmentThrift = fragment.toThrift()` before the 
backend loop and every `TPipelineFragmentParams` gets that same object via 
`params.setFragment(fragmentThrift)`. So this `setOutputSink()` mutates the 
shared fragment; as `assignAdaptiveRandomBucketForFragment()` iterates 
backends, the last backend's `bucket_be_id` / `local_bucket_seqs` overwrite the 
assignments for all earlier backends. A classic multi-BE load can then have 
every sender route using the last BE's adaptive assignment. This is distinct 
from the existing Nereids deep-copy thread because `ThriftPlansBuilder` now 
creates a fragment per worker before copying the sink; the classic Coordinator 
path still shares the fragment. Please deep-copy the `TPlanFragment` (or create 
per-backend `fragmentThrift`) before attaching the copied sink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to