pvary commented on code in PR #10331: URL: https://github.com/apache/iceberg/pull/10331#discussion_r1599575140
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java: ########## @@ -44,51 +46,76 @@ import org.slf4j.LoggerFactory; /** - * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link - * DataStatisticsOperator} every subtask and then merge them together. Once aggregation for all - * subtasks data statistics completes, DataStatisticsCoordinator will send the aggregated data - * statistics back to {@link DataStatisticsOperator}. In the end a custom partitioner will - * distribute traffic based on the aggregated data statistics to improve data clustering. + * DataStatisticsCoordinator receives {@link StatisticsEvent} from {@link DataStatisticsOperator} + * every subtask and then merge them together. Once aggregation for all subtasks data statistics + * completes, DataStatisticsCoordinator will send the aggregated data statistics back to {@link + * DataStatisticsOperator}. In the end a custom partitioner will distribute traffic based on the + * aggregated data statistics to improve data clustering. */ @Internal -class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements OperatorCoordinator { +class DataStatisticsCoordinator implements OperatorCoordinator { private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class); private final String operatorName; + private final OperatorCoordinator.Context context; + private final Schema schema; + private final SortOrder sortOrder; + private final int downstreamParallelism; + private final StatisticsType statisticsType; + private final ExecutorService coordinatorExecutor; - private final OperatorCoordinator.Context operatorCoordinatorContext; private final SubtaskGateways subtaskGateways; private final CoordinatorExecutorThreadFactory coordinatorThreadFactory; - private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer; - private final transient AggregatedStatisticsTracker<D, S> aggregatedStatisticsTracker; - private volatile AggregatedStatistics<D, S> completedStatistics; - private volatile boolean started; + private final TypeSerializer<AggregatedStatistics> aggregatedStatisticsSerializer; + + private transient boolean started; + private transient AggregatedStatisticsTracker aggregatedStatisticsTracker; + private transient AggregatedStatistics completedStatistics; DataStatisticsCoordinator( String operatorName, OperatorCoordinator.Context context, - TypeSerializer<DataStatistics<D, S>> statisticsSerializer) { + Schema schema, + SortOrder sortOrder, + int downstreamParallelism, Review Comment: never mind, the scaling is a restart, so we will have a correct `downstreamParallelism` at hand -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org