stevenzwu commented on code in PR #10331:
URL: https://github.com/apache/iceberg/pull/10331#discussion_r1625112317


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java:
##########
@@ -30,71 +42,99 @@
  * {@link AggregatedStatistics} received from {@link DataStatisticsOperator} 
subtasks for specific
  * checkpoint.
  */
-class AggregatedStatisticsTracker<D extends DataStatistics<D, S>, S> {
+class AggregatedStatisticsTracker {
   private static final Logger LOG = 
LoggerFactory.getLogger(AggregatedStatisticsTracker.class);
   private static final double ACCEPT_PARTIAL_AGGR_THRESHOLD = 90;
   private final String operatorName;
-  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
   private final int parallelism;
+  private final TypeSerializer<DataStatistics> statisticsSerializer;
+  private final int downstreamParallelism;
+  private final StatisticsType statisticsType;
+  private final int switchToSketchThreshold;
+  private final Comparator<StructLike> comparator;
+
   private final Set<Integer> inProgressSubtaskSet;
-  private volatile AggregatedStatistics<D, S> inProgressStatistics;
+  private volatile long inProgressCheckpointId;
+  private volatile StatisticsType coordinatorStatisticsType;
+  private volatile Map<SortKey, Long> coordinatorMapStatistics;
+  private volatile ReservoirItemsUnion<SortKey> coordinatorSketchStatistics;
 
   AggregatedStatisticsTracker(
       String operatorName,
-      TypeSerializer<DataStatistics<D, S>> statisticsSerializer,
-      int parallelism) {
+      int parallelism,
+      Schema schema,
+      SortOrder sortOrder,
+      int downstreamParallelism,
+      StatisticsType statisticsType,
+      int switchToSketchThreshold,
+      @Nullable AggregatedStatistics restoredStatistics) {
     this.operatorName = operatorName;
-    this.statisticsSerializer = statisticsSerializer;
     this.parallelism = parallelism;
+    this.statisticsSerializer =
+        new DataStatisticsSerializer(new SortKeySerializer(schema, sortOrder));
+    this.downstreamParallelism = downstreamParallelism;
+    this.statisticsType = statisticsType;
+    this.switchToSketchThreshold = switchToSketchThreshold;
+
+    this.comparator = SortOrderComparators.forSchema(schema, sortOrder);
     this.inProgressSubtaskSet = Sets.newHashSet();
+    this.coordinatorStatisticsType = 
StatisticsUtil.collectType(statisticsType, restoredStatistics);
+    this.inProgressCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
   }
 
-  AggregatedStatistics<D, S> updateAndCheckCompletion(
-      int subtask, DataStatisticsEvent<D, S> event) {
+  AggregatedStatistics updateAndCheckCompletion(int subtask, StatisticsEvent 
event) {
     long checkpointId = event.checkpointId();
+    LOG.debug(
+        "Handling statistics event from subtask {} of operator {} for 
checkpoint {}",
+        subtask,
+        operatorName,
+        checkpointId);
 
-    if (inProgressStatistics != null && inProgressStatistics.checkpointId() > 
checkpointId) {
+    if (inProgressCheckpointId > checkpointId) {
       LOG.info(
-          "Expect data statistics for operator {} checkpoint {}, but receive 
event from older checkpoint {}. Ignore it.",
+          "Ignore stale statistics event from operator {} subtask {} for older 
checkpoint {}. Was expecting data statistics from checkpoint {}",
           operatorName,
-          inProgressStatistics.checkpointId(),
-          checkpointId);
+          subtask,
+          checkpointId,
+          inProgressCheckpointId);
       return null;
     }
 
-    AggregatedStatistics<D, S> completedStatistics = null;
-    if (inProgressStatistics != null && inProgressStatistics.checkpointId() < 
checkpointId) {
+    AggregatedStatistics completedStatistics = null;
+    if (inProgress() && inProgressCheckpointId < checkpointId) {

Review Comment:
   > Is it so rare of an edge case for the statistics to arrive out of order? 
Even with unaligned checkpoints?
   
   Unaligned checkpoints probably don't really matter here. I definitely see 
your concern especially if concurrent checkpoints are set. Default is 1.
   ```
   execution.checkpointing.max-concurrent-checkpoints=1
   ``` 
   
   To make things more generic, I will change the implementation to support 
concurrent checkpoints by using Map to track aggregation per checkpoint.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to