stevenzwu commented on code in PR #11497:
URL: https://github.com/apache/iceberg/pull/11497#discussion_r1870155972


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient int processed;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    // Has to be concurrent since it is accessed by the CommitService from 
another thread
+    this.inProgress = Sets.newConcurrentHashSet();
+    Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
+    if (inProgressIterable != null) {
+      for (RewriteFileGroup group : inProgressIterable) {
+        inProgress.add(group);
+      }
+    }
+
+    commitInProgress(System.currentTimeMillis());
+
+    this.processed = 0;
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    startingSnapshotIdState.clear();
+    if (startingSnapshotId != null) {
+      startingSnapshotIdState.add(startingSnapshotId);
+    }
+
+    inProgressState.clear();
+    for (RewriteFileGroup group : inProgress) {
+      inProgressState.add(group);
+    }
+  }
+
+  @Override
+  public void 
processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteExecutor.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        this.startingSnapshotId = executedGroup.snapshotId();
+        this.commitService =
+            createCommitService(streamRecord.getTimestamp(), 
executedGroup.groupsPerCommit());
+      }
+
+      commitService.offer(executedGroup.group());
+      inProgress.add(executedGroup.group());
+      ++processed;
+    } catch (Exception e) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception processing {}",

Review Comment:
   when I said standardization, I meant consistent logging style. we don't have 
