stevenzwu commented on code in PR #11144: URL: https://github.com/apache/iceberg/pull/11144#discussion_r1763635282
########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Delete the files using the {@link FileIO}. */ +@Internal +public class AsyncDeleteFiles extends RichAsyncFunction<String, Boolean> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncDeleteFiles.class); + public static final Predicate<Collection<Boolean>> FAILED_PREDICATE = new FailedPredicate(); + + private final String name; + private final FileIO io; + private final int workerPoolSize; + private final String tableName; + + private transient ExecutorService workerPool; + private transient Counter failedCounter; + private transient Counter succeededCounter; + + public AsyncDeleteFiles(String name, TableLoader tableLoader, int workerPoolSize) { + Preconditions.checkNotNull(name, "Name should no be null"); + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.name = name; + tableLoader.open(); + Table table = tableLoader.loadTable(); + this.io = table.io(); + this.workerPoolSize = workerPoolSize; + this.tableName = table.name(); + } + + @Override + public void open(Configuration parameters) throws Exception { + this.failedCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER); + this.succeededCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER); + + this.workerPool = Review Comment: this is where I meant if we should use the shared thread pool here. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could + * be removed in the {@link #DELETE_STREAM} side output. + */ +public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class); + public static final OutputTag<String> DELETE_STREAM = + new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING); + + private final TableLoader tableLoader; + private final Long maxSnapshotAgeMs; + private final Integer retainLast; + private final int plannerPoolSize; + private transient ExecutorService plannerPool; + private transient Table table; + + public ExpireSnapshotsProcessor( + TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int plannerPoolSize) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.tableLoader = tableLoader; + this.maxSnapshotAgeMs = maxSnapshotAgeMs; + this.retainLast = retainLast; + this.plannerPoolSize = plannerPoolSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.plannerPool = ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize); + } + + @Override + public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> out) + throws Exception { + try { + table.refresh(); + ExpireSnapshots expireSnapshots = table.expireSnapshots(); + if (maxSnapshotAgeMs != null) { + expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - maxSnapshotAgeMs); + } + + if (retainLast != null) { + expireSnapshots = expireSnapshots.retainLast(retainLast); + } + + AtomicLong deleteFileCounter = new AtomicLong(0L); + expireSnapshots + .planWith(plannerPool) + .deleteWith( + file -> { + ctx.output(DELETE_STREAM, file); + deleteFileCounter.incrementAndGet(); + }) + .cleanExpiredFiles(true) + .commit(); + + LOG.info( + "Successfully finished expiring snapshots for {} at {}. Scheduled {} files for delete.", + table, + ctx.timestamp(), + deleteFileCounter.get()); + out.collect( + new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList())); + } catch (Exception e) { + LOG.info("Exception expiring snapshots for {} at {}", table, ctx.timestamp(), e); Review Comment: error level here? nit: error msg typically starts with sth like `Failed to ` or `Cannot ` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; + + private ExpireSnapshots() { + // Do not instantiate directly + } + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> { + private Duration maxSnapshotAge = null; + private Integer retainLast = null; + private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); + private int deleteAttemptNum = DELETE_ATTEMPT_NUM; + private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); + + /** + * The snapshots newer than this age will not be removed. + * + * @param newMaxSnapshotAge of the snapshots to be removed + * @return for chained calls + */ + public Builder maxSnapshotAge(Duration newMaxSnapshotAge) { + this.maxSnapshotAge = newMaxSnapshotAge; + return this; + } + + /** + * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more details description see + * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}. + * + * @param newRetainLast number of snapshots to retain + * @return for chained calls + */ + public Builder retainLast(int newRetainLast) { + this.retainLast = newRetainLast; + return this; + } + + /** + * The worker pool size used to calculate the files to delete. + * + * @param newPlanningWorkerPoolSize for planning files to delete + * @return for chained calls + */ + public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) { + this.planningWorkerPoolSize = newPlanningWorkerPoolSize; + return this; + } + + /** + * The number of retries on the failed delete attempts. + * + * @param newDeleteAttemptNum number of retries + * @return for chained calls + */ + public Builder deleteAttemptNum(int newDeleteAttemptNum) { + this.deleteAttemptNum = newDeleteAttemptNum; + return this; + } + + /** + * The worker pool size used for deleting files. + * + * @param newDeleteWorkerPoolSize for scanning + * @return for chained calls + */ + public Builder deleteWorkerPoolSize(int newDeleteWorkerPoolSize) { + this.deleteWorkerPoolSize = newDeleteWorkerPoolSize; + return this; + } + + @Override + DataStream<TaskResult> append(DataStream<Trigger> trigger) { + Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); + + SingleOutputStreamOperator<TaskResult> result = + trigger + .process( + new ExpireSnapshotsProcessor( + tableLoader(), + maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), + retainLast, + planningWorkerPoolSize)) + .name(EXECUTOR_TASK_NAME) + .uid("expire-snapshots-" + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + AsyncRetryStrategy<Boolean> retryStrategy = + new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<Boolean>( + deleteAttemptNum, + DELETE_INITIAL_DELAY_MS, + DELETE_MAX_RETRY_DELAY_MS, + DELETE_BACKOFF_MULTIPLIER) + .ifResult(AsyncDeleteFiles.FAILED_PREDICATE) + .build(); + + AsyncDataStream.unorderedWaitWithRetry( + result.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM).rebalance(), + new AsyncDeleteFiles(name(), tableLoader(), deleteWorkerPoolSize), + DELETE_TIMEOUT_MS, + TimeUnit.MILLISECONDS, + deleteWorkerPoolSize, + retryStrategy) + .name(DELETE_FILES_TASK_NAME) + .uid("delete-expired-files-" + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); + + // Deleting the files is asynchronous, so we ignore the results when calculating the return Review Comment: nit: `ignore the results` can be clarified. maybe sth like this ``` Ignore the async file deletion boolean result and return the DataStream<TaskResult> directly ``` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; + + private ExpireSnapshots() { + // Do not instantiate directly + } + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> { + private Duration maxSnapshotAge = null; + private Integer retainLast = null; + private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); + private int deleteAttemptNum = DELETE_ATTEMPT_NUM; + private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); + + /** + * The snapshots newer than this age will not be removed. + * + * @param newMaxSnapshotAge of the snapshots to be removed + * @return for chained calls Review Comment: remove the `@return` line. seems obvious to document it. this applies to all builder methods ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; + + private ExpireSnapshots() { + // Do not instantiate directly + } + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> { + private Duration maxSnapshotAge = null; + private Integer retainLast = null; + private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); + private int deleteAttemptNum = DELETE_ATTEMPT_NUM; + private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); + + /** + * The snapshots newer than this age will not be removed. Review Comment: nit: more direc. either `The snapshots newer than this age will be retained` or `The snapshots older than this age will be removed` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + private static final String TASK_NAME_FORMAT = "%s [%d]"; + static final String SOURCE_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager"; + static final String LOCK_REMOVER_TASK_NAME = "Lock remover"; + + private TableMaintenance() { + // Do not instantiate directly + } + + /** + * Use when the change stream is already provided, like in the {@link + * org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Creates the default monitor source for collecting the table changes and returns a builder for + * the maintenance stream. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMillis(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); + private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + this.env = env; + this.inputStream = null; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + private Builder( + DataStream<TableChange> inputStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = null; + this.inputStream = inputStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + * @return for chained calls + */ + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + * @return for chained calls + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + * @return for chained calls + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newLockCheckDelay lock checking frequency + * @return for chained calls + */ + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; + return this; + } + + /** + * Sets the global parallelism of maintenance tasks. Could be overwritten by the {@link + * MaintenanceTaskBuilder#parallelism(int)}. + * + * @param newParallelism task parallelism + * @return for chained calls + */ + public Builder parallelism(int newParallelism) { + this.parallelism = newParallelism; + return this; + } + + /** + * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at + * the first time. Only available when the {@link + * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, TriggerLockFactory)} is + * used. + * + * @param newMaxReadBack snapshots to consider when initializing + * @return for chained calls + */ + public Builder maxReadBack(int newMaxReadBack) { + Preconditions.checkArgument( + inputStream == null, "Can't set maxReadBack when change stream is provided"); + this.maxReadBack = newMaxReadBack; + return this; + } + + /** + * Adds a specific task with the given schedule. + * + * @param task to add + * @return for chained calls + */ + public Builder add(MaintenanceTaskBuilder<?> task) { + taskBuilders.add(task); + return this; + } + + /** Builds the task graph for the maintenance tasks. */ + public void append() { + Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); + Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); + + List<String> taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); + List<TriggerEvaluator> evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); + for (int i = 0; i < taskBuilders.size(); ++i) { + taskNames.add(nameFor(taskBuilders.get(i), i)); + evaluators.add(taskBuilders.get(i).evaluator()); + } + + DataStream<Trigger> triggers = + DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> true) + .process( + new TriggerManager( + tableLoader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_TASK_NAME) + .uid("trigger-manager-" + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel() + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) + .name("Watermark Assigner") + .uid("watermark-assigner-" + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream<TaskResult> unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + int finalIndex = i; + DataStream<Trigger> filtered = + triggers + .filter(t -> t.taskId() != null && t.taskId() == finalIndex) + .name("Filter " + i) Review Comment: again, might be good to make name and uid the same as we did for v2 sink ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> { + private int index; + private String name; + private TableLoader tableLoader; + private String uidSuffix = null; + private String slotSharingGroup = null; + private Integer parallelism = null; + private TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); + + abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream); + + /** + * After a given number of Iceberg table commits since the last run, starts the downstream job. + * + * @param commitCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnCommitCount(int commitCount) { + triggerEvaluator.commitCount(commitCount); + return (T) this; + } + + /** + * After a given number of new data files since the last run, starts the downstream job. + * + * @param dataFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnDataFileCount(int dataFileCount) { + triggerEvaluator.dataFileCount(dataFileCount); + return (T) this; + } + + /** + * After a given aggregated data file size since the last run, starts the downstream job. + * + * @param dataFileSizeInBytes after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnDataFileSize(long dataFileSizeInBytes) { + triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes); + return (T) this; + } + + /** + * After a given number of new positional delete files since the last run, starts the downstream + * job. + * + * @param posDeleteFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) { + triggerEvaluator.posDeleteFileCount(posDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new positional delete records since the last run, starts the downstream + * job. + * + * @param posDeleteRecordCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) { + triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount); + return (T) this; + } + + /** + * After a given number of new equality delete files since the last run, starts the downstream + * job. + * + * @param eqDeleteFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) { + triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new equality delete records since the last run, starts the downstream + * job. + * + * @param eqDeleteRecordCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) { + triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount); + return (T) this; + } + + /** + * After a given time since the last run, starts the downstream job. + * + * @param interval after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnInterval(Duration interval) { + triggerEvaluator.timeout(interval); + return (T) this; + } + + /** + * The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + * @return for chained calls + */ + public T uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return (T) this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + * @return for chained calls + */ + public T slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return (T) this; + } + + /** + * Sets the parallelism for the stream. + * + * @param newParallelism the required parallelism + * @return for chained calls + */ + public T parallelism(int newParallelism) { + this.parallelism = newParallelism; + return (T) this; + } + + @Internal + int id() { + return index; + } + + @Internal + String name() { + return name; + } + + @Internal + TableLoader tableLoader() { + return tableLoader; + } + + @Internal + String uidSuffix() { + return uidSuffix; + } + + @Internal + String slotSharingGroup() { + return slotSharingGroup; + } + + @Internal + Integer parallelism() { + return parallelism; + } + + @Internal + TriggerEvaluator evaluator() { + return triggerEvaluator.build(); + } + + @Internal + DataStream<TaskResult> append( + DataStream<Trigger> sourceStream, + int maintenanceTaskIndex, + String maintainanceTaskName, + TableLoader newTableLoader, + String mainUidSuffix, + String mainSlotSharingGroup, + int mainParallelism) { + Preconditions.checkArgument( + parallelism == null || parallelism == -1 || parallelism > 0, Review Comment: this assertion condition doesn't seem correct. it should assert on the input arg `mainParallelism`. also should `mainParallelism` be non-primitive `Integer`? if yes, we should only need to assert two conditions of `mainParallelism == null || mainParallelism > 0`. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could + * be removed in the {@link #DELETE_STREAM} side output. + */ +public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class); + public static final OutputTag<String> DELETE_STREAM = + new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING); + + private final TableLoader tableLoader; + private final Long maxSnapshotAgeMs; + private final Integer retainLast; + private final int plannerPoolSize; + private transient ExecutorService plannerPool; + private transient Table table; + + public ExpireSnapshotsProcessor( + TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int plannerPoolSize) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.tableLoader = tableLoader; + this.maxSnapshotAgeMs = maxSnapshotAgeMs; + this.retainLast = retainLast; + this.plannerPoolSize = plannerPoolSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.plannerPool = ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize); + } + + @Override + public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> out) + throws Exception { + try { + table.refresh(); + ExpireSnapshots expireSnapshots = table.expireSnapshots(); + if (maxSnapshotAgeMs != null) { + expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - maxSnapshotAgeMs); + } + + if (retainLast != null) { + expireSnapshots = expireSnapshots.retainLast(retainLast); + } + + AtomicLong deleteFileCounter = new AtomicLong(0L); + expireSnapshots + .planWith(plannerPool) + .deleteWith( + file -> { + ctx.output(DELETE_STREAM, file); + deleteFileCounter.incrementAndGet(); + }) + .cleanExpiredFiles(true) Review Comment: maybe we should add Javadoc to the `ExpireSnapshots` class that expired files are always deleted ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could + * be removed in the {@link #DELETE_STREAM} side output. + */ +public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> { + private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class); + public static final OutputTag<String> DELETE_STREAM = + new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING); + + private final TableLoader tableLoader; + private final Long maxSnapshotAgeMs; + private final Integer retainLast; + private final int plannerPoolSize; + private transient ExecutorService plannerPool; + private transient Table table; + + public ExpireSnapshotsProcessor( + TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int plannerPoolSize) { + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.tableLoader = tableLoader; + this.maxSnapshotAgeMs = maxSnapshotAgeMs; + this.retainLast = retainLast; + this.plannerPoolSize = plannerPoolSize; + } + + @Override + public void open(Configuration parameters) throws Exception { + tableLoader.open(); + this.table = tableLoader.loadTable(); + this.plannerPool = ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize); + } + + @Override + public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> out) + throws Exception { + try { + table.refresh(); + ExpireSnapshots expireSnapshots = table.expireSnapshots(); + if (maxSnapshotAgeMs != null) { + expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - maxSnapshotAgeMs); + } + + if (retainLast != null) { + expireSnapshots = expireSnapshots.retainLast(retainLast); + } + + AtomicLong deleteFileCounter = new AtomicLong(0L); + expireSnapshots + .planWith(plannerPool) + .deleteWith( + file -> { + ctx.output(DELETE_STREAM, file); + deleteFileCounter.incrementAndGet(); + }) + .cleanExpiredFiles(true) + .commit(); + + LOG.info( + "Successfully finished expiring snapshots for {} at {}. Scheduled {} files for delete.", + table, + ctx.timestamp(), + deleteFileCounter.get()); + out.collect( + new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList())); + } catch (Exception e) { + LOG.info("Exception expiring snapshots for {} at {}", table, ctx.timestamp(), e); + out.collect( + new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e))); Review Comment: `TaskResult` has ` List<Exception> exceptions`. wondering what scenario would we have a list of exceptions to propagate? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; Review Comment: maybe add some comment to explain multiplier choice of `1.5`. more common choice is `2`. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the filenames which could + * be removed in the {@link #DELETE_STREAM} side output. + */ +public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResult> { Review Comment: this can be marked as `@Internal` or package private if we can agree to remove the sub packages ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; Review Comment: I am wondering if we need this timeout. FileIO typically has a timeout config internally. If we are configuring another timeout here, we may have inconsistency. E.g., a lower value here would cause premature declaration of failure. should we just rely on the underneath FileIO timeouts. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; + + private ExpireSnapshots() { + // Do not instantiate directly + } + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> { + private Duration maxSnapshotAge = null; + private Integer retainLast = null; + private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); + private int deleteAttemptNum = DELETE_ATTEMPT_NUM; + private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); + + /** + * The snapshots newer than this age will not be removed. + * + * @param newMaxSnapshotAge of the snapshots to be removed + * @return for chained calls + */ + public Builder maxSnapshotAge(Duration newMaxSnapshotAge) { + this.maxSnapshotAge = newMaxSnapshotAge; + return this; + } + + /** + * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more details description see + * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}. + * + * @param newRetainLast number of snapshots to retain + * @return for chained calls + */ + public Builder retainLast(int newRetainLast) { + this.retainLast = newRetainLast; + return this; + } + + /** + * The worker pool size used to calculate the files to delete. + * + * @param newPlanningWorkerPoolSize for planning files to delete + * @return for chained calls + */ + public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) { + this.planningWorkerPoolSize = newPlanningWorkerPoolSize; + return this; + } + + /** + * The number of retries on the failed delete attempts. + * + * @param newDeleteAttemptNum number of retries + * @return for chained calls + */ + public Builder deleteAttemptNum(int newDeleteAttemptNum) { + this.deleteAttemptNum = newDeleteAttemptNum; + return this; + } + + /** + * The worker pool size used for deleting files. + * + * @param newDeleteWorkerPoolSize for scanning + * @return for chained calls + */ + public Builder deleteWorkerPoolSize(int newDeleteWorkerPoolSize) { + this.deleteWorkerPoolSize = newDeleteWorkerPoolSize; + return this; + } + + @Override + DataStream<TaskResult> append(DataStream<Trigger> trigger) { + Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); + + SingleOutputStreamOperator<TaskResult> result = + trigger + .process( + new ExpireSnapshotsProcessor( + tableLoader(), + maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), + retainLast, + planningWorkerPoolSize)) + .name(EXECUTOR_TASK_NAME) + .uid("expire-snapshots-" + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + AsyncRetryStrategy<Boolean> retryStrategy = + new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder<Boolean>( + deleteAttemptNum, + DELETE_INITIAL_DELAY_MS, + DELETE_MAX_RETRY_DELAY_MS, + DELETE_BACKOFF_MULTIPLIER) + .ifResult(AsyncDeleteFiles.FAILED_PREDICATE) + .build(); + + AsyncDataStream.unorderedWaitWithRetry( + result.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM).rebalance(), + new AsyncDeleteFiles(name(), tableLoader(), deleteWorkerPoolSize), + DELETE_TIMEOUT_MS, + TimeUnit.MILLISECONDS, + deleteWorkerPoolSize, + retryStrategy) + .name(DELETE_FILES_TASK_NAME) + .uid("delete-expired-files-" + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); Review Comment: parallelism is non-primitive `Integer` and can be null. probably only call `setParallelism` if not null. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; Review Comment: It is unclear what `ES` means. please use full name. also we can probably set the operator name and uid the same. right now, name is set to this constant. at least, it should include the `uidSuffix`. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; Review Comment: nit: this seems more like an operator name, not maintenance task name. again, would be simpler to just set the operator uid and name the same. that is what we do in the v2 sink ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; + + private ExpireSnapshots() { + // Do not instantiate directly + } + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> { + private Duration maxSnapshotAge = null; + private Integer retainLast = null; + private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); + private int deleteAttemptNum = DELETE_ATTEMPT_NUM; + private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); + + /** + * The snapshots newer than this age will not be removed. + * + * @param newMaxSnapshotAge of the snapshots to be removed + * @return for chained calls + */ + public Builder maxSnapshotAge(Duration newMaxSnapshotAge) { + this.maxSnapshotAge = newMaxSnapshotAge; + return this; + } + + /** + * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more details description see Review Comment: nit: should be `The minimum number of ... to retain`. also use import for the linked class? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; +import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class ExpireSnapshots { + private static final long DELETE_INITIAL_DELAY_MS = 10L; + private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; + private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; + private static final long DELETE_TIMEOUT_MS = 10000L; + private static final int DELETE_ATTEMPT_NUM = 10; + private static final String EXECUTOR_TASK_NAME = "ES Executor"; + @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; + + private ExpireSnapshots() { + // Do not instantiate directly + } + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> { + private Duration maxSnapshotAge = null; + private Integer retainLast = null; + private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); + private int deleteAttemptNum = DELETE_ATTEMPT_NUM; + private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); + + /** + * The snapshots newer than this age will not be removed. + * + * @param newMaxSnapshotAge of the snapshots to be removed + * @return for chained calls + */ + public Builder maxSnapshotAge(Duration newMaxSnapshotAge) { + this.maxSnapshotAge = newMaxSnapshotAge; + return this; + } + + /** + * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more details description see + * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}. + * + * @param newRetainLast number of snapshots to retain + * @return for chained calls + */ + public Builder retainLast(int newRetainLast) { + this.retainLast = newRetainLast; + return this; + } + + /** + * The worker pool size used to calculate the files to delete. + * + * @param newPlanningWorkerPoolSize for planning files to delete + * @return for chained calls + */ + public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) { + this.planningWorkerPoolSize = newPlanningWorkerPoolSize; + return this; + } + + /** + * The number of retries on the failed delete attempts. + * + * @param newDeleteAttemptNum number of retries + * @return for chained calls + */ + public Builder deleteAttemptNum(int newDeleteAttemptNum) { Review Comment: nit: maybe `deleteRetryNum` to be more clear it is retry. kind of like `COMMIT_NUM_RETRIES` from `TableProperties` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + private static final String TASK_NAME_FORMAT = "%s [%d]"; + static final String SOURCE_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager"; + static final String LOCK_REMOVER_TASK_NAME = "Lock remover"; + + private TableMaintenance() { + // Do not instantiate directly + } + + /** + * Use when the change stream is already provided, like in the {@link + * org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Creates the default monitor source for collecting the table changes and returns a builder for Review Comment: nit: is this more clear? ``` Use this for standalone maintenance job. It creates a monitor source that detect table changes and build the maintenance pipelines afterwards. ``` ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + private static final String TASK_NAME_FORMAT = "%s [%d]"; + static final String SOURCE_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager"; + static final String LOCK_REMOVER_TASK_NAME = "Lock remover"; + + private TableMaintenance() { + // Do not instantiate directly + } + + /** + * Use when the change stream is already provided, like in the {@link + * org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Creates the default monitor source for collecting the table changes and returns a builder for + * the maintenance stream. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMillis(1); Review Comment: is `1 ms` a good default? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + private static final String TASK_NAME_FORMAT = "%s [%d]"; + static final String SOURCE_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager"; + static final String LOCK_REMOVER_TASK_NAME = "Lock remover"; + + private TableMaintenance() { + // Do not instantiate directly + } + + /** + * Use when the change stream is already provided, like in the {@link + * org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Creates the default monitor source for collecting the table changes and returns a builder for + * the maintenance stream. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMillis(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); Review Comment: is `30s` a good default? is that based on the estimated average of task run time? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + private static final String TASK_NAME_FORMAT = "%s [%d]"; + static final String SOURCE_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager"; + static final String LOCK_REMOVER_TASK_NAME = "Lock remover"; + + private TableMaintenance() { + // Do not instantiate directly + } + + /** + * Use when the change stream is already provided, like in the {@link + * org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Creates the default monitor source for collecting the table changes and returns a builder for + * the maintenance stream. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMillis(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); + private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + this.env = env; + this.inputStream = null; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + private Builder( + DataStream<TableChange> inputStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = null; + this.inputStream = inputStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + * @return for chained calls + */ + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + * @return for chained calls + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + * @return for chained calls + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newLockCheckDelay lock checking frequency + * @return for chained calls + */ + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; + return this; + } + + /** + * Sets the global parallelism of maintenance tasks. Could be overwritten by the {@link Review Comment: nit: global -> default? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Delete the files using the {@link FileIO}. */ +@Internal +public class AsyncDeleteFiles extends RichAsyncFunction<String, Boolean> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncDeleteFiles.class); + public static final Predicate<Collection<Boolean>> FAILED_PREDICATE = new FailedPredicate(); + + private final String name; + private final FileIO io; + private final int workerPoolSize; + private final String tableName; + + private transient ExecutorService workerPool; + private transient Counter failedCounter; + private transient Counter succeededCounter; + + public AsyncDeleteFiles(String name, TableLoader tableLoader, int workerPoolSize) { + Preconditions.checkNotNull(name, "Name should no be null"); + Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + + this.name = name; + tableLoader.open(); + Table table = tableLoader.loadTable(); + this.io = table.io(); + this.workerPoolSize = workerPoolSize; + this.tableName = table.name(); + } + + @Override + public void open(Configuration parameters) throws Exception { + this.failedCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER); + this.succeededCounter = + getRuntimeContext() + .getMetricGroup() + .addGroup(TableMaintenanceMetrics.GROUP_KEY, name) + .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER); + + this.workerPool = + ThreadPools.newWorkerPool(tableName + "-" + name + "-async-delete-files", workerPoolSize); + } + + @Override + public void asyncInvoke(String fileName, ResultFuture<Boolean> resultFuture) { + workerPool.execute( + () -> { + try { + LOG.info("Deleting file: {} with {}", fileName, name); + io.deleteFile(fileName); Review Comment: > It is usually not a good practice to block a checkpoint with long running tasks. I'm a bit concerned because of the delay caused by executing the deletes on checkpoint. I see no difference with `IcebergStreamWriter#prepareSnapshotPreBarrier`, which does the file flush and upload in the synchronous part of checkpoint. > I agree that the other FileIO implementations could be faster with the bulk delete Bulk deletes are not only a lot faster. they can also avoid potential throttling from S3. We should definitely leverage bulk deletes whenever it is applicable. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java: ########## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerLockFactory; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + private static final String TASK_NAME_FORMAT = "%s [%d]"; + static final String SOURCE_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_TASK_NAME = "Trigger manager"; + static final String LOCK_REMOVER_TASK_NAME = "Lock remover"; + + private TableMaintenance() { + // Do not instantiate directly + } + + /** + * Use when the change stream is already provided, like in the {@link + * org.apache.iceberg.flink.sink.IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Creates the default monitor source for collecting the table changes and returns a builder for + * the maintenance stream. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMillis(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); + private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + this.env = env; + this.inputStream = null; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + private Builder( + DataStream<TableChange> inputStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = null; + this.inputStream = inputStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + * @return for chained calls + */ + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + * @return for chained calls + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + * @return for chained calls + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newLockCheckDelay lock checking frequency + * @return for chained calls + */ + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; + return this; + } + + /** + * Sets the global parallelism of maintenance tasks. Could be overwritten by the {@link + * MaintenanceTaskBuilder#parallelism(int)}. + * + * @param newParallelism task parallelism + * @return for chained calls + */ + public Builder parallelism(int newParallelism) { + this.parallelism = newParallelism; + return this; + } + + /** + * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at + * the first time. Only available when the {@link + * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, TriggerLockFactory)} is + * used. + * + * @param newMaxReadBack snapshots to consider when initializing + * @return for chained calls + */ + public Builder maxReadBack(int newMaxReadBack) { + Preconditions.checkArgument( + inputStream == null, "Can't set maxReadBack when change stream is provided"); + this.maxReadBack = newMaxReadBack; + return this; + } + + /** + * Adds a specific task with the given schedule. + * + * @param task to add + * @return for chained calls + */ + public Builder add(MaintenanceTaskBuilder<?> task) { + taskBuilders.add(task); + return this; + } + + /** Builds the task graph for the maintenance tasks. */ + public void append() { + Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); + Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); + + List<String> taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); + List<TriggerEvaluator> evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); + for (int i = 0; i < taskBuilders.size(); ++i) { + taskNames.add(nameFor(taskBuilders.get(i), i)); + evaluators.add(taskBuilders.get(i).evaluator()); + } + + DataStream<Trigger> triggers = + DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> true) + .process( + new TriggerManager( + tableLoader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_TASK_NAME) + .uid("trigger-manager-" + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel() + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) + .name("Watermark Assigner") + .uid("watermark-assigner-" + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream<TaskResult> unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + int finalIndex = i; + DataStream<Trigger> filtered = + triggers + .filter(t -> t.taskId() != null && t.taskId() == finalIndex) + .name("Filter " + i) + .forceNonParallel() + .uid("filter-" + i + "-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + MaintenanceTaskBuilder<?> builder = taskBuilders.get(i); + DataStream<TaskResult> result = + builder.append( + filtered, + i, + taskNames.get(i), + tableLoader, + uidSuffix, + slotSharingGroup, + parallelism); + if (unioned == null) { + unioned = result; + } else { + unioned = unioned.union(result); + } + } + + // Add the LockRemover to the end + unioned + .global() Review Comment: do we need `global` with the `forceNonParallel` below? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> { + private int index; + private String name; + private TableLoader tableLoader; + private String uidSuffix = null; + private String slotSharingGroup = null; + private Integer parallelism = null; + private TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); + + abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream); + + /** + * After a given number of Iceberg table commits since the last run, starts the downstream job. + * + * @param commitCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnCommitCount(int commitCount) { + triggerEvaluator.commitCount(commitCount); + return (T) this; + } + + /** + * After a given number of new data files since the last run, starts the downstream job. + * + * @param dataFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnDataFileCount(int dataFileCount) { + triggerEvaluator.dataFileCount(dataFileCount); + return (T) this; + } + + /** + * After a given aggregated data file size since the last run, starts the downstream job. + * + * @param dataFileSizeInBytes after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnDataFileSize(long dataFileSizeInBytes) { + triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes); + return (T) this; + } + + /** + * After a given number of new positional delete files since the last run, starts the downstream + * job. + * + * @param posDeleteFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) { + triggerEvaluator.posDeleteFileCount(posDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new positional delete records since the last run, starts the downstream + * job. + * + * @param posDeleteRecordCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) { + triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount); + return (T) this; + } + + /** + * After a given number of new equality delete files since the last run, starts the downstream + * job. + * + * @param eqDeleteFileCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) { + triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new equality delete records since the last run, starts the downstream + * job. + * + * @param eqDeleteRecordCount after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) { + triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount); + return (T) this; + } + + /** + * After a given time since the last run, starts the downstream job. + * + * @param interval after the downstream job should be started + * @return for chained calls + */ + public T scheduleOnInterval(Duration interval) { + triggerEvaluator.timeout(interval); + return (T) this; + } + + /** + * The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + * @return for chained calls + */ + public T uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return (T) this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + * @return for chained calls + */ + public T slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return (T) this; + } + + /** + * Sets the parallelism for the stream. + * + * @param newParallelism the required parallelism + * @return for chained calls + */ + public T parallelism(int newParallelism) { + this.parallelism = newParallelism; + return (T) this; + } + + @Internal + int id() { + return index; + } + + @Internal + String name() { + return name; + } + + @Internal + TableLoader tableLoader() { + return tableLoader; + } + + @Internal + String uidSuffix() { + return uidSuffix; + } + + @Internal + String slotSharingGroup() { + return slotSharingGroup; + } + + @Internal + Integer parallelism() { + return parallelism; + } + + @Internal + TriggerEvaluator evaluator() { + return triggerEvaluator.build(); + } + + @Internal + DataStream<TaskResult> append( + DataStream<Trigger> sourceStream, + int maintenanceTaskIndex, + String maintainanceTaskName, + TableLoader newTableLoader, + String mainUidSuffix, + String mainSlotSharingGroup, + int mainParallelism) { + Preconditions.checkArgument( + parallelism == null || parallelism == -1 || parallelism > 0, + "Parallelism should be left to default (-1/null) or greater than 0"); + Preconditions.checkNotNull(maintainanceTaskName, "Name should not be null"); + Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null"); + + this.index = maintenanceTaskIndex; + this.name = maintainanceTaskName; + this.tableLoader = newTableLoader; + + if (uidSuffix == null) { Review Comment: wondering if we need both `uidSuffix` and `mainUidSuffix`. can we just use `mainUidSuffix`? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; + +import java.time.Duration; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TaskResult; +import org.apache.iceberg.flink.maintenance.operator.Trigger; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> { Review Comment: I am not saying we shouldn't. but it is usually good to keep them private first so that we are free to evolve the class. Maybe wait until the need is clear. Use Spark as an example. the `BaseSparkAction` is package private. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java: ########## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.stream; Review Comment: > It is hard to understand what is a public API, and what should only used internally. Ideally, this should be achieved via Java scoping (public, package private, private etc.) > I agree that the stream might not be the best name, maybe api would be better, but I would try to separate out the classes used by the user from the classes only used by the implementation. This is not the tradition/style that Iceberg connectors (Spark, Flink) have been following. If we are going to have `api` package, it should be one under `org.apache.iceberg.flink.api`. It seems complicated to have `api` sub package under every sub namespaces like `org.apache.iceberg.flink.source/sink/data/maintenance/actions`. Anyway, my preference is not to introduce `api` sub package. But we can also try to get other people's feedbacks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org