pvary commented on code in PR #10457:
URL: https://github.com/apache/iceberg/pull/10457#discussion_r1630164955


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -103,14 +105,41 @@ public void initializeState(StateInitializationContext 
context) throws Exception
     if (context.isRestored()) {
       if (globalStatisticsState.get() == null
           || !globalStatisticsState.get().iterator().hasNext()) {
-        LOG.warn(
+        LOG.info(
             "Operator {} subtask {} doesn't have global statistics state to 
restore",
             operatorName,
             subtaskIndex);
+        // If Flink deprecates union state in the future, 
RequestGlobalStatisticsEvent can be
+        // leveraged to request global statistics from coordinator if new 
subtasks (scale-up case)
+        // has nothing to restore from.
       } else {
-        LOG.info(
-            "Operator {} subtask {} restoring global statistics state", 
operatorName, subtaskIndex);
-        this.globalStatistics = globalStatisticsState.get().iterator().next();
+        AggregatedStatistics restoredStatistics = 
globalStatisticsState.get().iterator().next();
+        // Range bounds is calculated from reservoir samplings with the 
determined partition
+        // (downstream parallelism). If downstream parallelism changed due to 
rescale, the computed
+        // range bounds array is not applicable. Operators should request 
coordinator to recompute
+        // the range bounds array using the new parallelism.
+        if (restoredStatistics.type() == StatisticsType.Sketch
+            && restoredStatistics.keySamples().length + 1 != 
downstreamParallelism) {
+          LOG.info(
+              "Operator {} subtask {} ignored restored sketch statistics as 
range bound length "
+                  + "not matching downstream parallelism due to rescale. "
+                  + "Old parallelism is {}, new parallelism is {}",
+              operatorName,
+              subtaskIndex,
+              restoredStatistics.keySamples().length + 1,
+              downstreamParallelism);
+          // Asynchronously request the latest global statistics calculated 
with new downstream
+          // parallelism. It is possible events may have started flowing 
before coordinator responds
+          // with global statistics. In this case, range partitioner would 
just blindly shuffle
+          // records in round robin fashion.

Review Comment:
   Maybe we could be better off if we use the old ranges with some heuristics?
   - If we have higher number of subtasks, just use the old ranges, and leave 
idle tasks
   - If we have lower number of subtasks, then use modulo?
   
   Or the communication is fast enough, that it doesn't worth the complexity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to