yegangy0718 commented on code in PR #7360:
URL: https://github.com/apache/iceberg/pull/7360#discussion_r1170639078


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {

Review Comment:
   Make sense to me. The reader may think `AggregateDataStatistics` is another 
implement of `DataStatistics`. I will rename the class name to 
`GlobalStatisticsAccumulator` and add javadoc. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+  private final Set<Integer> subtaskSet = Sets.newHashSet();
+
+  AggregateDataStatistics(long checkpoint, final DataStatisticsFactory<K> 
statisticsFactory) {
+    this.checkpointId = checkpoint;
+    this.dataStatistics = statisticsFactory.createDataStatistics();
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  void mergeDataStatistic(int subtask, DataStatisticsEvent<K> event) {
+    Preconditions.checkArgument(

Review Comment:
   `mergeDataStatistic` is called by `handleDataStatisticRequest`,  and 
`handleDataStatisticRequest` is called by `handleEventFromOperator` in the 
coordinator thread.
   If the `checkArgument` fails, it will throw IllegalArgumentException and 
`runInCoordinatorThread` method will catch it  and call context.failjob
   ```
     private void runInCoordinatorThread(
         ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
       ensureStarted();
       coordinatorExecutor.execute(
           () -> {
             try {
               action.run();
             } catch (Throwable t) {
               // if we have a JVM critical error, promote it immediately, 
there is a good
               // chance the logging or job failing will not succeed anymore
               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
   
               String actionString = String.format(actionName, 
actionNameFormatParameters);
               LOG.error(
                   "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
                   operatorName,
                   actionString,
                   t);
               context.failJob(t);
             }
           });
   ```
   And note that `handleEventFromOperator` is not called at checkpoint, so 
coordinator cannot abort any checkpoint. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / 
context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's more than the expected percentage {}. Thus sending the 
aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's less than the expected percentage {}. Thus dropping the 
incomplete aggregate data statistics {} and starting collecting data statistics 
from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > 
checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from 
older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == 
context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. 
Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) 
event));
+          } else {

Review Comment:
   will update to 
   ```
   Preconditions.checkArgument(event instanceof DataStatisticsEvent);
             handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) 
event));
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+  private final Set<Integer> subtaskSet = Sets.newHashSet();
+
+  AggregateDataStatistics(long checkpoint, final DataStatisticsFactory<K> 
statisticsFactory) {
+    this.checkpointId = checkpoint;
+    this.dataStatistics = statisticsFactory.createDataStatistics();
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  void mergeDataStatistic(int subtask, DataStatisticsEvent<K> event) {
+    Preconditions.checkArgument(
+        checkpointId == event.checkpointId(),
+        "Received unexpected event from checkpoint %s. Expected checkpoint %s",
+        event.checkpointId(),
+        checkpointId);
+    if (!subtaskSet.add(subtask)) {
+      return;

Review Comment:
   Sounds make. I will add 
   ```
         LOG.debug("Receive duplicated data statistics for checkpoint {} 
subtask {}. Ignore it.", checkpointId, subtask);
   
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / 
context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's more than the expected percentage {}. Thus sending the 
aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());

Review Comment:
   <img width="715" alt="Screenshot 2023-04-18 at 5 00 48 PM" 
src="https://user-images.githubusercontent.com/8229749/232930010-a8f74c25-240e-4ab8-bbd6-4431653ab79e.png";>
   I use function `dataStatistics` in unit test to verify 
GlobalStatisticsAccumulator merged data statistics result as well. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> 
globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));

