stevenzwu commented on code in PR #10457: URL: https://github.com/apache/iceberg/pull/10457#discussion_r1630457171
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java: ########## @@ -189,28 +198,75 @@ private void handleDataStatisticRequest(int subtask, StatisticsEvent event) { aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event); if (aggregatedStatistics != null) { - completedStatistics = aggregatedStatistics; - sendAggregatedStatisticsToSubtasks(completedStatistics.checkpointId(), completedStatistics); + // completedStatistics contains the complete samples, which is needed to compute + // the range bounds in globalStatistics if downstreamParallelism changed. + this.completedStatistics = aggregatedStatistics; + // globalStatistics only contains the range bounds, which has size equal to + // downstreamParallelism - 1. + this.globalStatistics = + globalStatistics(aggregatedStatistics, downstreamParallelism, comparator); + sendGlobalStatisticsToSubtasks(globalStatistics); + } + } + + private static AggregatedStatistics globalStatistics( + AggregatedStatistics aggregatedStatistics, + int downstreamParallelism, + Comparator<StructLike> comparator) { + if (aggregatedStatistics.type() == StatisticsType.Map) { + return aggregatedStatistics; + } else { + // range bound is a much smaller array compared to the complete samples. + // It helps reduce the amount of data transfer from coordinator to operator subtasks. + SortKey[] rangeBounds = + SketchUtil.rangeBounds( + downstreamParallelism, comparator, aggregatedStatistics.keySamples()); + return AggregatedStatistics.fromKeySamples(aggregatedStatistics.checkpointId(), rangeBounds); } } @SuppressWarnings("FutureReturnValueIgnored") - private void sendAggregatedStatisticsToSubtasks( - long checkpointId, AggregatedStatistics globalStatistics) { + private void sendGlobalStatisticsToSubtasks(AggregatedStatistics statistics) { callInCoordinatorThread( () -> { + // applyImmediately is set to false so that operator subtasks can apply the change at + // checkpoint boundary StatisticsEvent statisticsEvent = StatisticsEvent.createAggregatedStatisticsEvent( - checkpointId, globalStatistics, aggregatedStatisticsSerializer); + statistics, aggregatedStatisticsSerializer, false); for (int i = 0; i < context.currentParallelism(); ++i) { + // Ignore future return value for potential error, e.g. subtask down subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent); } - Review Comment: I have implemented the reconciliation part that can handle cover both send failure and rescale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org