pvary commented on code in PR #10331:
URL: https://github.com/apache/iceberg/pull/10331#discussion_r1599571779


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java:
##########
@@ -104,30 +144,135 @@ AggregatedStatistics<D, S> updateAndCheckCompletion(
           subtask,
           checkpointId);
     } else {
-      inProgressStatistics.mergeDataStatistic(
+      merge(dataStatistics);
+      LOG.debug(
+          "Merge data statistics from operator {} subtask {} for checkpoint 
{}.",
           operatorName,
-          event.checkpointId(),
-          DataStatisticsUtil.deserializeDataStatistics(
-              event.statisticsBytes(), statisticsSerializer));
+          subtask,
+          checkpointId);
     }
 
+    // This should be the happy path where all subtasks reports are received
     if (inProgressSubtaskSet.size() == parallelism) {
-      completedStatistics = inProgressStatistics;
+      completedStatistics = completedStatistics();
+      resetAggregates();
       LOG.info(
-          "Received data statistics from all {} operators {} for checkpoint 
{}. Return last completed aggregator {}.",
+          "Received data statistics from all {} operators {} for checkpoint 
{}. Return last completed aggregator.",
           parallelism,
           operatorName,
-          inProgressStatistics.checkpointId(),
-          completedStatistics.dataStatistics());
-      inProgressStatistics = new AggregatedStatistics<>(checkpointId + 1, 
statisticsSerializer);
-      inProgressSubtaskSet.clear();
+          inProgressCheckpointId);
     }
 
     return completedStatistics;
   }
 
+  private boolean inProgress() {
+    return inProgressCheckpointId != CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+  }
+
+  private AggregatedStatistics completedStatistics() {
+    if (coordinatorStatisticsType == StatisticsType.Map) {
+      LOG.info(
+          "Completed map statistics aggregation with {} keys", 
coordinatorMapStatistics.size());
+      return AggregatedStatistics.fromKeyFrequency(
+          inProgressCheckpointId, coordinatorMapStatistics);
+    } else {
+      ReservoirItemsSketch<SortKey> sketch = 
coordinatorSketchStatistics.getResult();
+      LOG.info(
+          "Completed sketch statistics aggregation: "
+              + "reservoir size = {}, number of items seen = {}, number of 
samples = {}",
+          sketch.getK(),
+          sketch.getN(),
+          sketch.getNumSamples());
+      return AggregatedStatistics.fromRangeBounds(
+          inProgressCheckpointId,
+          SketchUtil.rangeBounds(downstreamParallelism, comparator, sketch));
+    }
+  }
+
+  private void initializeAggregates(long checkpointId, DataStatistics 
taskStatistics) {
+    LOG.info("Starting a new statistics aggregation for checkpoint {}", 
checkpointId);
+    this.inProgressCheckpointId = checkpointId;
+    this.coordinatorStatisticsType = taskStatistics.type();
+
+    if (coordinatorStatisticsType == StatisticsType.Map) {
+      this.coordinatorMapStatistics = Maps.newHashMap();
+      this.coordinatorSketchStatistics = null;
+    } else {
+      this.coordinatorMapStatistics = null;
+      this.coordinatorSketchStatistics =
+          ReservoirItemsUnion.newInstance(
+              
SketchUtil.determineCoordinatorReservoirSize(downstreamParallelism));
+    }
+  }
+
+  private void resetAggregates() {
+    inProgressSubtaskSet.clear();
+    this.inProgressCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+    this.coordinatorMapStatistics = null;
+    this.coordinatorSketchStatistics = null;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void merge(DataStatistics taskStatistics) {
+    if (taskStatistics.type() == StatisticsType.Map) {
+      Map<SortKey, Long> taskMapStats = (Map<SortKey, Long>) 
taskStatistics.result();
+      if (coordinatorStatisticsType == StatisticsType.Map) {
+        taskMapStats.forEach((key, count) -> 
coordinatorMapStatistics.merge(key, count, Long::sum));
+        if (coordinatorMapStatistics.size() > switchToSketchThreshold) {
+          convertCoordinatorToSketch();
+        }
+      } else {
+        // convert task stats to sketch first
+        ReservoirItemsSketch<SortKey> taskSketch =
+            ReservoirItemsSketch.newInstance(
+                SketchUtil.determineOperatorReservoirSize(parallelism, 
downstreamParallelism));
+        SketchUtil.convertMapToSketch(taskMapStats, taskSketch::update);
+        coordinatorSketchStatistics.update(taskSketch);

Review Comment:
   I'm wondering which is better:
   1. Getting a map from task -> converting task map to sketch -> merging the 
coordinator and the map sketch
   2. Updating the coordinator sketch, by adding the values from the map 
directly
   
   Which one is performing better? Which results in better approximation in the 
resulting sketch?
   
   If we consciously use the 1st solution, then we probably want to send a 
message to the tasks when we switch to sketch to not bother sending the whole 
map, but just the sketch (it might be a smaller message)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to