yegangy0718 commented on code in PR #8749:
URL: https://github.com/apache/iceberg/pull/8749#discussion_r1351169031


##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated data
+ * statistics back to {@link DataStatisticsOperator}. In the end a custom 
partitioner will
+ * distribute traffic based on the aggregated data statistics to improve data 
clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
+  private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+  private final transient AggregatedStatisticsTracker<D, S> 
aggregatedStatisticsTracker;
+  private volatile AggregatedStatistics<D, S> completedStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways =
+        new 
OperatorCoordinator.SubtaskGateway[operatorCoordinatorContext.currentParallelism()];
+    this.statisticsSerializer = statisticsSerializer;
+    this.aggregatedStatisticsTracker =
+        new AggregatedStatisticsTracker<>(operatorName, statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator: {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    LOG.info("Closed data statistics coordinator: {}.", operatorName);
+  }
+
+  @VisibleForTesting
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error(
+                    "Uncaught Exception in data statistics coordinator: {} 
executor",
+                    operatorName,
+                    t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error(
+            "Uncaught Exception in data statistics coordinator: {} executor", 
operatorName, t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics coordinator: {} 
while {}. Triggering job failover",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator of %s has not started 
yet.", operatorName);
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    AggregatedStatistics<D, S> aggregatedStatistics =
+        aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event);
+
+    if (aggregatedStatistics != null) {
+      completedStatistics = aggregatedStatistics;
+      sendDataStatisticsToSubtasks(
+          completedStatistics.checkpointId(), 
completedStatistics.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              DataStatisticsEvent.create(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways[i].sendEvent(dataStatisticsEvent);
+          }
+
+          return null;
+        },
+        String.format(
+            "Failed to send operator %s coordinator global data statistics for 
checkpoint %d",
+            operatorName, checkpointId));
+  }
+
+  @Override
+  public void handleEventFromOperator(int subtask, OperatorEvent event) throws 
Exception {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug("Handling event from subtask {} of {}: {}", subtask, 
operatorName, event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format("handling operator event %s from subtask %d", 
event.getClass(), subtask));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Snapshotting data statistics coordinator {} for checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              DataStatisticsUtil.serializeAggregatedStatistics(
+                  completedStatistics, statisticsSerializer));
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    Preconditions.checkState(
+        !started, "The coordinator %s can only be reset if it was not yet 
started", operatorName);
+
+    if (checkpointData == null) {
+      LOG.info(
+          "Data statistic coordinator {} has nothing to restore from 
checkpoint {}",
+          operatorName,
+          checkpointId);
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}", 
operatorName, checkpointId);
+    completedStatistics =
+        DataStatisticsUtil.deserializeAggregatedStatistics(checkpointData, 
statisticsSerializer);
+  }
+
+  @Override
+  public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} of data 
statistic {}",
+              subtask,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways[subtask] = null;
+        },
+        String.format("handling subtask %d failure", subtask));
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {

Review Comment:
   the interface is unchanged. But the subtaskGateway in Flink 1.16 is 
`Map<Integer, SubtaskGateway>[] gateways` who map key is attempt num, while in 
Flink 1.15 we don't need to track the attemp num. Thus this is nothing we need 
to do in `subtaskRest`. 



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated data
+ * statistics back to {@link DataStatisticsOperator}. In the end a custom 
partitioner will
+ * distribute traffic based on the aggregated data statistics to improve data 
clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
+  private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+  private final transient AggregatedStatisticsTracker<D, S> 
aggregatedStatisticsTracker;
+  private volatile AggregatedStatistics<D, S> completedStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways =
+        new 
OperatorCoordinator.SubtaskGateway[operatorCoordinatorContext.currentParallelism()];
+    this.statisticsSerializer = statisticsSerializer;
+    this.aggregatedStatisticsTracker =
+        new AggregatedStatisticsTracker<>(operatorName, statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator: {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    LOG.info("Closed data statistics coordinator: {}.", operatorName);
+  }
+
+  @VisibleForTesting
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error(
+                    "Uncaught Exception in data statistics coordinator: {} 
executor",
+                    operatorName,
+                    t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error(
+            "Uncaught Exception in data statistics coordinator: {} executor", 
operatorName, t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics coordinator: {} 
while {}. Triggering job failover",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator of %s has not started 
yet.", operatorName);
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    AggregatedStatistics<D, S> aggregatedStatistics =
+        aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event);
+
+    if (aggregatedStatistics != null) {
+      completedStatistics = aggregatedStatistics;
+      sendDataStatisticsToSubtasks(
+          completedStatistics.checkpointId(), 
completedStatistics.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              DataStatisticsEvent.create(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways[i].sendEvent(dataStatisticsEvent);
+          }
+
+          return null;
+        },
+        String.format(
+            "Failed to send operator %s coordinator global data statistics for 
checkpoint %d",
+            operatorName, checkpointId));
+  }
+
+  @Override
+  public void handleEventFromOperator(int subtask, OperatorEvent event) throws 
Exception {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug("Handling event from subtask {} of {}: {}", subtask, 
operatorName, event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format("handling operator event %s from subtask %d", 
event.getClass(), subtask));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Snapshotting data statistics coordinator {} for checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              DataStatisticsUtil.serializeAggregatedStatistics(
+                  completedStatistics, statisticsSerializer));
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    Preconditions.checkState(
+        !started, "The coordinator %s can only be reset if it was not yet 
started", operatorName);
+
+    if (checkpointData == null) {
+      LOG.info(
+          "Data statistic coordinator {} has nothing to restore from 
checkpoint {}",
+          operatorName,
+          checkpointId);
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}", 
operatorName, checkpointId);
+    completedStatistics =
+        DataStatisticsUtil.deserializeAggregatedStatistics(checkpointData, 
statisticsSerializer);
+  }
+
+  @Override
+  public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} of data 
statistic {}",
+              subtask,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways[subtask] = null;
+        },
+        String.format("handling subtask %d failure", subtask));
+  }
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    LOG.info(
+        "Data statistic coordinator {} subtask {} is reset to checkpoint {}",
+        operatorName,
+        subtask,
+        checkpointId);
+  }
+
+  @Override
+  public void subtaskReady(int subtask, SubtaskGateway gateway) {

Review Comment:
   the interface gets changed to `public void executionAttemptReady(int 
subtask, int attemptNumber, SubtaskGateway gateway)` in Flink 1.16



##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.shuffle;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DataStatisticsCoordinator receives {@link DataStatisticsEvent} from {@link
+ * DataStatisticsOperator} every subtask and then merge them together. Once 
aggregation for all
+ * subtasks data statistics completes, DataStatisticsCoordinator will send the 
aggregated data
+ * statistics back to {@link DataStatisticsOperator}. In the end a custom 
partitioner will
+ * distribute traffic based on the aggregated data statistics to improve data 
clustering.
+ */
+@Internal
+class DataStatisticsCoordinator<D extends DataStatistics<D, S>, S> implements 
OperatorCoordinator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+  private final String operatorName;
+  private final ExecutorService coordinatorExecutor;
+  private final OperatorCoordinator.Context operatorCoordinatorContext;
+  private final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
+  private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+  private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
+  private final transient AggregatedStatisticsTracker<D, S> 
aggregatedStatisticsTracker;
+  private volatile AggregatedStatistics<D, S> completedStatistics;
+  private volatile boolean started;
+
+  DataStatisticsCoordinator(
+      String operatorName,
+      OperatorCoordinator.Context context,
+      TypeSerializer<DataStatistics<D, S>> statisticsSerializer) {
+    this.operatorName = operatorName;
+    this.coordinatorThreadFactory =
+        new CoordinatorExecutorThreadFactory(
+            "DataStatisticsCoordinator-" + operatorName, 
context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    this.operatorCoordinatorContext = context;
+    this.subtaskGateways =
+        new 
OperatorCoordinator.SubtaskGateway[operatorCoordinatorContext.currentParallelism()];
+    this.statisticsSerializer = statisticsSerializer;
+    this.aggregatedStatisticsTracker =
+        new AggregatedStatisticsTracker<>(operatorName, statisticsSerializer, 
parallelism());
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting data statistics coordinator: {}.", operatorName);
+    started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    LOG.info("Closed data statistics coordinator: {}.", operatorName);
+  }
+
+  @VisibleForTesting
+  void callInCoordinatorThread(Callable<Void> callable, String errorMessage) {
+    ensureStarted();
+    // Ensure the task is done by the coordinator executor.
+    if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+      try {
+        Callable<Void> guardedCallable =
+            () -> {
+              try {
+                return callable.call();
+              } catch (Throwable t) {
+                LOG.error(
+                    "Uncaught Exception in data statistics coordinator: {} 
executor",
+                    operatorName,
+                    t);
+                ExceptionUtils.rethrowException(t);
+                return null;
+              }
+            };
+
+        coordinatorExecutor.submit(guardedCallable).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new FlinkRuntimeException(errorMessage, e);
+      }
+    } else {
+      try {
+        callable.call();
+      } catch (Throwable t) {
+        LOG.error(
+            "Uncaught Exception in data statistics coordinator: {} executor", 
operatorName, t);
+        throw new FlinkRuntimeException(errorMessage, t);
+      }
+    }
+  }
+
+  public void runInCoordinatorThread(Runnable runnable) {
+    this.coordinatorExecutor.execute(
+        new ThrowableCatchingRunnable(
+            throwable ->
+                
this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), 
throwable),
+            runnable));
+  }
+
+  private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+    ensureStarted();
+    runInCoordinatorThread(
+        () -> {
+          try {
+            action.run();
+          } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+            LOG.error(
+                "Uncaught exception in the data statistics coordinator: {} 
while {}. Triggering job failover",
+                operatorName,
+                actionString,
+                t);
+            operatorCoordinatorContext.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator of %s has not started 
yet.", operatorName);
+  }
+
+  private int parallelism() {
+    return operatorCoordinatorContext.currentParallelism();
+  }
+
+  private void handleDataStatisticRequest(int subtask, DataStatisticsEvent<D, 
S> event) {
+    AggregatedStatistics<D, S> aggregatedStatistics =
+        aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event);
+
+    if (aggregatedStatistics != null) {
+      completedStatistics = aggregatedStatistics;
+      sendDataStatisticsToSubtasks(
+          completedStatistics.checkpointId(), 
completedStatistics.dataStatistics());
+    }
+  }
+
+  private void sendDataStatisticsToSubtasks(
+      long checkpointId, DataStatistics<D, S> globalDataStatistics) {
+    callInCoordinatorThread(
+        () -> {
+          DataStatisticsEvent<D, S> dataStatisticsEvent =
+              DataStatisticsEvent.create(checkpointId, globalDataStatistics, 
statisticsSerializer);
+          int parallelism = parallelism();
+          for (int i = 0; i < parallelism; ++i) {
+            subtaskGateways[i].sendEvent(dataStatisticsEvent);
+          }
+
+          return null;
+        },
+        String.format(
+            "Failed to send operator %s coordinator global data statistics for 
checkpoint %d",
+            operatorName, checkpointId));
+  }
+
+  @Override
+  public void handleEventFromOperator(int subtask, OperatorEvent event) throws 
Exception {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug("Handling event from subtask {} of {}: {}", subtask, 
operatorName, event);
+          Preconditions.checkArgument(event instanceof DataStatisticsEvent);
+          handleDataStatisticRequest(subtask, ((DataStatisticsEvent<D, S>) 
event));
+        },
+        String.format("handling operator event %s from subtask %d", 
event.getClass(), subtask));
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Snapshotting data statistics coordinator {} for checkpoint {}",
+              operatorName,
+              checkpointId);
+          resultFuture.complete(
+              DataStatisticsUtil.serializeAggregatedStatistics(
+                  completedStatistics, statisticsSerializer));
+        },
+        String.format("taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData)
+      throws Exception {
+    Preconditions.checkState(
+        !started, "The coordinator %s can only be reset if it was not yet 
started", operatorName);
+
+    if (checkpointData == null) {
+      LOG.info(
+          "Data statistic coordinator {} has nothing to restore from 
checkpoint {}",
+          operatorName,
+          checkpointId);
+      return;
+    }
+
+    LOG.info(
+        "Restoring data statistic coordinator {} from checkpoint {}", 
operatorName, checkpointId);
+    completedStatistics =
+        DataStatisticsUtil.deserializeAggregatedStatistics(checkpointData, 
statisticsSerializer);
+  }
+
+  @Override
+  public void subtaskFailed(int subtask, @Nullable Throwable reason) {

Review Comment:
   the interface gets changed to `public void executionAttemptFailed(int 
subtask, int attemptNumber, @Nullable Throwable reason)` in Flink 1.16



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to