stevenzwu commented on code in PR #10179:
URL: https://github.com/apache/iceberg/pull/10179#discussion_r1690550344


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.IcebergSink.Builder;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)

Review Comment:
   Flink module has been fully migrated to junit5. please rebase with latest 
main branch. we would need to convert all test here to junit5.
   https://github.com/apache/iceberg/pull/10770
   
   I am also going to submit another PR to refactor the sink test to reduce the 
number of combinations we are running and the run time. Maybe you can wait 
after that for the tests change.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergFlinkManifest.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.committer;
+
+import static 
org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.sink.DeltaManifests;
+import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.FlinkManifestUtil;
+import org.apache.iceberg.flink.sink.ManifestOutputFileFactory;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestIcebergFlinkManifest {

Review Comment:
   what is the purpose of this test class? what's the difference with 
`TestFlinkManifest`?



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/SinkTestUtil.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+public class SinkTestUtil {
+
+  private SinkTestUtil() {}
+
+  @SuppressWarnings("unchecked")
+  public static List<StreamElement> fromOutput(Collection<Object> elements) {
+    return elements.stream()
+        .map(
+            element -> {
+              if (element instanceof StreamRecord) {
+                return new StreamRecord<>(
+                    ((StreamRecord<CommittableMessage<?>>) 
element).getValue());
+              }
+              return (StreamElement) element;
+            })
+        .collect(Collectors.toList());
+  }
+
+  public static CommittableSummary<?> toCommittableSummary(StreamElement 
element) {

Review Comment:
   maybe rename the method to make the assertion part explicit like 
`extractAndAssertCommitableSummary`.
   
   see the `TestHelpers` util class e.g.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergCommitter.java:
##########
@@ -0,0 +1,1472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.committer;
+
+import static 
org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static org.apache.iceberg.flink.sink.SinkTestUtil.toCommittableSummary;
+import static 
org.apache.iceberg.flink.sink.SinkTestUtil.transformsToStreamElement;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.FlinkManifestUtil;
+import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
+import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestIcebergCommitter extends TestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestIcebergCommitter.class);
+  @TempDir File temporaryFolder;
+  @TempDir File flinkManifestFolder;
+  private Table table;
+  private TableLoader tableLoader;
+
+  @Parameter(index = 1)
+  private FileFormat format;
+
+  @Parameter(index = 2)
+  private Boolean isBatchMode;
+
+  @Parameter(index = 3)
+  private Boolean isCheckpointingEnabled;

Review Comment:
   do we need this dimension? batch has no checkpoint. streaming has checkpoint 
enabled. I believe streaming write won't work with checkpoint disabled.
   
   we need to be careful with the combination explosion of those parameterized 
tests, which leads to longer run time. this is especially important for test 
classes with many test methods like this one.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergCommitter.java:
##########
@@ -0,0 +1,1472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.committer;
+
+import static 
org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static org.apache.iceberg.flink.sink.SinkTestUtil.toCommittableSummary;
+import static 
org.apache.iceberg.flink.sink.SinkTestUtil.transformsToStreamElement;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.FlinkManifestUtil;
+import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
+import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestIcebergCommitter extends TestBase {

Review Comment:
   is the test coverage same as `TestIcebergFilesCommitter`? any new tests we 
added for the v2 sink?



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.sink.IcebergSink.Builder;
+import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)

Review Comment:
   I am also wondering if we can avoid the code duplication via 
paremterization. the constructing API is similar. Can we plug in different 
provider depends on the parameter? But if it turns out to be very difficult, we 
can go with the copy and paste.
   
   ```
   IcebergSink.forRowData(dataStream)
           .table(table)
           .tableLoader(tableLoader)
           .writeParallelism(parallelism)
           .append();
   ```
   
   e.g. for the source
   https://github.com/apache/iceberg/pull/10741/files



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergCommitter.java:
##########
@@ -0,0 +1,1472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.committer;
+
+import static 
org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static org.apache.iceberg.flink.sink.SinkTestUtil.toCommittableSummary;
+import static 
org.apache.iceberg.flink.sink.SinkTestUtil.transformsToStreamElement;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.FlinkManifestUtil;
+import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
+import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestIcebergCommitter extends TestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestIcebergCommitter.class);
+  @TempDir File temporaryFolder;
+  @TempDir File flinkManifestFolder;
+  private Table table;
+  private TableLoader tableLoader;
+
+  @Parameter(index = 1)
+  private FileFormat format;

Review Comment:
   I know this is in `TestIcebergFilesCommitter`. I am wondering if it is 
necessary to test different data file formats (Avro, Orc, Parquet) for 
committer logic?



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergCommitter.java:
##########
@@ -0,0 +1,1472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.committer;
+
+import static 
org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static org.apache.iceberg.flink.sink.SinkTestUtil.toCommittableSummary;
+import static 
org.apache.iceberg.flink.sink.SinkTestUtil.transformsToStreamElement;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.FlinkManifestUtil;
+import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
+import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestIcebergCommitter extends TestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestIcebergCommitter.class);
+  @TempDir File temporaryFolder;
+  @TempDir File flinkManifestFolder;
+  private Table table;
+  private TableLoader tableLoader;
+
+  @Parameter(index = 1)
+  private FileFormat format;
+
+  @Parameter(index = 2)
+  private Boolean isBatchMode;
+
+  @Parameter(index = 3)
+  private Boolean isCheckpointingEnabled;
+
+  @Parameter(index = 4)

Review Comment:
   move this to earlier position to align with `TestIcebergFilesCommitter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to