Guosmilesmile commented on code in PR #12979: URL: https://github.com/apache/iceberg/pull/12979#discussion_r2104313253
########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommittableToTableChangeConverter.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CommittableToTableChangeConverter + extends ProcessFunction<CommittableMessage<IcebergCommittable>, TableChange> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = + LoggerFactory.getLogger(CommittableToTableChangeConverter.class); + + private final TableLoader tableLoader; + private transient Table table; + private transient ListState<ManifestFile> manifestFilesToRemoveState; + private transient List<ManifestFile> manifestFilesToRemoveList; + private transient long lastCompletedCheckpointId = -1L; + private transient String flinkJobId; + + public CommittableToTableChangeConverter(TableLoader tableLoader) { + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + this.tableLoader = tableLoader; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.manifestFilesToRemoveList = Lists.newArrayList(); + this.manifestFilesToRemoveState = + context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("manifests-to-remove", ManifestFile.class)); + if (context.isRestored()) { + manifestFilesToRemoveList = Lists.newArrayList(manifestFilesToRemoveState.get()); + } + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + this.flinkJobId = getRuntimeContext().getJobId().toString(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + this.table = tableLoader.loadTable(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + manifestFilesToRemoveState.update(manifestFilesToRemoveList); + } + + @Override + public void processElement( + CommittableMessage<IcebergCommittable> value, + ProcessFunction<CommittableMessage<IcebergCommittable>, TableChange>.Context ctx, + Collector<TableChange> out) + throws Exception { + if (value instanceof CommittableWithLineage) { + CommittableWithLineage<IcebergCommittable> committable = + (CommittableWithLineage<IcebergCommittable>) value; + TableChange tableChange = convertToTableChange(committable.getCommittable()); + out.collect(tableChange); + } + } + + private TableChange convertToTableChange(IcebergCommittable icebergCommittable) + throws IOException { + if (icebergCommittable == null || icebergCommittable.manifest().length == 0) { + return TableChange.empty(); + } + + DeltaManifests deltaManifests = + SimpleVersionedSerialization.readVersionAndDeSerialize( + DeltaManifestsSerializer.INSTANCE, icebergCommittable.manifest()); + WriteResult writeResult = + FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()); + manifestFilesToRemoveList.addAll(deltaManifests.manifests()); Review Comment: I moved the operation of deleting temporary manifests into `CommittableToTableChangeConverter.java`. The logic remains consistent with the original. 1. If a job failure occurs and the data has not been committed yet, it will be committed again. `CommittableToTableChangeConverter` will receive the corresponding committable information to delete the manifests. 2. For the parts that have already been committed, no committable will be passed, so the manifests will remain. The current write process works as follows: Flink write generates temporary manifests, and during the two-phase commit, the final manifests are generated. In the existing logic, temporary manifests are not handled in case of failure. 1. If an exception occurs between the committer and the manifest deletion, manifests that were successfully committed but not deleted will not be handled according to `maxCheckpoint`. https://github.com/apache/iceberg/blob/d32ba358c0976a58229ca62dc175e15b16fda619/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java#L126-L133 2. If the deletion phase fails, it only logs the error. https://github.com/apache/iceberg/blob/d32ba358c0976a58229ca62dc175e15b16fda619/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java#L138-L159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org