stevenzwu commented on code in PR #10331:
URL: https://github.com/apache/iceberg/pull/10331#discussion_r1625081174


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java:
##########
@@ -104,30 +144,135 @@ AggregatedStatistics<D, S> updateAndCheckCompletion(
           subtask,
           checkpointId);
     } else {
-      inProgressStatistics.mergeDataStatistic(
+      merge(dataStatistics);
+      LOG.debug(
+          "Merge data statistics from operator {} subtask {} for checkpoint 
{}.",
           operatorName,
-          event.checkpointId(),
-          DataStatisticsUtil.deserializeDataStatistics(
-              event.statisticsBytes(), statisticsSerializer));
+          subtask,
+          checkpointId);
     }
 
+    // This should be the happy path where all subtasks reports are received
     if (inProgressSubtaskSet.size() == parallelism) {
-      completedStatistics = inProgressStatistics;
+      completedStatistics = completedStatistics();
+      resetAggregates();
       LOG.info(
-          "Received data statistics from all {} operators {} for checkpoint 
{}. Return last completed aggregator {}.",
+          "Received data statistics from all {} operators {} for checkpoint 
{}. Return last completed aggregator.",
           parallelism,
           operatorName,
-          inProgressStatistics.checkpointId(),
-          completedStatistics.dataStatistics());
-      inProgressStatistics = new AggregatedStatistics<>(checkpointId + 1, 
statisticsSerializer);
-      inProgressSubtaskSet.clear();
+          inProgressCheckpointId);
     }
 
     return completedStatistics;
   }
 
+  private boolean inProgress() {
+    return inProgressCheckpointId != CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+  }
+
+  private AggregatedStatistics completedStatistics() {
+    if (coordinatorStatisticsType == StatisticsType.Map) {
+      LOG.info(
+          "Completed map statistics aggregation with {} keys", 
coordinatorMapStatistics.size());
+      return AggregatedStatistics.fromKeyFrequency(
+          inProgressCheckpointId, coordinatorMapStatistics);
+    } else {
+      ReservoirItemsSketch<SortKey> sketch = 
coordinatorSketchStatistics.getResult();
+      LOG.info(
+          "Completed sketch statistics aggregation: "
+              + "reservoir size = {}, number of items seen = {}, number of 
samples = {}",
+          sketch.getK(),
+          sketch.getN(),
+          sketch.getNumSamples());
+      return AggregatedStatistics.fromRangeBounds(
+          inProgressCheckpointId,
+          SketchUtil.rangeBounds(downstreamParallelism, comparator, sketch));
+    }
+  }
+
+  private void initializeAggregates(long checkpointId, DataStatistics 
taskStatistics) {
+    LOG.info("Starting a new statistics aggregation for checkpoint {}", 
checkpointId);
+    this.inProgressCheckpointId = checkpointId;
+    this.coordinatorStatisticsType = taskStatistics.type();
+
+    if (coordinatorStatisticsType == StatisticsType.Map) {
+      this.coordinatorMapStatistics = Maps.newHashMap();
+      this.coordinatorSketchStatistics = null;
+    } else {
+      this.coordinatorMapStatistics = null;
+      this.coordinatorSketchStatistics =
+          ReservoirItemsUnion.newInstance(
+              
SketchUtil.determineCoordinatorReservoirSize(downstreamParallelism));
+    }
+  }
+
+  private void resetAggregates() {
+    inProgressSubtaskSet.clear();
+    this.inProgressCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+    this.coordinatorMapStatistics = null;
+    this.coordinatorSketchStatistics = null;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void merge(DataStatistics taskStatistics) {
+    if (taskStatistics.type() == StatisticsType.Map) {
+      Map<SortKey, Long> taskMapStats = (Map<SortKey, Long>) 
taskStatistics.result();
+      if (coordinatorStatisticsType == StatisticsType.Map) {
+        taskMapStats.forEach((key, count) -> 
coordinatorMapStatistics.merge(key, count, Long::sum));
+        if (coordinatorMapStatistics.size() > switchToSketchThreshold) {

Review Comment:
   good catch. it is related to your other comment that `statisticsType` was 
not used. it should be used and checked here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to