stevenzwu commented on code in PR #10484: URL: https://github.com/apache/iceberg/pull/10484#discussion_r1695810956
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link + * ManageSnapshots#removeTag(String)} based lock implementation for {@link TriggerLockFactory}. + */ +@Internal +public class TagBasedLockFactory implements TriggerLockFactory { + private static final Logger LOG = LoggerFactory.getLogger(TagBasedLockFactory.class); + private static final String RUNNING_TAG = "__flink_maintenance_running"; + private static final String RECOVERING_TAG = "__flink_maintenance_recovering"; + private static final int CHANGE_ATTEMPTS = 3; + + private final TableLoader tableLoader; + + public TagBasedLockFactory(TableLoader tableLoader) { + this.tableLoader = tableLoader; + } + + @Override + public TriggerLockFactory.Lock createLock() { + return new Lock(tableLoader, RUNNING_TAG); + } + + @Override + public TriggerLockFactory.Lock createRecoveryLock() { + return new Lock(tableLoader, RECOVERING_TAG); + } + + public static class Lock implements TriggerLockFactory.Lock { + private final Table table; + private final String lockKey; + + public Lock(TableLoader tableLoader, String lockKey) { + tableLoader.open(); + this.table = tableLoader.loadTable(); Review Comment: should we load the table once in the factory constructor? lock object only needs to refresh table state in the tryLock and unlock methods. also it doesn't seem that `tableLoader` is closed. ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link + * ManageSnapshots#removeTag(String)} based lock implementation for {@link TriggerLockFactory}. + */ +@Internal +public class TagBasedLockFactory implements TriggerLockFactory { + private static final Logger LOG = LoggerFactory.getLogger(TagBasedLockFactory.class); + private static final String RUNNING_TAG = "__flink_maintenance_running"; + private static final String RECOVERING_TAG = "__flink_maintenance_recovering"; + private static final int CHANGE_ATTEMPTS = 3; + + private final TableLoader tableLoader; + + public TagBasedLockFactory(TableLoader tableLoader) { + this.tableLoader = tableLoader; + } + + @Override + public TriggerLockFactory.Lock createLock() { + return new Lock(tableLoader, RUNNING_TAG); + } + + @Override + public TriggerLockFactory.Lock createRecoveryLock() { + return new Lock(tableLoader, RECOVERING_TAG); + } + + public static class Lock implements TriggerLockFactory.Lock { + private final Table table; + private final String lockKey; + + public Lock(TableLoader tableLoader, String lockKey) { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.lockKey = lockKey; + } + + /** + * The lock will be acquired by jobs with creating a new tag. A new empty commit is added for a + * table without snapshots. + * + * @return <code>true</code> if the lock is acquired by this operator + */ + @Override + public boolean tryLock() { + table.refresh(); + Map<String, SnapshotRef> refs = table.refs(); + if (isHeld()) { + LOG.info("Lock is already held by someone: {}", refs.keySet()); + return false; + } + + if (table.currentSnapshot() == null) { + // Create an empty commit Review Comment: if table is empty, is there any need to run maintenance. wondering if retiring false makes more sense here. is the purpose to avoid wait loop on `tryLock` to be true. ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link + * ManageSnapshots#removeTag(String)} based lock implementation for {@link TriggerLockFactory}. + */ +@Internal +public class TagBasedLockFactory implements TriggerLockFactory { + private static final Logger LOG = LoggerFactory.getLogger(TagBasedLockFactory.class); + private static final String RUNNING_TAG = "__flink_maintenance_running"; + private static final String RECOVERING_TAG = "__flink_maintenance_recovering"; + private static final int CHANGE_ATTEMPTS = 3; + + private final TableLoader tableLoader; + + public TagBasedLockFactory(TableLoader tableLoader) { + this.tableLoader = tableLoader; + } + + @Override + public TriggerLockFactory.Lock createLock() { + return new Lock(tableLoader, RUNNING_TAG); + } + + @Override + public TriggerLockFactory.Lock createRecoveryLock() { + return new Lock(tableLoader, RECOVERING_TAG); + } + + public static class Lock implements TriggerLockFactory.Lock { + private final Table table; + private final String lockKey; + + public Lock(TableLoader tableLoader, String lockKey) { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.lockKey = lockKey; + } + + /** + * The lock will be acquired by jobs with creating a new tag. A new empty commit is added for a + * table without snapshots. + * + * @return <code>true</code> if the lock is acquired by this operator + */ + @Override + public boolean tryLock() { + table.refresh(); + Map<String, SnapshotRef> refs = table.refs(); + if (isHeld()) { + LOG.info("Lock is already held by someone: {}", refs.keySet()); Review Comment: nit: remove `someone`. I don't know if we need to print the refs keySet. it would imply all tags are locks ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link + * ManageSnapshots#removeTag(String)} based lock implementation for {@link TriggerLockFactory}. + */ +@Internal +public class TagBasedLockFactory implements TriggerLockFactory { + private static final Logger LOG = LoggerFactory.getLogger(TagBasedLockFactory.class); + private static final String RUNNING_TAG = "__flink_maintenance_running"; + private static final String RECOVERING_TAG = "__flink_maintenance_recovering"; + private static final int CHANGE_ATTEMPTS = 3; + + private final TableLoader tableLoader; + + public TagBasedLockFactory(TableLoader tableLoader) { + this.tableLoader = tableLoader; + } + + @Override + public TriggerLockFactory.Lock createLock() { + return new Lock(tableLoader, RUNNING_TAG); + } + + @Override + public TriggerLockFactory.Lock createRecoveryLock() { + return new Lock(tableLoader, RECOVERING_TAG); + } + + public static class Lock implements TriggerLockFactory.Lock { + private final Table table; + private final String lockKey; + + public Lock(TableLoader tableLoader, String lockKey) { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.lockKey = lockKey; + } + + /** + * The lock will be acquired by jobs with creating a new tag. A new empty commit is added for a + * table without snapshots. + * + * @return <code>true</code> if the lock is acquired by this operator + */ + @Override + public boolean tryLock() { + table.refresh(); + Map<String, SnapshotRef> refs = table.refs(); Review Comment: `refs` is retrieved both here and in the `isHeld` method. `table.refresh()` is also done in the `isHeld` method. should we remove line 76 and 77? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -83,7 +96,7 @@ class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); Preconditions.checkArgument( - lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1."); + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); Review Comment: nit: `1 ms` with a space ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (inited) { + // Only store state if initialized + nextEvaluationTimeState.update(nextEvaluationTime); + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime; Review Comment: do we need the local variable of `nextTime`? read `nextEvaluationTime` directly? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link + * ManageSnapshots#removeTag(String)} based lock implementation for {@link TriggerLockFactory}. + */ +@Internal +public class TagBasedLockFactory implements TriggerLockFactory { Review Comment: if it is hard to guarantee colocation, I would suggest we remove the JVM lock for now. Otherwise users may misuse it without knowing the problem. basically not expose the JVM lock until we can guarantee colocation. ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (inited) { + // Only store state if initialized + nextEvaluationTimeState.update(nextEvaluationTime); + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime; + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulatedChanges); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + this.nextEvaluationTime = null; + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); Review Comment: we could have multiple timers scheduled for this scenario if the recoveryLock is not released in multiple `checkAndFire` calls? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** . */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTime; + private transient ListState<TableChange> accumulatedChanges; + private transient ListState<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTime = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChanges = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimes = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // Do nothing + } + + @Override + public void initializeState(FunctionInitializationContext context) { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime.value(); + + // Add the new changes to the already accumulated ones + List<TableChange> accumulated = Lists.newArrayList(accumulatedChanges.get()); + accumulated.forEach(tableChange -> tableChange.merge(change)); + accumulatedChanges.update(accumulated); + + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulated); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + nextEvaluationTime.clear(); + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) + throws Exception { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } else { + LOG.info("The cleanup is finished at {}", current); + isCleanUp = false; + } + } + + List<TableChange> changes = Lists.newArrayList(accumulatedChanges.get()); + List<Long> times = Lists.newArrayList(lastTriggerTimes.get()); + Integer taskToStart = nextTrigger(evaluators, changes, times, current, startsFrom); + if (taskToStart == null) { + // Nothing to execute + if (startsFrom == 0) { + nothingToTriggerCounter.inc(); + LOG.debug("Nothing to execute at {} for collected: {}", current, changes); + } + + startsFrom = 0; Review Comment: > When there is nothing to trigger, we start from the beginning, as the order of the tasks might be important (RewriteDataFiles first, and then RewriteManifestFiles later - in this case we don't want to rewrite manifest files which are removed by the RewriteDataFiles anyways) might make sense to add as code comment? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** . */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTime; + private transient ListState<TableChange> accumulatedChanges; + private transient ListState<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTime = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChanges = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimes = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // Do nothing + } + + @Override + public void initializeState(FunctionInitializationContext context) { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime.value(); + + // Add the new changes to the already accumulated ones + List<TableChange> accumulated = Lists.newArrayList(accumulatedChanges.get()); + accumulated.forEach(tableChange -> tableChange.merge(change)); + accumulatedChanges.update(accumulated); + + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulated); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + nextEvaluationTime.clear(); + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) + throws Exception { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } else { + LOG.info("The cleanup is finished at {}", current); + isCleanUp = false; + } + } + + List<TableChange> changes = Lists.newArrayList(accumulatedChanges.get()); + List<Long> times = Lists.newArrayList(lastTriggerTimes.get()); + Integer taskToStart = nextTrigger(evaluators, changes, times, current, startsFrom); + if (taskToStart == null) { + // Nothing to execute + if (startsFrom == 0) { + nothingToTriggerCounter.inc(); + LOG.debug("Nothing to execute at {} for collected: {}", current, changes); + } + + startsFrom = 0; + return; + } + + if (lock.tryLock()) { + TableChange change = changes.get(taskToStart); + SerializableTable table = + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); + out.collect(Trigger.create(current, table, taskToStart)); + LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, table.name()); + triggerCounters.get(taskToStart).inc(); + changes.set(taskToStart, TableChange.empty()); + accumulatedChanges.update(changes); + schedule(timerService, current + minFireDelayMs); + startsFrom = taskToStart + 1; Review Comment: where do we need to differentiate the 2 situations? ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java: ########## @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2; +import static org.apache.iceberg.flink.maintenance.operator.MetricConstants.CONCURRENT_RUN_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.MetricConstants.GROUP_VALUE_DEFAULT; +import static org.apache.iceberg.flink.maintenance.operator.MetricConstants.NOTHING_TO_TRIGGER; +import static org.apache.iceberg.flink.maintenance.operator.MetricConstants.RATE_LIMITER_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.MetricConstants.TRIGGERED; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +class TestTriggerManager extends OperatorTestBase { + private static final long DELAY = 10L; + private static final String NAME_1 = "name1"; + private static final String NAME_2 = "name2"; + private long processingTime = 0L; + private TriggerLockFactory.Lock lock; + private TriggerLockFactory.Lock recoveringLock; + + @BeforeEach + void before() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + lock = new TagBasedLockFactory(sql.tableLoader(TABLE_NAME)).createLock(); + recoveringLock = new TagBasedLockFactory(sql.tableLoader(TABLE_NAME)).createRecoveryLock(); + MetricsReporterFactoryForTests.reset(); + } + + @Test + void testCommitNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().commitNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 1), 0); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 2), 1); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 3), 2); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 10), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 1), 3); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 1), 3); + + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 1), 4); + } + } + + @Test + void testFileNumber() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, new TableChange(1, 0, 0, 0, 0), 0); + + addEventAndCheckResult(testHarness, new TableChange(1, 1, 0, 0, 0), 1); + addEventAndCheckResult(testHarness, new TableChange(0, 3, 0, 0, 0), 2); + addEventAndCheckResult(testHarness, new TableChange(5, 7, 0, 0, 0), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, new TableChange(1, 0, 0, 0, 0), 3); + addEventAndCheckResult(testHarness, new TableChange(0, 1, 0, 0, 0), 3); + + addEventAndCheckResult(testHarness, new TableChange(1, 0, 0, 0, 0), 4); + } + } + + @Test + void testFileSize() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileSize(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, new TableChange(0, 0, 1, 0, 0), 0); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 1, 1, 0), 1); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 3, 0), 2); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 5, 7, 0), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, new TableChange(0, 0, 1, 0, 0), 3); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 1, 0), 3); + + addEventAndCheckResult(testHarness, new TableChange(0, 0, 1, 0, 0), 4); + } + } + + @Test + void testDeleteFileNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().deleteFileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, new TableChange(3, 1, 0, 0, 0), 0); + addEventAndCheckResult(testHarness, new TableChange(0, 2, 0, 0, 0), 1); + addEventAndCheckResult(testHarness, new TableChange(0, 3, 0, 0, 0), 2); + addEventAndCheckResult(testHarness, new TableChange(0, 10, 0, 0, 0), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, new TableChange(0, 1, 0, 0, 0), 3); + addEventAndCheckResult(testHarness, new TableChange(0, 1, 0, 0, 0), 3); + + addEventAndCheckResult(testHarness, new TableChange(0, 1, 0, 0, 0), 4); + } + } + + @Test + void testTimeout() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + TableChange event = new TableChange(1, 0, 0, 0, 1); + + // Wait for one trigger + testHarness.processElement(event, EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + // Wait for a second trigger + long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); + assertThat(testHarness.extractOutputValues()).hasSize(1); + } + } + + @Test + void testStateRestore() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader); + OperatorSubtaskState state; + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + testHarness.processElement(new TableChange(1, 0, 0, 0, 1), EVENT_TIME); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + state = testHarness.snapshot(1, EVENT_TIME); + } + + // Restore the state, write some more data, create a checkpoint, check the data which is written + manager = manager(tableLoader); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.initializeState(state); + testHarness.open(); + + // Arrives the first real change + testHarness.processElement(new TableChange(0, 0, 0, 0, 1), EVENT_TIME_2); + assertThat(testHarness.extractOutputValues()) + .isEqualTo(Lists.newArrayList(Trigger.cleanUp(testHarness.getProcessingTime()))); + + recoveringLock.unlock(); + testHarness.setProcessingTime(EVENT_TIME_2); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testMinFireDelay() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader, DELAY, 1, false); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 2), 1); + long currentTime = testHarness.getProcessingTime(); + + // No new fire yet + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 2), 1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testLogCheckDelay() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader, 1, DELAY, false); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 2), 1); + + // Create a lock to prevent execution, and check that there is no result + assertThat(lock.tryLock()).isTrue(); + addEventAndCheckResult(testHarness, new TableChange(0, 0, 0, 0, 2), 1); + long currentTime = testHarness.getProcessingTime(); + + // Remove the lock, and still no trigger + lock.unlock(); + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCleanup(boolean cleanup) throws Exception { + // Simulate a lock remaining from a previous run + assertThat(lock.tryLock()).isTrue(); + + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader, 1, 1, cleanup); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + assertThat(lock.isHeld()).isEqualTo(!cleanup); + } + } + + /** + * Simulating recovery scenarios where there is a leftover table lock, and ongoing maintenance + * task. + * + * @param tableLock left on the table on job recovery + * @param runningTask is running and continues to run after job recovery + */ + @ParameterizedTest + @MethodSource("parametersForTestRecovery") + void testRecovery(boolean tableLock, boolean runningTask) throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader); + OperatorSubtaskState state; + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + state = testHarness.snapshot(1, EVENT_TIME); + } + + if (tableLock) { + assertThat(lock.tryLock()).isTrue(); + } + + manager = manager(tableLoader); + List<Trigger> expected = Lists.newArrayListWithExpectedSize(3); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.initializeState(state); + testHarness.open(); + + ++processingTime; + expected.add(Trigger.cleanUp(processingTime)); + testHarness.setProcessingTime(processingTime); + testHarness.processElement(new TableChange(0, 0, 0, 0, 2), processingTime); + assertThat(testHarness.extractOutputValues()).isEqualTo(expected); + + // Nothing happens until the recovery is finished + ++processingTime; + testHarness.setProcessingTime(processingTime); + assertThat(testHarness.extractOutputValues()).isEqualTo(expected); + + if (runningTask) { + lock.unlock(); + } + + // Still no results as the recovery is ongoing + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(new TableChange(0, 0, 0, 0, 2), processingTime); + assertThat(testHarness.extractOutputValues()).isEqualTo(expected); + + // All locks are removed when the recovery trigger is received by the lock cleaner + lock.unlock(); + recoveringLock.unlock(); + + // Emit only a single trigger + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(new TableChange(0, 0, 0, 0, 2), processingTime); + // Releasing lock will create a new snapshot, and we receive this in the trigger + expected.add( + Trigger.create( + processingTime, + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()), + 0)); + assertThat(testHarness.extractOutputValues()).isEqualTo(expected); + } + } + + @Test + void testTriggerMetrics() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource<TableChange> source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink<Trigger> sink = new CollectingSink<>(); + + TriggerManager manager = + new TriggerManager( + tableLoader, + new TagBasedLockFactory(tableLoader), + Lists.newArrayList(NAME_1, NAME_2), + Lists.newArrayList( + new TriggerEvaluator.Builder().commitNumber(2).build(), + new TriggerEvaluator.Builder().commitNumber(4).build()), + 1L, + 1L, + false); + source + .dataStream() + .keyBy(unused -> true) + .process(manager) + .name(DUMMY_NAME) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // This one doesn't trigger - tests NOTHING_TO_TRIGGER + source.sendRecord(new TableChange(0, 0, 0L, 0L, 1)); + + Awaitility.await() + .until( + () -> { + Long notingCounter = + MetricsReporterFactoryForTests.counter( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER); + return notingCounter != null && notingCounter.equals(1L); + }); + + // Trigger one of the tasks - tests TRIGGERED + source.sendRecord(new TableChange(0, 0, 0L, 0L, 1)); + // Wait until we receive the trigger + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + assertThat( + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + .isEqualTo(1L); + lock.unlock(); + + // Trigger both of the tasks - tests TRIGGERED + source.sendRecord(new TableChange(0, 0, 0L, 0L, 2)); + // Wait until we receive the trigger + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + lock.unlock(); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + lock.unlock(); + assertThat( + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + .isEqualTo(2L); + assertThat( + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED)) + .isEqualTo(1L); + + // Final check all the counters + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder<String, Long>() + .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, -1L) + .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_TRIGGERED, -1L) + .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 2L) + .put(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED, 1L) + .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 1L) + .build()); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testRateLimiterMetrics() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource<TableChange> source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink<Trigger> sink = new CollectingSink<>(); + + // High delay, so only triggered once + TriggerManager manager = manager(tableLoader, 1_000_000L, 1L, false); + source + .dataStream() + .keyBy(unused -> true) + .process(manager) + .name(DUMMY_NAME) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Start the first trigger + source.sendRecord(new TableChange(0, 0, 0L, 0L, 2)); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + lock.unlock(); + + // The second trigger will be blocked + source.sendRecord(new TableChange(0, 0, 0L, 0L, 2)); + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED) + .equals(1L)); + + // Final check all the counters + assertCounters(1L, 0L); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testConcurrentRunMetrics() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource<TableChange> source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink<Trigger> sink = new CollectingSink<>(); + + // High delay, so only triggered once + TriggerManager manager = manager(tableLoader, 1L, 1_000_000L, false); + source + .dataStream() + .keyBy(unused -> true) + .process(manager) + .name(DUMMY_NAME) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Start the first trigger - notice that we do not remove the lock after the trigger + source.sendRecord(new TableChange(0, 0, 0L, 0L, 2)); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + + // The second trigger will be blocked by the lock + source.sendRecord(new TableChange(0, 0, 0L, 0L, 2)); + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_TRIGGERED) + .equals(1L)); + + // Final check all the counters + assertCounters(0L, 1L); + } finally { + closeJobClient(jobClient); + } + } + + private static Stream<Arguments> parametersForTestRecovery() { + return Stream.of( + Arguments.of(true, false), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false)); + } + + private void assertCounters(long rateLimiterTrigger, long concurrentRunTrigger) { + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder<String, Long>() + .put( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, + rateLimiterTrigger) + .put( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_TRIGGERED, + concurrentRunTrigger) + .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 1L) + .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 0L) + .build()); + } + + private KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> harness( + TriggerManager manager) throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedProcessOperator<>(manager), value -> true, Types.BOOLEAN); + } + + private void addEventAndCheckResult( + OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness, + TableChange event, + int expectedSize) + throws Exception { + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(event, processingTime); + assertThat(testHarness.extractOutputValues()).hasSize(expectedSize); + lock.unlock(); Review Comment: can we add those as code comment? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); Review Comment: where is tableLoader closed? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (inited) { + // Only store state if initialized + nextEvaluationTimeState.update(nextEvaluationTime); + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime; + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulatedChanges); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + this.nextEvaluationTime = null; + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; Review Comment: maybe add a comment like `Recovered tasks in progress. Skip trigger check`? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (inited) { + // Only store state if initialized + nextEvaluationTimeState.update(nextEvaluationTime); + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime; + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulatedChanges); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + this.nextEvaluationTime = null; + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } else { + LOG.info("The cleanup is finished at {}", current); + isCleanUp = false; Review Comment: reset `isCleanUp` to false when recovery is done? `isCleanUp` is only used once by the `init` method. why don't just use the `inited` boolean? maybe `isCleanUp` naming doesn't help either. seems more like `shouldRestoreTasks`? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/Trigger.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Objects; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +@Internal +class Trigger { + private final long timestamp; + private final SerializableTable table; + private final Integer taskId; + private final boolean isCleanUp; + + private Trigger(long timestamp, SerializableTable table, Integer taskId, boolean isCleanUp) { + this.timestamp = timestamp; + this.table = table; + this.taskId = taskId; Review Comment: ok. let' revisit this once we see later parts ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (inited) { + // Only store state if initialized + nextEvaluationTimeState.update(nextEvaluationTime); + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime; + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulatedChanges); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + this.nextEvaluationTime = null; + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } else { + LOG.info("The cleanup is finished at {}", current); + isCleanUp = false; + } + } + + Integer taskToStart = + nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, startsFrom); + if (taskToStart == null) { + // Nothing to execute + if (startsFrom == 0) { + nothingToTriggerCounter.inc(); + LOG.debug("Nothing to execute at {} for collected: {}", current, accumulatedChanges); + } + + startsFrom = 0; + return; + } + + if (lock.tryLock()) { + TableChange change = accumulatedChanges.get(taskToStart); + SerializableTable table = + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); + out.collect(Trigger.create(current, table, taskToStart)); + LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, table.name()); + triggerCounters.get(taskToStart).inc(); + accumulatedChanges.set(taskToStart, TableChange.empty()); + lastTriggerTimes.set(taskToStart, current); + schedule(timerService, current + minFireDelayMs); + startsFrom = taskToStart + 1; + } else { + // The lock is already held by someone + LOG.info("Delaying task on failed lock check: {}", current); + + startsFrom = taskToStart; + concurrentRunTriggeredCounter.inc(); + schedule(timerService, current + lockCheckDelayMs); + } + + timerService.registerProcessingTimeTimer(nextEvaluationTime); + } + + private void schedule(TimerService timerService, long time) { + this.nextEvaluationTime = time; Review Comment: is the assumption here that there is only one timer scheduled at the most? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); Review Comment: nit: Iceberg error msg tends to be in the style of `Invalid something: reason`. so here could be `Invalid evaluators: null or empty` ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** . */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTime; + private transient ListState<TableChange> accumulatedChanges; + private transient ListState<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTime = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChanges = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimes = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // Do nothing + } + + @Override + public void initializeState(FunctionInitializationContext context) { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime.value(); + + // Add the new changes to the already accumulated ones + List<TableChange> accumulated = Lists.newArrayList(accumulatedChanges.get()); + accumulated.forEach(tableChange -> tableChange.merge(change)); + accumulatedChanges.update(accumulated); + + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( Review Comment: didn't quite understand the race condition part ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** . */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private final boolean clearLocks; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTime; + private transient ListState<TableChange> accumulatedChanges; + private transient ListState<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs, + boolean clearLocks) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + this.clearLocks = clearLocks; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(MetricConstants.GROUP_KEY, MetricConstants.GROUP_VALUE_DEFAULT) + .counter(MetricConstants.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(MetricConstants.GROUP_KEY, MetricConstants.GROUP_VALUE_DEFAULT) + .counter(MetricConstants.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(MetricConstants.GROUP_KEY, MetricConstants.GROUP_VALUE_DEFAULT) + .counter(MetricConstants.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(MetricConstants.GROUP_KEY, name) + .counter(MetricConstants.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTime = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChanges = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimes = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // Do nothing + } + + @Override + public void initializeState(FunctionInitializationContext context) { + LOG.info("Initializing state restored: {}, clearLocks: {}", context.isRestored(), clearLocks); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } else if (clearLocks) { + // Remove old lock if we are not restoring the job + lock.unlock(); + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime.value(); + init(out, ctx.timerService(), current); + + // Add the new changes to the already accumulated ones + List<TableChange> accumulated = Lists.newArrayList(accumulatedChanges.get()); + accumulated.forEach(tableChange -> tableChange.merge(change)); + accumulatedChanges.update(accumulated); + + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulated); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService(), timestamp); + nextEvaluationTime.clear(); + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) + throws Exception { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } else { + LOG.info("The cleanup is finished at {}", current); + isCleanUp = false; + } + } + + List<TableChange> changes = Lists.newArrayList(accumulatedChanges.get()); + List<Long> times = Lists.newArrayList(lastTriggerTimes.get()); + Integer taskToStart = nextTrigger(evaluators, changes, times, current, startsFrom); + if (taskToStart == null) { + // Nothing to execute + if (startsFrom == 0) { + nothingToTriggerCounter.inc(); + LOG.debug("Nothing to execute at {} for collected: {}", current, changes); + } + + startsFrom = 0; + return; + } + + if (lock.tryLock()) { + TableChange change = changes.get(taskToStart); + SerializableTable table = + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); + out.collect(Trigger.create(current, table, taskToStart)); + LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, table.name()); + triggerCounters.get(taskToStart).inc(); + changes.set(taskToStart, TableChange.empty()); + accumulatedChanges.update(changes); + schedule(timerService, current + minFireDelayMs); + startsFrom = taskToStart + 1; + } else { + // The lock is already held by someone + LOG.info("Delaying task on failed lock check: {}", current); + + startsFrom = taskToStart; + concurrentRunTriggeredCounter.inc(); + schedule(timerService, current + lockCheckDelayMs); + } + + timerService.registerProcessingTimeTimer(nextEvaluationTime.value()); + } + + private void schedule(TimerService timerService, long time) throws IOException { + nextEvaluationTime.update(time); + timerService.registerProcessingTimeTimer(time); + } + + private static Integer nextTrigger( + List<TriggerEvaluator> evaluators, + List<TableChange> changes, + List<Long> lastTriggerTimes, + long currentTime, + int startPos) { + int normalizedStartingPos = startPos % evaluators.size(); + int current = normalizedStartingPos; + do { + if (evaluators + .get(current) + .check(changes.get(current), lastTriggerTimes.get(current), currentTime)) { + return current; + } + + current = (current + 1) % evaluators.size(); + } while (current != normalizedStartingPos); + + return null; + } + + private void init(Collector<Trigger> out, TimerService timerService, long current) + throws Exception { + if (!inited) { + // Initialize with empty changes and current timestamp + if (!accumulatedChanges.get().iterator().hasNext()) { + List<TableChange> changes = Lists.newArrayListWithCapacity(evaluators.size()); + List<Long> triggerTimes = Lists.newArrayListWithCapacity(evaluators.size()); + for (int i = 0; i < evaluators.size(); ++i) { + changes.add(TableChange.empty()); + triggerTimes.add(current); + } + + accumulatedChanges.update(changes); + lastTriggerTimes.update(triggerTimes); + } + + if (isCleanUp) { + // When the job state is restored, there could be ongoing tasks. + // To prevent collision with the new triggers the following is done: + // - add a cleanup lock + // - fire a clean-up trigger + // This ensures that the tasks of the previous trigger are executed, and the lock is removed + // in the end. + recoveryLock.tryLock(); Review Comment: add the context as code comment as it is not immediately obvious? if tryLock failed, it is fine to send the `Trigger.cleanUp` event? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org