stevenzwu commented on code in PR #10484:
URL: https://github.com/apache/iceberg/pull/10484#discussion_r1698852703


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link
+ * ManageSnapshots#removeTag(String)} based lock implementation for {@link 
TriggerLockFactory}.
+ */
+@Internal
+public class TagBasedLockFactory implements TriggerLockFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TagBasedLockFactory.class);
+  private static final String RUNNING_TAG = "__flink_maintenance_running";
+  private static final String RECOVERING_TAG = 
"__flink_maintenance_recovering";
+  private static final int CHANGE_ATTEMPTS = 3;
+
+  private final TableLoader tableLoader;
+  private transient Table table;
+
+  public TagBasedLockFactory(TableLoader tableLoader) {
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void open() {
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createLock() {
+    return new Lock(table, RUNNING_TAG);
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createRecoveryLock() {
+    return new Lock(table, RECOVERING_TAG);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+  }
+
+  public static class Lock implements TriggerLockFactory.Lock {
+    private final Table table;
+    private final String lockKey;
+
+    public Lock(Table table, String lockKey) {
+      Preconditions.checkNotNull(table, "Table should not be null");
+      Preconditions.checkNotNull(lockKey, "Lock key should not be null");
+      this.table = table;
+      this.lockKey = lockKey;
+    }
+
+    /**
+     * The lock will be acquired by jobs with creating a new tag. A new empty 
commit is added for a
+     * table without snapshots.
+     *
+     * @return <code>true</code> if the lock is acquired by this operator
+     */
+    @Override
+    public boolean tryLock() {
+      if (isHeld()) {
+        LOG.info("Lock is already held");
+        return false;
+      }
+
+      if (table.currentSnapshot() == null) {
+        // Create an empty commit
+        table.newFastAppend().commit();
+        LOG.info("Empty table, new empty commit added for using tags");
+      }
+
+      try {
+        Tasks.foreach(1)
+            .retry(CHANGE_ATTEMPTS)
+            .stopOnFailure()
+            .throwFailureWhenFinished()
+            .run(
+                unused -> {
+                  table.refresh();
+                  ManageSnapshots manage = table.manageSnapshots();
+                  manage.createTag(lockKey, 
table.currentSnapshot().snapshotId());
+                  manage.commit();
+                  LOG.debug("Lock created");
+                });
+      } catch (Exception e) {
+        LOG.info("Concurrent lock created. Stop concurrent maintenance jobs.", 
e);

Review Comment:
   nit: this error msg doesn't seem correct. it just failed to acquire lock. it 
doesn't actually `stop` anything.



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link
+ * ManageSnapshots#removeTag(String)} based lock implementation for {@link 
TriggerLockFactory}.
+ */
+@Internal
+public class TagBasedLockFactory implements TriggerLockFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TagBasedLockFactory.class);
+  private static final String RUNNING_TAG = "__flink_maintenance_running";
+  private static final String RECOVERING_TAG = 
"__flink_maintenance_recovering";
+  private static final int CHANGE_ATTEMPTS = 3;
+
+  private final TableLoader tableLoader;
+  private transient Table table;
+
+  public TagBasedLockFactory(TableLoader tableLoader) {
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void open() {
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createLock() {
+    return new Lock(table, RUNNING_TAG);
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createRecoveryLock() {
+    return new Lock(table, RECOVERING_TAG);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+  }
+
+  public static class Lock implements TriggerLockFactory.Lock {
+    private final Table table;
+    private final String lockKey;
+
+    public Lock(Table table, String lockKey) {
+      Preconditions.checkNotNull(table, "Table should not be null");
+      Preconditions.checkNotNull(lockKey, "Lock key should not be null");
+      this.table = table;
+      this.lockKey = lockKey;
+    }
+
+    /**
+     * The lock will be acquired by jobs with creating a new tag. A new empty 
commit is added for a
+     * table without snapshots.
+     *
+     * @return <code>true</code> if the lock is acquired by this operator
+     */
+    @Override
+    public boolean tryLock() {
+      if (isHeld()) {
+        LOG.info("Lock is already held");
+        return false;
+      }
+
+      if (table.currentSnapshot() == null) {
+        // Create an empty commit
+        table.newFastAppend().commit();
+        LOG.info("Empty table, new empty commit added for using tags");
+      }
+
+      try {
+        Tasks.foreach(1)
+            .retry(CHANGE_ATTEMPTS)

Review Comment:
   wondering if retry is safe here. Could the lock/unblock action initiated by 
multiple threads/operator subtasks?
   
   here is one scenario I am thinking about.
   
   - t1: operator 1 calls unlock, it actually removed the tag by the catalog 
service/backend, but response failed to get back due to network issue
   - t2: operator 2 calls lock, and created the tag successfully.
   - t3: operator 1 retry the unlock and removed the tag
   
   If the above scenario couldn't happen, let's call out the constraints in the 
Javadoc of this class.
   
   also if the tag is already removed, there is no need to retry. but I guess 
this is not a correctness problem. just minor behavior improvement. don't know 
if we can easily distinguish this case with other errors.



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<TableChange> accumulatedChangesState;
+  private transient ListState<Long> lastTriggerTimesState;
+  private transient Long nextEvaluationTime;
+  private transient List<TableChange> accumulatedChanges;
+  private transient List<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean shouldRestoreTasks = false;
+  private transient boolean inited = false;
+  // To keep the task scheduling fair we keep the last triggered task position 
in memory.
+  // If we find a task to trigger, then we run it, but after it is finished, 
we start from the given
+  // position to prevent "starvation" of the tasks.
+  // When there is nothing to trigger, we start from the beginning, as the 
order of the tasks might
+  // be important (RewriteDataFiles first, and then RewriteManifestFiles later)
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null 
or empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 
ms.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTimeState =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChangesState =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimesState =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    if (inited) {
+      // Only store state if initialized
+      nextEvaluationTimeState.update(nextEvaluationTime);
+      accumulatedChangesState.update(accumulatedChanges);
+      lastTriggerTimesState.update(lastTriggerTimes);
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    lockFactory.open();
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      shouldRestoreTasks = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime;
+    if (nextTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
+          current,
+          nextTime,
+          accumulatedChanges);
+      rateLimiterTriggeredCounter.inc();
+    }
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
+    init(out, ctx.timerService());
+    this.nextEvaluationTime = null;
+    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+    lockFactory.close();
+  }
+
+  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out) {
+    if (shouldRestoreTasks) {
+      if (recoveryLock.isHeld()) {
+        // Recovered tasks in progress. Skip trigger check
+        LOG.debug("The cleanup lock is still held at {}", current);
+        schedule(timerService, current + lockCheckDelayMs);
+        return;
+      } else {
+        LOG.info("The cleanup is finished at {}", current);
+        shouldRestoreTasks = false;
+      }
+    }
+
+    Integer taskToStart =
+        nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
+    if (taskToStart == null) {
+      // Nothing to execute
+      if (startsFrom == 0) {
+        nothingToTriggerCounter.inc();
+        LOG.debug("Nothing to execute at {} for collected: {}", current, 
accumulatedChanges);
+      }
+
+      // Next time start from the beginning
+      startsFrom = 0;
+      return;
+    }
+
+    if (lock.tryLock()) {
+      TableChange change = accumulatedChanges.get(taskToStart);
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      out.collect(Trigger.create(current, table, taskToStart));
+      LOG.debug("Fired event with time: {}, collected: {} for {}", current, 
change, table.name());
+      triggerCounters.get(taskToStart).inc();
+      accumulatedChanges.set(taskToStart, TableChange.empty());
+      lastTriggerTimes.set(taskToStart, current);
+      schedule(timerService, current + minFireDelayMs);
+      startsFrom = taskToStart + 1;
+    } else {
+      // The lock is already held by someone
+      LOG.info("Delaying task on failed lock check: {}", current);
+
+      startsFrom = taskToStart;
+      concurrentRunTriggeredCounter.inc();
+      schedule(timerService, current + lockCheckDelayMs);
+    }
+
+    timerService.registerProcessingTimeTimer(nextEvaluationTime);
+  }
+
+  private void schedule(TimerService timerService, long time) {
+    this.nextEvaluationTime = time;
+    timerService.registerProcessingTimeTimer(time);
+  }
+
+  private static Integer nextTrigger(
+      List<TriggerEvaluator> evaluators,
+      List<TableChange> changes,
+      List<Long> lastTriggerTimes,
+      long currentTime,
+      int startPos) {
+    int normalizedStartingPos = startPos % evaluators.size();
+    int current = normalizedStartingPos;
+    do {
+      if (evaluators
+          .get(current)
+          .check(changes.get(current), lastTriggerTimes.get(current), 
currentTime)) {
+        return current;
+      }
+
+      current = (current + 1) % evaluators.size();
+    } while (current != normalizedStartingPos);
+
+    return null;
+  }
+
+  private void init(Collector<Trigger> out, TimerService timerService) throws 
Exception {
+    if (!inited) {
+      long current = timerService.currentProcessingTime();
+
+      // Initialize from state
+      this.nextEvaluationTime = nextEvaluationTimeState.value();
+      this.accumulatedChanges = 
Lists.newArrayList(accumulatedChangesState.get());
+      this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get());
+
+      // Initialize if the state was empty
+      if (accumulatedChanges.isEmpty()) {
+        for (int i = 0; i < evaluators.size(); ++i) {
+          accumulatedChanges.add(TableChange.empty());
+          lastTriggerTimes.add(current);
+        }
+      }
+
+      if (shouldRestoreTasks) {
+        // When the job state is restored, there could be ongoing tasks.
+        // To prevent collision with the new triggers the following is done:
+        //  - add a cleanup lock

Review Comment:
   nit: for consistency, maybe `acquire the recovery lock`



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link
+ * ManageSnapshots#removeTag(String)} based lock implementation for {@link 
TriggerLockFactory}.
+ */

Review Comment:
   let's call out its intended scope/limitation of the tag based locking: 
mainly only working with single job scenario. while we are clarifying that, 
maybe also call out multiple Flink jobs running compaction is not recommended.



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<TableChange> accumulatedChangesState;
+  private transient ListState<Long> lastTriggerTimesState;
+  private transient Long nextEvaluationTime;
+  private transient List<TableChange> accumulatedChanges;
+  private transient List<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean isCleanUp = false;
+  private transient boolean inited = false;
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Evaluators should not be 
empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 
1ms.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTimeState =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChangesState =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimesState =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    if (inited) {
+      // Only store state if initialized
+      nextEvaluationTimeState.update(nextEvaluationTime);
+      accumulatedChangesState.update(accumulatedChanges);
+      lastTriggerTimesState.update(lastTriggerTimes);
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      isCleanUp = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime;
+    if (nextTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
+          current,
+          nextTime,
+          accumulatedChanges);
+      rateLimiterTriggeredCounter.inc();
+    }
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
+    init(out, ctx.timerService());
+    this.nextEvaluationTime = null;
+    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+  }
+
+  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out) {
+    if (isCleanUp) {
+      if (recoveryLock.isHeld()) {
+        LOG.debug("The cleanup lock is still held at {}", current);
+        schedule(timerService, current + lockCheckDelayMs);
+        return;
+      } else {
+        LOG.info("The cleanup is finished at {}", current);
+        isCleanUp = false;
+      }
+    }
+
+    Integer taskToStart =
+        nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
+    if (taskToStart == null) {
+      // Nothing to execute
+      if (startsFrom == 0) {
+        nothingToTriggerCounter.inc();
+        LOG.debug("Nothing to execute at {} for collected: {}", current, 
accumulatedChanges);
+      }
+
+      startsFrom = 0;
+      return;
+    }
+
+    if (lock.tryLock()) {
+      TableChange change = accumulatedChanges.get(taskToStart);
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      out.collect(Trigger.create(current, table, taskToStart));
+      LOG.debug("Fired event with time: {}, collected: {} for {}", current, 
change, table.name());
+      triggerCounters.get(taskToStart).inc();
+      accumulatedChanges.set(taskToStart, TableChange.empty());
+      lastTriggerTimes.set(taskToStart, current);
+      schedule(timerService, current + minFireDelayMs);
+      startsFrom = taskToStart + 1;
+    } else {
+      // The lock is already held by someone
+      LOG.info("Delaying task on failed lock check: {}", current);
+
+      startsFrom = taskToStart;
+      concurrentRunTriggeredCounter.inc();
+      schedule(timerService, current + lockCheckDelayMs);
+    }
+
+    timerService.registerProcessingTimeTimer(nextEvaluationTime);
+  }
+
+  private void schedule(TimerService timerService, long time) {
+    this.nextEvaluationTime = time;

Review Comment:
   I saw 4 places where `schedule()` were called in `TriggerManager`
   
   1. init: `if (shouldRestoreTasks)`
   2. checkAndFire: `if (recoveryLock.isHeld())`
   3. checkAndFire: `if (lock.tryLock())` and `else` (these two are mutually 
exclusive)
   
   1 and 2 can co-exist. 1 and 3 also seems to can happen together.



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** . */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTime;
+  private transient ListState<TableChange> accumulatedChanges;
+  private transient ListState<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean isCleanUp = false;
+  private transient boolean inited = false;
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Evaluators should not be 
empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTime =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChanges =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimes =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      isCleanUp = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime.value();
+
+    // Add the new changes to the already accumulated ones
+    List<TableChange> accumulated = 
Lists.newArrayList(accumulatedChanges.get());
+    accumulated.forEach(tableChange -> tableChange.merge(change));
+    accumulatedChanges.update(accumulated);
+
+    if (nextTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(

Review Comment:
   oh. I didn't mean to "run the checkAndFire when the nextTime is larger then 
the current". anyway, I guess it is not important in this else section.



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link
+ * ManageSnapshots#removeTag(String)} based lock implementation for {@link 
TriggerLockFactory}.
+ */
+@Internal
+public class TagBasedLockFactory implements TriggerLockFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TagBasedLockFactory.class);
+  private static final String RUNNING_TAG = "__flink_maintenance_running";
+  private static final String RECOVERING_TAG = 
"__flink_maintenance_recovering";
+  private static final int CHANGE_ATTEMPTS = 3;
+
+  private final TableLoader tableLoader;
+  private transient Table table;
+
+  public TagBasedLockFactory(TableLoader tableLoader) {
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public void open() {
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createLock() {
+    return new Lock(table, RUNNING_TAG);
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createRecoveryLock() {
+    return new Lock(table, RECOVERING_TAG);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+  }
+
+  public static class Lock implements TriggerLockFactory.Lock {
+    private final Table table;
+    private final String lockKey;
+
+    public Lock(Table table, String lockKey) {
+      Preconditions.checkNotNull(table, "Table should not be null");
+      Preconditions.checkNotNull(lockKey, "Lock key should not be null");
+      this.table = table;
+      this.lockKey = lockKey;
+    }
+
+    /**
+     * The lock will be acquired by jobs with creating a new tag. A new empty 
commit is added for a
+     * table without snapshots.
+     *
+     * @return <code>true</code> if the lock is acquired by this operator
+     */
+    @Override
+    public boolean tryLock() {
+      if (isHeld()) {
+        LOG.info("Lock is already held");
+        return false;
+      }
+
+      if (table.currentSnapshot() == null) {
+        // Create an empty commit
+        table.newFastAppend().commit();
+        LOG.info("Empty table, new empty commit added for using tags");
+      }
+
+      try {
+        Tasks.foreach(1)
+            .retry(CHANGE_ATTEMPTS)

Review Comment:
   also how do we deal with orphaned locks where unlock failed (e.g. due to 
network issue with catalog service)



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<TableChange> accumulatedChangesState;
+  private transient ListState<Long> lastTriggerTimesState;
+  private transient Long nextEvaluationTime;
+  private transient List<TableChange> accumulatedChanges;
+  private transient List<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean shouldRestoreTasks = false;
+  private transient boolean inited = false;
+  // To keep the task scheduling fair we keep the last triggered task position 
in memory.
+  // If we find a task to trigger, then we run it, but after it is finished, 
we start from the given
+  // position to prevent "starvation" of the tasks.
+  // When there is nothing to trigger, we start from the beginning, as the 
order of the tasks might
+  // be important (RewriteDataFiles first, and then RewriteManifestFiles later)
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null 
or empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 
ms.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTimeState =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChangesState =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimesState =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    if (inited) {
+      // Only store state if initialized
+      nextEvaluationTimeState.update(nextEvaluationTime);
+      accumulatedChangesState.update(accumulatedChanges);
+      lastTriggerTimesState.update(lastTriggerTimes);
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    lockFactory.open();
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      shouldRestoreTasks = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime;
+    if (nextTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
+          current,
+          nextTime,
+          accumulatedChanges);
+      rateLimiterTriggeredCounter.inc();
+    }
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
+    init(out, ctx.timerService());
+    this.nextEvaluationTime = null;
+    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+    lockFactory.close();
+  }
+
+  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out) {
+    if (shouldRestoreTasks) {
+      if (recoveryLock.isHeld()) {
+        // Recovered tasks in progress. Skip trigger check
+        LOG.debug("The cleanup lock is still held at {}", current);
+        schedule(timerService, current + lockCheckDelayMs);
+        return;
+      } else {
+        LOG.info("The cleanup is finished at {}", current);
+        shouldRestoreTasks = false;
+      }
+    }
+
+    Integer taskToStart =
+        nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
+    if (taskToStart == null) {
+      // Nothing to execute
+      if (startsFrom == 0) {
+        nothingToTriggerCounter.inc();
+        LOG.debug("Nothing to execute at {} for collected: {}", current, 
accumulatedChanges);
+      }
+
+      // Next time start from the beginning
+      startsFrom = 0;
+      return;
+    }
+
+    if (lock.tryLock()) {
+      TableChange change = accumulatedChanges.get(taskToStart);
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      out.collect(Trigger.create(current, table, taskToStart));
+      LOG.debug("Fired event with time: {}, collected: {} for {}", current, 
change, table.name());
+      triggerCounters.get(taskToStart).inc();
+      accumulatedChanges.set(taskToStart, TableChange.empty());
+      lastTriggerTimes.set(taskToStart, current);
+      schedule(timerService, current + minFireDelayMs);
+      startsFrom = taskToStart + 1;
+    } else {
+      // The lock is already held by someone

Review Comment:
   what other operator (besides TriggerManager here) subtask can acquire lock?



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<TableChange> accumulatedChangesState;
+  private transient ListState<Long> lastTriggerTimesState;
+  private transient Long nextEvaluationTime;
+  private transient List<TableChange> accumulatedChanges;
+  private transient List<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean shouldRestoreTasks = false;
+  private transient boolean inited = false;
+  // To keep the task scheduling fair we keep the last triggered task position 
in memory.
+  // If we find a task to trigger, then we run it, but after it is finished, 
we start from the given
+  // position to prevent "starvation" of the tasks.
+  // When there is nothing to trigger, we start from the beginning, as the 
order of the tasks might
+  // be important (RewriteDataFiles first, and then RewriteManifestFiles later)
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null 
or empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 
ms.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTimeState =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChangesState =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimesState =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    if (inited) {
+      // Only store state if initialized
+      nextEvaluationTimeState.update(nextEvaluationTime);
+      accumulatedChangesState.update(accumulatedChanges);
+      lastTriggerTimesState.update(lastTriggerTimes);
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    lockFactory.open();
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      shouldRestoreTasks = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime;
+    if (nextTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
+          current,
+          nextTime,
+          accumulatedChanges);
+      rateLimiterTriggeredCounter.inc();
+    }
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
+    init(out, ctx.timerService());
+    this.nextEvaluationTime = null;
+    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+    lockFactory.close();
+  }
+
+  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out) {
+    if (shouldRestoreTasks) {
+      if (recoveryLock.isHeld()) {
+        // Recovered tasks in progress. Skip trigger check
+        LOG.debug("The cleanup lock is still held at {}", current);
+        schedule(timerService, current + lockCheckDelayMs);
+        return;
+      } else {
+        LOG.info("The cleanup is finished at {}", current);
+        shouldRestoreTasks = false;
+      }
+    }
+
+    Integer taskToStart =
+        nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
+    if (taskToStart == null) {
+      // Nothing to execute
+      if (startsFrom == 0) {
+        nothingToTriggerCounter.inc();
+        LOG.debug("Nothing to execute at {} for collected: {}", current, 
accumulatedChanges);
+      }
+
+      // Next time start from the beginning
+      startsFrom = 0;
+      return;
+    }
+
+    if (lock.tryLock()) {
+      TableChange change = accumulatedChanges.get(taskToStart);
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      out.collect(Trigger.create(current, table, taskToStart));
+      LOG.debug("Fired event with time: {}, collected: {} for {}", current, 
change, table.name());
+      triggerCounters.get(taskToStart).inc();
+      accumulatedChanges.set(taskToStart, TableChange.empty());
+      lastTriggerTimes.set(taskToStart, current);
+      schedule(timerService, current + minFireDelayMs);
+      startsFrom = taskToStart + 1;
+    } else {
+      // The lock is already held by someone
+      LOG.info("Delaying task on failed lock check: {}", current);
+
+      startsFrom = taskToStart;
+      concurrentRunTriggeredCounter.inc();
+      schedule(timerService, current + lockCheckDelayMs);
+    }
+
+    timerService.registerProcessingTimeTimer(nextEvaluationTime);
+  }
+
+  private void schedule(TimerService timerService, long time) {
+    this.nextEvaluationTime = time;
+    timerService.registerProcessingTimeTimer(time);
+  }
+
+  private static Integer nextTrigger(
+      List<TriggerEvaluator> evaluators,
+      List<TableChange> changes,
+      List<Long> lastTriggerTimes,
+      long currentTime,
+      int startPos) {
+    int normalizedStartingPos = startPos % evaluators.size();
+    int current = normalizedStartingPos;
+    do {
+      if (evaluators
+          .get(current)
+          .check(changes.get(current), lastTriggerTimes.get(current), 
currentTime)) {
+        return current;
+      }
+
+      current = (current + 1) % evaluators.size();
+    } while (current != normalizedStartingPos);
+
+    return null;
+  }
+
+  private void init(Collector<Trigger> out, TimerService timerService) throws 
Exception {
+    if (!inited) {
+      long current = timerService.currentProcessingTime();
+
+      // Initialize from state
+      this.nextEvaluationTime = nextEvaluationTimeState.value();

Review Comment:
   it is probably cleaner to do line 296-306  in `initializeState` method in 
the `if (context.isRestored())` block



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Map;
+import org.apache.flink.annotation.Internal;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link
+ * ManageSnapshots#removeTag(String)} based lock implementation for {@link 
TriggerLockFactory}.
+ */
+@Internal
+public class TagBasedLockFactory implements TriggerLockFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TagBasedLockFactory.class);
+  private static final String RUNNING_TAG = "__flink_maintenance_running";
+  private static final String RECOVERING_TAG = 
"__flink_maintenance_recovering";
+  private static final int CHANGE_ATTEMPTS = 3;
+
+  private final TableLoader tableLoader;
+
+  public TagBasedLockFactory(TableLoader tableLoader) {
+    this.tableLoader = tableLoader;
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createLock() {
+    return new Lock(tableLoader, RUNNING_TAG);
+  }
+
+  @Override
+  public TriggerLockFactory.Lock createRecoveryLock() {
+    return new Lock(tableLoader, RECOVERING_TAG);
+  }
+
+  public static class Lock implements TriggerLockFactory.Lock {
+    private final Table table;
+    private final String lockKey;
+
+    public Lock(TableLoader tableLoader, String lockKey) {
+      tableLoader.open();
+      this.table = tableLoader.loadTable();
+      this.lockKey = lockKey;
+    }
+
+    /**
+     * The lock will be acquired by jobs with creating a new tag. A new empty 
commit is added for a
+     * table without snapshots.
+     *
+     * @return <code>true</code> if the lock is acquired by this operator
+     */
+    @Override
+    public boolean tryLock() {
+      table.refresh();
+      Map<String, SnapshotRef> refs = table.refs();
+      if (isHeld()) {
+        LOG.info("Lock is already held by someone: {}", refs.keySet());
+        return false;
+      }
+
+      if (table.currentSnapshot() == null) {
+        // Create an empty commit

Review Comment:
   alright fair enough



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** . */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTime;
+  private transient ListState<TableChange> accumulatedChanges;
+  private transient ListState<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean isCleanUp = false;
+  private transient boolean inited = false;
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Evaluators should not be 
empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTime =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChanges =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimes =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      isCleanUp = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime.value();
+
+    // Add the new changes to the already accumulated ones
+    List<TableChange> accumulated = 
Lists.newArrayList(accumulatedChanges.get());
+    accumulated.forEach(tableChange -> tableChange.merge(change));
+    accumulatedChanges.update(accumulated);
+
+    if (nextTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
+          current,
+          nextTime,
+          accumulated);
+      rateLimiterTriggeredCounter.inc();
+    }
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
+    init(out, ctx.timerService());
+    nextEvaluationTime.clear();
+    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+  }
+
+  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out)
+      throws Exception {
+    if (isCleanUp) {
+      if (recoveryLock.isHeld()) {
+        LOG.debug("The cleanup lock is still held at {}", current);
+        schedule(timerService, current + lockCheckDelayMs);
+        return;
+      } else {
+        LOG.info("The cleanup is finished at {}", current);
+        isCleanUp = false;
+      }
+    }
+
+    List<TableChange> changes = Lists.newArrayList(accumulatedChanges.get());
+    List<Long> times = Lists.newArrayList(lastTriggerTimes.get());
+    Integer taskToStart = nextTrigger(evaluators, changes, times, current, 
startsFrom);
+    if (taskToStart == null) {
+      // Nothing to execute
+      if (startsFrom == 0) {
+        nothingToTriggerCounter.inc();
+        LOG.debug("Nothing to execute at {} for collected: {}", current, 
changes);
+      }
+
+      startsFrom = 0;
+      return;
+    }
+
+    if (lock.tryLock()) {
+      TableChange change = changes.get(taskToStart);
+      SerializableTable table =
+          (SerializableTable) 
SerializableTable.copyOf(tableLoader.loadTable());
+      out.collect(Trigger.create(current, table, taskToStart));
+      LOG.debug("Fired event with time: {}, collected: {} for {}", current, 
change, table.name());
+      triggerCounters.get(taskToStart).inc();
+      changes.set(taskToStart, TableChange.empty());
+      accumulatedChanges.update(changes);
+      schedule(timerService, current + minFireDelayMs);
+      startsFrom = taskToStart + 1;

Review Comment:
   I was saying perform the mod operation here to make sure `startsFrom` is 
always a valid position. This way, if we log this value, it will show correct 
value.
   
   ```
   startsFrom = (taskToStart + 1) % evaluators.size();
   ```
   
   instead of in `nextTrigger` method
   ```
   int normalizedStartingPos = startPos % evaluators.size();
   ```
   
   This should affect fairness, right?



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<TableChange> accumulatedChangesState;
+  private transient ListState<Long> lastTriggerTimesState;
+  private transient Long nextEvaluationTime;
+  private transient List<TableChange> accumulatedChanges;
+  private transient List<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean shouldRestoreTasks = false;
+  private transient boolean inited = false;
+  // To keep the task scheduling fair we keep the last triggered task position 
in memory.
+  // If we find a task to trigger, then we run it, but after it is finished, 
we start from the given
+  // position to prevent "starvation" of the tasks.
+  // When there is nothing to trigger, we start from the beginning, as the 
order of the tasks might
+  // be important (RewriteDataFiles first, and then RewriteManifestFiles later)
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null 
or empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 
ms.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTimeState =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChangesState =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimesState =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    if (inited) {
+      // Only store state if initialized
+      nextEvaluationTimeState.update(nextEvaluationTime);
+      accumulatedChangesState.update(accumulatedChanges);
+      lastTriggerTimesState.update(lastTriggerTimes);
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    lockFactory.open();
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      shouldRestoreTasks = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime;
+    if (nextTime == null) {
+      checkAndFire(current, ctx.timerService(), out);
+    } else {
+      LOG.info(
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
+          current,
+          nextTime,
+          accumulatedChanges);
+      rateLimiterTriggeredCounter.inc();
+    }
+  }
+
+  @Override
+  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
+    init(out, ctx.timerService());
+    this.nextEvaluationTime = null;
+    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+  }
+
+  @Override
+  public void close() throws IOException {
+    tableLoader.close();
+    lockFactory.close();
+  }
+
+  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out) {
+    if (shouldRestoreTasks) {
+      if (recoveryLock.isHeld()) {
+        // Recovered tasks in progress. Skip trigger check
+        LOG.debug("The cleanup lock is still held at {}", current);

Review Comment:
   nit: for consistency, should it be `recover lock` in the log msg?



##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
+ * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
+ * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
+ * result of the {@link TriggerEvaluator}.
+ *
+ * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
+ * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}.
+ *
+ * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
+ * the timer functions are available, but the key is not used.
+ */
+@Internal
+class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
+    implements CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+
+  private final TableLoader tableLoader;
+  private final TriggerLockFactory lockFactory;
+  private final List<String> taskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+  private transient Counter rateLimiterTriggeredCounter;
+  private transient Counter concurrentRunTriggeredCounter;
+  private transient Counter nothingToTriggerCounter;
+  private transient List<Counter> triggerCounters;
+  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<TableChange> accumulatedChangesState;
+  private transient ListState<Long> lastTriggerTimesState;
+  private transient Long nextEvaluationTime;
+  private transient List<TableChange> accumulatedChanges;
+  private transient List<Long> lastTriggerTimes;
+  private transient TriggerLockFactory.Lock lock;
+  private transient TriggerLockFactory.Lock recoveryLock;
+  private transient boolean isCleanUp = false;
+  private transient boolean inited = false;
+  private transient int startsFrom = 0;
+
+  TriggerManager(
+      TableLoader tableLoader,
+      TriggerLockFactory lockFactory,
+      List<String> taskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+    Preconditions.checkArgument(
+        evaluators != null && !evaluators.isEmpty(), "Evaluators should not be 
empty");
+    Preconditions.checkArgument(
+        taskNames.size() == evaluators.size(), "Provide a name and evaluator 
for all of the tasks");
+    Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should 
be at least 1.");
+    Preconditions.checkArgument(
+        lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 
1ms.");
+
+    this.tableLoader = tableLoader;
+    this.lockFactory = lockFactory;
+    this.taskNames = taskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.rateLimiterTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
+    this.concurrentRunTriggeredCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED);
+    this.nothingToTriggerCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(
+                TableMaintenanceMetrics.GROUP_KEY, 
TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
+            .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
+    this.triggerCounters =
+        taskNames.stream()
+            .map(
+                name ->
+                    getRuntimeContext()
+                        .getMetricGroup()
+                        .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+                        .counter(TableMaintenanceMetrics.TRIGGERED))
+            .collect(Collectors.toList());
+
+    this.nextEvaluationTimeState =
+        getRuntimeContext()
+            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+    this.accumulatedChangesState =
+        getRuntimeContext()
+            .getListState(
+                new ListStateDescriptor<>(
+                    "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+    this.lastTriggerTimesState =
+        getRuntimeContext()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    tableLoader.open();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    if (inited) {
+      // Only store state if initialized
+      nextEvaluationTimeState.update(nextEvaluationTime);
+      accumulatedChangesState.update(accumulatedChanges);
+      lastTriggerTimesState.update(lastTriggerTimes);
+    }
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    LOG.info("Initializing state restored: {}", context.isRestored());
+    this.lock = lockFactory.createLock();
+    this.recoveryLock = lockFactory.createRecoveryLock();
+    if (context.isRestored()) {
+      isCleanUp = true;
+    }
+  }
+
+  @Override
+  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
+      throws Exception {
+    init(out, ctx.timerService());
+
+    accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
+
+    long current = ctx.timerService().currentProcessingTime();
+    Long nextTime = nextEvaluationTime;

Review Comment:
   `nextTime` is only used in the `else` case without running `checkAndFire`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to