rodmeneses commented on code in PR #10179:
URL: https://github.com/apache/iceberg/pull/10179#discussion_r1714415965


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestIcebergCommitter.java:
##########
@@ -0,0 +1,1452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink.committer;
+
+import static 
org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static 
org.apache.iceberg.flink.sink.SinkTestUtil.extractAndAssertCommitableSummary;
+import static 
org.apache.iceberg.flink.sink.SinkTestUtil.transformsToStreamElement;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
+import org.apache.iceberg.flink.sink.FlinkManifestUtil;
+import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
+import org.apache.iceberg.flink.sink.IcebergSink;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestIcebergCommitter extends TestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestIcebergCommitter.class);
+  @TempDir File temporaryFolder;
+  @TempDir File flinkManifestFolder;
+  private Table table;
+  private TableLoader tableLoader;
+
+  @Parameter(index = 1)
+  private Boolean isBatchMode;
+
+  @Parameter(index = 2)
+  private String branch;
+
+  private final String jobId = "jobId";
+  private final long dataFIleRowCount = 5L;
+
+  private final DataFile dataFileTest1 =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-1.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  dataFIleRowCount,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 5L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, longToBuffer(0L)), // lower bounds
+                  ImmutableMap.of(1, longToBuffer(4L)) // upper bounds
+                  ))
+          .build();
+
+  private final DataFile dataFileTest2 =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-2.parquet")
+          .withFileSizeInBytes(0)
+          .withMetrics(
+              new Metrics(
+                  dataFIleRowCount,
+                  null, // no column sizes
+                  ImmutableMap.of(1, 5L), // value count
+                  ImmutableMap.of(1, 0L), // null count
+                  null,
+                  ImmutableMap.of(1, longToBuffer(0L)), // lower bounds
+                  ImmutableMap.of(1, longToBuffer(4L)) // upper bounds
+                  ))
+          .build();
+
+  @SuppressWarnings("checkstyle:NestedForDepth")
+  @Parameters(name = "formatVersion={0} isStreaming={1}, branch={2}")
+  protected static List<Object> parameters() {
+    List<Object> parameters = Lists.newArrayList();
+    for (Boolean isBatchMode : new Boolean[] {true, false}) {
+      for (int formatVersion : new int[] {1, 2}) {
+        parameters.add(new Object[] {formatVersion, isBatchMode, 
SnapshotRef.MAIN_BRANCH});
+        parameters.add(new Object[] {formatVersion, isBatchMode, 
"test-branch"});
+      }
+    }
+    return parameters;
+  }
+
+  @BeforeEach
+  public void before() throws Exception {
+    String warehouse = temporaryFolder.getAbsolutePath();
+
+    String tablePath = warehouse.concat("/test");
+    assertThat(new File(tablePath).mkdir()).as("Should create the table path 
correctly.").isTrue();
+
+    Map<String, String> props =
+        ImmutableMap.of(
+            TableProperties.FORMAT_VERSION,
+            String.valueOf(formatVersion),
+            FLINK_MANIFEST_LOCATION,
+            flinkManifestFolder.getAbsolutePath(),
+            IcebergCommitter.MAX_CONTINUOUS_EMPTY_COMMITS,
+            "1");
+    table = SimpleDataUtil.createTable(tablePath, props, false);
+    tableLoader = TableLoader.fromHadoopTable(tablePath);
+  }
+
+  @TestTemplate
+  public void testCommitTxnWithoutDataFiles() throws Exception {
+    IcebergCommitter committer = getCommitter();
+    SimpleDataUtil.assertTableRows(table, Lists.newArrayList(), branch);
+    assertSnapshotSize(0);
+    assertMaxCommittedCheckpointId(jobId, -1);
+
+    for (long i = 1; i <= 3; i++) {
+      final Committer.CommitRequest<IcebergCommittable> commitRequest =
+          buildCommitRequestFor(jobId, i, Lists.newArrayList());
+      committer.commit(Lists.newArrayList(commitRequest));
+      assertMaxCommittedCheckpointId(jobId, i);
+      assertSnapshotSize((int) i);
+    }
+  }
+
+  @TestTemplate
+  public void testMxContinuousEmptyCommits() throws Exception {
+    
table.updateProperties().set(IcebergCommitter.MAX_CONTINUOUS_EMPTY_COMMITS, 
"3").commit();
+    IcebergCommitter committer = getCommitter();
+    for (int i = 1; i <= 9; i++) {
+      final Committer.CommitRequest<IcebergCommittable> commitRequest =
+          buildCommitRequestFor(jobId, i, Lists.newArrayList());
+      committer.commit(Lists.newArrayList(commitRequest));
+      assertFlinkManifests(0);
+      assertSnapshotSize(i / 3);
+    }
+  }
+
+  @TestTemplate
+  public void testCommitTxn() throws Exception {
+    IcebergCommitter committer = getCommitter();
+    assertSnapshotSize(0);
+    List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+    for (int i = 1; i <= 3; i++) {
+      RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i);
+      DataFile dataFile = writeDataFile("data-" + i, 
ImmutableList.of(rowData));
+      rows.add(rowData);
+      WriteResult writeResult = of(dataFile);
+      final Committer.CommitRequest<IcebergCommittable> commitRequest =
+          buildCommitRequestFor(jobId, i, Lists.newArrayList(writeResult));
+      committer.commit(Lists.newArrayList(commitRequest));
+      assertFlinkManifests(0);
+      SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), 
branch);
+      assertSnapshotSize(i);
+      assertMaxCommittedCheckpointId(jobId, i);
+      Map<String, String> summary = SimpleDataUtil.latestSnapshot(table, 
branch).summary();
+      assertThat(summary)
+          .containsEntry(
+              "flink.test", 
"org.apache.iceberg.flink.sink.committer.TestIcebergCommitter")
+          .containsEntry("added-data-files", "1")
+          .containsEntry("flink.operator-id", "flink-sink")
+          .containsEntry("flink.job-id", "jobId");
+    }
+  }
+
+  @TestTemplate
+  public void testOrderedEventsBetweenCheckpoints() throws Exception {
+    // It's possible that two checkpoints happen in the following orders:
+    //   1. snapshotState for checkpoint#1;
+    //   2. snapshotState for checkpoint#2;
+    //   3. notifyCheckpointComplete for checkpoint#1;
+    //   4. notifyCheckpointComplete for checkpoint#2;
+    long timestamp = 0;
+
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        harness = getTestHarness();
+    harness.open();
+    assertMaxCommittedCheckpointId(jobId, -1L);
+
+    RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+    DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+    processElement(jobId, 1, harness, 1, "flink-sink", dataFile1);
+    assertMaxCommittedCheckpointId(jobId, -1L);
+
+    // 1. snapshotState for checkpoint#1
+    long firstCheckpointId = 1;
+    harness.snapshot(firstCheckpointId, ++timestamp);
+    assertFlinkManifests(1);
+
+    RowData row2 = SimpleDataUtil.createRowData(2, "world");
+    DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
+    processElement(jobId, 2, harness, 1, "flink-sink", dataFile2);
+    assertMaxCommittedCheckpointId(jobId, -1L);
+
+    // 2. snapshotState for checkpoint#2
+    long secondCheckpointId = 2;
+    harness.snapshot(secondCheckpointId, ++timestamp);
+    assertFlinkManifests(2);
+
+    // 3. notifyCheckpointComplete for checkpoint#1
+    harness.notifyOfCompletedCheckpoint(firstCheckpointId);
+    SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
+    assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+    assertFlinkManifests(1);
+
+    // 4. notifyCheckpointComplete for checkpoint#2
+    harness.notifyOfCompletedCheckpoint(secondCheckpointId);
+    SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), 
branch);
+    assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+    assertFlinkManifests(0);
+  }
+
+  @TestTemplate
+  public void testDisorderedEventsBetweenCheckpoints() throws Exception {
+    // It's possible that two checkpoints happen in the following orders:
+    //   1. snapshotState for checkpoint#1;
+    //   2. snapshotState for checkpoint#2;
+    //   3. notifyCheckpointComplete for checkpoint#1;
+    //   4. notifyCheckpointComplete for checkpoint#2;
+    long timestamp = 0;
+
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        harness = getTestHarness();
+    harness.open();
+    assertMaxCommittedCheckpointId(jobId, -1L);
+
+    RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+    DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+    processElement(jobId, 1, harness, 1, "flink-sink", dataFile1);
+    assertMaxCommittedCheckpointId(jobId, -1L);
+
+    // 1. snapshotState for checkpoint#1
+    long firstCheckpointId = 1;
+    harness.snapshot(firstCheckpointId, ++timestamp);
+    assertFlinkManifests(1);
+
+    RowData row2 = SimpleDataUtil.createRowData(2, "world");
+    DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
+    processElement(jobId, 2, harness, 1, "flink-sink", dataFile2);
+    assertMaxCommittedCheckpointId(jobId, -1L);
+
+    // 2. snapshotState for checkpoint#2
+    long secondCheckpointId = 2;
+    harness.snapshot(secondCheckpointId, ++timestamp);
+    assertFlinkManifests(2);
+
+    // 3. notifyCheckpointComplete for checkpoint#2
+    harness.notifyOfCompletedCheckpoint(secondCheckpointId);
+    SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), 
branch);
+    assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+    assertFlinkManifests(0);
+
+    // 4. notifyCheckpointComplete for checkpoint#1
+    harness.notifyOfCompletedCheckpoint(firstCheckpointId);
+    SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), 
branch);
+    assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+    assertFlinkManifests(0);
+  }
+
+  @TestTemplate
+  public void testEmitCommittablesMock() throws Exception {
+    final ForwardingCommitter committer = new ForwardingCommitter(tableLoader);
+    IcebergSink sink =
+        
spy(IcebergSink.forRowData(null).table(table).tableLoader(tableLoader).build());
+    doReturn(committer).when(sink).createCommitter(any());
+    String jobId1 = "jobId1";
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        testHarness =
+            new OneInputStreamOperatorTestHarness<>(
+                new CommitterOperatorFactory<>(sink, false, true));
+    testHarness.open();
+
+    WriteResult writeResult = WriteResult.builder().build();
+
+    IcebergCommittable commit =
+        new IcebergCommittable(
+            buildIcebergWriteAggregator(jobId1, "flink-sink")
+                .writeToManifest(Lists.newArrayList(writeResult), 1L),
+            jobId1,
+            "flink-sink",
+            1L);
+
+    final CommittableSummary<IcebergCommittable> committableSummary =
+        new CommittableSummary<>(1, 1, 1L, 1, 1, 0);
+    testHarness.processElement(new StreamRecord<>(committableSummary));
+    final CommittableWithLineage<IcebergCommittable> committableWithLineage =
+        new CommittableWithLineage<>(commit, 1L, 1);
+    testHarness.processElement(new StreamRecord<>(committableWithLineage));
+
+    // Trigger commit
+    testHarness.notifyOfCompletedCheckpoint(1);
+
+    assertThat(committer.getSuccessfulCommits()).isEqualTo(1);
+    final List<StreamElement> output = 
transformsToStreamElement(testHarness.getOutput());
+    
SinkV2Assertions.assertThat(extractAndAssertCommitableSummary(output.get(0)))
+        
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+        .hasOverallCommittables(committableSummary.getNumberOfCommittables())
+        .hasPendingCommittables(0);
+    testHarness.close();
+  }
+
+  @TestTemplate
+  public void testEmitCommittables() throws Exception {
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        testHarness = getTestHarness();
+    testHarness.open();
+
+    String jobId1 = "jobId1";
+    long checkpointId = 0;
+    CommittableSummary<IcebergCommittable> committableSummary =
+        processElement(jobId1, checkpointId, testHarness, 1, "flink-sink", 
dataFileTest1);
+
+    // Trigger commit
+    testHarness.notifyOfCompletedCheckpoint(1);
+
+    assertSnapshotSize(1);
+    assertMaxCommittedCheckpointId(jobId1, 0L);
+
+    final List<StreamElement> output = 
transformsToStreamElement(testHarness.getOutput());
+    
SinkV2Assertions.assertThat(extractAndAssertCommitableSummary(output.get(0)))
+        
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+        .hasOverallCommittables(committableSummary.getNumberOfCommittables())
+        .hasPendingCommittables(0);
+    testHarness.close();
+
+    table.refresh();
+    Snapshot currentSnapshot = table.snapshot(branch);
+
+    assertThat(currentSnapshot.summary())
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, 
String.valueOf(dataFIleRowCount))
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1");
+  }
+
+  /** The data was not committed in the previous job. */
+  @TestTemplate
+  public void testStateRestoreFromPreJob() throws Exception {
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        preJobTestHarness = getTestHarness();
+    preJobTestHarness.open();
+    preJobTestHarness.setup();
+
+    String jobId1 = "jobId1";
+    long checkpointId = 0;
+    CommittableSummary<IcebergCommittable> committableSummary =
+        processElement(jobId1, checkpointId, preJobTestHarness, 1, 
"flink-sink", dataFileTest1);
+
+    final OperatorSubtaskState snapshot = 
preJobTestHarness.snapshot(checkpointId, 2L);
+
+    assertThat(preJobTestHarness.getOutput()).isEmpty();
+    preJobTestHarness.close();
+
+    assertSnapshotSize(0);
+    assertMaxCommittedCheckpointId(jobId1, -1L);
+
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        restored = getTestHarness();
+    restored.setup(committableMessageTypeSerializer);
+    restored.initializeState(snapshot);
+    restored.open();
+
+    // Previous committables are immediately committed if possible
+    final List<StreamElement> output = 
transformsToStreamElement(restored.getOutput());
+    assertThat(output).hasSize(2);
+
+    
SinkV2Assertions.assertThat(extractAndAssertCommitableSummary(output.get(0)))
+        
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+        .hasOverallCommittables(committableSummary.getNumberOfCommittables())
+        .hasPendingCommittables(0);
+
+    table.refresh();
+
+    Snapshot currentSnapshot = table.snapshot(branch);
+
+    assertThat(currentSnapshot.summary())
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, 
String.valueOf(dataFIleRowCount))
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "1")
+        .containsEntry("flink.job-id", jobId1);
+
+    String jobId2 = "jobId2";
+    CommittableSummary<IcebergCommittable> committableSummary2 =
+        processElement(jobId2, checkpointId, restored, 1, "flink-sink", 
dataFileTest2);
+
+    // Trigger commit
+    restored.notifyOfCompletedCheckpoint(0);
+
+    final List<StreamElement> output2 = 
transformsToStreamElement(restored.getOutput());
+    
SinkV2Assertions.assertThat(extractAndAssertCommitableSummary(output2.get(0)))
+        
.hasFailedCommittables(committableSummary2.getNumberOfFailedCommittables())
+        .hasOverallCommittables(committableSummary2.getNumberOfCommittables())
+        .hasPendingCommittables(0);
+    restored.close();
+
+    assertSnapshotSize(2);
+    assertMaxCommittedCheckpointId(jobId2, 0);
+
+    table.refresh();
+    Snapshot currentSnapshot2 = table.snapshot(branch);
+
+    assertThat(currentSnapshot2.summary())
+        .containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, 
String.valueOf(dataFIleRowCount * 2))
+        .containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "2")
+        .containsEntry("flink.job-id", jobId2);
+  }
+
+  /** The data was committed in the previous job. */
+  @TestTemplate
+  public void testStateRestoreFromPreJob2() throws Exception {
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        preJobTestHarness = getTestHarness();
+
+    preJobTestHarness.open();
+
+    String jobId1 = "jobId1";
+    long checkpointId = 0;
+    CommittableSummary<IcebergCommittable> committableSummary =
+        processElement(jobId1, checkpointId, preJobTestHarness, 1, 
"flink-sink", dataFileTest1);
+
+    final OperatorSubtaskState snapshot = 
preJobTestHarness.snapshot(checkpointId, 2L);
+    // commit snapshot
+    preJobTestHarness.notifyOfCompletedCheckpoint(checkpointId);
+
+    final List<StreamElement> output = 
transformsToStreamElement(preJobTestHarness.getOutput());
+    assertThat(output).hasSize(2);
+
+    
SinkV2Assertions.assertThat(extractAndAssertCommitableSummary(output.get(0)))
+        
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
+        .hasOverallCommittables(committableSummary.getNumberOfCommittables())
+        .hasPendingCommittables(0);
+
+    preJobTestHarness.close();
+
+    assertSnapshotSize(1);
+    assertMaxCommittedCheckpointId(jobId1, 0L);
+    table.refresh();
+    long proJobSnapshotId = table.snapshot(branch).snapshotId();
+
+    String jobId2 = "jobId2";
+    final OneInputStreamOperatorTestHarness<
+            CommittableMessage<IcebergCommittable>, 
CommittableMessage<IcebergCommittable>>
+        restored = getTestHarness();
+
+    restored.initializeState(snapshot);
+    restored.open();
+
+    // The committed data will not be recommitted, but the commit records are 
emitted

Review Comment:
   TBH, I'm not sure if this comment actually makes sense or not. all we're 
doing in this test, is asserting that data committed in the previous job, can 
be read back in a new job. I changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to