Review Comment:
   will remove globalDataStatistics from log message



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> 
globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  void attemptReady(OperatorCoordinator.SubtaskGateway gateway) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.registerSubtaskGateway(gateway);
+  }
+
+  void attemptFailed(int subtaskIndex, int attemptNumber) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.unregisterSubtaskGateway(subtaskIndex, attemptNumber);
+  }
+
+  void subtaskReset(int subtaskIndex) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.reset(subtaskIndex);
+  }
+
+  void failJob(Throwable cause) {
+    operatorCoordinatorContext.failJob(cause);
+  }
+
+  /**
+   * A helper method that delegates the callable to the coordinator thread if 
the current thread is
+   * not the coordinator thread, otherwise call the callable right away.
+   *
+   * @param callable the callable to delegate.
+   */
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      this.gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        this.gateways[i] = new HashMap<>();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {

Review Comment:
   Flink side doesn't log it. My only concern is, when there are lots of 
subtasks, debug log will generate too many logs one this. But it doesn't hurt. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(

Review Comment:
   Do you mean rename `runInCoordinatorThread` to `runInCoordinatorTasks` ?
    The executor is defined by 
   ```
   ExecutorService coordinatorExecutor =
           Executors.newSingleThreadExecutor(coordinatorThreadFactory);
   ```
   And when implementing this class, I actually take 
https://github.com/apache/flink/blob/423cdcb99c3f66be435ad2e70d6a15f10a69e252/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java#L183.
 as a reference. 
   `runInCoordinatorThread` is more proper I think. WDYT?
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -138,6 +137,14 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
     localStatistics = statisticsFactory.createDataStatistics();
   }
 
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    super.notifyCheckpointComplete(checkpointId);
+    // Send global statistics to partitioners at checkpoint to update data 
distribution at the same

Review Comment:
   it's the comment for the line 145



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);

Review Comment:
   will remove `final` 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();

Review Comment:
   Indeed, in the `start` method, there is not much to do. The executor doesn't 
need to call any function to start. Whenever this is any job/task, submit it to 
the executor, that's it.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / 
context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's more than the expected percentage {}. Thus sending the 
aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's less than the expected percentage {}. Thus dropping the 
incomplete aggregate data statistics {} and starting collecting data statistics 
from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > 
checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from 
older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == 
context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. 
Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) 
event));
+          } else {
+            throw new FlinkException("Unrecognized data statistics operator 
event: " + event);
+          }
+        },
+        "handling operator event %s from data statistics operator subtask %d 
(#%d)",
+        event,

Review Comment:
   This is the error message which will only be logged when an exception 
happens. 
   I will update it to only log event class. 
   
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {

Review Comment:
   SGTM. Will rename it. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / 
context.currentParallelism()

Review Comment:
   The challenge to put the logic into `GlobalStatisticsAccumulator` is, based 
on different conditions, the operations on inProgressAggregation and 
lastCompleteAggregation are different. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregateDataStatistics.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+class AggregateDataStatistics<K> implements Serializable {
+
+  private final long checkpointId;
+  private final DataStatistics<K> dataStatistics;
+  private final Set<Integer> subtaskSet = Sets.newHashSet();
+
+  AggregateDataStatistics(long checkpoint, final DataStatisticsFactory<K> 
statisticsFactory) {
+    this.checkpointId = checkpoint;
+    this.dataStatistics = statisticsFactory.createDataStatistics();
+  }
+
+  long checkpointId() {
+    return checkpointId;
+  }
+
+  DataStatistics<K> dataStatistics() {
+    return dataStatistics;
+  }
+
+  void mergeDataStatistic(int subtask, DataStatisticsEvent<K> event) {
+    Preconditions.checkArgument(
+        checkpointId == event.checkpointId(),
+        "Received unexpected event from checkpoint %s. Expected checkpoint %s",
+        event.checkpointId(),
+        checkpointId);
+    if (!subtaskSet.add(subtask)) {
+      return;
+    }
+
+    dataStatistics.merge(event.dataStatistics());
+  }
+
+  long aggregateSize() {

Review Comment:
   OK. I will rename the function to `accumulatedSubtasksCount`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / 
context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's more than the expected percentage {}. Thus sending the 
aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's less than the expected percentage {}. Thus dropping the 
incomplete aggregate data statistics {} and starting collecting data statistics 
from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > 
checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from 
older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == 
context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. 
Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) 
event));
+          } else {
+            throw new FlinkException("Unrecognized data statistics operator 
event: " + event);
+          }
+        },
+        "handling operator event %s from data statistics operator subtask %d 
(#%d)",
+        event,
+        subtask,
+        attemptNumber);
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          try {
+            byte[] serializedDataDistributionWeight =

Review Comment:
   we need to checkpoint both statistics and checkpoint id (subtask set is not 
needed)
   since in #handleDataStatisticRequest, we checked lastCompleteAggregation 
checkpointId with the event checkpoint id 
   ```
       if (lastCompleteAggregation != null
           && lastCompleteAggregation.checkpointId() >= checkpointId) {
         LOG.debug(
             "Data statistics aggregation for checkpoint {} has completed. 
Ignore the event from subtask {} for checkpoint {}",
             lastCompleteAggregation.checkpointId(),
             subtask,
             checkpointId);
         return;
       }
   ```



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  public DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null

Review Comment:
   SGTM. Will rename it. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();

Review Comment:
   ExecutorService#shutdown(Initiates an orderly shutdown and but no new tasks 
will be accepted) and ExecutorService#shutdownNow(Attempts to stop all actively 
executing tasks) are used for different purposes. 
   In DataStatisticsCoordinator, it calls `context.close()` first which uses 
