pvary commented on code in PR #10331: URL: https://github.com/apache/iceberg/pull/10331#discussion_r1599571779
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java: ########## @@ -104,30 +144,135 @@ AggregatedStatistics<D, S> updateAndCheckCompletion( subtask, checkpointId); } else { - inProgressStatistics.mergeDataStatistic( + merge(dataStatistics); + LOG.debug( + "Merge data statistics from operator {} subtask {} for checkpoint {}.", operatorName, - event.checkpointId(), - DataStatisticsUtil.deserializeDataStatistics( - event.statisticsBytes(), statisticsSerializer)); + subtask, + checkpointId); } + // This should be the happy path where all subtasks reports are received if (inProgressSubtaskSet.size() == parallelism) { - completedStatistics = inProgressStatistics; + completedStatistics = completedStatistics(); + resetAggregates(); LOG.info( - "Received data statistics from all {} operators {} for checkpoint {}. Return last completed aggregator {}.", + "Received data statistics from all {} operators {} for checkpoint {}. Return last completed aggregator.", parallelism, operatorName, - inProgressStatistics.checkpointId(), - completedStatistics.dataStatistics()); - inProgressStatistics = new AggregatedStatistics<>(checkpointId + 1, statisticsSerializer); - inProgressSubtaskSet.clear(); + inProgressCheckpointId); } return completedStatistics; } + private boolean inProgress() { + return inProgressCheckpointId != CheckpointStoreUtil.INVALID_CHECKPOINT_ID; + } + + private AggregatedStatistics completedStatistics() { + if (coordinatorStatisticsType == StatisticsType.Map) { + LOG.info( + "Completed map statistics aggregation with {} keys", coordinatorMapStatistics.size()); + return AggregatedStatistics.fromKeyFrequency( + inProgressCheckpointId, coordinatorMapStatistics); + } else { + ReservoirItemsSketch<SortKey> sketch = coordinatorSketchStatistics.getResult(); + LOG.info( + "Completed sketch statistics aggregation: " + + "reservoir size = {}, number of items seen = {}, number of samples = {}", + sketch.getK(), + sketch.getN(), + sketch.getNumSamples()); + return AggregatedStatistics.fromRangeBounds( + inProgressCheckpointId, + SketchUtil.rangeBounds(downstreamParallelism, comparator, sketch)); + } + } + + private void initializeAggregates(long checkpointId, DataStatistics taskStatistics) { + LOG.info("Starting a new statistics aggregation for checkpoint {}", checkpointId); + this.inProgressCheckpointId = checkpointId; + this.coordinatorStatisticsType = taskStatistics.type(); + + if (coordinatorStatisticsType == StatisticsType.Map) { + this.coordinatorMapStatistics = Maps.newHashMap(); + this.coordinatorSketchStatistics = null; + } else { + this.coordinatorMapStatistics = null; + this.coordinatorSketchStatistics = + ReservoirItemsUnion.newInstance( + SketchUtil.determineCoordinatorReservoirSize(downstreamParallelism)); + } + } + + private void resetAggregates() { + inProgressSubtaskSet.clear(); + this.inProgressCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID; + this.coordinatorMapStatistics = null; + this.coordinatorSketchStatistics = null; + } + + @SuppressWarnings("unchecked") + private void merge(DataStatistics taskStatistics) { + if (taskStatistics.type() == StatisticsType.Map) { + Map<SortKey, Long> taskMapStats = (Map<SortKey, Long>) taskStatistics.result(); + if (coordinatorStatisticsType == StatisticsType.Map) { + taskMapStats.forEach((key, count) -> coordinatorMapStatistics.merge(key, count, Long::sum)); + if (coordinatorMapStatistics.size() > switchToSketchThreshold) { + convertCoordinatorToSketch(); + } + } else { + // convert task stats to sketch first + ReservoirItemsSketch<SortKey> taskSketch = + ReservoirItemsSketch.newInstance( + SketchUtil.determineOperatorReservoirSize(parallelism, downstreamParallelism)); + SketchUtil.convertMapToSketch(taskMapStats, taskSketch::update); + coordinatorSketchStatistics.update(taskSketch); Review Comment: I'm wondering which is better: 1. Getting a map from task -> converting task map to sketch -> merging the coordinator and the map sketch 2. Updating the coordinator sketch, by adding the values from the map directly Which one is performing better? Which results in better approximation in the resulting sketch? If we consciously use the 1st solution, then we probably want to send a message to the tasks when we switch to sketch to not bother sending the whole map, but just the sketch (it might be a smaller message) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org