stevenzwu commented on code in PR #11144:
URL: https://github.com/apache/iceberg/pull/11144#discussion_r1763635282


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Predicate;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Delete the files using the {@link FileIO}. */
+@Internal
+public class AsyncDeleteFiles extends RichAsyncFunction<String, Boolean> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncDeleteFiles.class);
+  public static final Predicate<Collection<Boolean>> FAILED_PREDICATE = new 
FailedPredicate();
+
+  private final String name;
+  private final FileIO io;
+  private final int workerPoolSize;
+  private final String tableName;
+
+  private transient ExecutorService workerPool;
+  private transient Counter failedCounter;
+  private transient Counter succeededCounter;
+
+  public AsyncDeleteFiles(String name, TableLoader tableLoader, int 
workerPoolSize) {
+    Preconditions.checkNotNull(name, "Name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.name = name;
+    tableLoader.open();
+    Table table = tableLoader.loadTable();
+    this.io = table.io();
+    this.workerPoolSize = workerPoolSize;
+    this.tableName = table.name();
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.failedCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+            .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
+    this.succeededCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+            .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);
+
+    this.workerPool =

Review Comment:
   this is where I meant if we should use the shared thread pool here.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the 
filenames which could
+ * be removed in the {@link #DELETE_STREAM} side output.
+ */
+public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, 
TaskResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
+  public static final OutputTag<String> DELETE_STREAM =
+      new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING);
+
+  private final TableLoader tableLoader;
+  private final Long maxSnapshotAgeMs;
+  private final Integer retainLast;
+  private final int plannerPoolSize;
+  private transient ExecutorService plannerPool;
+  private transient Table table;
+
+  public ExpireSnapshotsProcessor(
+      TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int 
plannerPoolSize) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableLoader = tableLoader;
+    this.maxSnapshotAgeMs = maxSnapshotAgeMs;
+    this.retainLast = retainLast;
+    this.plannerPoolSize = plannerPoolSize;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+    this.plannerPool = ThreadPools.newWorkerPool(table.name() + 
"-table--planner", plannerPoolSize);
+  }
+
+  @Override
+  public void processElement(Trigger trigger, Context ctx, 
Collector<TaskResult> out)
+      throws Exception {
+    try {
+      table.refresh();
+      ExpireSnapshots expireSnapshots = table.expireSnapshots();
+      if (maxSnapshotAgeMs != null) {
+        expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - 
maxSnapshotAgeMs);
+      }
+
+      if (retainLast != null) {
+        expireSnapshots = expireSnapshots.retainLast(retainLast);
+      }
+
+      AtomicLong deleteFileCounter = new AtomicLong(0L);
+      expireSnapshots
+          .planWith(plannerPool)
+          .deleteWith(
+              file -> {
+                ctx.output(DELETE_STREAM, file);
+                deleteFileCounter.incrementAndGet();
+              })
+          .cleanExpiredFiles(true)
+          .commit();
+
+      LOG.info(
+          "Successfully finished expiring snapshots for {} at {}. Scheduled {} 
files for delete.",
+          table,
+          ctx.timestamp(),
+          deleteFileCounter.get());
+      out.collect(
+          new TaskResult(trigger.taskId(), trigger.timestamp(), true, 
Collections.emptyList()));
+    } catch (Exception e) {
+      LOG.info("Exception expiring snapshots for {} at {}", table, 
ctx.timestamp(), e);

Review Comment:
   error level here?
   
   nit: error msg typically starts with sth like `Failed to ` or `Cannot `



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";
+
+  private ExpireSnapshots() {
+    // Do not instantiate directly
+  }
+
+  /** Creates the builder for creating a stream which expires snapshots for 
the table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
+    private Duration maxSnapshotAge = null;
+    private Integer retainLast = null;
+    private int planningWorkerPoolSize = 
SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
+    private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
+    private int deleteWorkerPoolSize = 
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+    /**
+     * The snapshots newer than this age will not be removed.
+     *
+     * @param newMaxSnapshotAge of the snapshots to be removed
+     * @return for chained calls
+     */
+    public Builder maxSnapshotAge(Duration newMaxSnapshotAge) {
+      this.maxSnapshotAge = newMaxSnapshotAge;
+      return this;
+    }
+
+    /**
+     * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more 
details description see
+     * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}.
+     *
+     * @param newRetainLast number of snapshots to retain
+     * @return for chained calls
+     */
+    public Builder retainLast(int newRetainLast) {
+      this.retainLast = newRetainLast;
+      return this;
+    }
+
+    /**
+     * The worker pool size used to calculate the files to delete.
+     *
+     * @param newPlanningWorkerPoolSize for planning files to delete
+     * @return for chained calls
+     */
+    public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) {
+      this.planningWorkerPoolSize = newPlanningWorkerPoolSize;
+      return this;
+    }
+
+    /**
+     * The number of retries on the failed delete attempts.
+     *
+     * @param newDeleteAttemptNum number of retries
+     * @return for chained calls
+     */
+    public Builder deleteAttemptNum(int newDeleteAttemptNum) {
+      this.deleteAttemptNum = newDeleteAttemptNum;
+      return this;
+    }
+
+    /**
+     * The worker pool size used for deleting files.
+     *
+     * @param newDeleteWorkerPoolSize for scanning
+     * @return for chained calls
+     */
+    public Builder deleteWorkerPoolSize(int newDeleteWorkerPoolSize) {
+      this.deleteWorkerPoolSize = newDeleteWorkerPoolSize;
+      return this;
+    }
+
+    @Override
+    DataStream<TaskResult> append(DataStream<Trigger> trigger) {
+      Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
+
+      SingleOutputStreamOperator<TaskResult> result =
+          trigger
+              .process(
+                  new ExpireSnapshotsProcessor(
+                      tableLoader(),
+                      maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
+                      retainLast,
+                      planningWorkerPoolSize))
+              .name(EXECUTOR_TASK_NAME)
+              .uid("expire-snapshots-" + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      AsyncRetryStrategy<Boolean> retryStrategy =
+          new 
AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<Boolean>(
+                  deleteAttemptNum,
+                  DELETE_INITIAL_DELAY_MS,
+                  DELETE_MAX_RETRY_DELAY_MS,
+                  DELETE_BACKOFF_MULTIPLIER)
+              .ifResult(AsyncDeleteFiles.FAILED_PREDICATE)
+              .build();
+
+      AsyncDataStream.unorderedWaitWithRetry(
+              
result.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM).rebalance(),
+              new AsyncDeleteFiles(name(), tableLoader(), 
deleteWorkerPoolSize),
+              DELETE_TIMEOUT_MS,
+              TimeUnit.MILLISECONDS,
+              deleteWorkerPoolSize,
+              retryStrategy)
+          .name(DELETE_FILES_TASK_NAME)
+          .uid("delete-expired-files-" + uidSuffix())
+          .slotSharingGroup(slotSharingGroup())
+          .setParallelism(parallelism());
+
+      // Deleting the files is asynchronous, so we ignore the results when 
calculating the return

Review Comment:
   nit: `ignore the results` can be clarified. maybe sth like this
   ```
   Ignore the async file deletion boolean result and return the 
DataStream<TaskResult> directly
   ```



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";
+
+  private ExpireSnapshots() {
+    // Do not instantiate directly
+  }
+
+  /** Creates the builder for creating a stream which expires snapshots for 
the table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
+    private Duration maxSnapshotAge = null;
+    private Integer retainLast = null;
+    private int planningWorkerPoolSize = 
SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
+    private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
+    private int deleteWorkerPoolSize = 
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+    /**
+     * The snapshots newer than this age will not be removed.
+     *
+     * @param newMaxSnapshotAge of the snapshots to be removed
+     * @return for chained calls

Review Comment:
   remove the `@return` line. seems obvious to document it. this applies to all 
builder methods



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";
+
+  private ExpireSnapshots() {
+    // Do not instantiate directly
+  }
+
+  /** Creates the builder for creating a stream which expires snapshots for 
the table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
+    private Duration maxSnapshotAge = null;
+    private Integer retainLast = null;
+    private int planningWorkerPoolSize = 
SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
+    private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
+    private int deleteWorkerPoolSize = 
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+    /**
+     * The snapshots newer than this age will not be removed.

Review Comment:
   nit: more direc. either `The snapshots newer than this age will be retained` 
or `The snapshots older than this age will be removed`



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory;
+import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Creates the table maintenance graph. */
+public class TableMaintenance {
+  private static final String TASK_NAME_FORMAT = "%s [%d]";
+  static final String SOURCE_NAME = "Monitor source";
+  static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager";
+  static final String LOCK_REMOVER_TASK_NAME = "Lock remover";
+
+  private TableMaintenance() {
+    // Do not instantiate directly
+  }
+
+  /**
+   * Use when the change stream is already provided, like in the {@link
+   * 
org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}.
+   *
+   * @param changeStream the table changes
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forChangeStream(
+      DataStream<TableChange> changeStream,
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(changeStream, tableLoader, lockFactory);
+  }
+
+  /**
+   * Creates the default monitor source for collecting the table changes and 
returns a builder for
+   * the maintenance stream.
+   *
+   * @param env used to register the monitor source
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forTable(
+      StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(env, tableLoader, lockFactory);
+  }
+
+  public static class Builder {
+    private final StreamExecutionEnvironment env;
+    private final DataStream<TableChange> inputStream;
+    private final TableLoader tableLoader;
+    private final List<MaintenanceTaskBuilder<?>> taskBuilders;
+    private final TriggerLockFactory lockFactory;
+
+    private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
+    private String slotSharingGroup = 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+    private Duration rateLimit = Duration.ofMillis(1);
+    private Duration lockCheckDelay = Duration.ofSeconds(30);
+    private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+    private int maxReadBack = 100;
+
+    private Builder(
+        StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+      this.env = env;
+      this.inputStream = null;
+      this.tableLoader = tableLoader;
+      this.lockFactory = lockFactory;
+      this.taskBuilders = Lists.newArrayListWithCapacity(4);
+    }
+
+    private Builder(
+        DataStream<TableChange> inputStream,
+        TableLoader tableLoader,
+        TriggerLockFactory lockFactory) {
+      this.env = null;
+      this.inputStream = inputStream;
+      this.tableLoader = tableLoader;
+      this.lockFactory = lockFactory;
+      this.taskBuilders = Lists.newArrayListWithCapacity(4);
+    }
+
+    /**
+     * The prefix used for the generated {@link 
org.apache.flink.api.dag.Transformation}'s uid.
+     *
+     * @param newUidSuffix for the transformations
+     * @return for chained calls
+     */
+    public Builder uidSuffix(String newUidSuffix) {
+      this.uidSuffix = newUidSuffix;
+      return this;
+    }
+
+    /**
+     * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all 
the operators of the
+     * generated stream. Could be used to separate the resources used by this 
task.
+     *
+     * @param newSlotSharingGroup to be used for the operators
+     * @return for chained calls
+     */
+    public Builder slotSharingGroup(String newSlotSharingGroup) {
+      this.slotSharingGroup = newSlotSharingGroup;
+      return this;
+    }
+
+    /**
+     * Limits the firing frequency for the task triggers.
+     *
+     * @param newRateLimit firing frequency
+     * @return for chained calls
+     */
+    public Builder rateLimit(Duration newRateLimit) {
+      Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should 
be greater than 0");
+      this.rateLimit = newRateLimit;
+      return this;
+    }
+
+    /**
+     * Sets the delay for checking lock availability when a concurrent run is 
detected.
+     *
+     * @param newLockCheckDelay lock checking frequency
+     * @return for chained calls
+     */
+    public Builder lockCheckDelay(Duration newLockCheckDelay) {
+      this.lockCheckDelay = newLockCheckDelay;
+      return this;
+    }
+
+    /**
+     * Sets the global parallelism of maintenance tasks. Could be overwritten 
by the {@link
+     * MaintenanceTaskBuilder#parallelism(int)}.
+     *
+     * @param newParallelism task parallelism
+     * @return for chained calls
+     */
+    public Builder parallelism(int newParallelism) {
+      this.parallelism = newParallelism;
+      return this;
+    }
+
+    /**
+     * Maximum number of snapshots checked when started with an embedded 
{@link MonitorSource} at
+     * the first time. Only available when the {@link
+     * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, 
TriggerLockFactory)} is
+     * used.
+     *
+     * @param newMaxReadBack snapshots to consider when initializing
+     * @return for chained calls
+     */
+    public Builder maxReadBack(int newMaxReadBack) {
+      Preconditions.checkArgument(
+          inputStream == null, "Can't set maxReadBack when change stream is 
provided");
+      this.maxReadBack = newMaxReadBack;
+      return this;
+    }
+
+    /**
+     * Adds a specific task with the given schedule.
+     *
+     * @param task to add
+     * @return for chained calls
+     */
+    public Builder add(MaintenanceTaskBuilder<?> task) {
+      taskBuilders.add(task);
+      return this;
+    }
+
+    /** Builds the task graph for the maintenance tasks. */
+    public void append() {
+      Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least 
one task");
+      Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null");
+
+      List<String> taskNames = 
Lists.newArrayListWithCapacity(taskBuilders.size());
+      List<TriggerEvaluator> evaluators = 
Lists.newArrayListWithCapacity(taskBuilders.size());
+      for (int i = 0; i < taskBuilders.size(); ++i) {
+        taskNames.add(nameFor(taskBuilders.get(i), i));
+        evaluators.add(taskBuilders.get(i).evaluator());
+      }
+
+      DataStream<Trigger> triggers =
+          DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> 
true)
+              .process(
+                  new TriggerManager(
+                      tableLoader,
+                      lockFactory,
+                      taskNames,
+                      evaluators,
+                      rateLimit.toMillis(),
+                      lockCheckDelay.toMillis()))
+              .name(TRIGGER_MANAGER_TASK_NAME)
+              .uid("trigger-manager-" + uidSuffix)
+              .slotSharingGroup(slotSharingGroup)
+              .forceNonParallel()
+              .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
+              .name("Watermark Assigner")
+              .uid("watermark-assigner-" + uidSuffix)
+              .slotSharingGroup(slotSharingGroup)
+              .forceNonParallel();
+
+      // Add the specific tasks
+      DataStream<TaskResult> unioned = null;
+      for (int i = 0; i < taskBuilders.size(); ++i) {
+        int finalIndex = i;
+        DataStream<Trigger> filtered =
+            triggers
+                .filter(t -> t.taskId() != null && t.taskId() == finalIndex)
+                .name("Filter " + i)

Review Comment:
   again, might be good to make name and uid the same as we did for v2 sink



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> 
{
+  private int index;
+  private String name;
+  private TableLoader tableLoader;
+  private String uidSuffix = null;
+  private String slotSharingGroup = null;
+  private Integer parallelism = null;
+  private TriggerEvaluator.Builder triggerEvaluator = new 
TriggerEvaluator.Builder();
+
+  abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream);
+
+  /**
+   * After a given number of Iceberg table commits since the last run, starts 
the downstream job.
+   *
+   * @param commitCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnCommitCount(int commitCount) {
+    triggerEvaluator.commitCount(commitCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new data files since the last run, starts the 
downstream job.
+   *
+   * @param dataFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnDataFileCount(int dataFileCount) {
+    triggerEvaluator.dataFileCount(dataFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given aggregated data file size since the last run, starts the 
downstream job.
+   *
+   * @param dataFileSizeInBytes after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnDataFileSize(long dataFileSizeInBytes) {
+    triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new positional delete files since the last run, 
starts the downstream
+   * job.
+   *
+   * @param posDeleteFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) {
+    triggerEvaluator.posDeleteFileCount(posDeleteFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new positional delete records since the last run, 
starts the downstream
+   * job.
+   *
+   * @param posDeleteRecordCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) {
+    triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new equality delete files since the last run, 
starts the downstream
+   * job.
+   *
+   * @param eqDeleteFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) {
+    triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new equality delete records since the last run, 
starts the downstream
+   * job.
+   *
+   * @param eqDeleteRecordCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) {
+    triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given time since the last run, starts the downstream job.
+   *
+   * @param interval after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnInterval(Duration interval) {
+    triggerEvaluator.timeout(interval);
+    return (T) this;
+  }
+
+  /**
+   * The suffix used for the generated {@link 
org.apache.flink.api.dag.Transformation}'s uid.
+   *
+   * @param newUidSuffix for the transformations
+   * @return for chained calls
+   */
+  public T uidSuffix(String newUidSuffix) {
+    this.uidSuffix = newUidSuffix;
+    return (T) this;
+  }
+
+  /**
+   * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all 
the operators of the
+   * generated stream. Could be used to separate the resources used by this 
task.
+   *
+   * @param newSlotSharingGroup to be used for the operators
+   * @return for chained calls
+   */
+  public T slotSharingGroup(String newSlotSharingGroup) {
+    this.slotSharingGroup = newSlotSharingGroup;
+    return (T) this;
+  }
+
+  /**
+   * Sets the parallelism for the stream.
+   *
+   * @param newParallelism the required parallelism
+   * @return for chained calls
+   */
+  public T parallelism(int newParallelism) {
+    this.parallelism = newParallelism;
+    return (T) this;
+  }
+
+  @Internal
+  int id() {
+    return index;
+  }
+
+  @Internal
+  String name() {
+    return name;
+  }
+
+  @Internal
+  TableLoader tableLoader() {
+    return tableLoader;
+  }
+
+  @Internal
+  String uidSuffix() {
+    return uidSuffix;
+  }
+
+  @Internal
+  String slotSharingGroup() {
+    return slotSharingGroup;
+  }
+
+  @Internal
+  Integer parallelism() {
+    return parallelism;
+  }
+
+  @Internal
+  TriggerEvaluator evaluator() {
+    return triggerEvaluator.build();
+  }
+
+  @Internal
+  DataStream<TaskResult> append(
+      DataStream<Trigger> sourceStream,
+      int maintenanceTaskIndex,
+      String maintainanceTaskName,
+      TableLoader newTableLoader,
+      String mainUidSuffix,
+      String mainSlotSharingGroup,
+      int mainParallelism) {
+    Preconditions.checkArgument(
+        parallelism == null || parallelism == -1 || parallelism > 0,

Review Comment:
   this assertion condition doesn't seem correct. it should assert on the input 
arg `mainParallelism`.
   
   also should `mainParallelism` be non-primitive `Integer`? if yes, we should 
only need to assert two conditions of `mainParallelism == null || 
mainParallelism  > 0`.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the 
filenames which could
+ * be removed in the {@link #DELETE_STREAM} side output.
+ */
+public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, 
TaskResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
+  public static final OutputTag<String> DELETE_STREAM =
+      new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING);
+
+  private final TableLoader tableLoader;
+  private final Long maxSnapshotAgeMs;
+  private final Integer retainLast;
+  private final int plannerPoolSize;
+  private transient ExecutorService plannerPool;
+  private transient Table table;
+
+  public ExpireSnapshotsProcessor(
+      TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int 
plannerPoolSize) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableLoader = tableLoader;
+    this.maxSnapshotAgeMs = maxSnapshotAgeMs;
+    this.retainLast = retainLast;
+    this.plannerPoolSize = plannerPoolSize;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+    this.plannerPool = ThreadPools.newWorkerPool(table.name() + 
"-table--planner", plannerPoolSize);
+  }
+
+  @Override
+  public void processElement(Trigger trigger, Context ctx, 
Collector<TaskResult> out)
+      throws Exception {
+    try {
+      table.refresh();
+      ExpireSnapshots expireSnapshots = table.expireSnapshots();
+      if (maxSnapshotAgeMs != null) {
+        expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - 
maxSnapshotAgeMs);
+      }
+
+      if (retainLast != null) {
+        expireSnapshots = expireSnapshots.retainLast(retainLast);
+      }
+
+      AtomicLong deleteFileCounter = new AtomicLong(0L);
+      expireSnapshots
+          .planWith(plannerPool)
+          .deleteWith(
+              file -> {
+                ctx.output(DELETE_STREAM, file);
+                deleteFileCounter.incrementAndGet();
+              })
+          .cleanExpiredFiles(true)

Review Comment:
   maybe we should add Javadoc to the `ExpireSnapshots` class that expired 
files are always deleted



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the 
filenames which could
+ * be removed in the {@link #DELETE_STREAM} side output.
+ */
+public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, 
TaskResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
+  public static final OutputTag<String> DELETE_STREAM =
+      new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING);
+
+  private final TableLoader tableLoader;
+  private final Long maxSnapshotAgeMs;
+  private final Integer retainLast;
+  private final int plannerPoolSize;
+  private transient ExecutorService plannerPool;
+  private transient Table table;
+
+  public ExpireSnapshotsProcessor(
+      TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int 
plannerPoolSize) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableLoader = tableLoader;
+    this.maxSnapshotAgeMs = maxSnapshotAgeMs;
+    this.retainLast = retainLast;
+    this.plannerPoolSize = plannerPoolSize;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+    this.plannerPool = ThreadPools.newWorkerPool(table.name() + 
"-table--planner", plannerPoolSize);
+  }
+
+  @Override
+  public void processElement(Trigger trigger, Context ctx, 
Collector<TaskResult> out)
+      throws Exception {
+    try {
+      table.refresh();
+      ExpireSnapshots expireSnapshots = table.expireSnapshots();
+      if (maxSnapshotAgeMs != null) {
+        expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - 
maxSnapshotAgeMs);
+      }
+
+      if (retainLast != null) {
+        expireSnapshots = expireSnapshots.retainLast(retainLast);
+      }
+
+      AtomicLong deleteFileCounter = new AtomicLong(0L);
+      expireSnapshots
+          .planWith(plannerPool)
+          .deleteWith(
+              file -> {
+                ctx.output(DELETE_STREAM, file);
+                deleteFileCounter.incrementAndGet();
+              })
+          .cleanExpiredFiles(true)
+          .commit();
+
+      LOG.info(
+          "Successfully finished expiring snapshots for {} at {}. Scheduled {} 
files for delete.",
+          table,
+          ctx.timestamp(),
+          deleteFileCounter.get());
+      out.collect(
+          new TaskResult(trigger.taskId(), trigger.timestamp(), true, 
Collections.emptyList()));
+    } catch (Exception e) {
+      LOG.info("Exception expiring snapshots for {} at {}", table, 
ctx.timestamp(), e);
+      out.collect(
+          new TaskResult(trigger.taskId(), trigger.timestamp(), false, 
Lists.newArrayList(e)));

Review Comment:
   `TaskResult` has ` List<Exception> exceptions`. wondering what scenario 
would we have a list of exceptions to propagate?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;

Review Comment:
   maybe add some comment to explain multiplier choice of `1.5`. more common 
choice is `2`.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the 
filenames which could
+ * be removed in the {@link #DELETE_STREAM} side output.
+ */
+public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, 
TaskResult> {

Review Comment:
   this can be marked as `@Internal` or package private if we can agree to 
remove the sub packages



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;

Review Comment:
   I am wondering if we need this timeout. FileIO typically has a timeout 
config internally. If we are configuring another timeout here, we may have 
inconsistency. E.g., a lower value here would cause premature declaration of 
failure. should we just rely on the underneath FileIO timeouts.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";
+
+  private ExpireSnapshots() {
+    // Do not instantiate directly
+  }
+
+  /** Creates the builder for creating a stream which expires snapshots for 
the table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
+    private Duration maxSnapshotAge = null;
+    private Integer retainLast = null;
+    private int planningWorkerPoolSize = 
SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
+    private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
+    private int deleteWorkerPoolSize = 
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+    /**
+     * The snapshots newer than this age will not be removed.
+     *
+     * @param newMaxSnapshotAge of the snapshots to be removed
+     * @return for chained calls
+     */
+    public Builder maxSnapshotAge(Duration newMaxSnapshotAge) {
+      this.maxSnapshotAge = newMaxSnapshotAge;
+      return this;
+    }
+
+    /**
+     * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more 
details description see
+     * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}.
+     *
+     * @param newRetainLast number of snapshots to retain
+     * @return for chained calls
+     */
+    public Builder retainLast(int newRetainLast) {
+      this.retainLast = newRetainLast;
+      return this;
+    }
+
+    /**
+     * The worker pool size used to calculate the files to delete.
+     *
+     * @param newPlanningWorkerPoolSize for planning files to delete
+     * @return for chained calls
+     */
+    public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) {
+      this.planningWorkerPoolSize = newPlanningWorkerPoolSize;
+      return this;
+    }
+
+    /**
+     * The number of retries on the failed delete attempts.
+     *
+     * @param newDeleteAttemptNum number of retries
+     * @return for chained calls
+     */
+    public Builder deleteAttemptNum(int newDeleteAttemptNum) {
+      this.deleteAttemptNum = newDeleteAttemptNum;
+      return this;
+    }
+
+    /**
+     * The worker pool size used for deleting files.
+     *
+     * @param newDeleteWorkerPoolSize for scanning
+     * @return for chained calls
+     */
+    public Builder deleteWorkerPoolSize(int newDeleteWorkerPoolSize) {
+      this.deleteWorkerPoolSize = newDeleteWorkerPoolSize;
+      return this;
+    }
+
+    @Override
+    DataStream<TaskResult> append(DataStream<Trigger> trigger) {
+      Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
+
+      SingleOutputStreamOperator<TaskResult> result =
+          trigger
+              .process(
+                  new ExpireSnapshotsProcessor(
+                      tableLoader(),
+                      maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
+                      retainLast,
+                      planningWorkerPoolSize))
+              .name(EXECUTOR_TASK_NAME)
+              .uid("expire-snapshots-" + uidSuffix())
+              .slotSharingGroup(slotSharingGroup())
+              .forceNonParallel();
+
+      AsyncRetryStrategy<Boolean> retryStrategy =
+          new 
AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<Boolean>(
+                  deleteAttemptNum,
+                  DELETE_INITIAL_DELAY_MS,
+                  DELETE_MAX_RETRY_DELAY_MS,
+                  DELETE_BACKOFF_MULTIPLIER)
+              .ifResult(AsyncDeleteFiles.FAILED_PREDICATE)
+              .build();
+
+      AsyncDataStream.unorderedWaitWithRetry(
+              
result.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM).rebalance(),
+              new AsyncDeleteFiles(name(), tableLoader(), 
deleteWorkerPoolSize),
+              DELETE_TIMEOUT_MS,
+              TimeUnit.MILLISECONDS,
+              deleteWorkerPoolSize,
+              retryStrategy)
+          .name(DELETE_FILES_TASK_NAME)
+          .uid("delete-expired-files-" + uidSuffix())
+          .slotSharingGroup(slotSharingGroup())
+          .setParallelism(parallelism());

Review Comment:
   parallelism is non-primitive `Integer` and can be null. probably only call 
`setParallelism` if not null.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";

Review Comment:
   It is unclear what `ES` means. please use full name. also we can probably 
set the operator name and uid the same. right now, name is set to this 
constant. at least, it should include the `uidSuffix`.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";

Review Comment:
   nit: this seems more like an operator name, not maintenance task name. 
again, would be simpler to just set the operator uid and name the same. that is 
what we do in the v2 sink



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";
+
+  private ExpireSnapshots() {
+    // Do not instantiate directly
+  }
+
+  /** Creates the builder for creating a stream which expires snapshots for 
the table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
+    private Duration maxSnapshotAge = null;
+    private Integer retainLast = null;
+    private int planningWorkerPoolSize = 
SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
+    private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
+    private int deleteWorkerPoolSize = 
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+    /**
+     * The snapshots newer than this age will not be removed.
+     *
+     * @param newMaxSnapshotAge of the snapshots to be removed
+     * @return for chained calls
+     */
+    public Builder maxSnapshotAge(Duration newMaxSnapshotAge) {
+      this.maxSnapshotAge = newMaxSnapshotAge;
+      return this;
+    }
+
+    /**
+     * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more 
details description see

Review Comment:
   nit: should be `The minimum number of ... to retain`.
   
   also use import for the linked class?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";
+
+  private ExpireSnapshots() {
+    // Do not instantiate directly
+  }
+
+  /** Creates the builder for creating a stream which expires snapshots for 
the table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
+    private Duration maxSnapshotAge = null;
+    private Integer retainLast = null;
+    private int planningWorkerPoolSize = 
SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
+    private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
+    private int deleteWorkerPoolSize = 
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+    /**
+     * The snapshots newer than this age will not be removed.
+     *
+     * @param newMaxSnapshotAge of the snapshots to be removed
+     * @return for chained calls
+     */
+    public Builder maxSnapshotAge(Duration newMaxSnapshotAge) {
+      this.maxSnapshotAge = newMaxSnapshotAge;
+      return this;
+    }
+
+    /**
+     * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more 
details description see
+     * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}.
+     *
+     * @param newRetainLast number of snapshots to retain
+     * @return for chained calls
+     */
+    public Builder retainLast(int newRetainLast) {
+      this.retainLast = newRetainLast;
+      return this;
+    }
+
+    /**
+     * The worker pool size used to calculate the files to delete.
+     *
+     * @param newPlanningWorkerPoolSize for planning files to delete
+     * @return for chained calls
+     */
+    public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) {
+      this.planningWorkerPoolSize = newPlanningWorkerPoolSize;
+      return this;
+    }
+
+    /**
+     * The number of retries on the failed delete attempts.
+     *
+     * @param newDeleteAttemptNum number of retries
+     * @return for chained calls
+     */
+    public Builder deleteAttemptNum(int newDeleteAttemptNum) {

Review Comment:
   nit: maybe `deleteRetryNum` to be more clear it is retry. kind of like 
`COMMIT_NUM_RETRIES` from `TableProperties`



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory;
+import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Creates the table maintenance graph. */
+public class TableMaintenance {
+  private static final String TASK_NAME_FORMAT = "%s [%d]";
+  static final String SOURCE_NAME = "Monitor source";
+  static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager";
+  static final String LOCK_REMOVER_TASK_NAME = "Lock remover";
+
+  private TableMaintenance() {
+    // Do not instantiate directly
+  }
+
+  /**
+   * Use when the change stream is already provided, like in the {@link
+   * 
org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}.
+   *
+   * @param changeStream the table changes
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forChangeStream(
+      DataStream<TableChange> changeStream,
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(changeStream, tableLoader, lockFactory);
+  }
+
+  /**
+   * Creates the default monitor source for collecting the table changes and 
returns a builder for

Review Comment:
   nit: is this more clear?
   
   ```
   Use this for standalone maintenance job. It creates a monitor source that 
detect table changes and build the maintenance pipelines afterwards.
   ```



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory;
+import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Creates the table maintenance graph. */
+public class TableMaintenance {
+  private static final String TASK_NAME_FORMAT = "%s [%d]";
+  static final String SOURCE_NAME = "Monitor source";
+  static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager";
+  static final String LOCK_REMOVER_TASK_NAME = "Lock remover";
+
+  private TableMaintenance() {
+    // Do not instantiate directly
+  }
+
+  /**
+   * Use when the change stream is already provided, like in the {@link
+   * 
org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}.
+   *
+   * @param changeStream the table changes
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forChangeStream(
+      DataStream<TableChange> changeStream,
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(changeStream, tableLoader, lockFactory);
+  }
+
+  /**
+   * Creates the default monitor source for collecting the table changes and 
returns a builder for
+   * the maintenance stream.
+   *
+   * @param env used to register the monitor source
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forTable(
+      StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(env, tableLoader, lockFactory);
+  }
+
+  public static class Builder {
+    private final StreamExecutionEnvironment env;
+    private final DataStream<TableChange> inputStream;
+    private final TableLoader tableLoader;
+    private final List<MaintenanceTaskBuilder<?>> taskBuilders;
+    private final TriggerLockFactory lockFactory;
+
+    private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
+    private String slotSharingGroup = 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+    private Duration rateLimit = Duration.ofMillis(1);

Review Comment:
   is `1 ms` a good default?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory;
+import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Creates the table maintenance graph. */
+public class TableMaintenance {
+  private static final String TASK_NAME_FORMAT = "%s [%d]";
+  static final String SOURCE_NAME = "Monitor source";
+  static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager";
+  static final String LOCK_REMOVER_TASK_NAME = "Lock remover";
+
+  private TableMaintenance() {
+    // Do not instantiate directly
+  }
+
+  /**
+   * Use when the change stream is already provided, like in the {@link
+   * 
org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}.
+   *
+   * @param changeStream the table changes
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forChangeStream(
+      DataStream<TableChange> changeStream,
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(changeStream, tableLoader, lockFactory);
+  }
+
+  /**
+   * Creates the default monitor source for collecting the table changes and 
returns a builder for
+   * the maintenance stream.
+   *
+   * @param env used to register the monitor source
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forTable(
+      StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(env, tableLoader, lockFactory);
+  }
+
+  public static class Builder {
+    private final StreamExecutionEnvironment env;
+    private final DataStream<TableChange> inputStream;
+    private final TableLoader tableLoader;
+    private final List<MaintenanceTaskBuilder<?>> taskBuilders;
+    private final TriggerLockFactory lockFactory;
+
+    private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
+    private String slotSharingGroup = 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+    private Duration rateLimit = Duration.ofMillis(1);
+    private Duration lockCheckDelay = Duration.ofSeconds(30);

Review Comment:
   is `30s` a good default? is that based on the estimated average of task run 
time?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory;
+import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Creates the table maintenance graph. */
+public class TableMaintenance {
+  private static final String TASK_NAME_FORMAT = "%s [%d]";
+  static final String SOURCE_NAME = "Monitor source";
+  static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager";
+  static final String LOCK_REMOVER_TASK_NAME = "Lock remover";
+
+  private TableMaintenance() {
+    // Do not instantiate directly
+  }
+
+  /**
+   * Use when the change stream is already provided, like in the {@link
+   * 
org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}.
+   *
+   * @param changeStream the table changes
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forChangeStream(
+      DataStream<TableChange> changeStream,
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(changeStream, tableLoader, lockFactory);
+  }
+
+  /**
+   * Creates the default monitor source for collecting the table changes and 
returns a builder for
+   * the maintenance stream.
+   *
+   * @param env used to register the monitor source
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forTable(
+      StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(env, tableLoader, lockFactory);
+  }
+
+  public static class Builder {
+    private final StreamExecutionEnvironment env;
+    private final DataStream<TableChange> inputStream;
+    private final TableLoader tableLoader;
+    private final List<MaintenanceTaskBuilder<?>> taskBuilders;
+    private final TriggerLockFactory lockFactory;
+
+    private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
+    private String slotSharingGroup = 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+    private Duration rateLimit = Duration.ofMillis(1);
+    private Duration lockCheckDelay = Duration.ofSeconds(30);
+    private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+    private int maxReadBack = 100;
+
+    private Builder(
+        StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+      this.env = env;
+      this.inputStream = null;
+      this.tableLoader = tableLoader;
+      this.lockFactory = lockFactory;
+      this.taskBuilders = Lists.newArrayListWithCapacity(4);
+    }
+
+    private Builder(
+        DataStream<TableChange> inputStream,
+        TableLoader tableLoader,
+        TriggerLockFactory lockFactory) {
+      this.env = null;
+      this.inputStream = inputStream;
+      this.tableLoader = tableLoader;
+      this.lockFactory = lockFactory;
+      this.taskBuilders = Lists.newArrayListWithCapacity(4);
+    }
+
+    /**
+     * The prefix used for the generated {@link 
org.apache.flink.api.dag.Transformation}'s uid.
+     *
+     * @param newUidSuffix for the transformations
+     * @return for chained calls
+     */
+    public Builder uidSuffix(String newUidSuffix) {
+      this.uidSuffix = newUidSuffix;
+      return this;
+    }
+
+    /**
+     * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all 
the operators of the
+     * generated stream. Could be used to separate the resources used by this 
task.
+     *
+     * @param newSlotSharingGroup to be used for the operators
+     * @return for chained calls
+     */
+    public Builder slotSharingGroup(String newSlotSharingGroup) {
+      this.slotSharingGroup = newSlotSharingGroup;
+      return this;
+    }
+
+    /**
+     * Limits the firing frequency for the task triggers.
+     *
+     * @param newRateLimit firing frequency
+     * @return for chained calls
+     */
+    public Builder rateLimit(Duration newRateLimit) {
+      Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should 
be greater than 0");
+      this.rateLimit = newRateLimit;
+      return this;
+    }
+
+    /**
+     * Sets the delay for checking lock availability when a concurrent run is 
detected.
+     *
+     * @param newLockCheckDelay lock checking frequency
+     * @return for chained calls
+     */
+    public Builder lockCheckDelay(Duration newLockCheckDelay) {
+      this.lockCheckDelay = newLockCheckDelay;
+      return this;
+    }
+
+    /**
+     * Sets the global parallelism of maintenance tasks. Could be overwritten 
by the {@link

Review Comment:
   nit: global -> default?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Predicate;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Delete the files using the {@link FileIO}. */
+@Internal
+public class AsyncDeleteFiles extends RichAsyncFunction<String, Boolean> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncDeleteFiles.class);
+  public static final Predicate<Collection<Boolean>> FAILED_PREDICATE = new 
FailedPredicate();
+
+  private final String name;
+  private final FileIO io;
+  private final int workerPoolSize;
+  private final String tableName;
+
+  private transient ExecutorService workerPool;
+  private transient Counter failedCounter;
+  private transient Counter succeededCounter;
+
+  public AsyncDeleteFiles(String name, TableLoader tableLoader, int 
workerPoolSize) {
+    Preconditions.checkNotNull(name, "Name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.name = name;
+    tableLoader.open();
+    Table table = tableLoader.loadTable();
+    this.io = table.io();
+    this.workerPoolSize = workerPoolSize;
+    this.tableName = table.name();
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.failedCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+            .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
+    this.succeededCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+            .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);
+
+    this.workerPool =
+        ThreadPools.newWorkerPool(tableName + "-" + name + 
"-async-delete-files", workerPoolSize);
+  }
+
+  @Override
+  public void asyncInvoke(String fileName, ResultFuture<Boolean> resultFuture) 
{
+    workerPool.execute(
+        () -> {
+          try {
+            LOG.info("Deleting file: {} with {}", fileName, name);
+            io.deleteFile(fileName);

Review Comment:
   > It is usually not a good practice to block a checkpoint with long running 
tasks. I'm a bit concerned because of the delay caused by executing the deletes 
on checkpoint.
   
   I see no difference with `IcebergStreamWriter#prepareSnapshotPreBarrier`, 
which does the file flush and upload in the synchronous part of checkpoint.
   
   >  I agree that the other FileIO implementations could be faster with the 
bulk delete
   
   Bulk deletes are not only a lot faster. they can also avoid potential 
throttling from S3. We should definitely leverage bulk deletes whenever it is 
applicable.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory;
+import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/** Creates the table maintenance graph. */
+public class TableMaintenance {
+  private static final String TASK_NAME_FORMAT = "%s [%d]";
+  static final String SOURCE_NAME = "Monitor source";
+  static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager";
+  static final String LOCK_REMOVER_TASK_NAME = "Lock remover";
+
+  private TableMaintenance() {
+    // Do not instantiate directly
+  }
+
+  /**
+   * Use when the change stream is already provided, like in the {@link
+   * 
org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}.
+   *
+   * @param changeStream the table changes
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forChangeStream(
+      DataStream<TableChange> changeStream,
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(changeStream, tableLoader, lockFactory);
+  }
+
+  /**
+   * Creates the default monitor source for collecting the table changes and 
returns a builder for
+   * the maintenance stream.
+   *
+   * @param env used to register the monitor source
+   * @param tableLoader used for accessing the table
+   * @param lockFactory used for preventing concurrent task runs
+   * @return builder for the maintenance stream
+   */
+  public static Builder forTable(
+      StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+    Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
+
+    return new Builder(env, tableLoader, lockFactory);
+  }
+
+  public static class Builder {
+    private final StreamExecutionEnvironment env;
+    private final DataStream<TableChange> inputStream;
+    private final TableLoader tableLoader;
+    private final List<MaintenanceTaskBuilder<?>> taskBuilders;
+    private final TriggerLockFactory lockFactory;
+
+    private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
+    private String slotSharingGroup = 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+    private Duration rateLimit = Duration.ofMillis(1);
+    private Duration lockCheckDelay = Duration.ofSeconds(30);
+    private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+    private int maxReadBack = 100;
+
+    private Builder(
+        StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+      this.env = env;
+      this.inputStream = null;
+      this.tableLoader = tableLoader;
+      this.lockFactory = lockFactory;
+      this.taskBuilders = Lists.newArrayListWithCapacity(4);
+    }
+
+    private Builder(
+        DataStream<TableChange> inputStream,
+        TableLoader tableLoader,
+        TriggerLockFactory lockFactory) {
+      this.env = null;
+      this.inputStream = inputStream;
+      this.tableLoader = tableLoader;
+      this.lockFactory = lockFactory;
+      this.taskBuilders = Lists.newArrayListWithCapacity(4);
+    }
+
+    /**
+     * The prefix used for the generated {@link 
org.apache.flink.api.dag.Transformation}'s uid.
+     *
+     * @param newUidSuffix for the transformations
+     * @return for chained calls
+     */
+    public Builder uidSuffix(String newUidSuffix) {
+      this.uidSuffix = newUidSuffix;
+      return this;
+    }
+
+    /**
+     * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all 
the operators of the
+     * generated stream. Could be used to separate the resources used by this 
task.
+     *
+     * @param newSlotSharingGroup to be used for the operators
+     * @return for chained calls
+     */
+    public Builder slotSharingGroup(String newSlotSharingGroup) {
+      this.slotSharingGroup = newSlotSharingGroup;
+      return this;
+    }
+
+    /**
+     * Limits the firing frequency for the task triggers.
+     *
+     * @param newRateLimit firing frequency
+     * @return for chained calls
+     */
+    public Builder rateLimit(Duration newRateLimit) {
+      Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should 
be greater than 0");
+      this.rateLimit = newRateLimit;
+      return this;
+    }
+
+    /**
+     * Sets the delay for checking lock availability when a concurrent run is 
detected.
+     *
+     * @param newLockCheckDelay lock checking frequency
+     * @return for chained calls
+     */
+    public Builder lockCheckDelay(Duration newLockCheckDelay) {
+      this.lockCheckDelay = newLockCheckDelay;
+      return this;
+    }
+
+    /**
+     * Sets the global parallelism of maintenance tasks. Could be overwritten 
by the {@link
+     * MaintenanceTaskBuilder#parallelism(int)}.
+     *
+     * @param newParallelism task parallelism
+     * @return for chained calls
+     */
+    public Builder parallelism(int newParallelism) {
+      this.parallelism = newParallelism;
+      return this;
+    }
+
+    /**
+     * Maximum number of snapshots checked when started with an embedded 
{@link MonitorSource} at
+     * the first time. Only available when the {@link
+     * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, 
TriggerLockFactory)} is
+     * used.
+     *
+     * @param newMaxReadBack snapshots to consider when initializing
+     * @return for chained calls
+     */
+    public Builder maxReadBack(int newMaxReadBack) {
+      Preconditions.checkArgument(
+          inputStream == null, "Can't set maxReadBack when change stream is 
provided");
+      this.maxReadBack = newMaxReadBack;
+      return this;
+    }
+
+    /**
+     * Adds a specific task with the given schedule.
+     *
+     * @param task to add
+     * @return for chained calls
+     */
+    public Builder add(MaintenanceTaskBuilder<?> task) {
+      taskBuilders.add(task);
+      return this;
+    }
+
+    /** Builds the task graph for the maintenance tasks. */
+    public void append() {
+      Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least 
one task");
+      Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null");
+
+      List<String> taskNames = 
Lists.newArrayListWithCapacity(taskBuilders.size());
+      List<TriggerEvaluator> evaluators = 
Lists.newArrayListWithCapacity(taskBuilders.size());
+      for (int i = 0; i < taskBuilders.size(); ++i) {
+        taskNames.add(nameFor(taskBuilders.get(i), i));
+        evaluators.add(taskBuilders.get(i).evaluator());
+      }
+
+      DataStream<Trigger> triggers =
+          DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> 
true)
+              .process(
+                  new TriggerManager(
+                      tableLoader,
+                      lockFactory,
+                      taskNames,
+                      evaluators,
+                      rateLimit.toMillis(),
+                      lockCheckDelay.toMillis()))
+              .name(TRIGGER_MANAGER_TASK_NAME)
+              .uid("trigger-manager-" + uidSuffix)
+              .slotSharingGroup(slotSharingGroup)
+              .forceNonParallel()
+              .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
+              .name("Watermark Assigner")
+              .uid("watermark-assigner-" + uidSuffix)
+              .slotSharingGroup(slotSharingGroup)
+              .forceNonParallel();
+
+      // Add the specific tasks
+      DataStream<TaskResult> unioned = null;
+      for (int i = 0; i < taskBuilders.size(); ++i) {
+        int finalIndex = i;
+        DataStream<Trigger> filtered =
+            triggers
+                .filter(t -> t.taskId() != null && t.taskId() == finalIndex)
+                .name("Filter " + i)
+                .forceNonParallel()
+                .uid("filter-" + i + "-" + uidSuffix)
+                .slotSharingGroup(slotSharingGroup);
+        MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
+        DataStream<TaskResult> result =
+            builder.append(
+                filtered,
+                i,
+                taskNames.get(i),
+                tableLoader,
+                uidSuffix,
+                slotSharingGroup,
+                parallelism);
+        if (unioned == null) {
+          unioned = result;
+        } else {
+          unioned = unioned.union(result);
+        }
+      }
+
+      // Add the LockRemover to the end
+      unioned
+          .global()

Review Comment:
   do we need `global` with the `forceNonParallel` below?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> 
{
+  private int index;
+  private String name;
+  private TableLoader tableLoader;
+  private String uidSuffix = null;
+  private String slotSharingGroup = null;
+  private Integer parallelism = null;
+  private TriggerEvaluator.Builder triggerEvaluator = new 
TriggerEvaluator.Builder();
+
+  abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream);
+
+  /**
+   * After a given number of Iceberg table commits since the last run, starts 
the downstream job.
+   *
+   * @param commitCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnCommitCount(int commitCount) {
+    triggerEvaluator.commitCount(commitCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new data files since the last run, starts the 
downstream job.
+   *
+   * @param dataFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnDataFileCount(int dataFileCount) {
+    triggerEvaluator.dataFileCount(dataFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given aggregated data file size since the last run, starts the 
downstream job.
+   *
+   * @param dataFileSizeInBytes after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnDataFileSize(long dataFileSizeInBytes) {
+    triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new positional delete files since the last run, 
starts the downstream
+   * job.
+   *
+   * @param posDeleteFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) {
+    triggerEvaluator.posDeleteFileCount(posDeleteFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new positional delete records since the last run, 
starts the downstream
+   * job.
+   *
+   * @param posDeleteRecordCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) {
+    triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new equality delete files since the last run, 
starts the downstream
+   * job.
+   *
+   * @param eqDeleteFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) {
+    triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new equality delete records since the last run, 
starts the downstream
+   * job.
+   *
+   * @param eqDeleteRecordCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) {
+    triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given time since the last run, starts the downstream job.
+   *
+   * @param interval after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnInterval(Duration interval) {
+    triggerEvaluator.timeout(interval);
+    return (T) this;
+  }
+
+  /**
+   * The suffix used for the generated {@link 
org.apache.flink.api.dag.Transformation}'s uid.
+   *
+   * @param newUidSuffix for the transformations
+   * @return for chained calls
+   */
+  public T uidSuffix(String newUidSuffix) {
+    this.uidSuffix = newUidSuffix;
+    return (T) this;
+  }
+
+  /**
+   * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all 
the operators of the
+   * generated stream. Could be used to separate the resources used by this 
task.
+   *
+   * @param newSlotSharingGroup to be used for the operators
+   * @return for chained calls
+   */
+  public T slotSharingGroup(String newSlotSharingGroup) {
+    this.slotSharingGroup = newSlotSharingGroup;
+    return (T) this;
+  }
+
+  /**
+   * Sets the parallelism for the stream.
+   *
+   * @param newParallelism the required parallelism
+   * @return for chained calls
+   */
+  public T parallelism(int newParallelism) {
+    this.parallelism = newParallelism;
+    return (T) this;
+  }
+
+  @Internal
+  int id() {
+    return index;
+  }
+
+  @Internal
+  String name() {
+    return name;
+  }
+
+  @Internal
+  TableLoader tableLoader() {
+    return tableLoader;
+  }
+
+  @Internal
+  String uidSuffix() {
+    return uidSuffix;
+  }
+
+  @Internal
+  String slotSharingGroup() {
+    return slotSharingGroup;
+  }
+
+  @Internal
+  Integer parallelism() {
+    return parallelism;
+  }
+
+  @Internal
+  TriggerEvaluator evaluator() {
+    return triggerEvaluator.build();
+  }
+
+  @Internal
+  DataStream<TaskResult> append(
+      DataStream<Trigger> sourceStream,
+      int maintenanceTaskIndex,
+      String maintainanceTaskName,
+      TableLoader newTableLoader,
+      String mainUidSuffix,
+      String mainSlotSharingGroup,
+      int mainParallelism) {
+    Preconditions.checkArgument(
+        parallelism == null || parallelism == -1 || parallelism > 0,
+        "Parallelism should be left to default (-1/null) or greater than 0");
+    Preconditions.checkNotNull(maintainanceTaskName, "Name should not be 
null");
+    Preconditions.checkNotNull(newTableLoader, "TableLoader should not be 
null");
+
+    this.index = maintenanceTaskIndex;
+    this.name = maintainanceTaskName;
+    this.tableLoader = newTableLoader;
+
+    if (uidSuffix == null) {

Review Comment:
   wondering if we need both `uidSuffix` and `mainUidSuffix`. can we just use 
`mainUidSuffix`?



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> 
{

Review Comment:
   I am not saying we shouldn't. but it is usually good to keep them private 
first so that we are free to evolve the class. Maybe wait until the need is 
clear.
   
   Use Spark as an example. the `BaseSparkAction` is package private.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;

Review Comment:
   >  It is hard to understand what is a public API, and what should only used 
internally.
   
   Ideally, this should be achieved via Java scoping (public, package private, 
private etc.)
   
   > I agree that the stream might not be the best name, maybe api would be 
better, but I would try to separate out the classes used by the user from the 
classes only used by the implementation.
   
   This is not the tradition/style that Iceberg connectors (Spark, Flink) have 
been following. If we are going to have `api` package, it should be one under 
`org.apache.iceberg.flink.api`. It seems complicated to have `api` sub package 
under every sub namespaces like 
`org.apache.iceberg.flink.source/sink/data/maintenance/actions`. Anyway, my 
preference is not to introduce `api` sub package. But we can also try to get 
other people's feedbacks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to