stevenzwu commented on code in PR #11497:
URL: https://github.com/apache/iceberg/pull/11497#discussion_r1868436095


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient int processed;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    this.inProgress = Sets.newHashSet();
+    Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
+    if (inProgressIterable != null) {
+      for (RewriteFileGroup group : inProgressIterable) {
+        inProgress.add(group);
+      }
+    }
+
+    commitInProgress(System.currentTimeMillis());
+
+    this.processed = 0;
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    startingSnapshotIdState.clear();
+    if (startingSnapshotId != null) {
+      startingSnapshotIdState.add(startingSnapshotId);
+    }
+
+    inProgressState.clear();
+    for (RewriteFileGroup group : inProgress) {
+      inProgressState.add(group);
+    }
+  }
+
+  @Override
+  public void 
processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteExecutor.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        this.commitService = createCommitService(executedGroup, 
streamRecord.getTimestamp());
+        this.startingSnapshotId = executedGroup.snapshotId();
+      }
+
+      commitService.offer(executedGroup.group());
+      inProgress.add(executedGroup.group());
+      ++processed;
+    } catch (Exception e) {
+      LOG.info(
+          "Exception processing {} for table {} with {}[{}] at {}",
+          executedGroup,
+          tableName,
+          taskName,
+          taskIndex,
+          streamRecord.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    try {
+      if (commitService != null) {
+        commitService.close();
+        if (processed != commitService.results().size()) {
+          throw new RuntimeException(
+              String.format(
+                  "From %d commits only %d were unsuccessful for table %s with 
%s[%d] at %d",
+                  processed,
+                  commitService.results().size(),
+                  tableName,
+                  taskName,
+                  taskIndex,
+                  mark.getTimestamp()));
+        }
+      }
+
+      table.refresh();
+      LOG.info(
+          "Successfully completed data file compaction to {} for table {} with 
{}[{}] at {}",
+          table.currentSnapshot().snapshotId(),
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp());
+    } catch (Exception e) {
+      LOG.info(
+          "Exception closing commit service for table {} with {}[{}] at {}",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+
+    // Cleanup
+    this.commitService = null;
+    this.startingSnapshotId = null;
+    this.processed = 0;
+    inProgress.clear();
+
+    super.processWatermark(mark);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (commitService != null) {
+      commitService.close();
+    }
+  }
+
+  private CommitService createCommitService(
+      DataFileRewriteExecutor.ExecutedGroup element, long timestamp) {
+    FlinkRewriteDataFilesCommitManager commitManager =
+        new FlinkRewriteDataFilesCommitManager(table, element.snapshotId(), 
timestamp);
+    CommitService service = commitManager.service(element.groupsPerCommit());
+    service.start();
+
+    return service;
+  }
+
+  private void commitInProgress(long timestamp) {
+    if (!inProgress.isEmpty()) {
+      try {
+        FlinkRewriteDataFilesCommitManager manager =

Review Comment:
   can the above `createCommitService` method be used here?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner.RewritePlanResult;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Plans the rewrite groups using the {@link RewriteFileGroupPlanner}. The 
input is the {@link
+ * Trigger}, the output is zero, or some {@link PlannedGroup}s.
+ */
+@Internal
+public class DataFileRewritePlanner
+    extends ProcessFunction<Trigger, DataFileRewritePlanner.PlannedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewritePlanner.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final int partialProgressMaxCommits;
+  private final long maxRewriteBytes;
+  private final Map<String, String> rewriterOptions;
+  private transient SizeBasedDataRewriter rewriter;
+  private transient RewriteFileGroupPlanner planner;
+  private transient Counter errorCounter;
+
+  public DataFileRewritePlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      int newPartialProgressMaxCommits,
+      long maxRewriteBytes,
+      Map<String, String> rewriterOptions) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+    this.maxRewriteBytes = maxRewriteBytes;
+    this.rewriterOptions = rewriterOptions;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.rewriter =
+        new SizeBasedDataRewriter(tableLoader.loadTable()) {
+          @Override
+          public Set<DataFile> rewrite(List<FileScanTask> group) {
+            // We use the rewriter only for bin-packing the file groups to 
compact
+            throw new UnsupportedOperationException("Should not be called");
+          }
+        };
+
+    rewriter.init(rewriterOptions);
+    this.planner = new RewriteFileGroupPlanner(rewriter, RewriteJobOrder.NONE);
+  }
+
+  @Override
+  public void processElement(Trigger value, Context ctx, 
Collector<PlannedGroup> out)
+      throws Exception {
+    LOG.debug(
+        "Creating rewrite plan for table {} with {}[{}] at {}",
+        tableName,
+        taskName,
+        taskIndex,
+        ctx.timestamp());
+    try {
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      if (table.currentSnapshot() == null) {
+        LOG.info(
+            "Nothing to plan for in an empty table {} with {}[{}] at {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp());
+        return;
+      }
+
+      RewritePlanResult plan =
+          planner.plan(table, Expressions.alwaysTrue(), 
table.currentSnapshot().snapshotId());
+
+      long rewriteBytes = 0;
+      List<RewriteFileGroup> groups = 
plan.fileGroups().collect(Collectors.toList());
+      ListIterator<RewriteFileGroup> iter = groups.listIterator();
+      while (iter.hasNext()) {
+        RewriteFileGroup group = iter.next();
+        if (rewriteBytes + group.sizeInBytes() > maxRewriteBytes) {
+          // Keep going, maybe some other group might fit in
+          LOG.info(
+              "Skipping group {} as max rewrite size reached for table {} with 
{}[{}] at {}",
+              group,
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp());
+          iter.remove();
+        } else {
+          rewriteBytes += group.sizeInBytes();
+        }
+      }
+
+      int groupsPerCommit =
+          IntMath.divide(
+              plan.context().totalGroupCount(), partialProgressMaxCommits, 
RoundingMode.CEILING);
+
+      LOG.info(
+          "Rewrite plan created {} for table {} with {}[{}] at {}",
+          groups,
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp());
+
+      for (RewriteFileGroup group : groups) {
+        LOG.debug(
+            "Emitting {} with for table {} with {}[{}] at {}",
+            group,
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp());
+        out.collect(
+            new PlannedGroup(
+                table, groupsPerCommit, 
rewriter.splitSize(group.sizeInBytes()), group));
+      }
+    } catch (Exception e) {
+      LOG.info(
+          "Exception planning data file rewrite groups for table {} with 
{}[{}] at {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          e);
+      ctx.output(TaskResultAggregator.ERROR_STREAM, e);
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    tableLoader.close();
+  }
+
+  public static class PlannedGroup {
+    private final SerializableTable table;
+    private final int groupsPerCommit;
+    private final long splitSize;
+    private final RewriteFileGroup group;
+
+    private PlannedGroup(
+        SerializableTable table, int groupsPerCommit, long splitSize, 
RewriteFileGroup group) {

Review Comment:
   pass `SerializableTable` to the constructor of `DataFileRewriteExecutor` so 
that it becomes part of the class state.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the compaction changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void open() throws Exception {
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    this.inProgress = Sets.newHashSet();
+    Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
+    if (inProgressIterable != null) {
+      for (RewriteFileGroup group : inProgressIterable) {
+        inProgress.add(group);
+      }
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    startingSnapshotIdState.clear();
+    if (startingSnapshotId != null) {
+      startingSnapshotIdState.add(startingSnapshotId);
+    }
+
+    inProgressState.clear();
+    for (RewriteFileGroup group : inProgress) {
+      inProgressState.add(group);
+    }
+  }
+
+  @Override
+  public void 
processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteExecutor.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        this.commitService = createCommitService(executedGroup, 
streamRecord.getTimestamp());
+        this.startingSnapshotId = executedGroup.snapshotId();
+      }
+
+      commitService.offer(executedGroup.group());
+      inProgress.add(executedGroup.group());
+    } catch (Exception e) {
+      LOG.info(
+          "Exception processing {} for table {} with {}[{}] at {}",
+          executedGroup,
+          tableName,
+          taskName,
+          taskIndex,
+          streamRecord.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    try {
+      if (commitService == null && !inProgress.isEmpty()) {
+        this.commitService = createCommitService(null, mark.getTimestamp());
+      }
+
+      if (commitService != null) {
+        commitService.close();
+      }
+
+      table.refresh();
+      LOG.info(
+          "Successfully completed data file compaction to {} for table {} with 
{}[{}] at {}",
+          table.currentSnapshot().snapshotId(),
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp());
+    } catch (Exception e) {
+      LOG.info(
+          "Exception closing commit service for table {} with {}[{}] at {}",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+
+    // Cleanup
+    commitService = null;
+    startingSnapshotId = null;
+    inProgress.clear();
+
+    super.processWatermark(mark);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (commitService != null) {
+      commitService.close();
+    }
+  }
+
+  private CommitService createCommitService(
+      DataFileRewriteExecutor.ExecutedGroup element, long timestamp) {
+    table.refresh();
+    CommitService service;
+    RewriteDataFilesCommitManager manager;
+    if (element == null) {
+      manager = new FlinkRewriteDataFilesCommitManager(table, 
startingSnapshotId, timestamp);
+      service = manager.service(Integer.MAX_VALUE);
+    } else {
+      manager = new FlinkRewriteDataFilesCommitManager(table, 
element.snapshotId(), timestamp);
+      service = manager.service(element.groupsPerCommit());
+    }
+
+    service.start();
+    if (!inProgress.isEmpty()) {
+      try {
+        manager.commitFileGroups(inProgress);
+      } catch (Exception e) {
+        LOG.info(
+            "Failed committing pending groups {} for table {} with {}[{}], so 
skipping.",
+            inProgress,
+            tableName,
+            taskName,
+            taskIndex,
+            e);
+      }
+    }
+    return service;
+  }
+
+  private class FlinkRewriteDataFilesCommitManager extends 
RewriteDataFilesCommitManager {
+    private final long timestamp;
+
+    FlinkRewriteDataFilesCommitManager(Table table, long snapshotId, long 
timestamp) {
+      super(table, snapshotId);
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
+      super.commitFileGroups(fileGroups);
+      LOG.debug(
+          "Committed {} for table {} with {}[{}] at {}",
+          fileGroups,
+          tableName,
+          taskName,
+          taskIndex,
+          timestamp);
+      updateMetrics(fileGroups);
+      inProgress.removeAll(fileGroups);
+      LOG.debug(

Review Comment:
   if this method is executed from separate `CommitManager` thread (than Flink 
thread), then we should add some synchronization/lock around the `inProgress` 
access (read or write).
   
   regardless, it doesn't seem interesting/valueable to log this line 
immediately after the `removeAll` call above.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient int processed;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    this.inProgress = Sets.newHashSet();
+    Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
+    if (inProgressIterable != null) {
+      for (RewriteFileGroup group : inProgressIterable) {
+        inProgress.add(group);
+      }
+    }
+
+    commitInProgress(System.currentTimeMillis());
+
+    this.processed = 0;
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    startingSnapshotIdState.clear();
+    if (startingSnapshotId != null) {
+      startingSnapshotIdState.add(startingSnapshotId);
+    }
+
+    inProgressState.clear();
+    for (RewriteFileGroup group : inProgress) {
+      inProgressState.add(group);
+    }
+  }
+
+  @Override
+  public void 
processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteExecutor.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        this.commitService = createCommitService(executedGroup, 
streamRecord.getTimestamp());
+        this.startingSnapshotId = executedGroup.snapshotId();
+      }
+
+      commitService.offer(executedGroup.group());
+      inProgress.add(executedGroup.group());
+      ++processed;
+    } catch (Exception e) {
+      LOG.info(
+          "Exception processing {} for table {} with {}[{}] at {}",
+          executedGroup,
+          tableName,
+          taskName,
+          taskIndex,
+          streamRecord.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    try {
+      if (commitService != null) {
+        commitService.close();
+        if (processed != commitService.results().size()) {
+          throw new RuntimeException(
+              String.format(
+                  "From %d commits only %d were unsuccessful for table %s with 
%s[%d] at %d",
+                  processed,
+                  commitService.results().size(),
+                  tableName,
+                  taskName,
+                  taskIndex,
+                  mark.getTimestamp()));
+        }
+      }
+
+      table.refresh();
+      LOG.info(
+          "Successfully completed data file compaction to {} for table {} with 
{}[{}] at {}",
+          table.currentSnapshot().snapshotId(),
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp());
+    } catch (Exception e) {
+      LOG.info(
+          "Exception closing commit service for table {} with {}[{}] at {}",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+
+    // Cleanup
+    this.commitService = null;
+    this.startingSnapshotId = null;
+    this.processed = 0;
+    inProgress.clear();
+
+    super.processWatermark(mark);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (commitService != null) {
+      commitService.close();
+    }
+  }
+
+  private CommitService createCommitService(
+      DataFileRewriteExecutor.ExecutedGroup element, long timestamp) {
+    FlinkRewriteDataFilesCommitManager commitManager =
+        new FlinkRewriteDataFilesCommitManager(table, element.snapshotId(), 
timestamp);
+    CommitService service = commitManager.service(element.groupsPerCommit());
+    service.start();
+
+    return service;
+  }
+
+  private void commitInProgress(long timestamp) {
+    if (!inProgress.isEmpty()) {
+      try {
+        FlinkRewriteDataFilesCommitManager manager =
+            new FlinkRewriteDataFilesCommitManager(table, startingSnapshotId, 
timestamp);
+        CommitService service = manager.service(Integer.MAX_VALUE);
+        service.start();
+        manager.commitOrClean(inProgress);
+        service.close();

Review Comment:
   close/clear should probably happen in `finally` section. `commitOrClean` may 
throw an exception.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner.RewritePlanResult;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Plans the rewrite groups using the {@link RewriteFileGroupPlanner}. The 
input is the {@link
+ * Trigger}, the output is zero, or some {@link PlannedGroup}s.
+ */
+@Internal
+public class DataFileRewritePlanner
+    extends ProcessFunction<Trigger, DataFileRewritePlanner.PlannedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewritePlanner.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final int partialProgressMaxCommits;
+  private final long maxRewriteBytes;
+  private final Map<String, String> rewriterOptions;
+  private transient SizeBasedDataRewriter rewriter;
+  private transient RewriteFileGroupPlanner planner;
+  private transient Counter errorCounter;
+
+  public DataFileRewritePlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      int newPartialProgressMaxCommits,
+      long maxRewriteBytes,
+      Map<String, String> rewriterOptions) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+    this.maxRewriteBytes = maxRewriteBytes;
+    this.rewriterOptions = rewriterOptions;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.rewriter =
+        new SizeBasedDataRewriter(tableLoader.loadTable()) {
+          @Override
+          public Set<DataFile> rewrite(List<FileScanTask> group) {
+            // We use the rewriter only for bin-packing the file groups to 
compact
+            throw new UnsupportedOperationException("Should not be called");
+          }
+        };
+
+    rewriter.init(rewriterOptions);
+    this.planner = new RewriteFileGroupPlanner(rewriter, RewriteJobOrder.NONE);
+  }
+
+  @Override
+  public void processElement(Trigger value, Context ctx, 
Collector<PlannedGroup> out)
+      throws Exception {
+    LOG.debug(
+        "Creating rewrite plan for table {} with {}[{}] at {}",
+        tableName,
+        taskName,
+        taskIndex,
+        ctx.timestamp());
+    try {
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      if (table.currentSnapshot() == null) {
+        LOG.info(
+            "Nothing to plan for in an empty table {} with {}[{}] at {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp());
+        return;
+      }
+
+      RewritePlanResult plan =
+          planner.plan(table, Expressions.alwaysTrue(), 
table.currentSnapshot().snapshotId());
+
+      long rewriteBytes = 0;
+      List<RewriteFileGroup> groups = 
plan.fileGroups().collect(Collectors.toList());
+      ListIterator<RewriteFileGroup> iter = groups.listIterator();
+      while (iter.hasNext()) {
+        RewriteFileGroup group = iter.next();
+        if (rewriteBytes + group.sizeInBytes() > maxRewriteBytes) {

Review Comment:
   sorry, I meant ` it doesn't need to be precise`. but never mind. we are 
deleting unneeded items here, which would require iterating through the whole 
list. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to