pvary commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650849668
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ########## @@ -426,30 +425,44 @@ private void commitOperation( } @Override - public void processElement(StreamRecord<WriteResult> element) { - this.writeResultsOfCurrentCkpt.add(element.getValue()); + public void processElement(StreamRecord<FlinkWriteResult> element) { + FlinkWriteResult flinkWriteResult = element.getValue(); + List<WriteResult> writeResults = + writeResultsSinceLastSnapshot.computeIfAbsent( + flinkWriteResult.checkpointId(), k -> Lists.newArrayList()); + writeResults.add(flinkWriteResult.writeResult()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. - long currentCheckpointId = Long.MAX_VALUE; - dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); - writeResultsOfCurrentCkpt.clear(); - + long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID; + writeToManifestSinceLastSnapshot(currentCheckpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); } + private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException { + if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { + dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA); + } + + for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt : + writeResultsSinceLastSnapshot.entrySet()) { + dataFilesPerCheckpoint.put( Review Comment: Is this a scenario which could happen? - Writer 1 writes data - Writer 2 writes data - Writer 1 prepareSnapshotPreBarrier for Checkpoint 1, File 1-1 with checkpoint 1 created - Committer receives File 1-1 - Writer 2 prepareSnapshotPreBarrier for Checkpoint 1, File 1-2 with checkpoint 1 created - Committer receives File 1-2 - Writer 1 writes data - Writer 2 writes data - Writer 1 prepareSnapshotPreBarrier for Checkpoint 2, File 2-1 with checkpoint 2 created - Committer receives File 2-1 - Committer starts snapshotState for Checkpoint 1 - At this stage we have `writeResultsSinceLastSnapshot` with more data, but not all of them are ready to commit. - Committer receives File 2-1 - Committer starts snapshotState for Checkpoint 2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org