pvary commented on code in PR #10331:
URL: https://github.com/apache/iceberg/pull/10331#discussion_r1599574241


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -44,51 +46,76 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
- * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
- * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated data
- * statistics back to {@link DataStatisticsOperator}. In the end a custom 
partitioner will
- * distribute traffic based on the aggregated data statistics to improve data 
clustering.
+ * DataStatisticsCoordinator receives {@link StatisticsEvent} from {@link 
DataStatisticsOperator}
+ * every subtask and then merge them together. Once aggregation for all 
subtasks data statistics
+ * completes, DataStatisticsCoordinator will send the aggregated data 
statistics back to {@link
+ * DataStatisticsOperator}. In the end a custom partitioner will distribute 
traffic based on the
+ * aggregated data statistics to improve data clustering.
  */
 @Internal
-class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+class DataStatisticsCoordinator implements OperatorCoordinator {
   private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
 
   private final String operatorName;
+  private final OperatorCoordinator.Context context;
+  private final Schema schema;
+  private final SortOrder sortOrder;
+  private final int downstreamParallelism;
+  private final StatisticsType statisticsType;
+
   private final ExecutorService coordinatorExecutor;
-  private final OperatorCoordinator.Context operatorCoordinatorContext;
   private final SubtaskGateways subtaskGateways;
   private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
-  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
-  private final transient AggregatedStatisticsTracker<D, S> 
aggregatedStatisticsTracker;
-  private volatile AggregatedStatistics<D, S> completedStatistics;
-  private volatile boolean started;
+  private final TypeSerializer<AggregatedStatistics> 
aggregatedStatisticsSerializer;
+
+  private transient boolean started;
+  private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
+  private transient AggregatedStatistics completedStatistics;
 
   DataStatisticsCoordinator(
       String operatorName,
       OperatorCoordinator.Context context,
-      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+      Schema schema,
+      SortOrder sortOrder,
+      int downstreamParallelism,

Review Comment:
   How will this behave with the autoscaler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to