`ExecutorService#shutdown` , and then call `coordinatorExecutor.shutdownNow()` 
second to shut down all. 
   
   ```
     @Override
     public void close() throws Exception {
       LOG.info("Closing data statistics coordinator for {}.", operatorName);
       try {
         if (started) {
           context.close();
         }
       } finally {
         coordinatorExecutor.shutdownNow();
         // We do not expect this to actually block for long. At this point, 
there should
         // be very few task running in the executor, if any.
         coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
       }
     }
   ``` 
   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> 
globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {

Review Comment:
   will rename it to parallelism



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> 
globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);

Review Comment:
   Double checked how SourceCoordiantorContext handles that. 
   It calls CoordinatryThread for every subtask 
   
   ```
       void sendEventToSourceOperator(int subtaskId, OperatorEvent event) {
           checkSubtaskIndex(subtaskId);
   
           callInCoordinatorThread(
                   () -> {
                       final OperatorCoordinator.SubtaskGateway gateway =
                               
subtaskGateways.getOnlyGatewayAndCheckReady(subtaskId);
                       gateway.sendEvent(event);
                       return null;
                   },
                   String.format("Failed to send event %s to subtask %d", 
event, subtaskId));
       }
   ```
   For the two options,  execute the loop for all subtasks in coordinator 
thread vs define loop outside and then call coordiantor thread for every 
subtask, I feel that there is no mush difference. Since  
callInCoordinatorThread is sync call, any one fails/throws exception, the 
remaining one won't be executed. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / 
context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's more than the expected percentage {}. Thus sending the 
aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's less than the expected percentage {}. Thus dropping the 
incomplete aggregate data statistics {} and starting collecting data statistics 
from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > 
checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from 
older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);

Review Comment:
   yes, we can do that. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;

Review Comment:
   I googled percentage vs ratio. Below is the result
   ```
   A ratio is a comparison of two similar quantities. Given any two similar 
quantities a and b, the ratio of a to b that is a:b is defined as a:b = a/b, 
where b≠0. Percentage means 'by the hundred' or 'divide by one hundred'. The 
percentage is also used to compare quantities, which means 'per 100'.
   ```
   In our case, we want to make sure we receive a good amount of subtasks data 
