stevenzwu commented on code in PR #10457: URL: https://github.com/apache/iceberg/pull/10457#discussion_r1630376841
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java: ########## @@ -189,28 +198,75 @@ private void handleDataStatisticRequest(int subtask, StatisticsEvent event) { aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event); if (aggregatedStatistics != null) { - completedStatistics = aggregatedStatistics; - sendAggregatedStatisticsToSubtasks(completedStatistics.checkpointId(), completedStatistics); + // completedStatistics contains the complete samples, which is needed to compute + // the range bounds in globalStatistics if downstreamParallelism changed. + this.completedStatistics = aggregatedStatistics; + // globalStatistics only contains the range bounds, which has size equal to + // downstreamParallelism - 1. + this.globalStatistics = + globalStatistics(aggregatedStatistics, downstreamParallelism, comparator); + sendGlobalStatisticsToSubtasks(globalStatistics); + } + } + + private static AggregatedStatistics globalStatistics( + AggregatedStatistics aggregatedStatistics, + int downstreamParallelism, + Comparator<StructLike> comparator) { + if (aggregatedStatistics.type() == StatisticsType.Map) { + return aggregatedStatistics; + } else { + // range bound is a much smaller array compared to the complete samples. + // It helps reduce the amount of data transfer from coordinator to operator subtasks. + SortKey[] rangeBounds = + SketchUtil.rangeBounds( + downstreamParallelism, comparator, aggregatedStatistics.keySamples()); + return AggregatedStatistics.fromKeySamples(aggregatedStatistics.checkpointId(), rangeBounds); } } @SuppressWarnings("FutureReturnValueIgnored") - private void sendAggregatedStatisticsToSubtasks( - long checkpointId, AggregatedStatistics globalStatistics) { + private void sendGlobalStatisticsToSubtasks(AggregatedStatistics statistics) { callInCoordinatorThread( () -> { + // applyImmediately is set to false so that operator subtasks can apply the change at + // checkpoint boundary StatisticsEvent statisticsEvent = StatisticsEvent.createAggregatedStatisticsEvent( - checkpointId, globalStatistics, aggregatedStatisticsSerializer); + statistics, aggregatedStatisticsSerializer, false); for (int i = 0; i < context.currentParallelism(); ++i) { + // Ignore future return value for potential error, e.g. subtask down subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent); } - Review Comment: haha. this is a good question. I guess we were just avoiding the complexity of error handling for completable future. Now coming back to the error handling, what we really need is a reconciliation process/protocol. I think we can compute a hash code / signature in the `GlobalStatistics`. Subtasks always send the `RequestStatisticsEvent` with the signature of its local copy upon startup. Coordinator can check if the signature matches or not. if not, resend the latest statistics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org