stevenzwu commented on code in PR #7171: URL: https://github.com/apache/iceberg/pull/7171#discussion_r1161386084
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ########## @@ -145,6 +151,7 @@ public void initializeState(StateInitializationContext context) throws Exception this.tableLoader.open(); this.table = tableLoader.loadTable(); this.committerMetrics = new IcebergFilesCommitterMetrics(super.metrics, table.name()); + this.spec = table.specs().get(specId); Review Comment: why don't we pass the `spec` in directly? ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @Test + public void testSpecEvolution() throws Exception { + long timestamp = 0; + + JobID jobID = new JobID(); + OperatorID operatorId; + try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertSnapshotSize(0); + + List<RowData> rows = Lists.newArrayListWithExpectedSize(3); + + int checkpointId = 1; + RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); + DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); + harness.processElement(of(dataFile), ++timestamp); + rows.add(rowData); + harness.snapshot(checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + + // Change partition spec + table.refresh(); + table.updateSpec().addField("id").commit(); Review Comment: I think we should verify the restore works after partition spec change. ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @Test + public void testSpecEvolution() throws Exception { + long timestamp = 0; + + JobID jobID = new JobID(); + OperatorID operatorId; + try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertSnapshotSize(0); + + List<RowData> rows = Lists.newArrayListWithExpectedSize(3); + + int checkpointId = 1; + RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); + DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); + harness.processElement(of(dataFile), ++timestamp); + rows.add(rowData); + harness.snapshot(checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + + // Change partition spec + table.refresh(); + table.updateSpec().addField("id").commit(); + + checkpointId = 2; + rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); + dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); + harness.processElement(of(dataFile), ++timestamp); + rows.add(rowData); + harness.snapshot(checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + + // Change partition spec again Review Comment: I don't know if it is necessary to test partition spec change again ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ########## @@ -443,7 +451,7 @@ private byte[] writeToManifest(long checkpointId) throws IOException { WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( - result, () -> manifestOutputFileFactory.create(checkpointId), table.spec()); + result, () -> manifestOutputFileFactory.create(checkpointId), spec); Review Comment: looks like this is the key fix ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -877,6 +877,64 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @Test + public void testSpecEvolution() throws Exception { + long timestamp = 0; + + JobID jobID = new JobID(); + OperatorID operatorId; + try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobID)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertSnapshotSize(0); + + List<RowData> rows = Lists.newArrayListWithExpectedSize(3); + + int checkpointId = 1; + RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); + DataFile dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); + harness.processElement(of(dataFile), ++timestamp); + rows.add(rowData); + harness.snapshot(checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); + + // Change partition spec + table.refresh(); + table.updateSpec().addField("id").commit(); + + checkpointId = 2; + rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId); + dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData)); + harness.processElement(of(dataFile), ++timestamp); + rows.add(rowData); + harness.snapshot(checkpointId, ++timestamp); + harness.notifyOfCompletedCheckpoint(checkpointId); Review Comment: we should verify that the staging manifest file is still written with the old partition spec when the committer operator was created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org