stevenzwu commented on code in PR #11144: URL: https://github.com/apache/iceberg/pull/11144#discussion_r1813326096
########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.time.Duration; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +@Experimental +@SuppressWarnings("unchecked") +public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?>> { + private int index; + private String name; + private TableLoader tableLoader; + private String uidSuffix = null; + private String slotSharingGroup = null; + private Integer parallelism = null; + private final TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); + + abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream); + + /** + * After a given number of Iceberg table commits since the last run, starts the downstream job. + * + * @param commitCount after the downstream job should be started + */ + public T scheduleOnCommitCount(int commitCount) { + triggerEvaluator.commitCount(commitCount); + return (T) this; + } + + /** + * After a given number of new data files since the last run, starts the downstream job. + * + * @param dataFileCount after the downstream job should be started + */ + public T scheduleOnDataFileCount(int dataFileCount) { + triggerEvaluator.dataFileCount(dataFileCount); + return (T) this; + } + + /** + * After a given aggregated data file size since the last run, starts the downstream job. + * + * @param dataFileSizeInBytes after the downstream job should be started + */ + public T scheduleOnDataFileSize(long dataFileSizeInBytes) { + triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes); + return (T) this; + } + + /** + * After a given number of new positional delete files since the last run, starts the downstream + * job. + * + * @param posDeleteFileCount after the downstream job should be started + */ + public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) { + triggerEvaluator.posDeleteFileCount(posDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new positional delete records since the last run, starts the downstream + * job. + * + * @param posDeleteRecordCount after the downstream job should be started + */ + public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) { + triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount); + return (T) this; + } + + /** + * After a given number of new equality delete files since the last run, starts the downstream + * job. + * + * @param eqDeleteFileCount after the downstream job should be started + */ + public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) { + triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new equality delete records since the last run, starts the downstream + * job. + * + * @param eqDeleteRecordCount after the downstream job should be started + */ + public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) { + triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount); + return (T) this; + } + + /** + * After a given time since the last run, starts the downstream job. + * + * @param interval after the downstream job should be started + */ + public T scheduleOnInterval(Duration interval) { + triggerEvaluator.timeout(interval); + return (T) this; + } + + /** + * The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + */ + public T uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return (T) this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + */ + public T slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return (T) this; + } + + /** + * Sets the parallelism for the stream. + * + * @param newParallelism the required parallelism + */ + public T parallelism(int newParallelism) { + OperatorValidationUtils.validateParallelism(newParallelism); + this.parallelism = newParallelism; + return (T) this; + } + + protected int index() { + return index; + } + + protected String name() { + return name; + } + + protected TableLoader tableLoader() { + return tableLoader; + } + + protected String uidSuffix() { + return uidSuffix; + } + + protected String slotSharingGroup() { + return slotSharingGroup; + } + + protected Integer parallelism() { + return parallelism; + } + + protected String operatorName(String operatorNameBase) { + return operatorNameBase + "[" + index() + "]"; + } + + TriggerEvaluator evaluator() { + return triggerEvaluator.build(); + } + + DataStream<TaskResult> append( + DataStream<Trigger> sourceStream, + int defaultTaskIndex, Review Comment: what does `default` mean here? should we just remove `default`? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.time.Duration; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +@Experimental +@SuppressWarnings("unchecked") +public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?>> { + private int index; + private String name; + private TableLoader tableLoader; + private String uidSuffix = null; + private String slotSharingGroup = null; + private Integer parallelism = null; + private final TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); + + abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream); + + /** + * After a given number of Iceberg table commits since the last run, starts the downstream job. + * + * @param commitCount after the downstream job should be started + */ + public T scheduleOnCommitCount(int commitCount) { + triggerEvaluator.commitCount(commitCount); + return (T) this; + } + + /** + * After a given number of new data files since the last run, starts the downstream job. + * + * @param dataFileCount after the downstream job should be started + */ + public T scheduleOnDataFileCount(int dataFileCount) { + triggerEvaluator.dataFileCount(dataFileCount); + return (T) this; + } + + /** + * After a given aggregated data file size since the last run, starts the downstream job. + * + * @param dataFileSizeInBytes after the downstream job should be started + */ + public T scheduleOnDataFileSize(long dataFileSizeInBytes) { + triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes); + return (T) this; + } + + /** + * After a given number of new positional delete files since the last run, starts the downstream + * job. + * + * @param posDeleteFileCount after the downstream job should be started + */ + public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) { + triggerEvaluator.posDeleteFileCount(posDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new positional delete records since the last run, starts the downstream + * job. + * + * @param posDeleteRecordCount after the downstream job should be started + */ + public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) { + triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount); + return (T) this; + } + + /** + * After a given number of new equality delete files since the last run, starts the downstream + * job. + * + * @param eqDeleteFileCount after the downstream job should be started + */ + public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) { + triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount); + return (T) this; + } + + /** + * After a given number of new equality delete records since the last run, starts the downstream + * job. + * + * @param eqDeleteRecordCount after the downstream job should be started + */ + public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) { + triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount); + return (T) this; + } + + /** + * After a given time since the last run, starts the downstream job. + * + * @param interval after the downstream job should be started + */ + public T scheduleOnInterval(Duration interval) { + triggerEvaluator.timeout(interval); + return (T) this; + } + + /** + * The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * + * @param newUidSuffix for the transformations + */ + public T uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return (T) this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + */ + public T slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return (T) this; + } + + /** + * Sets the parallelism for the stream. + * + * @param newParallelism the required parallelism + */ + public T parallelism(int newParallelism) { + OperatorValidationUtils.validateParallelism(newParallelism); + this.parallelism = newParallelism; + return (T) this; + } + + protected int index() { + return index; + } + + protected String name() { + return name; + } + + protected TableLoader tableLoader() { + return tableLoader; + } + + protected String uidSuffix() { + return uidSuffix; + } + + protected String slotSharingGroup() { + return slotSharingGroup; + } + + protected Integer parallelism() { + return parallelism; + } + + protected String operatorName(String operatorNameBase) { + return operatorNameBase + "[" + index() + "]"; + } + + TriggerEvaluator evaluator() { + return triggerEvaluator.build(); + } + + DataStream<TaskResult> append( + DataStream<Trigger> sourceStream, + int defaultTaskIndex, + String defaultTaskName, + TableLoader newTableLoader, + String mainUidSuffix, Review Comment: I still find the naming of `main` confusing here. should these be called `default`? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.time.Duration; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor; +import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Deletes expired snapshots and the corresponding files. */ +public class ExpireSnapshots { + private static final int DELETE_BATCH_SIZE_DEFAULT = 1000; + private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot"; + @VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file"; + + private ExpireSnapshots() {} + + /** Creates the builder for creating a stream which expires snapshots for the table. */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> { + private Duration maxSnapshotAge = null; + private Integer numSnapshots = null; + private Integer planningWorkerPoolSize; + private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT; + + /** + * The snapshots older than this age will be removed. + * + * @param newMaxSnapshotAge of the snapshots to be removed + */ + public Builder maxSnapshotAge(Duration newMaxSnapshotAge) { + this.maxSnapshotAge = newMaxSnapshotAge; + return this; + } + + /** + * The minimum number of {@link Snapshot}s to retain. For more details description see {@link + * org.apache.iceberg.ExpireSnapshots#retainLast(int)}. + * + * @param newNumSnapshots number of snapshots to retain + */ + public Builder retainLast(int newNumSnapshots) { + this.numSnapshots = newNumSnapshots; + return this; + } + + /** + * The worker pool size used to calculate the files to delete. If not set, the shared worker + * pool is used. + * + * @param newPlanningWorkerPoolSize for planning files to delete + */ + public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) { + this.planningWorkerPoolSize = newPlanningWorkerPoolSize; + return this; + } + + /** + * Size of the batch used to deleting the files. + * + * @param newDeleteBatchSize used for deleting + */ + public Builder deleteBatchSize(int newDeleteBatchSize) { + this.deleteBatchSize = newDeleteBatchSize; + return this; + } + + @Override + DataStream<TaskResult> append(DataStream<Trigger> trigger) { + Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); + + SingleOutputStreamOperator<TaskResult> result = + trigger + .process( + new ExpireSnapshotsProcessor( + tableLoader(), + maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), + numSnapshots, + planningWorkerPoolSize)) + .name(operatorName(EXECUTOR_OPERATOR_NAME)) Review Comment: I still have some reservation here. if we have enabled maintenance for multiple tables in the same Flink job. we won't be tell which table this operator is for from Flink UI. Flink operator metrics contain the operator name. we won't be able to distinguish the metrics for the operators for different tables. We probably should concatenate the operator name with the table name to distinguish. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + static final String SOURCE_OPERATOR_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager"; + static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner"; + static final String FILTER_OPERATOR_NAME_PREFIX = "Filter "; + static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover"; + + private TableMaintenance() {} + + /** + * Use when the change stream is already provided, like in the {@link + * IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + @Internal + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(null, changeStream, tableLoader, lockFactory); + } + + /** + * Use this for standalone maintenance job. It creates a monitor source that detect table changes + * and build the maintenance pipelines afterwards. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, null, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMinutes(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, + DataStream<TableChange> inputStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = env; + this.inputStream = inputStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The suffix used for the generated {@link Transformation}'s uid. + * + * @param newUidSuffix for the transformations + */ + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newLockCheckDelay lock checking frequency + */ + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; + return this; + } + + /** + * Sets the default parallelism of maintenance tasks. Could be overwritten by the {@link + * MaintenanceTaskBuilder#parallelism(int)}. + * + * @param newParallelism task parallelism + */ + public Builder parallelism(int newParallelism) { + OperatorValidationUtils.validateParallelism(newParallelism); + this.parallelism = newParallelism; + return this; + } + + /** + * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at + * the first time. Only available when the {@link + * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, TriggerLockFactory)} is + * used. + * + * @param newMaxReadBack snapshots to consider when initializing + */ + public Builder maxReadBack(int newMaxReadBack) { + Preconditions.checkArgument( + inputStream == null, "Can't set maxReadBack when change stream is provided"); + this.maxReadBack = newMaxReadBack; + return this; + } + + /** + * Adds a specific task with the given schedule. + * + * @param task to add + */ + public Builder add(MaintenanceTaskBuilder<?> task) { + taskBuilders.add(task); + return this; + } + + /** Builds the task graph for the maintenance tasks. */ + public void append() throws IOException { + Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); + Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); + + List<String> taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); + List<TriggerEvaluator> evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); + for (int i = 0; i < taskBuilders.size(); ++i) { + taskNames.add(nameFor(taskBuilders.get(i), i)); + evaluators.add(taskBuilders.get(i).evaluator()); + } + + try (TableLoader loader = tableLoader.clone()) { + DataStream<Trigger> triggers = + DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true) + .process( + new TriggerManager( + loader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_OPERATOR_NAME) + .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel() + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) + .name(WATERMARK_ASSIGNER_OPERATOR_NAME) + .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream<TaskResult> unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + int finalIndex = i; + DataStream<Trigger> filtered = + triggers + .filter(t -> t.taskId() != null && t.taskId() == finalIndex) + .name(FILTER_OPERATOR_NAME_PREFIX + i) + .forceNonParallel() + .uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + MaintenanceTaskBuilder<?> builder = taskBuilders.get(i); + DataStream<TaskResult> result = + builder.append( + filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism); + if (unioned == null) { + unioned = result; + } else { + unioned = unioned.union(result); + } + } + + // Add the LockRemover to the end + unioned + .transform( + LOCK_REMOVER_OPERATOR_NAME, + TypeInformation.of(Void.class), + new LockRemover(lockFactory, taskNames)) + .forceNonParallel() + .uid("lock-remover-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + } + } + + private DataStream<TableChange> changeStream(TableLoader loader) { + if (inputStream == null) { + // Create a monitor source to provide the TableChange stream + MonitorSource source = + new MonitorSource( + loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); + return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME) + .uid(SOURCE_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + } else { + return inputStream.global(); + } + } + } + + private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskId) { + return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskId); Review Comment: why do we use class name? can't we use the `name()` getter from `MaintenanceTaskBuilder`? ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + static final String SOURCE_OPERATOR_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager"; + static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner"; + static final String FILTER_OPERATOR_NAME_PREFIX = "Filter "; + static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover"; + + private TableMaintenance() {} + + /** + * Use when the change stream is already provided, like in the {@link + * IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + @Internal + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(changeStream, tableLoader, lockFactory); + } + + /** + * Use this for standalone maintenance job. It creates a monitor source that detect table changes + * and build the maintenance pipelines afterwards. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMinutes(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + this.env = env; + this.inputStream = null; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + private Builder( + DataStream<TableChange> inputStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = null; + this.inputStream = inputStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The prefix used for the generated {@link Transformation}'s uid. + * + * @param newUidSuffix for the transformations + */ + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newLockCheckDelay lock checking frequency + */ + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; + return this; + } + + /** + * Sets the default parallelism of maintenance tasks. Could be overwritten by the {@link + * MaintenanceTaskBuilder#parallelism(int)}. + * + * @param newParallelism task parallelism + */ + public Builder parallelism(int newParallelism) { + OperatorValidationUtils.validateParallelism(newParallelism); + this.parallelism = newParallelism; + return this; + } + + /** + * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at + * the first time. Only available when the {@link + * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, TriggerLockFactory)} is + * used. + * + * @param newMaxReadBack snapshots to consider when initializing + */ + public Builder maxReadBack(int newMaxReadBack) { + Preconditions.checkArgument( + inputStream == null, "Can't set maxReadBack when change stream is provided"); + this.maxReadBack = newMaxReadBack; + return this; + } + + /** + * Adds a specific task with the given schedule. + * + * @param task to add + */ + public Builder add(MaintenanceTaskBuilder<?> task) { + taskBuilders.add(task); + return this; + } + + /** Builds the task graph for the maintenance tasks. */ + public void append() throws IOException { + Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); + Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); + + List<String> taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); + List<TriggerEvaluator> evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); + for (int i = 0; i < taskBuilders.size(); ++i) { + taskNames.add(nameFor(taskBuilders.get(i), i)); + evaluators.add(taskBuilders.get(i).evaluator()); + } + + try (TableLoader loader = tableLoader.clone()) { + DataStream<Trigger> triggers = + DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true) + .process( + new TriggerManager( + loader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_OPERATOR_NAME) + .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel() + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) + .name(WATERMARK_ASSIGNER_OPERATOR_NAME) + .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream<TaskResult> unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + int finalIndex = i; Review Comment: I guess the question is why can't `i` be used directly? like `t.taskId() == taskIndex`. I would also probably rename `i` to `taskIndex` for readability as this for loop is pretty long. ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java: ########## @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.MonitorSource; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; +import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.flink.sink.IcebergSink; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** Creates the table maintenance graph. */ +public class TableMaintenance { + static final String SOURCE_OPERATOR_NAME = "Monitor source"; + static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager"; + static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner"; + static final String FILTER_OPERATOR_NAME_PREFIX = "Filter "; + static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover"; + + private TableMaintenance() {} + + /** + * Use when the change stream is already provided, like in the {@link + * IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + @Internal + public static Builder forChangeStream( + DataStream<TableChange> changeStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(null, changeStream, tableLoader, lockFactory); + } + + /** + * Use this for standalone maintenance job. It creates a monitor source that detect table changes + * and build the maintenance pipelines afterwards. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @param lockFactory used for preventing concurrent task runs + * @return builder for the maintenance stream + */ + public static Builder forTable( + StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); + + return new Builder(env, null, tableLoader, lockFactory); + } + + public static class Builder { + private final StreamExecutionEnvironment env; + private final DataStream<TableChange> inputStream; + private final TableLoader tableLoader; + private final List<MaintenanceTaskBuilder<?>> taskBuilders; + private final TriggerLockFactory lockFactory; + + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); + private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; + private Duration rateLimit = Duration.ofMinutes(1); + private Duration lockCheckDelay = Duration.ofSeconds(30); + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int maxReadBack = 100; + + private Builder( + StreamExecutionEnvironment env, + DataStream<TableChange> inputStream, + TableLoader tableLoader, + TriggerLockFactory lockFactory) { + this.env = env; + this.inputStream = inputStream; + this.tableLoader = tableLoader; + this.lockFactory = lockFactory; + this.taskBuilders = Lists.newArrayListWithCapacity(4); + } + + /** + * The suffix used for the generated {@link Transformation}'s uid. + * + * @param newUidSuffix for the transformations + */ + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; + return this; + } + + /** + * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all the operators of the + * generated stream. Could be used to separate the resources used by this task. + * + * @param newSlotSharingGroup to be used for the operators + */ + public Builder slotSharingGroup(String newSlotSharingGroup) { + this.slotSharingGroup = newSlotSharingGroup; + return this; + } + + /** + * Limits the firing frequency for the task triggers. + * + * @param newRateLimit firing frequency + */ + public Builder rateLimit(Duration newRateLimit) { + Preconditions.checkNotNull(rateLimit.toMillis() > 0, "Rate limit should be greater than 0"); + this.rateLimit = newRateLimit; + return this; + } + + /** + * Sets the delay for checking lock availability when a concurrent run is detected. + * + * @param newLockCheckDelay lock checking frequency + */ + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; + return this; + } + + /** + * Sets the default parallelism of maintenance tasks. Could be overwritten by the {@link + * MaintenanceTaskBuilder#parallelism(int)}. + * + * @param newParallelism task parallelism + */ + public Builder parallelism(int newParallelism) { + OperatorValidationUtils.validateParallelism(newParallelism); + this.parallelism = newParallelism; + return this; + } + + /** + * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at + * the first time. Only available when the {@link + * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, TriggerLockFactory)} is + * used. + * + * @param newMaxReadBack snapshots to consider when initializing + */ + public Builder maxReadBack(int newMaxReadBack) { + Preconditions.checkArgument( + inputStream == null, "Can't set maxReadBack when change stream is provided"); + this.maxReadBack = newMaxReadBack; + return this; + } + + /** + * Adds a specific task with the given schedule. + * + * @param task to add + */ + public Builder add(MaintenanceTaskBuilder<?> task) { + taskBuilders.add(task); + return this; + } + + /** Builds the task graph for the maintenance tasks. */ + public void append() throws IOException { + Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); + Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); + + List<String> taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); + List<TriggerEvaluator> evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); + for (int i = 0; i < taskBuilders.size(); ++i) { + taskNames.add(nameFor(taskBuilders.get(i), i)); + evaluators.add(taskBuilders.get(i).evaluator()); + } + + try (TableLoader loader = tableLoader.clone()) { + DataStream<Trigger> triggers = + DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true) + .process( + new TriggerManager( + loader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_OPERATOR_NAME) + .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel() + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) + .name(WATERMARK_ASSIGNER_OPERATOR_NAME) + .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream<TaskResult> unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + int finalIndex = i; + DataStream<Trigger> filtered = + triggers + .filter(t -> t.taskId() != null && t.taskId() == finalIndex) + .name(FILTER_OPERATOR_NAME_PREFIX + i) + .forceNonParallel() + .uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + MaintenanceTaskBuilder<?> builder = taskBuilders.get(i); + DataStream<TaskResult> result = + builder.append( + filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism); + if (unioned == null) { + unioned = result; + } else { + unioned = unioned.union(result); + } + } + + // Add the LockRemover to the end + unioned + .transform( + LOCK_REMOVER_OPERATOR_NAME, + TypeInformation.of(Void.class), + new LockRemover(lockFactory, taskNames)) + .forceNonParallel() + .uid("lock-remover-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + } + } + + private DataStream<TableChange> changeStream(TableLoader loader) { + if (inputStream == null) { + // Create a monitor source to provide the TableChange stream + MonitorSource source = + new MonitorSource( + loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); + return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME) + .uid(SOURCE_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + } else { + return inputStream.global(); + } + } + } + + private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskId) { Review Comment: `taskIndex` is probably more accurate than `taskId` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org