stevenzwu commented on code in PR #11144:
URL: https://github.com/apache/iceberg/pull/11144#discussion_r1765317845


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> 
{

Review Comment:
   not just `append`. there are a lot of public methods in this class.
   
   If we have any doubt if users would implement extensions from this class, we 
can delay the decision until real ask came forward. it is trivial to make a 
private class public. But once a class is public, it is more difficult to 
change/evolve the contract.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles;
+import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class ExpireSnapshots {
+  private static final long DELETE_INITIAL_DELAY_MS = 10L;
+  private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L;
+  private static final double DELETE_BACKOFF_MULTIPLIER = 1.5;
+  private static final long DELETE_TIMEOUT_MS = 10000L;
+  private static final int DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT = 10;
+  private static final int DELETE_ATTEMPT_NUM = 10;
+  private static final int DELETE_WORKER_POOL_SIZE_DEFAULT = 10;
+  private static final String EXECUTOR_TASK_NAME = "ES Executor";
+  @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete 
file";
+
+  private ExpireSnapshots() {
+    // Do not instantiate directly
+  }
+
+  /** Creates the builder for creating a stream which expires snapshots for 
the table. */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends 
MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
+    private Duration minAge = null;
+    private Integer retainLast = null;
+    private int planningWorkerPoolSize = 
DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT;
+    private int deleteAttemptNum = DELETE_ATTEMPT_NUM;
+    private int deleteWorkerPoolSize = DELETE_WORKER_POOL_SIZE_DEFAULT;
+
+    /**
+     * The snapshots newer than this age will not be removed.
+     *
+     * @param newMinAge of the files to be removed
+     * @return for chained calls
+     */
+    public Builder minAge(Duration newMinAge) {
+      this.minAge = newMinAge;
+      return this;
+    }
+
+    /**
+     * The minimum {@link org.apache.iceberg.Snapshot}s to retain. For more 
details description see
+     * {@link org.apache.iceberg.ExpireSnapshots#retainLast(int)}.
+     *
+     * @param newRetainLast number of snapshots to retain
+     * @return for chained calls
+     */
+    public Builder retainLast(int newRetainLast) {
+      this.retainLast = newRetainLast;
+      return this;
+    }
+
+    /**
+     * The worker pool size used to calculate the files to delete.
+     *
+     * @param newPlanningWorkerPoolSize for planning files to delete
+     * @return for chained calls
+     */
+    public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) {

Review Comment:
   sharing thread pool is not necessarily a bad thing. it can limit the 
concurrent I/O. E.g., we may not want to have too many threads perform scan 
planing, which can be memory intensive.
   
   Deletes have low memory footprint. Hence it is probably less of a concern to 
have separate pools. but probably good to keep an eye on the number of http 
connections



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;

Review Comment:
   sure. it is fine to have sub packages. we will just continue to use the 
`@Internal` annotation as we have been doing.
   
   but I don't know if we have a convention to use `api` subpackage in 
connectors like Flink, Kafka, Spark modules.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.stream;
+
+import java.time.Duration;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.TaskResult;
+import org.apache.iceberg.flink.maintenance.operator.Trigger;
+import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> 
{
+  private int id;
+  private String name;
+  private TableLoader tableLoader;
+  private String uidPrefix = null;
+  private String slotSharingGroup = null;
+  private Integer parallelism = null;
+  private TriggerEvaluator.Builder triggerEvaluator = new 
TriggerEvaluator.Builder();
+
+  abstract DataStream<TaskResult> buildInternal(DataStream<Trigger> 
sourceStream);
+
+  /**
+   * After a given number of Iceberg table commits since the last run, starts 
the downstream job.
+   *
+   * @param commitCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnCommitCount(int commitCount) {
+    triggerEvaluator.commitCount(commitCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new data files since the last run, starts the 
downstream job.
+   *
+   * @param dataFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnDataFileCount(int dataFileCount) {
+    triggerEvaluator.dataFileCount(dataFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given aggregated data file size since the last run, starts the 
downstream job.
+   *
+   * @param dataFileSizeInBytes after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnDataFileSize(long dataFileSizeInBytes) {
+    triggerEvaluator.dataFileSizeInBytes(dataFileSizeInBytes);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new positional delete files since the last run, 
starts the downstream
+   * job.
+   *
+   * @param posDeleteFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T schedulerOnPosDeleteFileCount(int posDeleteFileCount) {
+    triggerEvaluator.posDeleteFileCount(posDeleteFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new positional delete records since the last run, 
starts the downstream
+   * job.
+   *
+   * @param posDeleteRecordCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T schedulerOnPosDeleteRecordCount(long posDeleteRecordCount) {
+    triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new equality delete files since the last run, 
starts the downstream
+   * job.
+   *
+   * @param eqDeleteFileCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnEqDeleteFileCount(int eqDeleteFileCount) {
+    triggerEvaluator.eqDeleteFileCount(eqDeleteFileCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given number of new equality delete records since the last run, 
starts the downstream
+   * job.
+   *
+   * @param eqDeleteRecordCount after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) {
+    triggerEvaluator.eqDeleteRecordCount(eqDeleteRecordCount);
+    return (T) this;
+  }
+
+  /**
+   * After a given time since the last run, starts the downstream job.
+   *
+   * @param time after the downstream job should be started
+   * @return for chained calls
+   */
+  public T scheduleOnTime(Duration time) {
+    triggerEvaluator.timeout(time);
+    return (T) this;
+  }
+
+  /**
+   * The prefix used for the generated {@link 
org.apache.flink.api.dag.Transformation}'s uid.
+   *
+   * @param newUidPrefix for the transformations
+   * @return for chained calls
+   */
+  public T uidPrefix(String newUidPrefix) {
+    this.uidPrefix = newUidPrefix;
+    return (T) this;
+  }
+
+  /**
+   * The {@link SingleOutputStreamOperator#slotSharingGroup(String)} for all 
the operators of the
+   * generated stream. Could be used to separate the resources used by this 
task.
+   *
+   * @param newSlotSharingGroup to be used for the operators
+   * @return for chained calls
+   */
+  public T slotSharingGroup(String newSlotSharingGroup) {
+    this.slotSharingGroup = newSlotSharingGroup;
+    return (T) this;
+  }
+
+  /**
+   * Sets the parallelism for the stream.
+   *
+   * @param newParallelism the required parallelism
+   * @return for chained calls
+   */
+  public T parallelism(int newParallelism) {
+    this.parallelism = newParallelism;
+    return (T) this;
+  }
+
+  @Internal

Review Comment:
   I thought these getters are needed by child classes extended from this one. 
then the protected scope is needed?
   
   But as discussed in another comment, if this class is not exposed as public. 
then package private would be fine too.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Calls the {@link ExpireSnapshots} to remove the old snapshots and emits the 
filenames which could
+ * be removed in the {@link #DELETE_STREAM} side output.
+ */
+public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, 
TaskResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsProcessor.class);
+  public static final OutputTag<String> DELETE_STREAM =
+      new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING);
+
+  private final TableLoader tableLoader;
+  private final Long maxSnapshotAgeMs;
+  private final Integer retainLast;
+  private final int plannerPoolSize;
+  private transient ExecutorService plannerPool;
+  private transient Table table;
+
+  public ExpireSnapshotsProcessor(
+      TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int 
plannerPoolSize) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.tableLoader = tableLoader;
+    this.maxSnapshotAgeMs = maxSnapshotAgeMs;
+    this.retainLast = retainLast;
+    this.plannerPoolSize = plannerPoolSize;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    tableLoader.open();
+    this.table = tableLoader.loadTable();
+    this.plannerPool = ThreadPools.newWorkerPool(table.name() + 
"-table--planner", plannerPoolSize);
+  }
+
+  @Override
+  public void processElement(Trigger trigger, Context ctx, 
Collector<TaskResult> out)
+      throws Exception {
+    try {
+      table.refresh();
+      ExpireSnapshots expireSnapshots = table.expireSnapshots();
+      if (maxSnapshotAgeMs != null) {
+        expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - 
maxSnapshotAgeMs);
+      }
+
+      if (retainLast != null) {
+        expireSnapshots = expireSnapshots.retainLast(retainLast);
+      }
+
+      AtomicLong deleteFileCounter = new AtomicLong(0L);
+      expireSnapshots
+          .planWith(plannerPool)
+          .deleteWith(
+              file -> {
+                ctx.output(DELETE_STREAM, file);
+                deleteFileCounter.incrementAndGet();
+              })
+          .cleanExpiredFiles(true)
+          .commit();
+
+      LOG.info(
+          "Successfully finished expiring snapshots for {} at {}. Scheduled {} 
files for delete.",
+          table,
+          ctx.timestamp(),
+          deleteFileCounter.get());
+      out.collect(
+          new TaskResult(trigger.taskId(), trigger.timestamp(), true, 
Collections.emptyList()));
+    } catch (Exception e) {
+      LOG.info("Exception expiring snapshots for {} at {}", table, 
ctx.timestamp(), e);
+      out.collect(
+          new TaskResult(trigger.taskId(), trigger.timestamp(), false, 
Lists.newArrayList(e)));

Review Comment:
   hmm. I am still not quite following. Each operator subtask emits a 
`TaskResult`. Each `TaskResult` should only contain one exception, right?
   
   I didn't see the exceptions are used by downstream. if `success` boolean 
flag good enough for downstream, maybe we can remove the exceptions from 
`TaskResult` as stack trace can be non-trivial.
   
   BTW, `TaskResult` is not marked as `Serializable`. 



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/AsyncDeleteFiles.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Predicate;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Delete the files using the {@link FileIO}. */
+@Internal
+public class AsyncDeleteFiles extends RichAsyncFunction<String, Boolean> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncDeleteFiles.class);
+  public static final Predicate<Collection<Boolean>> FAILED_PREDICATE = new 
FailedPredicate();
+
+  private final String name;
+  private final FileIO io;
+  private final int workerPoolSize;
+  private final String tableName;
+
+  private transient ExecutorService workerPool;
+  private transient Counter failedCounter;
+  private transient Counter succeededCounter;
+
+  public AsyncDeleteFiles(String name, TableLoader tableLoader, int 
workerPoolSize) {
+    Preconditions.checkNotNull(name, "Name should no be null");
+    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+
+    this.name = name;
+    tableLoader.open();
+    Table table = tableLoader.loadTable();
+    this.io = table.io();
+    this.workerPoolSize = workerPoolSize;
+    this.tableName = table.name();
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.failedCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+            .counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
+    this.succeededCounter =
+        getRuntimeContext()
+            .getMetricGroup()
+            .addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
+            .counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);
+
+    this.workerPool =
+        ThreadPools.newWorkerPool(tableName + "-" + name + 
"-async-delete-files", workerPoolSize);
+  }
+
+  @Override
+  public void asyncInvoke(String fileName, ResultFuture<Boolean> resultFuture) 
{
+    workerPool.execute(
+        () -> {
+          try {
+            LOG.info("Deleting file: {} with {}", fileName, name);
+            io.deleteFile(fileName);

Review Comment:
   there is really no fundamental difference whether we use 
`prepareSnapshotPreBarrier` or not. if we use the bulk delete API, the check 
barrier can come right after a bulk delete request with or without 
`prepareSnapshotPreBarrier`. The main thing is that batch delete requests have 
higher latency. 
   
   Async model can increase the throughput with single file deletion. but it is 
overall inefficient and more expensive. It also can't avoid the potential 
throttling problem.
   
   The change seems not pushed up to the PR yet. I imagine `deleteBatchSize` is 
used for buffering (like the target file size for rolling file writer). It can 
be set to 1,000 which is the S3 limit. Wait until 1,000 files or checkpoint 
barrier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to