stevenzwu commented on code in PR #10484: URL: https://github.com/apache/iceberg/pull/10484#discussion_r1702043002
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TagBasedLockFactory.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iceberg table {@link ManageSnapshots#createTag(String, long)}/{@link + * ManageSnapshots#removeTag(String)} based lock implementation for {@link TriggerLockFactory}. + */ +@Internal +public class TagBasedLockFactory implements TriggerLockFactory { + private static final Logger LOG = LoggerFactory.getLogger(TagBasedLockFactory.class); + @VisibleForTesting static final String RUNNING_TAG_PREFIX = "__flink_maintenance_running_"; + @VisibleForTesting static final String RECOVERING_TAG_PREFIX = "__flink_maintenance_recovering_"; + private static final int CHANGE_ATTEMPTS = 3; + + private final TableLoader tableLoader; + private transient Table table; + + public TagBasedLockFactory(TableLoader tableLoader) { + this.tableLoader = tableLoader; + } + + @Override + public void open() { + tableLoader.open(); + this.table = tableLoader.loadTable(); + } + + @Override + public TriggerLockFactory.Lock createLock() { + return new Lock(table, RUNNING_TAG_PREFIX); + } + + @Override + public TriggerLockFactory.Lock createRecoveryLock() { + return new Lock(table, RECOVERING_TAG_PREFIX); + } + + @Override + public void close() throws IOException { + tableLoader.close(); + } + + public static class Lock implements TriggerLockFactory.Lock { + private final Table table; + private final String lockPrefix; + + public Lock(Table table, String lockPrefix) { + Preconditions.checkNotNull(table, "Table should not be null"); + Preconditions.checkNotNull(lockPrefix, "Lock key should not be null"); + this.table = table; + this.lockPrefix = lockPrefix; + } + + /** + * The lock will be acquired by jobs with creating a new tag. A new empty commit is added for a + * table without snapshots. + * + * @return <code>true</code> if the lock is acquired by this operator + */ + @Override + public boolean tryLock() { + if (isHeld()) { + LOG.info("Lock is already held. The relevant tag: {}", findLock()); + return false; + } + + if (table.currentSnapshot() == null) { + // Create an empty commit + table.newFastAppend().commit(); + LOG.info("Empty table, new empty commit added for using tags"); + } + + String lockKey = lockPrefix + UUID.randomUUID(); + try { + Tasks.foreach(1) + .retry(CHANGE_ATTEMPTS) + .stopOnFailure() + .throwFailureWhenFinished() + .run( + unused -> { + try { + table.refresh(); + ManageSnapshots manage = table.manageSnapshots(); + manage.createTag(lockKey, table.currentSnapshot().snapshotId()); + manage.commit(); + LOG.debug("Lock created"); + } catch (Exception e) { + if (!isHeld()) { + LOG.warn("Retrying lock after exception", e); + throw e; + } else { + LOG.debug("Lock created, hiding exception", e); + } + } + }); + } catch (Exception e) { + LOG.info("Concurrent lock created. Is there a concurrent maintenance job running?", e); + return false; + } + + return true; + } + + @Override + public void unlock() { + table.refresh(); + + Optional<String> lockKey = findLock(); + if (lockKey.isPresent()) { + Tasks.foreach(lockKey.get()) + .retry(CHANGE_ATTEMPTS) + .stopOnFailure() + .throwFailureWhenFinished() + .run( + key -> { + try { + table.refresh(); + ManageSnapshots manage = table.manageSnapshots(); + manage.removeTag(key); + manage.commit(); + } catch (Exception e) { + table.refresh(); + if (table.refs().containsKey(key)) { + LOG.warn("Retrying lock removal after exception", e); + throw e; + } else { + LOG.debug("Lock removed, hiding exception", e); + } + } + }); + LOG.debug("Lock removed"); + } else { + LOG.warn( + "Missing lock, can not remove. Found only the following tags: {}.", + table.refs().keySet()); + } + } + + @Override + public boolean isHeld() { + table.refresh(); + return findLock().isPresent(); + } + + private Optional<String> findLock() { + List<String> locks = + table.refs().keySet().stream() + .filter(tag -> tag.startsWith(lockPrefix)) + .collect(Collectors.toList()); + Preconditions.checkArgument(locks.size() < 2, "Invalid lock state: %s", locks); Review Comment: nit: should explain what is invalid here. maybe sth like `Invalid lock state: multiple locks found: %s` ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java: ########## @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class TestTriggerManager extends OperatorTestBase { + private static final long DELAY = 10L; + private static final String NAME_1 = "name1"; + private static final String NAME_2 = "name2"; + private long processingTime = 0L; + private TriggerLockFactory lockFactory; + private TriggerLockFactory.Lock lock; + private TriggerLockFactory.Lock recoveringLock; + + @BeforeEach + void before() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + this.lockFactory = new TagBasedLockFactory(sql.tableLoader(TABLE_NAME)); + lockFactory.open(); + this.lock = lockFactory.createLock(); + this.recoveringLock = lockFactory.createRecoveryLock(); + lock.unlock(); + recoveringLock.unlock(); + MetricsReporterFactoryForTests.reset(); + } + + @AfterEach + void after() throws IOException { + lockFactory.close(); + } + + @Test + void testCommitNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().commitNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 0); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(3).build(), 2); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 4); + } + } + + @Test + void testFileNumber() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 0); + + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(1).deleteFileNum(1).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(3).build(), 2); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(5).deleteFileNum(7).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 4); + } + } + + @Test + void testFileSize() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileSize(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 0); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileSize(1L).deleteFileSize(1L).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileSize(3L).build(), 2); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileSize(5L).deleteFileSize(7L).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileSize(1L).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 4); + } + } + + @Test + void testDeleteFileNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().deleteFileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(3).deleteFileNum(1).build(), 0); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(2).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(3).build(), 2); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 4); + } + } + + @Test + void testTimeout() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + TableChange event = TableChange.builder().dataFileSize(1).commitNum(1).build(); + + // Wait for some time + testHarness.processElement(event, EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + // Wait for the timeout to expire + long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); Review Comment: oh. should this line of `processElement` be called right after line 186? check there is no output immediately after the element and one output after timeout (clock advanced)? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean isCleanUp = false; + private transient boolean inited = false; + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Evaluators should not be empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (inited) { + // Only store state if initialized + nextEvaluationTimeState.update(nextEvaluationTime); + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("Initializing state restored: {}", context.isRestored()); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + isCleanUp = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime; + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulatedChanges); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + this.nextEvaluationTime = null; + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) { + if (isCleanUp) { + if (recoveryLock.isHeld()) { + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } else { + LOG.info("The cleanup is finished at {}", current); + isCleanUp = false; + } + } + + Integer taskToStart = + nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, startsFrom); + if (taskToStart == null) { + // Nothing to execute + if (startsFrom == 0) { + nothingToTriggerCounter.inc(); + LOG.debug("Nothing to execute at {} for collected: {}", current, accumulatedChanges); + } + + startsFrom = 0; + return; + } + + if (lock.tryLock()) { + TableChange change = accumulatedChanges.get(taskToStart); + SerializableTable table = + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); + out.collect(Trigger.create(current, table, taskToStart)); + LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, table.name()); + triggerCounters.get(taskToStart).inc(); + accumulatedChanges.set(taskToStart, TableChange.empty()); + lastTriggerTimes.set(taskToStart, current); + schedule(timerService, current + minFireDelayMs); + startsFrom = taskToStart + 1; + } else { + // The lock is already held by someone + LOG.info("Delaying task on failed lock check: {}", current); + + startsFrom = taskToStart; + concurrentRunTriggeredCounter.inc(); + schedule(timerService, current + lockCheckDelayMs); + } + + timerService.registerProcessingTimeTimer(nextEvaluationTime); + } + + private void schedule(TimerService timerService, long time) { + this.nextEvaluationTime = time; Review Comment: adding the check in `TriggerManager.init` won't help. it happens first. we should add the `if (nextEvaluationTime == null)` check before every `schedule` call. in particularly, 2 and 3 in `checkAndFire` method ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java: ########## @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class TestTriggerManager extends OperatorTestBase { + private static final long DELAY = 10L; + private static final String NAME_1 = "name1"; + private static final String NAME_2 = "name2"; + private long processingTime = 0L; + private TriggerLockFactory lockFactory; + private TriggerLockFactory.Lock lock; + private TriggerLockFactory.Lock recoveringLock; + + @BeforeEach + void before() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + this.lockFactory = new TagBasedLockFactory(sql.tableLoader(TABLE_NAME)); + lockFactory.open(); + this.lock = lockFactory.createLock(); + this.recoveringLock = lockFactory.createRecoveryLock(); + lock.unlock(); + recoveringLock.unlock(); + MetricsReporterFactoryForTests.reset(); + } + + @AfterEach + void after() throws IOException { + lockFactory.close(); + } + + @Test + void testCommitNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().commitNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 0); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(3).build(), 2); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 4); + } + } + + @Test + void testFileNumber() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 0); + + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(1).deleteFileNum(1).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(3).build(), 2); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(5).deleteFileNum(7).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 4); + } + } + + @Test + void testFileSize() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileSize(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 0); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileSize(1L).deleteFileSize(1L).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileSize(3L).build(), 2); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileSize(5L).deleteFileSize(7L).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileSize(1L).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 4); + } + } + + @Test + void testDeleteFileNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().deleteFileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(3).deleteFileNum(1).build(), 0); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(2).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(3).build(), 2); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 4); + } + } + + @Test + void testTimeout() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + TableChange event = TableChange.builder().dataFileSize(1).commitNum(1).build(); + + // Wait for some time + testHarness.processElement(event, EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + // Wait for the timeout to expire + long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Remove the lock to allow the next trigger + lock.unlock(); + + // Send a new event + testHarness.setProcessingTime(newTime + 1); + testHarness.processElement(event, newTime); + + // No trigger yet + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Send a new event + newTime += Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); + + // New trigger should arrive + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testStateRestore() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader); + OperatorSubtaskState state; + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + testHarness.processElement( + TableChange.builder().dataFileSize(1).commitNum(1).build(), EVENT_TIME); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + state = testHarness.snapshot(1, EVENT_TIME); + } + + // Restore the state, write some more data, create a checkpoint, check the data which is written + manager = manager(tableLoader); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.initializeState(state); + testHarness.open(); + + // Arrives the first real change + testHarness.processElement(TableChange.builder().commitNum(1).build(), EVENT_TIME_2); + assertTriggers( + testHarness.extractOutputValues(), + Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime()))); + + // Remove the lock to allow the next trigger + recoveringLock.unlock(); + testHarness.setProcessingTime(EVENT_TIME_2); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testMinFireDelay() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader, DELAY, 1); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + long currentTime = testHarness.getProcessingTime(); + + // No new fire yet + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testLogCheckDelay() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader, 1, DELAY); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + + // Create a lock to prevent execution, and check that there is no result + assertThat(lock.tryLock()).isTrue(); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + long currentTime = testHarness.getProcessingTime(); + + // Remove the lock, and still no trigger + lock.unlock(); + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + /** + * Simulating recovery scenarios where there is a leftover table lock, and ongoing maintenance + * task. + * + * @param tableLock left on the table on job recovery + * @param runningTask is running and continues to run after job recovery + */ + @ParameterizedTest + @MethodSource("parametersForTestRecovery") + void testRecovery(boolean tableLock, boolean runningTask) throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader); + OperatorSubtaskState state; + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + state = testHarness.snapshot(1, EVENT_TIME); + } + + if (tableLock) { + assertThat(lock.tryLock()).isTrue(); + } + + manager = manager(tableLoader); + List<Trigger> expected = Lists.newArrayListWithExpectedSize(3); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.initializeState(state); + testHarness.open(); + + ++processingTime; + expected.add(Trigger.recovery(processingTime)); + testHarness.setProcessingTime(processingTime); + testHarness.processElement(TableChange.builder().commitNum(2).build(), processingTime); + assertTriggers(testHarness.extractOutputValues(), expected); + + // Nothing happens until the recovery is finished + ++processingTime; + testHarness.setProcessingTime(processingTime); + assertTriggers(testHarness.extractOutputValues(), expected); + + if (runningTask) { + lock.unlock(); + } + + // Still no results as the recovery is ongoing + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(TableChange.builder().commitNum(2).build(), processingTime); + assertTriggers(testHarness.extractOutputValues(), expected); + + // All locks are removed when the recovery trigger is received by the lock cleaner + lock.unlock(); + recoveringLock.unlock(); + + // Emit only a single trigger + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(TableChange.builder().commitNum(2).build(), processingTime); + // Releasing lock will create a new snapshot, and we receive this in the trigger + expected.add( + Trigger.create( + processingTime, + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()), + 0)); + assertTriggers(testHarness.extractOutputValues(), expected); + } + } + + @Test + void testTriggerMetrics() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource<TableChange> source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink<Trigger> sink = new CollectingSink<>(); + + TriggerManager manager = + new TriggerManager( + tableLoader, + lockFactory, + Lists.newArrayList(NAME_1, NAME_2), + Lists.newArrayList( + new TriggerEvaluator.Builder().commitNumber(2).build(), + new TriggerEvaluator.Builder().commitNumber(4).build()), + 1L, + 1L); + source + .dataStream() + .keyBy(unused -> true) + .process(manager) + .name(DUMMY_NAME) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // This one doesn't trigger - tests NOTHING_TO_TRIGGER + source.sendRecord(TableChange.builder().commitNum(1).build()); + + Awaitility.await() + .until( + () -> { + Long notingCounter = + MetricsReporterFactoryForTests.counter( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER); + return notingCounter != null && notingCounter.equals(1L); + }); + + // Trigger one of the tasks - tests TRIGGERED + source.sendRecord(TableChange.builder().commitNum(1).build()); + // Wait until we receive the trigger + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + assertThat( + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + .isEqualTo(1L); + lock.unlock(); + + // Trigger both of the tasks - tests TRIGGERED Review Comment: oh. I missed one TableChange event. thought there was only 3 commits. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTagBasedLockFactory.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.TagBasedLockFactory.RUNNING_TAG_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.ManageSnapshots; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class TestTagBasedLockFactory extends TestLockFactoryBase { Review Comment: > reusing the OperatorTestBase It would just lead me to wonder why is operator needed to test lock factory. It is not a blocker from my side if you have clear preference of this pattern. > TableLoader is needed for the TagBasedLockFactory. maybe use existing `HadoopTableExtension` which extends `HadoopCatalogExtension`? > Also SQL is used to create the table, and add a snapshot to it. maybe `GenericAppenderHelper` from iceberg-core? ########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} messages which are + * calculated based on the incoming {@link TableChange} messages. The TriggerManager keeps track of + * the changes since the last run of the Maintenance Tasks and triggers a new run based on the + * result of the {@link TriggerEvaluator}. + * + * <p>The TriggerManager prevents overlapping Maintenance Task runs using {@link + * org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory.Lock}. + * + * <p>The TriggerManager should run as a global operator. {@link KeyedProcessFunction} is used, so + * the timer functions are available, but the key is not used. + */ +@Internal +class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, Trigger> + implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class); + + private final TableLoader tableLoader; + private final TriggerLockFactory lockFactory; + private final List<String> taskNames; + private final List<TriggerEvaluator> evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunTriggeredCounter; + private transient Counter nothingToTriggerCounter; + private transient List<Counter> triggerCounters; + private transient ValueState<Long> nextEvaluationTimeState; + private transient ListState<TableChange> accumulatedChangesState; + private transient ListState<Long> lastTriggerTimesState; + private transient Long nextEvaluationTime; + private transient List<TableChange> accumulatedChanges; + private transient List<Long> lastTriggerTimes; + private transient TriggerLockFactory.Lock lock; + private transient TriggerLockFactory.Lock recoveryLock; + private transient boolean shouldRestoreTasks = false; + private transient boolean inited = false; + // To keep the task scheduling fair we keep the last triggered task position in memory. + // If we find a task to trigger, then we run it, but after it is finished, we start from the given + // position to prevent "starvation" of the tasks. + // When there is nothing to trigger, we start from the beginning, as the order of the tasks might + // be important (RewriteDataFiles first, and then RewriteManifestFiles later) + private transient int startsFrom = 0; + + TriggerManager( + TableLoader tableLoader, + TriggerLockFactory lockFactory, + List<String> taskNames, + List<TriggerEvaluator> evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(lockFactory, "Lock factory should no be null"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null or empty"); + Preconditions.checkArgument( + taskNames.size() == evaluators.size(), "Provide a name and evaluator for all of the tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 ms."); + + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskNames = taskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.rateLimiterTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunTriggeredCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED); + this.nothingToTriggerCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup( + TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT) + .counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = + taskNames.stream() + .map( + name -> + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.TRIGGERED)) + .collect(Collectors.toList()); + + this.nextEvaluationTimeState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + this.accumulatedChangesState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + this.lastTriggerTimesState = + getRuntimeContext() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + tableLoader.open(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (inited) { + // Only store state if initialized + nextEvaluationTimeState.update(nextEvaluationTime); + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + LOG.info("Initializing state restored: {}", context.isRestored()); + lockFactory.open(); + this.lock = lockFactory.createLock(); + this.recoveryLock = lockFactory.createRecoveryLock(); + if (context.isRestored()) { + shouldRestoreTasks = true; + } + } + + @Override + public void processElement(TableChange change, Context ctx, Collector<Trigger> out) + throws Exception { + init(out, ctx.timerService()); + + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + + long current = ctx.timerService().currentProcessingTime(); + Long nextTime = nextEvaluationTime; + if (nextTime == null) { + checkAndFire(current, ctx.timerService(), out); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {}", + current, + nextTime, + accumulatedChanges); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out) throws Exception { + init(out, ctx.timerService()); + this.nextEvaluationTime = null; + checkAndFire(ctx.timerService().currentProcessingTime(), ctx.timerService(), out); + } + + @Override + public void close() throws IOException { + tableLoader.close(); + lockFactory.close(); + } + + private void checkAndFire(long current, TimerService timerService, Collector<Trigger> out) { + if (shouldRestoreTasks) { + if (recoveryLock.isHeld()) { + // Recovered tasks in progress. Skip trigger check + LOG.debug("The cleanup lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } else { + LOG.info("The cleanup is finished at {}", current); + shouldRestoreTasks = false; + } + } + + Integer taskToStart = + nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, startsFrom); + if (taskToStart == null) { + // Nothing to execute + if (startsFrom == 0) { + nothingToTriggerCounter.inc(); + LOG.debug("Nothing to execute at {} for collected: {}", current, accumulatedChanges); + } + + // Next time start from the beginning + startsFrom = 0; + return; + } + + if (lock.tryLock()) { + TableChange change = accumulatedChanges.get(taskToStart); + SerializableTable table = + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()); + out.collect(Trigger.create(current, table, taskToStart)); + LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, table.name()); + triggerCounters.get(taskToStart).inc(); + accumulatedChanges.set(taskToStart, TableChange.empty()); + lastTriggerTimes.set(taskToStart, current); + schedule(timerService, current + minFireDelayMs); + startsFrom = taskToStart + 1; + } else { + // The lock is already held by someone + LOG.info("Delaying task on failed lock check: {}", current); + + startsFrom = taskToStart; + concurrentRunTriggeredCounter.inc(); + schedule(timerService, current + lockCheckDelayMs); + } + + timerService.registerProcessingTimeTimer(nextEvaluationTime); + } + + private void schedule(TimerService timerService, long time) { + this.nextEvaluationTime = time; + timerService.registerProcessingTimeTimer(time); + } + + private static Integer nextTrigger( + List<TriggerEvaluator> evaluators, + List<TableChange> changes, + List<Long> lastTriggerTimes, + long currentTime, + int startPos) { + int normalizedStartingPos = startPos % evaluators.size(); + int current = normalizedStartingPos; + do { + if (evaluators + .get(current) + .check(changes.get(current), lastTriggerTimes.get(current), currentTime)) { + return current; + } + + current = (current + 1) % evaluators.size(); + } while (current != normalizedStartingPos); + + return null; + } + + private void init(Collector<Trigger> out, TimerService timerService) throws Exception { + if (!inited) { + long current = timerService.currentProcessingTime(); + + // Initialize from state + this.nextEvaluationTime = nextEvaluationTimeState.value(); Review Comment: keyed state only available after `keyBy`. doubt it is related to construct the state variable in the `open` method or `initializeState` method. This should probably also be an operator list state with a single element. This way, we don't need the keyed stream. I remember we discussed the keyed stream before. is this the reason/problem? ``` this.nextEvaluationTimeState = getRuntimeContext() .getState(new ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); ``` ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/MetricsReporterFactoryForTests.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.MetricReporterFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class MetricsReporterFactoryForTests implements MetricReporterFactory { Review Comment: not sure if I understand the question. I was asking if we can replace this class with Flink's existing `CollectingMetricsReporter`. Its base class stores the metrics using Map like here. ``` public abstract class AbstractReporter implements MetricReporter, CharacterFilter { protected final Logger log = LoggerFactory.getLogger(getClass()); protected final Map<Gauge<?>, String> gauges = new HashMap<>(); protected final Map<Counter, String> counters = new HashMap<>(); protected final Map<Histogram, String> histograms = new HashMap<>(); protected final Map<Meter, String> meters = new HashMap<>(); ``` ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManager.java: ########## @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.DUMMY_NAME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME; +import static org.apache.iceberg.flink.maintenance.operator.ConstantsForTests.EVENT_TIME_2; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.GROUP_VALUE_DEFAULT; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class TestTriggerManager extends OperatorTestBase { + private static final long DELAY = 10L; + private static final String NAME_1 = "name1"; + private static final String NAME_2 = "name2"; + private long processingTime = 0L; + private TriggerLockFactory lockFactory; + private TriggerLockFactory.Lock lock; + private TriggerLockFactory.Lock recoveringLock; + + @BeforeEach + void before() { + sql.exec("CREATE TABLE %s (id int, data varchar)", TABLE_NAME); + this.lockFactory = new TagBasedLockFactory(sql.tableLoader(TABLE_NAME)); + lockFactory.open(); + this.lock = lockFactory.createLock(); + this.recoveringLock = lockFactory.createRecoveryLock(); + lock.unlock(); + recoveringLock.unlock(); + MetricsReporterFactoryForTests.reset(); + } + + @AfterEach + void after() throws IOException { + lockFactory.close(); + } + + @Test + void testCommitNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().commitNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 0); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(3).build(), 2); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(1).build(), 4); + } + } + + @Test + void testFileNumber() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 0); + + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(1).deleteFileNum(1).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(3).build(), 2); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(5).deleteFileNum(7).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileNum(1).build(), 4); + } + } + + @Test + void testFileSize() throws Exception { + TriggerManager manager = + manager(sql.tableLoader(TABLE_NAME), new TriggerEvaluator.Builder().fileSize(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 0); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileSize(1L).deleteFileSize(1L).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileSize(3L).build(), 2); + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileSize(5L).deleteFileSize(7L).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileSize(1L).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().dataFileSize(1L).build(), 4); + } + } + + @Test + void testDeleteFileNumber() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().deleteFileNumber(3).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult( + testHarness, TableChange.builder().dataFileNum(3).deleteFileNum(1).build(), 0); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(2).build(), 1); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(3).build(), 2); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 3); + + addEventAndCheckResult(testHarness, TableChange.builder().deleteFileNum(1).build(), 4); + } + } + + @Test + void testTimeout() throws Exception { + TriggerManager manager = + manager( + sql.tableLoader(TABLE_NAME), + new TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build()); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + TableChange event = TableChange.builder().dataFileSize(1).commitNum(1).build(); + + // Wait for some time + testHarness.processElement(event, EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + // Wait for the timeout to expire + long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Remove the lock to allow the next trigger + lock.unlock(); + + // Send a new event + testHarness.setProcessingTime(newTime + 1); + testHarness.processElement(event, newTime); + + // No trigger yet + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Send a new event + newTime += Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); + + // New trigger should arrive + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testStateRestore() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader); + OperatorSubtaskState state; + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + testHarness.processElement( + TableChange.builder().dataFileSize(1).commitNum(1).build(), EVENT_TIME); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + state = testHarness.snapshot(1, EVENT_TIME); + } + + // Restore the state, write some more data, create a checkpoint, check the data which is written + manager = manager(tableLoader); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.initializeState(state); + testHarness.open(); + + // Arrives the first real change + testHarness.processElement(TableChange.builder().commitNum(1).build(), EVENT_TIME_2); + assertTriggers( + testHarness.extractOutputValues(), + Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime()))); + + // Remove the lock to allow the next trigger + recoveringLock.unlock(); + testHarness.setProcessingTime(EVENT_TIME_2); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testMinFireDelay() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader, DELAY, 1); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + long currentTime = testHarness.getProcessingTime(); + + // No new fire yet + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testLogCheckDelay() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader, 1, DELAY); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + + // Create a lock to prevent execution, and check that there is no result + assertThat(lock.tryLock()).isTrue(); + addEventAndCheckResult(testHarness, TableChange.builder().commitNum(2).build(), 1); + long currentTime = testHarness.getProcessingTime(); + + // Remove the lock, and still no trigger + lock.unlock(); + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + /** + * Simulating recovery scenarios where there is a leftover table lock, and ongoing maintenance + * task. + * + * @param tableLock left on the table on job recovery + * @param runningTask is running and continues to run after job recovery + */ + @ParameterizedTest + @MethodSource("parametersForTestRecovery") + void testRecovery(boolean tableLock, boolean runningTask) throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + TriggerManager manager = manager(tableLoader); + OperatorSubtaskState state; + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.open(); + state = testHarness.snapshot(1, EVENT_TIME); + } + + if (tableLock) { + assertThat(lock.tryLock()).isTrue(); + } + + manager = manager(tableLoader); + List<Trigger> expected = Lists.newArrayListWithExpectedSize(3); + try (KeyedOneInputStreamOperatorTestHarness<Boolean, TableChange, Trigger> testHarness = + harness(manager)) { + testHarness.initializeState(state); + testHarness.open(); + + ++processingTime; + expected.add(Trigger.recovery(processingTime)); + testHarness.setProcessingTime(processingTime); + testHarness.processElement(TableChange.builder().commitNum(2).build(), processingTime); + assertTriggers(testHarness.extractOutputValues(), expected); + + // Nothing happens until the recovery is finished + ++processingTime; + testHarness.setProcessingTime(processingTime); + assertTriggers(testHarness.extractOutputValues(), expected); + + if (runningTask) { + lock.unlock(); + } + + // Still no results as the recovery is ongoing + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(TableChange.builder().commitNum(2).build(), processingTime); + assertTriggers(testHarness.extractOutputValues(), expected); + + // All locks are removed when the recovery trigger is received by the lock cleaner + lock.unlock(); + recoveringLock.unlock(); + + // Emit only a single trigger + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(TableChange.builder().commitNum(2).build(), processingTime); + // Releasing lock will create a new snapshot, and we receive this in the trigger + expected.add( + Trigger.create( + processingTime, + (SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()), + 0)); + assertTriggers(testHarness.extractOutputValues(), expected); + } + } + + @Test + void testTriggerMetrics() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource<TableChange> source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink<Trigger> sink = new CollectingSink<>(); + + TriggerManager manager = + new TriggerManager( + tableLoader, + lockFactory, + Lists.newArrayList(NAME_1, NAME_2), + Lists.newArrayList( + new TriggerEvaluator.Builder().commitNumber(2).build(), + new TriggerEvaluator.Builder().commitNumber(4).build()), + 1L, + 1L); + source + .dataStream() + .keyBy(unused -> true) + .process(manager) + .name(DUMMY_NAME) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // This one doesn't trigger - tests NOTHING_TO_TRIGGER + source.sendRecord(TableChange.builder().commitNum(1).build()); + + Awaitility.await() + .until( + () -> { + Long notingCounter = + MetricsReporterFactoryForTests.counter( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER); + return notingCounter != null && notingCounter.equals(1L); + }); + + // Trigger one of the tasks - tests TRIGGERED + source.sendRecord(TableChange.builder().commitNum(1).build()); + // Wait until we receive the trigger + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + assertThat( + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + .isEqualTo(1L); + lock.unlock(); + + // Trigger both of the tasks - tests TRIGGERED + source.sendRecord(TableChange.builder().commitNum(2).build()); + // Wait until we receive the trigger + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + lock.unlock(); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + lock.unlock(); + assertThat( + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED)) + .isEqualTo(2L); + assertThat( + MetricsReporterFactoryForTests.counter(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED)) + .isEqualTo(1L); + + // Final check all the counters + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder<String, Long>() + .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED, -1L) + .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_TRIGGERED, -1L) + .put(DUMMY_NAME + "." + NAME_1 + "." + TRIGGERED, 2L) + .put(DUMMY_NAME + "." + NAME_2 + "." + TRIGGERED, 1L) + .put(DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + NOTHING_TO_TRIGGER, 1L) + .build()); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testRateLimiterMetrics() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource<TableChange> source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink<Trigger> sink = new CollectingSink<>(); + + // High delay, so only triggered once + TriggerManager manager = manager(tableLoader, 1_000_000L, 1L); + source + .dataStream() + .keyBy(unused -> true) + .process(manager) + .name(DUMMY_NAME) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Start the first trigger + source.sendRecord(TableChange.builder().commitNum(2).build()); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + + // Remove the lock to allow the next trigger + lock.unlock(); + + // The second trigger will be blocked + source.sendRecord(TableChange.builder().commitNum(2).build()); + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + RATE_LIMITER_TRIGGERED) + .equals(1L)); + + // Final check all the counters + assertCounters(1L, 0L); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testConcurrentRunMetrics() throws Exception { + TableLoader tableLoader = sql.tableLoader(TABLE_NAME); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource<TableChange> source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink<Trigger> sink = new CollectingSink<>(); + + // High delay, so only triggered once + TriggerManager manager = manager(tableLoader, 1L, 1_000_000L); + source + .dataStream() + .keyBy(unused -> true) + .process(manager) + .name(DUMMY_NAME) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Start the first trigger - notice that we do not remove the lock after the trigger + source.sendRecord(TableChange.builder().commitNum(2).build()); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + + // The second trigger will be blocked by the lock + source.sendRecord(TableChange.builder().commitNum(2).build()); + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + DUMMY_NAME + "." + GROUP_VALUE_DEFAULT + "." + CONCURRENT_RUN_TRIGGERED) Review Comment: I am saying if `concurrentRunTriggered` is the accurate metric name. seems more like `concurrentRunThrottled` to me, because it is not triggered and was scheduled for later check instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org