to use a constant, which add a bit indirection to see the log message. but I 
would be ok if this is what you preferred.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner.RewritePlanResult;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Plans the rewrite groups using the {@link RewriteFileGroupPlanner}. The 
input is the {@link
+ * Trigger}, the output is zero, or some {@link PlannedGroup}s.
+ */
+@Internal
+public class DataFileRewritePlanner
+    extends ProcessFunction<Trigger, DataFileRewritePlanner.PlannedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewritePlanner.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final int partialProgressMaxCommits;
+  private final long maxRewriteBytes;
+  private final Map<String, String> rewriterOptions;
+  private transient SizeBasedDataRewriter rewriter;
+  private transient RewriteFileGroupPlanner planner;
+  private transient Counter errorCounter;
+
+  public DataFileRewritePlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      int newPartialProgressMaxCommits,
+      long maxRewriteBytes,
+      Map<String, String> rewriterOptions) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+    this.maxRewriteBytes = maxRewriteBytes;
+    this.rewriterOptions = rewriterOptions;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.rewriter =
+        new SizeBasedDataRewriter(tableLoader.loadTable()) {
+          @Override
+          public Set<DataFile> rewrite(List<FileScanTask> group) {
+            // We use the rewriter only for bin-packing the file groups to 
compact
+            throw new UnsupportedOperationException("Should not be called");
+          }
+        };
+
+    rewriter.init(rewriterOptions);
+    this.planner = new RewriteFileGroupPlanner(rewriter, RewriteJobOrder.NONE);
+  }
+
+  @Override
+  public void processElement(Trigger value, Context ctx, 
Collector<PlannedGroup> out)
+      throws Exception {
+    LOG.debug(
+        LogUtil.MESSAGE_PREFIX + "Creating rewrite plan",
+        tableName,
+        taskName,
+        taskIndex,
+        ctx.timestamp());
+    try {
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      if (table.currentSnapshot() == null) {
+        LOG.info(
+            LogUtil.MESSAGE_PREFIX + "Nothing to plan for in an empty table",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp());
+        return;
+      }
+
+      RewritePlanResult plan =
+          planner.plan(

Review Comment:
   so we are doing a full table scan to figure out the rewrite candidate, which 
can be expensive for large tables. wondering if incremental scan is a good fit 
here.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.actions.SizeBasedFileRewriter;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteCommitter;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteExecutor;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner;
+import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Creates the data file rewriter data stream. Which runs a single iteration 
of the task for every
+ * {@link Trigger} event.
+ *
+ * <p>The input is a {@link DataStream} with {@link Trigger} events and every 
event should be
+ * immediately followed by a {@link 
org.apache.flink.streaming.api.watermark.Watermark} with the
+ * same timestamp as the event.
+ *
+ * <p>The output is a {@link DataStream} with the {@link TaskResult} of the 
run followed by the
+ * {@link org.apache.flink.streaming.api.watermark.Watermark}.
+ */
+public class RewriteDataFiles {
+  static final String PLANNER_TASK_NAME = "RDF Planner";
+  static final String REWRITE_TASK_NAME = "Rewrite";
+  static final String COMMIT_TASK_NAME = "Rewrite commit";
+  static final String AGGREGATOR_TASK_NAME = "Rewrite aggregator";
+
+  private RewriteDataFiles() {}
+
+  /** Creates the builder for a stream which rewrites data files for the 
table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<RewriteDataFiles.Builder> {
+    private boolean partialProgressEnabled = false;
+    private int partialProgressMaxCommits = 10;
+    private final Map<String, String> rewriteOptions = 
Maps.newHashMapWithExpectedSize(6);
+    private long maxRewriteBytes = Long.MAX_VALUE;
+
+    /**
+     * Allows committing compacted data files in batches. For more details 
description see {@link
+     * org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_ENABLED}.
+     *
+     * @param newPartialProgressEnabled to enable partial commits
+     */
+    public Builder partialProgressEnabled(boolean newPartialProgressEnabled) {
+      this.partialProgressEnabled = newPartialProgressEnabled;
+      return this;
+    }
+
+    /**
+     * Configures the size of batches if {@link #partialProgressEnabled}. For 
more details
+     * description see {@link
+     * 
org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_MAX_COMMITS}.
+     *
+     * @param newPartialProgressMaxCommits to target number of the commits per 
run
+     */
+    public Builder partialProgressMaxCommits(int newPartialProgressMaxCommits) 
{
+      this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+      return this;
+    }
+
+    /**
+     * Configures the maximum byte size of the rewrites for one scheduled 
compaction. This could be
+     * used to limit the resources used by the compaction.
+     *
+     * @param newMaxRewriteBytes to limit the size of the rewrites
+     */
+    public Builder maxRewriteBytes(long newMaxRewriteBytes) {
+      this.maxRewriteBytes = newMaxRewriteBytes;
+      return this;
+    }
+
+    /**
+     * Configures the target file size. For more details description see {@link
+     * org.apache.iceberg.actions.RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
+     *
+     * @param targetFileSizeBytes target file size
+     */
+    public Builder targetFileSizeBytes(long targetFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, 
String.valueOf(targetFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the min file size considered for rewriting. For more details 
description see
+     * {@link SizeBasedFileRewriter#MIN_FILE_SIZE_BYTES}.
+     *
+     * @param minFileSizeBytes min file size
+     */
+    public Builder minFileSizeBytes(long minFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, 
String.valueOf(minFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the max file size considered for rewriting. For more details 
description see
+     * {@link SizeBasedFileRewriter#MAX_FILE_SIZE_BYTES}.
+     *
+     * @param maxFileSizeBytes max file size
+     */
+    public Builder maxFileSizeBytes(long maxFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, 
String.valueOf(maxFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the minimum file number after a rewrite is always initiated. 
For more details
+     * description see {@link SizeBasedFileRewriter#MIN_INPUT_FILES}.
+     *
+     * @param minInputFiles min file number
+     */
+    public Builder minInputFiles(int minInputFiles) {
+      this.rewriteOptions.put(SizeBasedFileRewriter.MIN_INPUT_FILES, 
String.valueOf(minInputFiles));
+      return this;
+    }
+
+    /**
+     * Configures the minimum delete file number for a file after a rewrite is 
always initiated. For
+     * more details description see {@link 
SizeBasedDataRewriter#DELETE_FILE_THRESHOLD}.
+     *
+     * @param deleteFileThreshold min delete file number
+     */
+    public Builder deleteFileThreshold(int deleteFileThreshold) {
+      this.rewriteOptions.put(
+          SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, 
String.valueOf(deleteFileThreshold));
+      return this;
+    }
+
+    /**
+     * Every other option is overridden, and all the files are rewritten.
+     *
+     * @param rewriteAll enables a full rewrite
+     */
+    public Builder rewriteAll(boolean rewriteAll) {
+      this.rewriteOptions.put(SizeBasedFileRewriter.REWRITE_ALL, 
String.valueOf(rewriteAll));
+      return this;
+    }
+
+    /**
+     * Configures the group size for rewriting. For more details description 
see {@link
+     * SizeBasedDataRewriter#MAX_FILE_GROUP_SIZE_BYTES}.
+     *
+     * @param maxFileGroupSizeBytes file group size for rewrite
+     */
+    public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.MAX_FILE_GROUP_SIZE_BYTES, 
String.valueOf(maxFileGroupSizeBytes));
+      return this;
+    }
+
+    @Override
+    DataStream<TaskResult> append(DataStream<Trigger> trigger) {
+      SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
+          trigger
+              .process(
+                  new DataFileRewritePlanner(
+                      tableName(),
+                      taskName(),
+                      index(),
+                      tableLoader(),
+                      partialProgressEnabled ? partialProgressMaxCommits : 1,
+                      maxRewriteBytes,
+                      rewriteOptions))
+              .name(operatorName(PLANNER_TASK_NAME))
+              .uid(PLANNER_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      SingleOutputStreamOperator<DataFileRewriteExecutor.ExecutedGroup> 
rewritten =
+          planned
+              .rebalance()
+              .process(new DataFileRewriteExecutor(tableName(), taskName(), 
index()))
+              .name(operatorName(REWRITE_TASK_NAME))
+              .uid(REWRITE_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .setParallelism(parallelism());
+
+      SingleOutputStreamOperator<Trigger> updated =
+          rewritten
+              .transform(
+                  operatorName(COMMIT_TASK_NAME),
+                  TypeInformation.of(Trigger.class),
+                  new DataFileRewriteCommitter(tableName(), taskName(), 
index(), tableLoader()))
+              .uid(COMMIT_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      return trigger
+          .union(updated)
+          .connect(
+              planned
+                  .getSideOutput(TaskResultAggregator.ERROR_STREAM)

Review Comment:
   also wondering if watermark flow through the side output streams? I assume 
yes.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteExecutor.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with 
the standard {@link
+ * FileScanTaskReader}, so the delete files are considered, and writes using 
the {@link
+ * TaskWriterFactory}. The output is an {@link ExecutedGroup}.
+ */
+@Internal
+public class DataFileRewriteExecutor
+    extends ProcessFunction<PlannedGroup, 
DataFileRewriteExecutor.ExecutedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteExecutor.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+
+  private transient int subTaskId;
+  private transient int attemptId;
+  private transient Counter errorCounter;
+
+  public DataFileRewriteExecutor(String tableName, String taskName, int 
taskIndex) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+  }
+
+  @Override
+  public void open(Configuration parameters) {
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+    this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
+  }
+
+  @Override
+  public void processElement(PlannedGroup value, Context ctx, 
Collector<ExecutedGroup> out)
+      throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          LogUtil.MESSAGE_PREFIX + "Rewriting files {} from {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().info(),
+          value.group().rewrittenFiles());
+    } else {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Rewriting {} files",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().rewrittenFiles().size());

Review Comment:
   why don't we also supply `value.group().info()` here too? the only 
difference would be the complete list of `rewrittenFiles`.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner.RewritePlanResult;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Plans the rewrite groups using the {@link RewriteFileGroupPlanner}. The 
input is the {@link
+ * Trigger}, the output is zero, or some {@link PlannedGroup}s.
+ */
+@Internal
+public class DataFileRewritePlanner
+    extends ProcessFunction<Trigger, DataFileRewritePlanner.PlannedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewritePlanner.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final int partialProgressMaxCommits;
+  private final long maxRewriteBytes;
+  private final Map<String, String> rewriterOptions;
+  private transient SizeBasedDataRewriter rewriter;
+  private transient RewriteFileGroupPlanner planner;
+  private transient Counter errorCounter;
+
+  public DataFileRewritePlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      int newPartialProgressMaxCommits,
+      long maxRewriteBytes,
+      Map<String, String> rewriterOptions) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+    this.maxRewriteBytes = maxRewriteBytes;
+    this.rewriterOptions = rewriterOptions;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.rewriter =
+        new SizeBasedDataRewriter(tableLoader.loadTable()) {
+          @Override
+          public Set<DataFile> rewrite(List<FileScanTask> group) {
+            // We use the rewriter only for bin-packing the file groups to 
compact
+            throw new UnsupportedOperationException("Should not be called");
+          }
+        };
+
+    rewriter.init(rewriterOptions);
+    this.planner = new RewriteFileGroupPlanner(rewriter, RewriteJobOrder.NONE);
+  }
+
+  @Override
+  public void processElement(Trigger value, Context ctx, 
Collector<PlannedGroup> out)
+      throws Exception {
+    LOG.debug(
+        LogUtil.MESSAGE_PREFIX + "Creating rewrite plan",
+        tableName,
+        taskName,
+        taskIndex,
+        ctx.timestamp());
+    try {
+      SerializableTable table =

Review Comment:
   table can be loaded once and just be refreshed to get the latest state. 
There are many existing code using `refresh`.
   
   This way, we can also immediately close `TableLoader` in the `open` method 
after `Table` is loaded.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteExecutor.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with 
the standard {@link
+ * FileScanTaskReader}, so the delete files are considered, and writes using 
the {@link
+ * TaskWriterFactory}. The output is an {@link ExecutedGroup}.
+ */
+@Internal
+public class DataFileRewriteExecutor
+    extends ProcessFunction<PlannedGroup, 
DataFileRewriteExecutor.ExecutedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteExecutor.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+
+  private transient int subTaskId;
+  private transient int attemptId;
+  private transient Counter errorCounter;
+
+  public DataFileRewriteExecutor(String tableName, String taskName, int 
taskIndex) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+  }
+
+  @Override
+  public void open(Configuration parameters) {
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+    this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
+  }
+
+  @Override
+  public void processElement(PlannedGroup value, Context ctx, 
Collector<ExecutedGroup> out)
+      throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          LogUtil.MESSAGE_PREFIX + "Rewriting files {} from {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().info(),
+          value.group().rewrittenFiles());
+    } else {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Rewriting {} files",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().rewrittenFiles().size());
+    }
+
+    try (TaskWriter<RowData> writer = writerFor(value)) {
+      try (DataIterator<RowData> iterator = readerFor(value)) {
+        while (iterator.hasNext()) {
+          writer.write(iterator.next());
+        }
+
+        Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles());
+        value.group().setOutputFiles(dataFiles);
+        out.collect(
+            new ExecutedGroup(
+                value.table().currentSnapshot().snapshotId(),
+                value.groupsPerCommit(),
+                value.group()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              LogUtil.MESSAGE_PREFIX + "Rewritten files {} from {} to {}",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().info(),
+              value.group().rewrittenFiles(),
+              value.group().addedFiles());
+        } else {
+          LOG.info(
+              LogUtil.MESSAGE_PREFIX + "Rewritten {} files to {} files",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().rewrittenFiles().size(),
+              value.group().addedFiles().size());
+        }
+      } catch (Exception ex) {
+        LOG.info(
+            LogUtil.MESSAGE_PREFIX + "Exception rewriting datafile group {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp(),
+            value.group(),
+            ex);
+        ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+        errorCounter.inc();
+        abort(writer, ctx.timestamp());
+      }
+    } catch (Exception ex) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception creating compaction writer for 
group {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group(),
+          ex);
+      ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+      errorCounter.inc();
+    }
+  }
+
+  private TaskWriter<RowData> writerFor(PlannedGroup value) {
+    String formatString =
+        PropertyUtil.propertyAsString(
+            value.table().properties(),
+            TableProperties.DEFAULT_FILE_FORMAT,
+            TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    RowDataTaskWriterFactory factory =
+        new RowDataTaskWriterFactory(
+            value.table(),
+            FlinkSchemaUtil.convert(value.table().schema()),
+            value.splitSize(),
+            FileFormat.fromString(formatString),
+            value.table().properties(),
+            null,
+            false);
+    factory.initialize(subTaskId, attemptId);

Review Comment:
   I feel the `RowDataTaskWriterFactory` could be created just once during 
`open` method



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient int processed;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    // Has to be concurrent since it is accessed by the CommitService from 
another thread

Review Comment:
   I don't see `RewriteFileGroup` implements `hash` and `equals` methods. Is it 
safe to be used in `Set`? just use object identity?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteExecutor.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with 
the standard {@link
+ * FileScanTaskReader}, so the delete files are considered, and writes using 
the {@link
+ * TaskWriterFactory}. The output is an {@link ExecutedGroup}.
+ */
+@Internal
+public class DataFileRewriteExecutor
+    extends ProcessFunction<PlannedGroup, 
DataFileRewriteExecutor.ExecutedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteExecutor.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+
+  private transient int subTaskId;
+  private transient int attemptId;
+  private transient Counter errorCounter;
+
+  public DataFileRewriteExecutor(String tableName, String taskName, int 
taskIndex) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+  }
+
+  @Override
+  public void open(Configuration parameters) {
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+    this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
+  }
+
+  @Override
+  public void processElement(PlannedGroup value, Context ctx, 
Collector<ExecutedGroup> out)
+      throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          LogUtil.MESSAGE_PREFIX + "Rewriting files {} from {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().info(),
+          value.group().rewrittenFiles());
+    } else {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Rewriting {} files",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().rewrittenFiles().size());
+    }
+
+    try (TaskWriter<RowData> writer = writerFor(value)) {
+      try (DataIterator<RowData> iterator = readerFor(value)) {
+        while (iterator.hasNext()) {
+          writer.write(iterator.next());
+        }
+
+        Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles());
+        value.group().setOutputFiles(dataFiles);
+        out.collect(
+            new ExecutedGroup(
+                value.table().currentSnapshot().snapshotId(),
+                value.groupsPerCommit(),
+                value.group()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              LogUtil.MESSAGE_PREFIX + "Rewritten files {} from {} to {}",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().info(),
+              value.group().rewrittenFiles(),
+              value.group().addedFiles());
+        } else {
+          LOG.info(
+              LogUtil.MESSAGE_PREFIX + "Rewritten {} files to {} files",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().rewrittenFiles().size(),
+              value.group().addedFiles().size());
+        }
+      } catch (Exception ex) {
+        LOG.info(
+            LogUtil.MESSAGE_PREFIX + "Exception rewriting datafile group {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp(),
+            value.group(),
+            ex);
+        ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+        errorCounter.inc();
+        abort(writer, ctx.timestamp());
+      }
+    } catch (Exception ex) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception creating compaction writer for 
group {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group(),
+          ex);
+      ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+      errorCounter.inc();
+    }
+  }
+
+  private TaskWriter<RowData> writerFor(PlannedGroup value) {
+    String formatString =
+        PropertyUtil.propertyAsString(
+            value.table().properties(),
+            TableProperties.DEFAULT_FILE_FORMAT,
+            TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    RowDataTaskWriterFactory factory =
+        new RowDataTaskWriterFactory(
+            value.table(),
+            FlinkSchemaUtil.convert(value.table().schema()),
+            value.splitSize(),
+            FileFormat.fromString(formatString),
+            value.table().properties(),
+            null,
+            false);
+    factory.initialize(subTaskId, attemptId);
+    return factory.create();
+  }
+
+  private DataIterator<RowData> readerFor(PlannedGroup value) {
+    RowDataFileScanTaskReader reader =
+        new RowDataFileScanTaskReader(
+            value.table().schema(),
+            value.table().schema(),
+            PropertyUtil.propertyAsString(value.table().properties(), 
DEFAULT_NAME_MAPPING, null),
+            false,

Review Comment:
   can't we hardcode `caseSensitive` to `false` here?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteExecutor.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with 
the standard {@link
+ * FileScanTaskReader}, so the delete files are considered, and writes using 
the {@link
+ * TaskWriterFactory}. The output is an {@link ExecutedGroup}.
+ */
+@Internal
+public class DataFileRewriteExecutor
+    extends ProcessFunction<PlannedGroup, 
DataFileRewriteExecutor.ExecutedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteExecutor.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+
+  private transient int subTaskId;
+  private transient int attemptId;
+  private transient Counter errorCounter;
+
+  public DataFileRewriteExecutor(String tableName, String taskName, int 
taskIndex) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+  }
+
+  @Override
+  public void open(Configuration parameters) {
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+    this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
+  }
+
+  @Override
+  public void processElement(PlannedGroup value, Context ctx, 
Collector<ExecutedGroup> out)
+      throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          LogUtil.MESSAGE_PREFIX + "Rewriting files {} from {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().info(),
+          value.group().rewrittenFiles());
+    } else {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Rewriting {} files",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().rewrittenFiles().size());
+    }
+
+    try (TaskWriter<RowData> writer = writerFor(value)) {
+      try (DataIterator<RowData> iterator = readerFor(value)) {
+        while (iterator.hasNext()) {
+          writer.write(iterator.next());
+        }
+
+        Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles());
+        value.group().setOutputFiles(dataFiles);
+        out.collect(
+            new ExecutedGroup(
+                value.table().currentSnapshot().snapshotId(),
+                value.groupsPerCommit(),
+                value.group()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              LogUtil.MESSAGE_PREFIX + "Rewritten files {} from {} to {}",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().info(),
+              value.group().rewrittenFiles(),
+              value.group().addedFiles());
+        } else {
+          LOG.info(
+              LogUtil.MESSAGE_PREFIX + "Rewritten {} files to {} files",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().rewrittenFiles().size(),
+              value.group().addedFiles().size());
+        }
+      } catch (Exception ex) {
+        LOG.info(
+            LogUtil.MESSAGE_PREFIX + "Exception rewriting datafile group {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp(),
+            value.group(),
+            ex);
+        ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+        errorCounter.inc();
+        abort(writer, ctx.timestamp());
+      }
+    } catch (Exception ex) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception creating compaction writer for 
group {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group(),
+          ex);
+      ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+      errorCounter.inc();
+    }
+  }
+
+  private TaskWriter<RowData> writerFor(PlannedGroup value) {
+    String formatString =

Review Comment:
   I know some write configs are not applicable to maintenance write (like 
upsertMode, equalityFieldIds). but some (like fileFormat, writeProperties) 
probably should look beyond table properties?
   ```
   flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig);
   ```
   
   



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner;
+import org.apache.iceberg.actions.RewriteFileGroupPlanner.RewritePlanResult;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Plans the rewrite groups using the {@link RewriteFileGroupPlanner}. The 
input is the {@link
+ * Trigger}, the output is zero, or some {@link PlannedGroup}s.
+ */
+@Internal
+public class DataFileRewritePlanner
+    extends ProcessFunction<Trigger, DataFileRewritePlanner.PlannedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewritePlanner.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final int partialProgressMaxCommits;
+  private final long maxRewriteBytes;
+  private final Map<String, String> rewriterOptions;
+  private transient SizeBasedDataRewriter rewriter;
+  private transient RewriteFileGroupPlanner planner;
+  private transient Counter errorCounter;
+
+  public DataFileRewritePlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      int newPartialProgressMaxCommits,
+      long maxRewriteBytes,
+      Map<String, String> rewriterOptions) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(rewriterOptions, "Options map should no be 
null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+    this.maxRewriteBytes = maxRewriteBytes;
+    this.rewriterOptions = rewriterOptions;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.rewriter =
+        new SizeBasedDataRewriter(tableLoader.loadTable()) {
+          @Override
+          public Set<DataFile> rewrite(List<FileScanTask> group) {
+            // We use the rewriter only for bin-packing the file groups to 
compact
+            throw new UnsupportedOperationException("Should not be called");
+          }
+        };
+
+    rewriter.init(rewriterOptions);
+    this.planner = new RewriteFileGroupPlanner(rewriter, RewriteJobOrder.NONE);
+  }
+
+  @Override
+  public void processElement(Trigger value, Context ctx, 
Collector<PlannedGroup> out)
+      throws Exception {
+    LOG.debug(
+        LogUtil.MESSAGE_PREFIX + "Creating rewrite plan",
+        tableName,
+        taskName,
+        taskIndex,
+        ctx.timestamp());
+    try {
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      if (table.currentSnapshot() == null) {
+        LOG.info(
+            LogUtil.MESSAGE_PREFIX + "Nothing to plan for in an empty table",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp());
+        return;
+      }
+
+      RewritePlanResult plan =
+          planner.plan(
+              table, Expressions.alwaysTrue(), 
table.currentSnapshot().snapshotId(), false);
+
+      long rewriteBytes = 0;
+      int totalGroupCount = 0;
+      List<RewriteFileGroup> groups = 
plan.fileGroups().collect(Collectors.toList());
+      ListIterator<RewriteFileGroup> iter = groups.listIterator();
+      while (iter.hasNext()) {
+        RewriteFileGroup group = iter.next();
+        if (rewriteBytes + group.sizeInBytes() > maxRewriteBytes) {
+          // Keep going, maybe some other group might fit in
+          LOG.info(
+              LogUtil.MESSAGE_PREFIX + "Skipping group {} as max rewrite size 
reached",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              group);
+          iter.remove();
+        } else {
+          rewriteBytes += group.sizeInBytes();
+          ++totalGroupCount;
+        }
+      }
+
+      int groupsPerCommit =
+          IntMath.divide(totalGroupCount, partialProgressMaxCommits, 
RoundingMode.CEILING);
+
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Rewrite plan created {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          groups);
+
+      for (RewriteFileGroup group : groups) {
+        LOG.debug(
+            LogUtil.MESSAGE_PREFIX + "Emitting {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp(),
+            group);
+        out.collect(
+            new PlannedGroup(
+                table, groupsPerCommit, 
rewriter.splitSize(group.sizeInBytes()), group));
+      }
+    } catch (Exception e) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception planning data file rewrite 
groups",

Review Comment:
   nit: maybe `Failed to plan ...`



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteExecutor.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes a rewrite for a single {@link PlannedGroup}. Reads the files with 
the standard {@link
+ * FileScanTaskReader}, so the delete files are considered, and writes using 
the {@link
+ * TaskWriterFactory}. The output is an {@link ExecutedGroup}.
+ */
+@Internal
+public class DataFileRewriteExecutor
+    extends ProcessFunction<PlannedGroup, 
DataFileRewriteExecutor.ExecutedGroup> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteExecutor.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+
+  private transient int subTaskId;
+  private transient int attemptId;
+  private transient Counter errorCounter;
+
+  public DataFileRewriteExecutor(String tableName, String taskName, int 
taskIndex) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+  }
+
+  @Override
+  public void open(Configuration parameters) {
+    this.errorCounter =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
+            .counter(TableMaintenanceMetrics.ERROR_COUNTER);
+
+    this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+    this.attemptId = getRuntimeContext().getTaskInfo().getAttemptNumber();
+  }
+
+  @Override
+  public void processElement(PlannedGroup value, Context ctx, 
Collector<ExecutedGroup> out)
+      throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          LogUtil.MESSAGE_PREFIX + "Rewriting files {} from {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().info(),
+          value.group().rewrittenFiles());
+    } else {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Rewriting {} files",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group().rewrittenFiles().size());
+    }
+
+    try (TaskWriter<RowData> writer = writerFor(value)) {
+      try (DataIterator<RowData> iterator = readerFor(value)) {
+        while (iterator.hasNext()) {
+          writer.write(iterator.next());
+        }
+
+        Set<DataFile> dataFiles = Sets.newHashSet(writer.dataFiles());
+        value.group().setOutputFiles(dataFiles);
+        out.collect(
+            new ExecutedGroup(
+                value.table().currentSnapshot().snapshotId(),
+                value.groupsPerCommit(),
+                value.group()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              LogUtil.MESSAGE_PREFIX + "Rewritten files {} from {} to {}",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().info(),
+              value.group().rewrittenFiles(),
+              value.group().addedFiles());
+        } else {
+          LOG.info(
+              LogUtil.MESSAGE_PREFIX + "Rewritten {} files to {} files",
+              tableName,
+              taskName,
+              taskIndex,
+              ctx.timestamp(),
+              value.group().rewrittenFiles().size(),
+              value.group().addedFiles().size());
+        }
+      } catch (Exception ex) {
+        LOG.info(
+            LogUtil.MESSAGE_PREFIX + "Exception rewriting datafile group {}",
+            tableName,
+            taskName,
+            taskIndex,
+            ctx.timestamp(),
+            value.group(),
+            ex);
+        ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+        errorCounter.inc();
+        abort(writer, ctx.timestamp());
+      }
+    } catch (Exception ex) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception creating compaction writer for 
group {}",
+          tableName,
+          taskName,
+          taskIndex,
+          ctx.timestamp(),
+          value.group(),
+          ex);
+      ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
+      errorCounter.inc();
+    }
+  }
+
+  private TaskWriter<RowData> writerFor(PlannedGroup value) {
+    String formatString =
+        PropertyUtil.propertyAsString(
+            value.table().properties(),
+            TableProperties.DEFAULT_FILE_FORMAT,
+            TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    RowDataTaskWriterFactory factory =
+        new RowDataTaskWriterFactory(
+            value.table(),
+            FlinkSchemaUtil.convert(value.table().schema()),
+            value.splitSize(),
+            FileFormat.fromString(formatString),
+            value.table().properties(),
+            null,
+            false);
+    factory.initialize(subTaskId, attemptId);
+    return factory.create();
+  }
+
+  private DataIterator<RowData> readerFor(PlannedGroup value) {
+    RowDataFileScanTaskReader reader =
+        new RowDataFileScanTaskReader(
+            value.table().schema(),
+            value.table().schema(),
+            PropertyUtil.propertyAsString(value.table().properties(), 
DEFAULT_NAME_MAPPING, null),
+            false,
+            Collections.emptyList());
+    return new DataIterator<>(
+        reader,
+        new BaseCombinedScanTask(value.group().fileScans()),
+        value.table().io(),
+        value.table().encryption());
+  }
+
+  private void abort(TaskWriter<RowData> writer, long timestamp) {
+    try {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Aborting rewrite for (subTaskId {}, 
attemptId {})",
+          tableName,
+          taskName,
+          taskIndex,
+          timestamp,
+          subTaskId,
+          attemptId);
+      writer.abort();
+      LOG.info(

Review Comment:
   nit: probably don't need this log line



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient int processed;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    // Has to be concurrent since it is accessed by the CommitService from 
another thread
+    this.inProgress = Sets.newConcurrentHashSet();
+    Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
+    if (inProgressIterable != null) {
+      for (RewriteFileGroup group : inProgressIterable) {
+        inProgress.add(group);
+      }
+    }
+
+    commitInProgress(System.currentTimeMillis());
+
+    this.processed = 0;
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    startingSnapshotIdState.clear();
+    if (startingSnapshotId != null) {
+      startingSnapshotIdState.add(startingSnapshotId);
+    }
+
+    inProgressState.clear();
+    for (RewriteFileGroup group : inProgress) {
+      inProgressState.add(group);
+    }
+  }
+
+  @Override
+  public void 
processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteExecutor.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        this.startingSnapshotId = executedGroup.snapshotId();
+        this.commitService =
+            createCommitService(streamRecord.getTimestamp(), 
executedGroup.groupsPerCommit());
+      }
+
+      commitService.offer(executedGroup.group());
+      inProgress.add(executedGroup.group());
+      ++processed;
+    } catch (Exception e) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception processing {}",
+          tableName,
+          taskName,
+          taskIndex,
+          streamRecord.getTimestamp(),
+          executedGroup,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    try {
+      if (commitService != null) {
+        commitService.close();
+        if (processed != commitService.results().size()) {
+          throw new RuntimeException(
+              String.format(
+                  Locale.ROOT,
+                  LogUtil.MESSAGE_FORMAT_PREFIX + "From %d commits only %d 
were unsuccessful",
+                  tableName,
+                  taskName,
+                  taskIndex,
+                  mark.getTimestamp(),
+                  processed,
+                  commitService.results().size()));
+        }
+      }
+
+      table.refresh();
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Successfully completed data file 
compaction to {}",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          table.currentSnapshot().snapshotId());
+    } catch (Exception e) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception closing commit service",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+
+    // Cleanup
+    this.commitService = null;
+    this.startingSnapshotId = null;
+    this.processed = 0;
+    inProgress.clear();
+
+    super.processWatermark(mark);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (commitService != null) {
+      commitService.close();
+    }
+  }
+
+  private CommitService createCommitService(long timestamp, int 
groupsPerCommit) {
+    FlinkRewriteDataFilesCommitManager commitManager =
+        new FlinkRewriteDataFilesCommitManager(table, startingSnapshotId, 
timestamp);
+    CommitService service = commitManager.service(groupsPerCommit);
+    service.start();
+    return service;
+  }
+
+  private void commitInProgress(long timestamp) {
+    if (!inProgress.isEmpty()) {
+      CommitService service = null;
+      try {
+        service = createCommitService(timestamp, inProgress.size());
+        inProgress.forEach(service::offer);
+      } catch (Exception e) {
+        LOG.info(

Review Comment:
   nit: error log?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.Map;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.actions.SizeBasedFileRewriter;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteCommitter;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteExecutor;
+import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner;
+import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Creates the data file rewriter data stream. Which runs a single iteration 
of the task for every
+ * {@link Trigger} event.
+ *
+ * <p>The input is a {@link DataStream} with {@link Trigger} events and every 
event should be
+ * immediately followed by a {@link 
org.apache.flink.streaming.api.watermark.Watermark} with the
+ * same timestamp as the event.
+ *
+ * <p>The output is a {@link DataStream} with the {@link TaskResult} of the 
run followed by the
+ * {@link org.apache.flink.streaming.api.watermark.Watermark}.
+ */
+public class RewriteDataFiles {
+  static final String PLANNER_TASK_NAME = "RDF Planner";
+  static final String REWRITE_TASK_NAME = "Rewrite";
+  static final String COMMIT_TASK_NAME = "Rewrite commit";
+  static final String AGGREGATOR_TASK_NAME = "Rewrite aggregator";
+
+  private RewriteDataFiles() {}
+
+  /** Creates the builder for a stream which rewrites data files for the 
table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<RewriteDataFiles.Builder> {
+    private boolean partialProgressEnabled = false;
+    private int partialProgressMaxCommits = 10;
+    private final Map<String, String> rewriteOptions = 
Maps.newHashMapWithExpectedSize(6);
+    private long maxRewriteBytes = Long.MAX_VALUE;
+
+    /**
+     * Allows committing compacted data files in batches. For more details 
description see {@link
+     * org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_ENABLED}.
+     *
+     * @param newPartialProgressEnabled to enable partial commits
+     */
+    public Builder partialProgressEnabled(boolean newPartialProgressEnabled) {
+      this.partialProgressEnabled = newPartialProgressEnabled;
+      return this;
+    }
+
+    /**
+     * Configures the size of batches if {@link #partialProgressEnabled}. For 
more details
+     * description see {@link
+     * 
org.apache.iceberg.actions.RewriteDataFiles#PARTIAL_PROGRESS_MAX_COMMITS}.
+     *
+     * @param newPartialProgressMaxCommits to target number of the commits per 
run
+     */
+    public Builder partialProgressMaxCommits(int newPartialProgressMaxCommits) 
{
+      this.partialProgressMaxCommits = newPartialProgressMaxCommits;
+      return this;
+    }
+
+    /**
+     * Configures the maximum byte size of the rewrites for one scheduled 
compaction. This could be
+     * used to limit the resources used by the compaction.
+     *
+     * @param newMaxRewriteBytes to limit the size of the rewrites
+     */
+    public Builder maxRewriteBytes(long newMaxRewriteBytes) {
+      this.maxRewriteBytes = newMaxRewriteBytes;
+      return this;
+    }
+
+    /**
+     * Configures the target file size. For more details description see {@link
+     * org.apache.iceberg.actions.RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
+     *
+     * @param targetFileSizeBytes target file size
+     */
+    public Builder targetFileSizeBytes(long targetFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, 
String.valueOf(targetFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the min file size considered for rewriting. For more details 
description see
+     * {@link SizeBasedFileRewriter#MIN_FILE_SIZE_BYTES}.
+     *
+     * @param minFileSizeBytes min file size
+     */
+    public Builder minFileSizeBytes(long minFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, 
String.valueOf(minFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the max file size considered for rewriting. For more details 
description see
+     * {@link SizeBasedFileRewriter#MAX_FILE_SIZE_BYTES}.
+     *
+     * @param maxFileSizeBytes max file size
+     */
+    public Builder maxFileSizeBytes(long maxFileSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, 
String.valueOf(maxFileSizeBytes));
+      return this;
+    }
+
+    /**
+     * Configures the minimum file number after a rewrite is always initiated. 
For more details
+     * description see {@link SizeBasedFileRewriter#MIN_INPUT_FILES}.
+     *
+     * @param minInputFiles min file number
+     */
+    public Builder minInputFiles(int minInputFiles) {
+      this.rewriteOptions.put(SizeBasedFileRewriter.MIN_INPUT_FILES, 
String.valueOf(minInputFiles));
+      return this;
+    }
+
+    /**
+     * Configures the minimum delete file number for a file after a rewrite is 
always initiated. For
+     * more details description see {@link 
SizeBasedDataRewriter#DELETE_FILE_THRESHOLD}.
+     *
+     * @param deleteFileThreshold min delete file number
+     */
+    public Builder deleteFileThreshold(int deleteFileThreshold) {
+      this.rewriteOptions.put(
+          SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, 
String.valueOf(deleteFileThreshold));
+      return this;
+    }
+
+    /**
+     * Every other option is overridden, and all the files are rewritten.
+     *
+     * @param rewriteAll enables a full rewrite
+     */
+    public Builder rewriteAll(boolean rewriteAll) {
+      this.rewriteOptions.put(SizeBasedFileRewriter.REWRITE_ALL, 
String.valueOf(rewriteAll));
+      return this;
+    }
+
+    /**
+     * Configures the group size for rewriting. For more details description 
see {@link
+     * SizeBasedDataRewriter#MAX_FILE_GROUP_SIZE_BYTES}.
+     *
+     * @param maxFileGroupSizeBytes file group size for rewrite
+     */
+    public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
+      this.rewriteOptions.put(
+          SizeBasedFileRewriter.MAX_FILE_GROUP_SIZE_BYTES, 
String.valueOf(maxFileGroupSizeBytes));
+      return this;
+    }
+
+    @Override
+    DataStream<TaskResult> append(DataStream<Trigger> trigger) {
+      SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
+          trigger
+              .process(
+                  new DataFileRewritePlanner(
+                      tableName(),
+                      taskName(),
+                      index(),
+                      tableLoader(),
+                      partialProgressEnabled ? partialProgressMaxCommits : 1,
+                      maxRewriteBytes,
+                      rewriteOptions))
+              .name(operatorName(PLANNER_TASK_NAME))
+              .uid(PLANNER_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      SingleOutputStreamOperator<DataFileRewriteExecutor.ExecutedGroup> 
rewritten =
+          planned
+              .rebalance()
+              .process(new DataFileRewriteExecutor(tableName(), taskName(), 
index()))
+              .name(operatorName(REWRITE_TASK_NAME))
+              .uid(REWRITE_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .setParallelism(parallelism());
+
+      SingleOutputStreamOperator<Trigger> updated =
+          rewritten
+              .transform(
+                  operatorName(COMMIT_TASK_NAME),
+                  TypeInformation.of(Trigger.class),
+                  new DataFileRewriteCommitter(tableName(), taskName(), 
index(), tableLoader()))
+              .uid(COMMIT_TASK_NAME + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      return trigger
+          .union(updated)

Review Comment:
   I mean why do we need the original `trigger` stream to be unionized here?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient int processed;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    // Has to be concurrent since it is accessed by the CommitService from 
another thread
+    this.inProgress = Sets.newConcurrentHashSet();
+    Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
+    if (inProgressIterable != null) {
+      for (RewriteFileGroup group : inProgressIterable) {
+        inProgress.add(group);
+      }
+    }
+
+    commitInProgress(System.currentTimeMillis());
+
+    this.processed = 0;
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    startingSnapshotIdState.clear();
+    if (startingSnapshotId != null) {
+      startingSnapshotIdState.add(startingSnapshotId);
+    }
+
+    inProgressState.clear();
+    for (RewriteFileGroup group : inProgress) {
+      inProgressState.add(group);
+    }
+  }
+
+  @Override
+  public void 
processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteExecutor.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        this.startingSnapshotId = executedGroup.snapshotId();
+        this.commitService =
+            createCommitService(streamRecord.getTimestamp(), 
executedGroup.groupsPerCommit());
+      }
+
+      commitService.offer(executedGroup.group());
+      inProgress.add(executedGroup.group());
+      ++processed;
+    } catch (Exception e) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception processing {}",
+          tableName,
+          taskName,
+          taskIndex,
+          streamRecord.getTimestamp(),
+          executedGroup,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    try {
+      if (commitService != null) {
+        commitService.close();
+        if (processed != commitService.results().size()) {
+          throw new RuntimeException(
+              String.format(
+                  Locale.ROOT,
+                  LogUtil.MESSAGE_FORMAT_PREFIX + "From %d commits only %d 
were unsuccessful",
+                  tableName,
+                  taskName,
+                  taskIndex,
+                  mark.getTimestamp(),
+                  processed,
+                  commitService.results().size()));
+        }
+      }
+
+      table.refresh();
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Successfully completed data file 
compaction to {}",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          table.currentSnapshot().snapshotId());
+    } catch (Exception e) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception closing commit service",
+          tableName,
+          taskName,
+          taskIndex,
+          mark.getTimestamp(),
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+
+    // Cleanup
+    this.commitService = null;
+    this.startingSnapshotId = null;
+    this.processed = 0;
+    inProgress.clear();
+
+    super.processWatermark(mark);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (commitService != null) {
+      commitService.close();
+    }
+  }
+
+  private CommitService createCommitService(long timestamp, int 
groupsPerCommit) {
+    FlinkRewriteDataFilesCommitManager commitManager =
+        new FlinkRewriteDataFilesCommitManager(table, startingSnapshotId, 
timestamp);
+    CommitService service = commitManager.service(groupsPerCommit);
+    service.start();
+    return service;
+  }
+
+  private void commitInProgress(long timestamp) {
+    if (!inProgress.isEmpty()) {
+      CommitService service = null;
+      try {
+        service = createCommitService(timestamp, inProgress.size());
+        inProgress.forEach(service::offer);
+      } catch (Exception e) {
+        LOG.info(
+            LogUtil.MESSAGE_PREFIX + "Failed committing pending groups {}",
+            tableName,
+            taskName,
+            taskIndex,
+            timestamp,
+            inProgress,
+            e);
+      } finally {
+        inProgress.clear();
+        if (service != null) {
+          try {
+            service.close();
+          } catch (Exception e) {
+            LOG.warn(

Review Comment:
   nit: error log?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteCommitter.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
+import org.apache.iceberg.actions.RewriteDataFilesCommitManager.CommitService;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commits the rewrite changes using {@link RewriteDataFilesCommitManager}. 
The input is a {@link
+ * DataFileRewriteExecutor.ExecutedGroup}. Only {@link Watermark} is emitted 
which is chained to
+ * {@link TaskResultAggregator} input 1.
+ */
+@Internal
+public class DataFileRewriteCommitter extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<DataFileRewriteExecutor.ExecutedGroup, 
Trigger> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRewriteCommitter.class);
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+
+  private transient Table table;
+  private transient Long startingSnapshotId;
+  private transient Set<RewriteFileGroup> inProgress;
+  private transient ListState<Long> startingSnapshotIdState;
+  private transient ListState<RewriteFileGroup> inProgressState;
+  private transient CommitService commitService;
+  private transient int processed;
+  private transient Counter errorCounter;
+  private transient Counter addedDataFileNumCounter;
+  private transient Counter addedDataFileSizeCounter;
+  private transient Counter removedDataFileNumCounter;
+  private transient Counter removedDataFileSizeCounter;
+
+  public DataFileRewriteCommitter(
+      String tableName, String taskName, int taskIndex, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(tableName, "Table name should no be null");
+    Preconditions.checkNotNull(taskName, "Task name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.errorCounter = 
taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER);
+    this.addedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC);
+    this.addedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC);
+    this.removedDataFileNumCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_NUM_METRIC);
+    this.removedDataFileSizeCounter =
+        
taskMetricGroup.counter(TableMaintenanceMetrics.REMOVED_DATA_FILE_SIZE_METRIC);
+
+    this.startingSnapshotIdState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-snapshot-id", 
BasicTypeInfo.LONG_TYPE_INFO));
+    this.inProgressState =
+        context
+            .getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "data-file-updater-in-progress", 
TypeInformation.of(RewriteFileGroup.class)));
+
+    this.startingSnapshotId = null;
+    Iterable<Long> snapshotIdIterable = startingSnapshotIdState.get();
+    if (snapshotIdIterable != null) {
+      Iterator<Long> snapshotIdIterator = snapshotIdIterable.iterator();
+      if (snapshotIdIterator.hasNext()) {
+        this.startingSnapshotId = snapshotIdIterator.next();
+      }
+    }
+
+    // Has to be concurrent since it is accessed by the CommitService from 
another thread
+    this.inProgress = Sets.newConcurrentHashSet();
+    Iterable<RewriteFileGroup> inProgressIterable = inProgressState.get();
+    if (inProgressIterable != null) {
+      for (RewriteFileGroup group : inProgressIterable) {
+        inProgress.add(group);
+      }
+    }
+
+    commitInProgress(System.currentTimeMillis());
+
+    this.processed = 0;
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+
+    startingSnapshotIdState.clear();
+    if (startingSnapshotId != null) {
+      startingSnapshotIdState.add(startingSnapshotId);
+    }
+
+    inProgressState.clear();
+    for (RewriteFileGroup group : inProgress) {
+      inProgressState.add(group);
+    }
+  }
+
+  @Override
+  public void 
processElement(StreamRecord<DataFileRewriteExecutor.ExecutedGroup> 
streamRecord) {
+    DataFileRewriteExecutor.ExecutedGroup executedGroup = 
streamRecord.getValue();
+    try {
+      if (commitService == null) {
+        this.startingSnapshotId = executedGroup.snapshotId();
+        this.commitService =
+            createCommitService(streamRecord.getTimestamp(), 
executedGroup.groupsPerCommit());
+      }
+
+      commitService.offer(executedGroup.group());
+      inProgress.add(executedGroup.group());
+      ++processed;
+    } catch (Exception e) {
+      LOG.info(
+          LogUtil.MESSAGE_PREFIX + "Exception processing {}",
+          tableName,
+          taskName,
+          taskIndex,
+          streamRecord.getTimestamp(),
+          executedGroup,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      errorCounter.inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    try {
+      if (commitService != null) {
+        commitService.close();
+        if (processed != commitService.results().size()) {
+          throw new RuntimeException(
+              String.format(
+                  Locale.ROOT,
+                  LogUtil.MESSAGE_FORMAT_PREFIX + "From %d commits only %d 
were unsuccessful",
+                  tableName,
+                  taskName,
+                  taskIndex,
+                  mark.getTimestamp(),
+                  processed,
+                  commitService.results().size()));
+        }
+      }
+
+      table.refresh();

Review Comment:
   is this refresh needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to