stevenzwu commented on code in PR #10457: URL: https://github.com/apache/iceberg/pull/10457#discussion_r1630358521
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java: ########## @@ -103,14 +105,41 @@ public void initializeState(StateInitializationContext context) throws Exception if (context.isRestored()) { if (globalStatisticsState.get() == null || !globalStatisticsState.get().iterator().hasNext()) { - LOG.warn( + LOG.info( "Operator {} subtask {} doesn't have global statistics state to restore", operatorName, subtaskIndex); + // If Flink deprecates union state in the future, RequestGlobalStatisticsEvent can be + // leveraged to request global statistics from coordinator if new subtasks (scale-up case) + // has nothing to restore from. } else { - LOG.info( - "Operator {} subtask {} restoring global statistics state", operatorName, subtaskIndex); - this.globalStatistics = globalStatisticsState.get().iterator().next(); + AggregatedStatistics restoredStatistics = globalStatisticsState.get().iterator().next(); + // Range bounds is calculated from reservoir samplings with the determined partition + // (downstream parallelism). If downstream parallelism changed due to rescale, the computed + // range bounds array is not applicable. Operators should request coordinator to recompute + // the range bounds array using the new parallelism. + if (restoredStatistics.type() == StatisticsType.Sketch + && restoredStatistics.keySamples().length + 1 != downstreamParallelism) { + LOG.info( + "Operator {} subtask {} ignored restored sketch statistics as range bound length " + + "not matching downstream parallelism due to rescale. " + + "Old parallelism is {}, new parallelism is {}", + operatorName, + subtaskIndex, + restoredStatistics.keySamples().length + 1, + downstreamParallelism); + // Asynchronously request the latest global statistics calculated with new downstream + // parallelism. It is possible events may have started flowing before coordinator responds + // with global statistics. In this case, range partitioner would just blindly shuffle + // records in round robin fashion. Review Comment: great question. I also thought about the fallback heuristic. Initially, I was thinking maybe start with simple and we can improve this part if it turns out to be a problem. communication should be relatively fast (maybe a few to 10 ms) if parallelism/fan-out is not very high. > If we have higher number of subtasks, just use the old ranges, and leave idle tasks scale-up doesn't require any code change. it works like this already > If we have lower number of subtasks, then use modulo? I agree modulo could be a sensible strategy * pro: better clustering than round-robin * con: uneven distribution. some subtasks may get double the loads than the other subtasks. but if the stats refresh is fast (like less than a dozen of ms). maybe this is not a concern. Hence, I am in favor of implementing the fallback behavior for rescale -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org