pvary commented on code in PR #10331:
URL: https://github.com/apache/iceberg/pull/10331#discussion_r1599575140
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -44,51 +46,76 @@
import org.slf4j.LoggerFactory;
/**
- * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
- * DataStatisticsOperator} every subtask and then merge them together. Once
aggregation for all
- * subtasks data statistics completes, DataStatisticsCoordinator will send the
aggregated data
- * statistics back to {@link DataStatisticsOperator}. In the end a custom
partitioner will
- * distribute traffic based on the aggregated data statistics to improve data
clustering.
+ * DataStatisticsCoordinator receives {@link StatisticsEvent} from {@link
DataStatisticsOperator}
+ * every subtask and then merge them together. Once aggregation for all
subtasks data statistics
+ * completes, DataStatisticsCoordinator will send the aggregated data
statistics back to {@link
+ * DataStatisticsOperator}. In the end a custom partitioner will distribute
traffic based on the
+ * aggregated data statistics to improve data clustering.
*/
@Internal
-class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements
OperatorCoordinator {
+class DataStatisticsCoordinator implements OperatorCoordinator {
private static final Logger LOG =
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
private final String operatorName;
+ private final OperatorCoordinator.Context context;
+ private final Schema schema;
+ private final SortOrder sortOrder;
+ private final int downstreamParallelism;
+ private final StatisticsType statisticsType;
+
private final ExecutorService coordinatorExecutor;
- private final OperatorCoordinator.Context operatorCoordinatorContext;
private final SubtaskGateways subtaskGateways;
private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
- private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
- private final transient AggregatedStatisticsTracker<D, S>
aggregatedStatisticsTracker;
- private volatile AggregatedStatistics<D, S> completedStatistics;
- private volatile boolean started;
+ private final TypeSerializer<AggregatedStatistics>
aggregatedStatisticsSerializer;
+
+ private transient boolean started;
+ private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
+ private transient AggregatedStatistics completedStatistics;
DataStatisticsCoordinator(
String operatorName,
OperatorCoordinator.Context context,
- TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+ Schema schema,
+ SortOrder sortOrder,
+ int downstreamParallelism,
Review Comment:
never mind, the scaling is a restart, so we will have a correct
`downstreamParallelism` at hand
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]