github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3491739303
##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java:
##########
@@ -574,6 +581,375 @@ private TOlapTablePartitionParam
createDummyPartition(long dbId, OlapTable table
return partitionParam;
}
+ public static final class AdaptiveBucketAssignment {
+ private final int loadTabletIdx;
+ private final AdaptiveIndexBucketAssignment firstIndexAssignment;
+ private final Map<Long, AdaptiveIndexBucketAssignment>
indexAssignments;
+
+ public AdaptiveBucketAssignment(int loadTabletIdx,
+ AdaptiveIndexBucketAssignment firstIndexAssignment,
+ Map<Long, AdaptiveIndexBucketAssignment> indexAssignments) {
+ this.loadTabletIdx = loadTabletIdx;
+ this.firstIndexAssignment = firstIndexAssignment;
+ this.indexAssignments = new HashMap<>(indexAssignments);
+ }
+
+ public long getBucketBeId() {
+ return firstIndexAssignment == null ? -1L :
firstIndexAssignment.getBucketBeId();
+ }
+
+ public int getLoadTabletIdx() {
+ return loadTabletIdx;
+ }
+
+ public List<Integer> getLocalBucketSeqs() {
+ return firstIndexAssignment == null ? Collections.emptyList()
+ : firstIndexAssignment.getLocalBucketSeqs();
+ }
+
+ public Map<Long, AdaptiveIndexBucketAssignment> getIndexAssignments() {
+ return indexAssignments;
+ }
+ }
+
+ public static final class AdaptiveIndexBucketAssignment {
+ private final long indexId;
+ private final long bucketBeId;
+ private final List<Integer> localBucketSeqs;
+
+ public AdaptiveIndexBucketAssignment(long indexId, long bucketBeId,
List<Integer> localBucketSeqs) {
+ this.indexId = indexId;
+ this.bucketBeId = bucketBeId;
+ this.localBucketSeqs = new ArrayList<>(localBucketSeqs);
+ }
+
+ public long getIndexId() {
+ return indexId;
+ }
+
+ public long getBucketBeId() {
+ return bucketBeId;
+ }
+
+ public List<Integer> getLocalBucketSeqs() {
+ return localBucketSeqs;
+ }
+
+ @Override
+ public String toString() {
+ return "indexId=" + indexId + ",bucketBeId=" + bucketBeId
+ + ",localBucketSeqs=" + localBucketSeqs;
+ }
+ }
+
+ public static boolean shouldAssignAdaptiveRandomBucket(TOlapTableSink
sink) {
+ return sink != null
+ && sink.isSetEnableAdaptiveRandomBucket()
+ && sink.isEnableAdaptiveRandomBucket()
+ && (!sink.isSetLoadToSingleTablet() ||
!sink.isLoadToSingleTablet())
+ && sink.isSetPartition()
+ && sink.getPartition() != null
+ && (!sink.getPartition().isSetDistributedColumns()
+ ||
sink.getPartition().getDistributedColumns().isEmpty());
+ }
+
+ public boolean shouldAssignAdaptiveRandomBucket() {
+ return tDataSink != null &&
shouldAssignAdaptiveRandomBucket(tDataSink.getOlapTableSink());
+ }
+
+ public static Map<Long, Map<Long, AdaptiveBucketAssignment>>
computeAdaptiveRandomBucketAssignments(
+ List<Long> sinkBackendIds, List<TOlapTablePartition> partitions,
+ List<TTabletLocation> tabletLocations, int sinkInstanceNum) {
+ Map<Long, Map<Long, AdaptiveBucketAssignment>> assignments = new
HashMap<>();
+ List<Long> orderedSinkBackendIds = sinkBackendIds.stream()
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ for (Long sinkBackendId : orderedSinkBackendIds) {
+ assignments.put(sinkBackendId, new HashMap<>());
+ }
+ if (orderedSinkBackendIds.isEmpty() || partitions == null ||
tabletLocations == null) {
+ return assignments;
+ }
+
+ Map<Long, TTabletLocation> tabletLocationMap = new
HashMap<>(tabletLocations.size());
+ for (TTabletLocation tabletLocation : tabletLocations) {
+ tabletLocationMap.put(tabletLocation.getTabletId(),
tabletLocation);
+ }
+
+ for (TOlapTablePartition partition : partitions) {
+ if (!partition.isSetLoadTabletIdx() || partition.getNumBuckets()
<= 0
+ || partition.getIndexes().isEmpty()) {
+ continue;
+ }
+ TOlapTableIndexTablets baseIndex = partition.getIndexes().get(0);
+ Map<Long, List<Integer>> beToBucketSeqs =
buildBeToBucketSeqs(baseIndex, tabletLocationMap);
+ long baseTabletIndex = partition.getLoadTabletIdx();
+ int fallbackBucketIdx = (int) Math.floorMod(baseTabletIndex,
(long) partition.getNumBuckets());
+ int targetBucketNum = Math.min(
Review Comment:
The new `sinkInstanceNum` still cannot increase the number of adaptive
buckets when several sink instances run on the same BE.
`computeAdaptiveRandomBucketAssignments()` deduplicates `sinkBackendIds` and
then caps `targetBucketNum` by `orderedSinkBackendIds.size()`, and the result
is keyed only by backend id. For example, with two sink BEs, four local sink
instances per BE, and eight random buckets, `sinkInstanceNum` is 8 but this
line still plans only `min(2, 8, 8) = 2` buckets, so all four senders on a BE
open the same `local_bucket_seqs` and start on the same receiver tablet until
rotation. Please either make the assignment granular per local sender/instance,
or remove `sinkInstanceNum` from this calculation and document that adaptive
random bucket intentionally works at backend granularity rather than
sink-instance granularity.
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -5745,4 +5768,132 @@ private TStatus checkMaster() {
}
return status;
}
+
+ private static final class AdaptiveBucketSinkContext {
+ private final boolean enableAdaptiveRandomBucket;
+ private final List<Long> sinkBackendIds;
+ private final int sinkInstanceNum;
+
+ private AdaptiveBucketSinkContext(boolean enableAdaptiveRandomBucket,
List<Long> sinkBackendIds,
+ int sinkInstanceNum) {
+ this.enableAdaptiveRandomBucket = enableAdaptiveRandomBucket;
+ this.sinkBackendIds = sinkBackendIds;
+ this.sinkInstanceNum = sinkInstanceNum;
+ }
+ }
+
+ private static AdaptiveBucketSinkContext
disabledAdaptiveBucketSinkContext() {
+ return new AdaptiveBucketSinkContext(false, Lists.newArrayList(), 1);
+ }
+
+ private static AdaptiveBucketSinkContext
collectAdaptiveBucketSinkContext(TUniqueId queryId, long tableId) {
+ if (queryId == null) {
+ return disabledAdaptiveBucketSinkContext();
+ }
+ Coordinator coordinator =
QeProcessorImpl.INSTANCE.getCoordinator(queryId);
+ if (coordinator == null) {
+ return disabledAdaptiveBucketSinkContext();
+ }
+ if (!(coordinator instanceof NereidsCoordinator)) {
+ Optional<Coordinator.AdaptiveRandomBucketSinkContext> context =
+ coordinator.getAdaptiveRandomBucketSinkContext(tableId);
+ if (context.isPresent()) {
+ return new AdaptiveBucketSinkContext(
+ true, context.get().getSinkBackendIds(),
context.get().getSinkInstanceNum());
+ }
+ return disabledAdaptiveBucketSinkContext();
+ }
+ Set<Long> sinkBackendIds = new TreeSet<>();
+ int sinkInstanceNum = 0;
+ for (PipelineDistributedPlan distributedPlan :
+ ((NereidsCoordinator)
coordinator).getCoordinatorContext().distributedPlans) {
+ if (!(distributedPlan.getFragmentJob().getFragment().getSink()
instanceof OlapTableSink)) {
+ continue;
+ }
+ OlapTableSink sink = (OlapTableSink)
distributedPlan.getFragmentJob().getFragment().getSink();
+ if (sink.getDstTable().getId() != tableId) {
+ continue;
+ }
+ if (!sink.shouldAssignAdaptiveRandomBucket()) {
+ continue;
+ }
+ sinkInstanceNum += distributedPlan.getInstanceJobs().size();
+ for (AssignedJob assignedJob : distributedPlan.getInstanceJobs()) {
+ sinkBackendIds.add(assignedJob.getAssignedWorker().id());
+ }
+ }
+ if (sinkBackendIds.isEmpty()) {
+ return disabledAdaptiveBucketSinkContext();
+ }
+ return new AdaptiveBucketSinkContext(true, new
ArrayList<>(sinkBackendIds), Math.max(sinkInstanceNum, 1));
+ }
+
+ private static void assignAdaptiveBucketToPartition(TOlapTablePartition
partition,
+ List<TTabletLocation> partitionTablets, long currentBeId, long
tableId, TUniqueId queryId,
+ boolean enableAdaptiveRandomBucket) {
+ if (!enableAdaptiveRandomBucket ||
!Config.enable_adaptive_random_bucket_load
Review Comment:
This method should treat `enableAdaptiveRandomBucket` from the
create/replace-partition request as the per-load source of truth. That flag was
fixed when the sink was planned/opened and is sent by BE from
`_tablet_finder->is_adaptive_random_bucket()`, but this extra mutable global
config check can turn the feature off only for runtime-created/replaced
partitions of an in-flight adaptive load. If `ADMIN SET FRONTEND CONFIG
('enable_adaptive_random_bucket_load' = 'false')` runs after planning, FE
returns partitions without `bucket_be_id` / `local_bucket_seqs`; the adaptive
sender then falls back to its own BE id for channel selection and can later
fail with `unknown partition channel` when this BE is not the bucket owner.
Please gate runtime replanning on the request's per-load flag, not the current
global default.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]