stevenzwu commented on code in PR #10457:
URL: https://github.com/apache/iceberg/pull/10457#discussion_r1630015642


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -71,6 +76,7 @@ class DataStatisticsCoordinator implements 
OperatorCoordinator {
   private transient boolean started;
   private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
   private transient AggregatedStatistics completedStatistics;
+  private transient AggregatedStatistics globalStatistics;

Review Comment:
   for sketch statistics type, `globalStatistics` only stores range bounds (a 
much smaller array)



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -103,14 +105,41 @@ public void initializeState(StateInitializationContext 
context) throws Exception
     if (context.isRestored()) {
       if (globalStatisticsState.get() == null
           || !globalStatisticsState.get().iterator().hasNext()) {
-        LOG.warn(
+        LOG.info(
             "Operator {} subtask {} doesn't have global statistics state to 
restore",
             operatorName,
             subtaskIndex);
+        // If Flink deprecates union state in the future, 
RequestGlobalStatisticsEvent can be
+        // leveraged to request global statistics from coordinator if new 
subtasks (scale-up case)
+        // has nothing to restore from.
       } else {
-        LOG.info(
-            "Operator {} subtask {} restoring global statistics state", 
operatorName, subtaskIndex);
-        this.globalStatistics = globalStatisticsState.get().iterator().next();
+        AggregatedStatistics restoredStatistics = 
globalStatisticsState.get().iterator().next();
+        // Range bounds is calculated from reservoir samplings with the 
determined partition
+        // (downstream parallelism). If downstream parallelism changed due to 
rescale, the computed
+        // range bounds array is not applicable. Operators should request 
coordinator to recompute
+        // the range bounds array using the new parallelism.
+        if (restoredStatistics.type() == StatisticsType.Sketch

Review Comment:
   this handles the rescale scenario for sketch stats (range bounds)



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatistics.java:
##########
@@ -35,28 +35,28 @@ class AggregatedStatistics implements Serializable {
   private final long checkpointId;
   private final StatisticsType type;
   private final Map<SortKey, Long> keyFrequency;
-  private final SortKey[] rangeBounds;
+  private final SortKey[] keySamples;

Review Comment:
   this rename is needed so that `AggregatedStatistics` can be used to store 
both the complete samples and calculated range bounds.



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -139,14 +168,16 @@ public void handleOperatorEvent(OperatorEvent event) {
         operatorName,
         subtaskIndex,
         statisticsEvent.checkpointId());
-    globalStatistics =
+    this.globalStatistics =
         StatisticsUtil.deserializeAggregatedStatistics(
             statisticsEvent.statisticsBytes(), aggregatedStatisticsSerializer);
     checkStatisticsTypeMigration();
-    output.collect(new 
StreamRecord<>(StatisticsOrRecord.fromStatistics(globalStatistics)));

Review Comment:
   it is a bug previously. we actually don't want to apply the new stats 
immediately during normal aggregation and propagation phase. switch happens at 
checkpoint boundary.
   
   `applyImmediately` flag is added in this PR to distinguish the stats 
requested during rescale case. In this case, immediate application is desired.



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java:
##########
@@ -258,8 +258,7 @@ private AggregatedStatistics completedStatistics(long 
checkpointId) {
             sketch.getK(),
             sketch.getN(),
             sketch.getNumSamples());
-        return AggregatedStatistics.fromRangeBounds(
-            checkpointId, SketchUtil.rangeBounds(downstreamParallelism, 
comparator, sketch));
+        return AggregatedStatistics.fromKeySamples(checkpointId, 
sketch.getSamples());

Review Comment:
   returning the complete samples from tracker. coordinator will store both 
complete samples and ranged bounds



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to