gyfora commented on code in PR #8653: URL: https://github.com/apache/iceberg/pull/8653#discussion_r1345500193
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java: ########## @@ -18,72 +18,39 @@ */ package org.apache.iceberg.flink.sink; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; -import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION; -import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; -import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL; -import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.function.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.util.DataFormatConverters; -import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DistributionMode; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionField; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; -import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteConf; -import org.apache.iceberg.flink.FlinkWriteOptions; -import org.apache.iceberg.flink.TableLoader; -import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.flink.sink.committer.IcebergFilesCommitter; +import org.apache.iceberg.flink.sink.writer.IcebergStreamWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.SerializableSupplier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FlinkSink { - private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class); +public class FlinkSink extends SinkBase { Review Comment: Isn't this going to cause backward compatibility issues with the current FlinkSink instances? Maybe I am wrong but it means that users can only upgrade the iceberg version if they re-build their job graph (re-serialize operators etc.) ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/committer/CommonCommitter.java: ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.committer; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.core.io.SimpleVersionedSerialization; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotUpdate; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the common commit related tasks which are needed for both the {@link + * org.apache.iceberg.flink.sink.FlinkSink} (SinkV1) and the {@link + * org.apache.iceberg.flink.sink.IcebergSink} (SinkV2). + * + * <p>The object is serialized and sent to the {@link IcebergFilesCommitter} (SinkV1) and to the + * {@link SinkV2Aggregator}, {@link SinkV2Committer} (SinkV2) to execute the related tasks. Before + * calling any of the public methods, the {@link #init(IcebergFilesCommitterMetrics)} method should + * be called, so the non-serializable attributes are initialized. The {@link + * #init(IcebergFilesCommitterMetrics)} is idempotent, so it could be called multiple times to + * accommodate the missing init feature in the {@link Committer}. + */ +public class CommonCommitter implements Serializable { Review Comment: Similar to the Sink base class maybe this should be called `CommitterBase` ? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java: ########## @@ -91,6 +91,12 @@ private FlinkConfigOptions() {} .defaultValue(false) .withDescription("Use the FLIP-27 based Iceberg source implementation."); + public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_FLIP143_SINK = Review Comment: I think this config name is not very good. We should not include "flip" in it. How about calling `use-v2-sink`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org