pvary commented on code in PR #10179:
URL: https://github.com/apache/iceberg/pull/10179#discussion_r1596970595

@@ -0,0 +1,774 @@
+package org.apache.iceberg.flink.sink;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.sink.committer.SinkAggregator;
+import org.apache.iceberg.flink.sink.committer.SinkCommittable;
+import org.apache.iceberg.flink.sink.committer.SinkCommittableSerializer;
+import org.apache.iceberg.flink.sink.committer.SinkCommitter;
+import org.apache.iceberg.flink.sink.committer.WriteResultSerializer;
+import org.apache.iceberg.flink.sink.writer.IcebergSinkWriter;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ * Flink v2 sink offer different hooks to insert custom topologies into the 
sink. We will use the
+ * following:
+ *
+ * <ul>
+ *   <li>{@link SupportsPreWriteTopology} which redistributes the data to the 
writers based on the
+ *       {@link DistributionMode}
+ *   <li>{@link org.apache.flink.api.connector.sink2.SinkWriter} which writes 
data files, and
+ *       generates a {@link SinkCommittable} to store the {@link 
+ *   <li>{@link SupportsPreCommitTopology} which we use to to place the {@link 
SinkAggregator} which
+ *       merges the individual {@link 
org.apache.flink.api.connector.sink2.SinkWriter}'s {@link
+ *       org.apache.iceberg.io.WriteResult}s to a single {@link
+ *       org.apache.iceberg.flink.sink.DeltaManifests}
+ *   <li>{@link SinkCommitter} which stores the incoming {@link
+ *       org.apache.iceberg.flink.sink.DeltaManifests}s in state for recovery, 
and commits them to
+ *       the Iceberg table
+ *   <li>{@link SupportsPostCommitTopology} we could use for incremental 
compaction later. This is
+ *       not implemented yet.
+ * </ul>
+ *
+ * The job graph looks like below:
+ *
+ * <pre>{@code
+ *                            Flink sink
+ *               
+ *               |                                                             
+ * +-------+     | +----------+                               +-------------+  
    +---------------+ |
+ * | Map 1 | ==> | | writer 1 |                               | committer 1 | 
---> | post commit 1 | |
+ * +-------+     | +----------+                               +-------------+  
    +---------------+ |
+ *               |             \                             /                
\                      |
+ *               |              \                           /                  
\                     |
+ *               |               \                         /                   
 \                    |
+ * +-------+     | +----------+   \ +-------------------+ /   +-------------+  
  \ +---------------+ |
+ * | Map 2 | ==> | | writer 2 | --->| commit aggregator |     | committer 2 |  
    | post commit 2 | |
+ * +-------+     | +----------+     +-------------------+     +-------------+  
    +---------------+ |
+ *               |                                             Commit only on  
+ *               |                                             committer 1     
+ *               
+ * }</pre>
+ */
+public class IcebergSink
+    implements Sink<RowData>,
+        SupportsPreWriteTopology<RowData>,
+        SupportsCommitter<SinkCommittable>,
+        SupportsPreCommitTopology<WriteResult, SinkCommittable>,
+        SupportsPostCommitTopology<SinkCommittable> {
+  private Table initTable;
+  private final TableLoader tableLoader;
+  private final Map<String, String> snapshotProperties;
+  private String uidPrefix;
+  private Committer<SinkCommittable> sinkCommitter;
+  private final String sinkId;
+  private transient IcebergSink.Builder builder;
+  private Map<String, String> writeProperties;
+  private RowType flinkRowType;
+  private SerializableSupplier<Table> tableSupplier;
+  private transient FlinkWriteConf flinkWriteConf;
+  private List<Integer> equalityFieldIds;
+  private boolean upsertMode;
+  private FileFormat dataFileFormat;
+  private long targetDataFileSize;
+  private String branch;
+  private boolean overwriteMode;
+  private int workerPoolSize;
+  private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class);
+  private IcebergSink(IcebergSink.Builder builder) {
+    this.builder = builder;
+    this.tableLoader = builder.tableLoader();
+    this.snapshotProperties = builder.snapshotProperties;
+    // We generate a random UUID every time when a sink is created.
+    // This is used to separate files generated by different sinks writing the 
same table.
+    // Also used to generate the aggregator operator name
+    this.sinkId = UUID.randomUUID().toString();
+  }
+  @Override
+  public SinkWriter<RowData> createWriter(InitContext context) throws 
IOException {
+    RowDataTaskWriterFactory taskWriterFactory =
+        new RowDataTaskWriterFactory(
+            tableSupplier,
+            flinkRowType,
+            targetDataFileSize,
+            dataFileFormat,
+            writeProperties,
+            equalityFieldIds,
+            upsertMode);
+    IcebergStreamWriterMetrics metrics =
+        new IcebergStreamWriterMetrics(context.metricGroup(), 
+    return new IcebergSinkWriter(
+        tableSupplier,
+        taskWriterFactory,
+        metrics,
+        context.getSubtaskId(),
+        context.getAttemptNumber());
+  }
+  @Override
+  public Committer<SinkCommittable> createCommitter(CommitterInitContext 
+      throws IOException {
+    return sinkCommitter != null
+        ? sinkCommitter
+        : new SinkCommitter(
+            tableLoader, branch, snapshotProperties, overwriteMode, 
workerPoolSize, sinkId);
+  }
+  @Override
+  public SimpleVersionedSerializer<SinkCommittable> getCommittableSerializer() 
+    return new SinkCommittableSerializer();
+  }
+  @Override
+  public void 
committables) {
+    // TODO Support small file compaction
+  }
+  @Override
+  public DataStream<RowData> addPreWriteTopology(DataStream<RowData> 
inputDataStream) {
+    return distributeDataStream(inputDataStream);
+  }
+  @Override
+  public DataStream<CommittableMessage<SinkCommittable>> addPreCommitTopology(
+      DataStream<CommittableMessage<WriteResult>> writeResults) {
+    TypeInformation<CommittableMessage<SinkCommittable>> typeInformation =
+        CommittableMessageTypeInfo.of(this::getCommittableSerializer);
+    return writeResults
+        .global()
+        .transform(
+            prefixIfNotNull(uidPrefix, initTable.name() + "-" + sinkId + 
+            typeInformation,
+            new SinkAggregator(tableLoader, sinkId))
+        .uid(prefixIfNotNull(uidPrefix, "pre-commit-topology"))
+        .setParallelism(1)
+        .setMaxParallelism(1)
+        .global();
+  }
+  @Override
+  public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {
+    return new WriteResultSerializer();
+  }
+  public static class Builder {
+    private Function<String, DataStream<RowData>> inputCreator = null;
+    private TableLoader tableLoader;
+    private Table table;
+    private TableSchema tableSchema;
+    private List<String> equalityFieldColumns = null;
+    private String uidPrefix = null;
+    private final Map<String, String> snapshotProperties = Maps.newHashMap();
+    private ReadableConfig readableConfig = new Configuration();
+    private final Map<String, String> writeOptions = Maps.newHashMap();
+    private Builder() {}
+    private IcebergSink.Builder forRowData(DataStream<RowData> 
newRowDataInput) {
+      this.inputCreator = ignored -> newRowDataInput;
+      return this;
+    }
+    private IcebergSink.Builder forRow(DataStream<Row> input, TableSchema 
inputTableSchema) {
+      RowType rowType = (RowType) 
+      DataType[] fieldDataTypes = inputTableSchema.getFieldDataTypes();
+      DataFormatConverters.RowConverter rowConverter =
+          new DataFormatConverters.RowConverter(fieldDataTypes);
+      return this.forMapperOutputType(
+              input, rowConverter::toInternal, 
+          .tableSchema(inputTableSchema);
+    }
+    private <T> IcebergSink.Builder forMapperOutputType(
+        DataStream<T> input, MapFunction<T, RowData> mapper, 
TypeInformation<RowData> outputType) {
+      this.inputCreator =
+          newUidPrefix -> {
+            // Input stream order is crucial for some situation(e.g. in cdc 
case). Therefore, we
+            // need to set the parallelism
+            // of map operator same as its input to keep map operator chaining 
its input, and avoid
+            // rebalanced by default.
+            SingleOutputStreamOperator<RowData> inputStream =
+                input.map(mapper, 
+            if (newUidPrefix != null) {
+              inputStream.name(operatorName(newUidPrefix)).uid(newUidPrefix + 
+            }
+            return inputStream;
+          };
+      return this;
+    }
+    /**
+     * This iceberg {@link Table} instance is used for initializing {@link 
+     * which will write all the records into {@link DataFile}s and emit them 
to downstream operator.
+     * Providing a table would avoid so many table loading from each separate 
+     *
+     * @param newTable the loaded iceberg table instance.
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder table(Table newTable) {
+      this.table = newTable;
+      return this;
+    }
+    /**
+     * The table loader is used for loading tables in {@link SinkCommitter} 
lazily, we need this
+     * loader because {@link Table} is not serializable and could not just use 
the loaded table from
+     * Builder#table in the remote task manager.
+     *
+     * @param newTableLoader to load iceberg table inside tasks.
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder tableLoader(TableLoader newTableLoader) {
+      this.tableLoader = newTableLoader;
+      return this;
+    }
+    TableLoader tableLoader() {
+      return tableLoader;
+    }
+    /**
+     * Set the write properties for IcebergSink. View the supported properties 
in {@link
+     * FlinkWriteOptions}
+     */
+    public IcebergSink.Builder set(String property, String value) {
+      writeOptions.put(property, value);
+      return this;
+    }
+    /**
+     * Set the write properties for IcebergSink. View the supported properties 
in {@link
+     * FlinkWriteOptions}
+     */
+    public IcebergSink.Builder setAll(Map<String, String> properties) {
+      writeOptions.putAll(properties);
+      return this;
+    }
+    public IcebergSink.Builder tableSchema(TableSchema newTableSchema) {
+      this.tableSchema = newTableSchema;
+      return this;
+    }
+    public IcebergSink.Builder overwrite(boolean newOverwrite) {
+      writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), 
+      return this;
+    }
+    public IcebergSink.Builder flinkConf(ReadableConfig config) {
+      this.readableConfig = config;
+      return this;
+    }
+    /**
+     * Configure the write {@link DistributionMode} that the IcebergSink will 
use. Currently, flink
+     * support {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
+     *
+     * @param mode to specify the write distribution mode.
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder distributionMode(DistributionMode mode) {
+      Preconditions.checkArgument(
+          !DistributionMode.RANGE.equals(mode),
+          "Flink does not support 'range' write distribution mode now.");
+      if (mode != null) {
+        writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), 
+      }
+      return this;
+    }
+    /**
+     * Configuring the write parallel number for iceberg stream writer.
+     *
+     * @param newWriteParallelism the number of parallel iceberg stream writer.
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder writeParallelism(int newWriteParallelism) {
+      writeOptions.put(
+          FlinkWriteOptions.WRITE_PARALLELISM.key(), 
+      return this;
+    }
+    /**
+     * All INSERT/UPDATE_AFTER events from input stream will be transformed to 
UPSERT events, which
+     * means it will DELETE the old records and then INSERT the new records. 
In partitioned table,
+     * the partition fields should be a subset of equality fields, otherwise 
the old row that
+     * located in partition-A could not be deleted by the new row that located 
in partition-B.
+     *
+     * @param enabled indicate whether it should transform all 
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder upsert(boolean enabled) {
+      writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), 
+      return this;
+    }
+    /**
+     * Configuring the equality field columns for iceberg table that accept 
CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder equalityFieldColumns(List<String> columns) {
+      this.equalityFieldColumns = columns;
+      return this;
+    }
+    /**
+     * Set the uid prefix for IcebergSink operators. Note that IcebergSink 
internally consists of
+     * multiple operators (like writer, committer, aggregator) Actual operator 
uid will be appended
+     * with a suffix like "uidPrefix-writer". <br>
+     * <br>
+     * If provided, this prefix is also applied to operator names. <br>
+     * <br>
+     * Flink auto generates operator uid if not set explicitly. It is a 
recommended <a
+     * 
+     * best-practice to set uid for all operators</a> before deploying to 
production. Flink has an
+     * option to {@code pipeline.auto-generate-uid=false} to disable 
auto-generation and force
+     * explicit setting of all operator uid. <br>
+     * <br>
+     * Be careful with setting this for an existing job, because now we are 
changing the operator
+     * uid from an auto-generated one to this new value. When deploying the 
change with a
+     * checkpoint, Flink won't be able to restore the previous IcebergSink 
operator state (more
+     * specifically the committer operator state). You need to use {@code 
+     * to ignore the previous sink state. During restore IcebergSink state is 
used to check if last
+     * commit was actually successful or not. {@code --allowNonRestoredState} 
can lead to data loss
+     * if the Iceberg commit failed in the last completed checkpoint.
+     *
+     * @param newPrefix prefix for Flink sink operator uid and name
+     * @return {@link IcebergSink.Builder} to connect the iceberg table.
+     */
+    public IcebergSink.Builder uidPrefix(String newPrefix) {
+      this.uidPrefix = newPrefix;
+      return this;
+    }
+    public IcebergSink.Builder setSnapshotProperties(Map<String, String> 
properties) {
+      snapshotProperties.putAll(properties);
+      return this;
+    }
+    public IcebergSink.Builder setSnapshotProperty(String property, String 
value) {
+      snapshotProperties.put(property, value);
+      return this;
+    }
+    public IcebergSink.Builder toBranch(String branch) {
+      writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
+      return this;
+    }
+    private String operatorName(String suffix) {
+      return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
+    }
+    @VisibleForTesting
+    List<Integer> checkAndGetEqualityFieldIds() {
+      List<Integer> equalityFieldIds = 
+      if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) {
+        Set<Integer> equalityFieldSet =
+            Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
+        for (String column : equalityFieldColumns) {
+          org.apache.iceberg.types.Types.NestedField field = 
+          Preconditions.checkNotNull(
+              field,
+              "Missing required equality field column '%s' in table schema %s",
+              column,
+              table.schema());
+          equalityFieldSet.add(field.fieldId());
+        }
+        if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
+          LOG.warn(
+              "The configured equality field column IDs {} are not matched 
with the schema identifier field IDs"
+                  + " {}, use job specified equality field columns as the 
equality fields by default.",
+              equalityFieldSet,
+              table.schema().identifierFieldIds());
+        }
+        equalityFieldIds = Lists.newArrayList(equalityFieldSet);
+      }
+      return equalityFieldIds;
+    }
+    public IcebergSink build() {
+      IcebergSink sink = new IcebergSink(this);
+      // validations
+      Preconditions.checkArgument(
+          inputCreator != null,
+          "Please use forRowData() or forMapperOutputType() to initialize the 
input DataStream.");
+      Preconditions.checkNotNull(tableLoader(), "Table loader shouldn't be 
+      // Set the table if it is not yet set in the builder, so we can do the 
equalityId checks
+      table(checkAndGetTable(tableLoader(), table));
+      // Validate the equality fields and partition fields if we enable the 
upsert mode.
+      sink.equalityFieldIds = checkAndGetEqualityFieldIds();
+      // Init the `flinkWriteConf` here, so we can do the checks
+      sink.flinkWriteConf = new FlinkWriteConf(table, writeOptions, 
+      sink.branch = sink.flinkWriteConf.branch();
+      sink.overwriteMode = sink.flinkWriteConf.overwriteMode();
+      sink.workerPoolSize = sink.flinkWriteConf.workerPoolSize();
+      if (sink.flinkWriteConf.upsertMode()) {
+        Preconditions.checkState(
+            !sink.flinkWriteConf.overwriteMode(),
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT 
data stream.");
+        Preconditions.checkState(
+            !sink.equalityFieldIds.isEmpty(),
+            "Equality field columns shouldn't be empty when configuring to use 
UPSERT data stream.");
+        if (!table.spec().isUnpartitioned()) {
+          for (PartitionField partitionField : table.spec().fields()) {
+            Preconditions.checkState(
+                sink.equalityFieldIds.contains(partitionField.sourceId()),
+                "In UPSERT mode, partition field '%s' should be included in 
equality fields: '%s'",
+                partitionField,
+                equalityFieldColumns);
+          }
+        }
+      }
+      sink.writeProperties =
+          writeProperties(table, sink.flinkWriteConf.dataFileFormat(), 
+      sink.flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+      Duration tableRefreshInterval = 
+      if (tableRefreshInterval != null) {
+        sink.tableSupplier =
+            new CachingTableSupplier(
+                (SerializableTable) table, tableLoader(), 
+      } else {
+        sink.tableSupplier = new SimpleTableSupplier((SerializableTable) 
+      }
+      sink.initTable = sink.tableSupplier.get();
+      sink.uidPrefix = Optional.ofNullable(uidPrefix).orElse("");
+      // FlinkWriteConf properties needed to be set separately, so we do not 
have to serialize the
+      // full conf
+      sink.upsertMode = sink.flinkWriteConf.upsertMode();
+      sink.dataFileFormat = sink.flinkWriteConf.dataFileFormat();
+      sink.targetDataFileSize = sink.flinkWriteConf.targetDataFileSize();
+      return sink;
+    }
+    /**
+     * Append the iceberg sink operators to write records to iceberg table.
+     *
+     * @return {@link DataStreamSink} for sink.
+     */
+    public DataStreamSink<RowData> append() {
+      IcebergSink sink = build();
+      DataStream<RowData> rowDataInput = inputCreator.apply(uidPrefix);
+      DataStreamSink<RowData> rowDataDataStreamSink =
+          rowDataInput.sinkTo(sink).uid(uidPrefix + "-sink");
+      if (sink.flinkWriteConf.writeParallelism() != null) {
+      }
+      return rowDataDataStreamSink;
+    }
+  }
+  private static Table checkAndGetTable(TableLoader tableLoader, Table table) {
+    if (table == null) {
+      if (!tableLoader.isOpen()) {
+        tableLoader.open();
+      }
+      try (TableLoader loader = tableLoader) {
+        return SerializableTable.copyOf(loader.loadTable());
+      } catch (IOException e) {
+        throw new UncheckedIOException(
+            "Failed to load iceberg table from table loader: " + tableLoader, 
+      }
+    }
+    return SerializableTable.copyOf(table);
+  }
+  private static RowType toFlinkRowType(Schema schema, TableSchema 
requestedSchema) {
+    if (requestedSchema != null) {
+      // Convert the flink schema to iceberg schema firstly, then reassign ids 
to match the existing
+      // iceberg schema.
+      Schema writeSchema = 
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+      TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
+      // We use this flink schema to read values from RowData. The flink's 
+      // be promoted to iceberg INTEGER, that means if we use iceberg's table 
schema to read TINYINT
+      // (backend by 1 'byte'), we will read 4 bytes rather than 1 byte, it 
will mess up the byte
+      // array in

Review Comment:
   nit: formatting