statistics, so PERCENTAGE is more proper I think. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> 
globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  void attemptReady(OperatorCoordinator.SubtaskGateway gateway) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.registerSubtaskGateway(gateway);
+  }
+
+  void attemptFailed(int subtaskIndex, int attemptNumber) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.unregisterSubtaskGateway(subtaskIndex, attemptNumber);
+  }
+
+  void subtaskReset(int subtaskIndex) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.reset(subtaskIndex);
+  }
+
+  void failJob(Throwable cause) {
+    operatorCoordinatorContext.failJob(cause);
+  }
+
+  /**
+   * A helper method that delegates the callable to the coordinator thread if 
the current thread is
+   * not the coordinator thread, otherwise call the callable right away.
+   *
+   * @param callable the callable to delegate.
+   */
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {

Review Comment:
   same reply as https://github.com/apache/iceberg/pull/7360/files#r1170076511



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregation
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+class DataStatisticsCoordinator<K> implements OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
0.8;
+
+  private final String operatorName;
+  // A single-thread executor to handle all the actions for coordinator
+  private final ExecutorService coordinatorExecutor;
+  private final DataStatisticsCoordinatorContext<K> context;
+  private final DataStatisticsFactory<K> statisticsFactory;
+
+  private volatile AggregateDataStatistics<K> 
incompleteAggregateDataStatistics;
+  private volatile AggregateDataStatistics<K> completeAggregateDataStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorContext<K> context,
+      DataStatisticsFactory<K> statisticsFactory) {
+    this.operatorName = operatorName;
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.context = context;
+    this.statisticsFactory = statisticsFactory;
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    try {
+      if (started) {
+        context.close();
+      }
+    } finally {
+      coordinatorExecutor.shutdownNow();
+      // We do not expect this to actually block for long. At this point, 
there should
+      // be very few task running in the executor, if any.
+      coordinatorExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    }
+  }
+
+  private void runInCoordinatorThread(
+      ThrowingRunnable<Throwable> action, String actionName, Object... 
actionNameFormatParameters) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            // if we have a JVM critical error, promote it immediately, there 
is a good
+            // chance the logging or job failing will not succeed anymore
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+            final String actionString = String.format(actionName, 
actionNameFormatParameters);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    if (!this.started) {
+      throw new IllegalStateException("The coordinator has not started yet.");
+    }
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<K> 
event) {
+    long checkpointId = event.checkpointId();
+
+    if (completeAggregateDataStatistics != null
+        && completeAggregateDataStatistics.checkpointId() >= checkpointId) {
+      LOG.debug(
+          "Data statistics aggregation for checkpoint {} has completed. Ignore 
the event from subtask {} for checkpoint {}",
+          completeAggregateDataStatistics.checkpointId(),
+          subtask,
+          checkpointId);
+      return;
+    }
+
+    if (incompleteAggregateDataStatistics == null) {
+      incompleteAggregateDataStatistics =
+          new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+    }
+
+    if (incompleteAggregateDataStatistics.checkpointId() < checkpointId) {
+      if ((double) incompleteAggregateDataStatistics.aggregateSize() / 
context.currentParallelism()
+          >= EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE) {
+        completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's more than the expected percentage {}. Thus sending the 
aggregate data statistics {} to subtasks.",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            completeAggregateDataStatistics);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+        context.sendDataStatisticsToSubtasks(
+            incompleteAggregateDataStatistics.checkpointId(),
+            completeAggregateDataStatistics.dataStatistics());
+        return;
+      } else {
+        LOG.info(
+            "Received data statistics from {} operators out of total {} for 
checkpoint {}. It's less than the expected percentage {}. Thus dropping the 
incomplete aggregate data statistics {} and starting collecting data statistics 
from new checkpoint {}",
+            incompleteAggregateDataStatistics.aggregateSize(),
+            context.currentParallelism(),
+            incompleteAggregateDataStatistics.checkpointId(),
+            EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE,
+            incompleteAggregateDataStatistics,
+            checkpointId);
+        incompleteAggregateDataStatistics =
+            new AggregateDataStatistics<>(checkpointId, statisticsFactory);
+        incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+      }
+    } else if (incompleteAggregateDataStatistics.checkpointId() > 
checkpointId) {
+      LOG.debug(
+          "Expect data statistics for checkpoint {}, but receive event from 
older checkpoint {}. Ignore it.",
+          incompleteAggregateDataStatistics.checkpointId(),
+          checkpointId);
+      return;
+    } else {
+      incompleteAggregateDataStatistics.mergeDataStatistic(subtask, event);
+    }
+
+    if (incompleteAggregateDataStatistics.aggregateSize() == 
context.currentParallelism()) {
+      completeAggregateDataStatistics = incompleteAggregateDataStatistics;
+      LOG.info(
+          "Received data statistics from all {} operators for checkpoint {}. 
Sending the aggregated data statistics {} to subtasks.",
+          context.currentParallelism(),
+          incompleteAggregateDataStatistics.checkpointId(),
+          completeAggregateDataStatistics);
+      incompleteAggregateDataStatistics = null;
+      context.sendDataStatisticsToSubtasks(
+          checkpointId, completeAggregateDataStatistics.dataStatistics());
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          if (event instanceof DataStatisticsEvent) {
+            handleDataStatisticRequest(subtask, ((DataStatisticsEvent<K>) 
event));
+          } else {
+            throw new FlinkException("Unrecognized data statistics operator 
event: " + event);
+          }
+        },
+        "handling operator event %s from data statistics operator subtask %d 
(#%d)",
+        event,
+        subtask,
+        attemptNumber);
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          try {
+            byte[] serializedDataDistributionWeight =
+                
InstantiationUtil.serializeObject(completeAggregateDataStatistics);
+            resultFuture.complete(serializedDataDistributionWeight);
+          } catch (Throwable e) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
+            resultFuture.completeExceptionally(
+                new CompletionException(
+                    String.format("Failed to checkpoint data statistics for 
%s", operatorName), e));
+          }
+        },
+        "taking checkpoint %d",
+        checkpointId);
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void notifyCheckpointAborted(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", 
operatorName, checkpointId);
+    completeAggregateDataStatistics =
+        InstantiationUtil.deserializeObject(
+            checkpointData, AggregateDataStatistics.class.getClassLoader());
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    this.runInCoordinatorThread(

Review Comment:
   right. I will remove all unneeded 'this'



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorProvider.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.jetbrains.annotations.NotNull;
+
+public class DataStatisticsCoordinatorProvider<K extends Serializable>
+    extends RecreateOnResetOperatorCoordinator.Provider {
+
+  private final String operatorName;
+  private final DataStatisticsFactory<K> dataStatisticsFactory;
+
+  public DataStatisticsCoordinatorProvider(
+      String operatorName, OperatorID operatorID, DataStatisticsFactory<K> 
dataStatisticsFactory) {
+    super(operatorID);
+    this.operatorName = operatorName;
+    this.dataStatisticsFactory = dataStatisticsFactory;
+  }
+
+  @Override
+  public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) {
+    DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    ExecutorService coordinatorExecutor =
+        Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    DataStatisticsCoordinatorContext<K> dataStatisticsCoordinatorContext =

Review Comment:
   When implementing the DataStatisticsCoordiantorProvider, I took 
SourceCoordinatorProvider as refernce.  
https://github.com/apache/flink/blob/423cdcb99c3f66be435ad2e70d6a15f10a69e252/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java#L75
  The SourceCoordinatorContext is more complex which needs to assignSplits and 
send event to source readers.  The DataStatisticsContext is much simpler. We 
can leave DataStatisticsCoordinator to initialize the executor and the context 
like what you said.   



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorContext.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStatisticsCoordinatorContext<K> implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinatorContext.class);
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+
+  DataStatisticsCoordinatorContext(
+      ExecutorService coordinatorExecutor,
+      DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+      OperatorCoordinator.Context operatorCoordinatorContext) {
+    this.coordinatorExecutor = coordinatorExecutor;
+    this.coordinatorThreadFactory = coordinatorThreadFactory;
+    this.operatorCoordinatorContext = operatorCoordinatorContext;
+    this.subtaskGateways = new SubtaskGateways(currentParallelism());
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+  }
+
+  void sendDataStatisticsToSubtasks(long checkpointId, DataStatistics<K> 
globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<K> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics);
+          int parallelism = currentParallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format(
+            "Failed to send global data statistics %s for checkpoint %d",
+            globalDataStatistics, checkpointId));
+  }
+
+  int currentParallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  void attemptReady(OperatorCoordinator.SubtaskGateway gateway) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.registerSubtaskGateway(gateway);
+  }
+
+  void attemptFailed(int subtaskIndex, int attemptNumber) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.unregisterSubtaskGateway(subtaskIndex, attemptNumber);
+  }
+
+  void subtaskReset(int subtaskIndex) {
+    
Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+    this.subtaskGateways.reset(subtaskIndex);
+  }
+
+  void failJob(Throwable cause) {
+    operatorCoordinatorContext.failJob(cause);
+  }
+
+  /**
+   * A helper method that delegates the callable to the coordinator thread if 
the current thread is
+   * not the coordinator thread, otherwise call the callable right away.
+   *
+   * @param callable the callable to delegate.
+   */
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      this.gateways = new Map[parallelism];

Review Comment:
   gateways is an array of Map. We can use `Maps.newHashMap()` to init the map 
in the array.
   Not sure how to use Maps to define the array. Any example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to