yegangy0718 commented on code in PR #7360:
URL: https://github.com/apache/iceberg/pull/7360#discussion_r1263279238


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;

Review Comment:
   the operator name is only used for logging. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -121,6 +124,10 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
         checkpointId,
         subTaskId);
 
+    // Send global statistics to partitioners at checkpoint to update data 
distribution at the same

Review Comment:
   it only applies to single line below



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GlobalStatisticsAggregator is used by {@link DataStatisticsCoordinator} to 
collect {@link
+ * DataStatistics} from {@link DataStatisticsOperator} subtasks for specific 
checkpoint. It stores
+ * the merged {@link DataStatistics} result and uses set to keep a record of 
all reported subtasks.
+ */
+@Internal
+class GlobalStatisticsAggregatorTracker<D extends DataStatistics<D, S>, S> 
implements Serializable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GlobalStatisticsAggregatorTracker.class);
+  private static final double EXPECTED_DATA_STATISTICS_RECEIVED_PERCENTAGE = 
90;
+  private final transient TypeSerializer<DataStatistics<D, S>> 
statisticsSerializer;
+  private final transient int parallelism;
+  private transient volatile GlobalStatisticsAggregator<D, S> 
inProgressAggregator;
+  private volatile GlobalStatisticsAggregator<D, S> lastCompletedAggregator;
+
+  GlobalStatisticsAggregatorTracker(
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer, int 
parallelism) {
+    this.statisticsSerializer = statisticsSerializer;
+    this.parallelism = parallelism;
+  }
+
+  boolean receiveDataStatisticEventAndCheckCompletion(

Review Comment:
   We still need a variable to save the `completedStatistics` since in 
`DataStatisticsCoordiantor` functions `checkpointCoordinator` and 
`resetToCheckpoint` they  need to get the value of `completedStatistics` .
   Will define a class variable called `completedStatistics` in coordinator to 
record the value. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(

Review Comment:
   `isCurrentTrackingComplete` depends on `receiveDataStatisticsEvent` 
conditionally. It's hard to split them



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {

Review Comment:
   updated



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");

Review Comment:
   done



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");

Review Comment:
   done



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+

Review Comment:
   done



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);

Review Comment:
   Got it. Update `DataStatistics` to `data statistics`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);

Review Comment:
   updated



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsEvent.java:
##########
@@ -30,28 +34,47 @@
 class DataStatisticsEvent<D extends DataStatistics<D, S>, S> implements 
OperatorEvent {
 
   private static final long serialVersionUID = 1L;
-
   private final long checkpointId;
-  private final DataStatistics<D, S> dataStatistics;
+  private final byte[] dataStatisticsBytes;
 
-  DataStatisticsEvent(long checkpointId, DataStatistics<D, S> dataStatistics) {
+  DataStatisticsEvent(
+      long checkpointId,
+      DataStatistics<D, S> dataStatistics,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
     this.checkpointId = checkpointId;
-    this.dataStatistics = dataStatistics;
+    DataOutputSerializer out = new DataOutputSerializer(64);
+    try {
+      statisticsSerializer.serialize(dataStatistics, out);
+      this.dataStatisticsBytes = out.getCopyOfBuffer();
+    } catch (IOException e) {
+      throw new IllegalStateException("Fail to serialize data statistics", e);
+    }
   }
 
   long checkpointId() {
     return checkpointId;
   }
 
-  DataStatistics<D, S> dataStatistics() {
+  @SuppressWarnings("unchecked")
+  D dataStatistics(TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {

Review Comment:
   created a separate `DataStatisticsUtil` class to serialize/deserialize 
DataStatistics to bytes 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinatorProvider.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.ThreadFactory;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * DataStatisticsCoordinatorProvider provides the method to create new {@link
+ * DataStatisticsCoordinator} and defines {@link 
CoordinatorExecutorThreadFactory} to create new
+ * thread for {@link DataStatisticsCoordinator} to execute task
+ */
+public class DataStatisticsCoordinatorProvider<D extends DataStatistics<D, S>, 
S>
+    extends RecreateOnResetOperatorCoordinator.Provider {
+
+  private final String operatorName;
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  public DataStatisticsCoordinatorProvider(
+      String operatorName,
+      OperatorID operatorID,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    super(operatorID);
+    this.operatorName = operatorName;
+    this.statisticsSerializer = statisticsSerializer;
+  }
+
+  @Override
+  public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) {
+    return new DataStatisticsCoordinator<>(operatorName, context, 
statisticsSerializer);
+  }
+
+  static class CoordinatorExecutorThreadFactory

Review Comment:
   yeah, can move ThreadFactory to  `DataStatisticsCoordinator`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);

Review Comment:
   updated



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {

Review Comment:
   followed 
https://github.com/apache/flink/blob/8829b5e60a71b871462d1d2bb849c926b0de9b80/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java#L150
 earlier. In Iceberg repo, indeed most places use simple shut down



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/GlobalStatisticsAggregatorTracker.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GlobalStatisticsAggregator is used by {@link DataStatisticsCoordinator} to 
collect {@link
+ * DataStatistics} from {@link DataStatisticsOperator} subtasks for specific 
checkpoint. It stores
+ * the merged {@link DataStatistics} result and uses set to keep a record of 
all reported subtasks.
+ */
+@Internal
+class GlobalStatisticsAggregatorTracker<D extends DataStatistics<D, S>, S> 
implements Serializable {

Review Comment:
   I'm OK  to remove aggregator from class name



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", 
operatorName, checkpointId);
+
+    
globalStatisticsAggregatorTracker.deserializeLastCompletedAggregator(checkpointData);
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Resetting subtask {} to checkpoint {} for data statistics {}.",
+              subtask,
+              checkpointId,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.reset(subtask);
+        },
+        String.format("handling subtask %d recovery to checkpoint %d", 
subtask, checkpointId));
+  }
+
+  @Override
+  public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable 
Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} (#{}) of 
data statistic {}.",
+              subtask,
+              attemptNumber,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+        },
+        String.format("handling subtask %d (#%d) failure", subtask, 
attemptNumber));
+  }
+
+  @Override
+  public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+    Preconditions.checkArgument(subtask == gateway.getSubtask());
+    Preconditions.checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+    runInCoordinatorThread(
+        () -> {
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.registerSubtaskGateway(gateway);
+        },
+        String.format(
+            "making event gateway to subtask %d (#%d) available", subtask, 
attemptNumber));
+  }
+
+  @VisibleForTesting
+  GlobalStatisticsAggregatorTracker<D, S> globalStatisticsAggregatorTracker() {
+    return globalStatisticsAggregatorTracker;
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        gateways[i] = Maps.newHashMap();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+      int subtaskIndex = gateway.getSubtask();
+      int attemptNumber = gateway.getExecution().getAttemptNumber();
+      Preconditions.checkState(
+          !gateways[subtaskIndex].containsKey(attemptNumber),
+          "Already have a subtask gateway for %d (#%d).",
+          subtaskIndex,
+          attemptNumber);
+      LOG.debug("Register gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].put(attemptNumber, gateway);
+    }
+
+    private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) 
{
+      LOG.debug("Unregister gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].remove(attemptNumber);
+    }
+
+    private OperatorCoordinator.SubtaskGateway getOnlyGatewayAndCheckReady(int 
subtaskIndex) {
+      Preconditions.checkState(
+          gateways[subtaskIndex].size() > 0,
+          "Subtask %d is not ready yet to receive events.",

Review Comment:
   subtask class doesn't know the operator name. Will add it into class 
variable. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", 
operatorName, checkpointId);
+
+    
globalStatisticsAggregatorTracker.deserializeLastCompletedAggregator(checkpointData);
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Resetting subtask {} to checkpoint {} for data statistics {}.",
+              subtask,
+              checkpointId,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.reset(subtask);
+        },
+        String.format("handling subtask %d recovery to checkpoint %d", 
subtask, checkpointId));
+  }
+
+  @Override
+  public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable 
Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} (#{}) of 
data statistic {}.",
+              subtask,
+              attemptNumber,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+        },
+        String.format("handling subtask %d (#%d) failure", subtask, 
attemptNumber));
+  }
+
+  @Override
+  public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+    Preconditions.checkArgument(subtask == gateway.getSubtask());
+    Preconditions.checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+    runInCoordinatorThread(
+        () -> {
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.registerSubtaskGateway(gateway);
+        },
+        String.format(
+            "making event gateway to subtask %d (#%d) available", subtask, 
attemptNumber));
+  }
+
+  @VisibleForTesting
+  GlobalStatisticsAggregatorTracker<D, S> globalStatisticsAggregatorTracker() {
+    return globalStatisticsAggregatorTracker;
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        gateways[i] = Maps.newHashMap();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+      int subtaskIndex = gateway.getSubtask();
+      int attemptNumber = gateway.getExecution().getAttemptNumber();
+      Preconditions.checkState(
+          !gateways[subtaskIndex].containsKey(attemptNumber),
+          "Already have a subtask gateway for %d (#%d).",
+          subtaskIndex,
+          attemptNumber);
+      LOG.debug("Register gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);

Review Comment:
   Will add operator name into SubtaskGateways constructor



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);

Review Comment:
   you're right. Removed this line



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated
+ * result(global data statistics) back to {@link DataStatisticsOperator}. In 
the end a custom
+ * partitioner will distribute traffic based on the global data statistics to 
improve data
+ * clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final SubtaskGateways subtaskGateways;
+  private final 
DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory
+      coordinatorThreadFactory;
+  private final GlobalStatisticsAggregatorTracker<D, S> 
globalStatisticsAggregatorTracker;
+
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new DataStatisticsCoordinatorProvider.CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways = new SubtaskGateways(parallelism());
+    this.statisticsSerializer = statisticsSerializer;
+    this.globalStatisticsAggregatorTracker =
+        new GlobalStatisticsAggregatorTracker<>(statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator for {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Closing data statistics coordinator for {}.", operatorName);
+    coordinatorExecutor.shutdown();
+    try {
+      if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.warn(
+            "Fail to shut down data statistics coordinator context gracefully. 
Shutting down now");
+        coordinatorExecutor.shutdownNow();
+        if (!coordinatorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          LOG.warn("Fail to terminate data statistics coordinator context");
+          return;
+        }
+      }
+      LOG.info("Data statistics coordinator context closed.");
+    } catch (InterruptedException e) {
+      coordinatorExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+      LOG.error("Errors occurred while closing the data statistics coordinator 
context", e);
+    }
+
+    LOG.info("Data statistics coordinator for {} closed.", operatorName);
+  }
+
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        final Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error("Uncaught Exception in DataStatistics Coordinator 
Executor", t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error("Uncaught Exception in DataStatistics coordinator executor", 
t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics {} while {}. 
Triggering job failover.",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    if 
(globalStatisticsAggregatorTracker.receiveDataStatisticEventAndCheckCompletion(
+        subtask, event)) {
+      GlobalStatisticsAggregator<D, S> lastCompletedAggregator =
+          globalStatisticsAggregatorTracker.lastCompletedAggregator();
+      sendDataStatisticsToSubtasks(
+          lastCompletedAggregator.checkpointId(), 
lastCompletedAggregator.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              new DataStatisticsEvent<>(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            
subtaskGateways.getOnlyGatewayAndCheckReady(i).sendEvent(dataStatisticsEvent);
+          }
+          return null;
+        },
+        String.format("Failed to send global data statistics for checkpoint 
%d", checkpointId));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName,
+              event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format(
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(), subtask, attemptNumber));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Taking a state snapshot on data statistics coordinator {} for 
checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              
globalStatisticsAggregatorTracker.serializeLastCompletedAggregator());
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    if (started) {
+      throw new IllegalStateException(
+          "The coordinator can only be reset if it was not yet started");
+    }
+
+    if (checkpointData == null) {
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}.", 
operatorName, checkpointId);
+
+    
globalStatisticsAggregatorTracker.deserializeLastCompletedAggregator(checkpointData);
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Resetting subtask {} to checkpoint {} for data statistics {}.",
+              subtask,
+              checkpointId,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.reset(subtask);
+        },
+        String.format("handling subtask %d recovery to checkpoint %d", 
subtask, checkpointId));
+  }
+
+  @Override
+  public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable 
Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} (#{}) of 
data statistic {}.",
+              subtask,
+              attemptNumber,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+        },
+        String.format("handling subtask %d (#%d) failure", subtask, 
attemptNumber));
+  }
+
+  @Override
+  public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+    Preconditions.checkArgument(subtask == gateway.getSubtask());
+    Preconditions.checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+    runInCoordinatorThread(
+        () -> {
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.registerSubtaskGateway(gateway);
+        },
+        String.format(
+            "making event gateway to subtask %d (#%d) available", subtask, 
attemptNumber));
+  }
+
+  @VisibleForTesting
+  GlobalStatisticsAggregatorTracker<D, S> globalStatisticsAggregatorTracker() {
+    return globalStatisticsAggregatorTracker;
+  }
+
+  private static class SubtaskGateways {
+    private final Map<Integer, SubtaskGateway>[] gateways;
+
+    private SubtaskGateways(int parallelism) {
+      gateways = new Map[parallelism];
+
+      for (int i = 0; i < parallelism; ++i) {
+        gateways[i] = Maps.newHashMap();
+      }
+    }
+
+    private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+      int subtaskIndex = gateway.getSubtask();
+      int attemptNumber = gateway.getExecution().getAttemptNumber();
+      Preconditions.checkState(
+          !gateways[subtaskIndex].containsKey(attemptNumber),
+          "Already have a subtask gateway for %d (#%d).",
+          subtaskIndex,
+          attemptNumber);
+      LOG.debug("Register gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].put(attemptNumber, gateway);
+    }
+
+    private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) 
{
+      LOG.debug("Unregister gateway for subtask {} attempt {}", subtaskIndex, 
attemptNumber);
+      gateways[subtaskIndex].remove(attemptNumber);
+    }
+
+    private OperatorCoordinator.SubtaskGateway getOnlyGatewayAndCheckReady(int 
subtaskIndex) {

Review Comment:
   OK. Make sense to me. I will rename the function name to `getSubtaskGateway` 
since other functions name is `registerSbutaskGateway`



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -129,9 +136,14 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
       globalStatisticsState.add(globalStatistics);
     }
 
-    // For now, we make it simple to send globalStatisticsState at checkpoint
+    // For now, we make it simple to send localStatistics at checkpoint
     operatorEventGateway.sendEventToCoordinator(
-        new DataStatisticsEvent<>(checkpointId, localStatistics));
+        new DataStatisticsEvent(checkpointId, localStatistics, 
statisticsSerializer));
+    LOG.debug(
+        "Send local statistics {} from subtask {} at checkpoint {} to 
coordinator",

Review Comment:
   `DataStatisticsOperator` doesn't have operator name 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to