pvary commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1695119335
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ########## @@ -426,30 +425,44 @@ private void commitOperation( } @Override - public void processElement(StreamRecord<WriteResult> element) { - this.writeResultsOfCurrentCkpt.add(element.getValue()); + public void processElement(StreamRecord<FlinkWriteResult> element) { + FlinkWriteResult flinkWriteResult = element.getValue(); + List<WriteResult> writeResults = + writeResultsSinceLastSnapshot.computeIfAbsent( + flinkWriteResult.checkpointId(), k -> Lists.newArrayList()); + writeResults.add(flinkWriteResult.writeResult()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. - long currentCheckpointId = Long.MAX_VALUE; - dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); - writeResultsOfCurrentCkpt.clear(); - + long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID; + writeToManifestSinceLastSnapshot(currentCheckpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); } + private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException { + if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { + dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA); + } + + for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt : Review Comment: I think the process above will translate to the following messages on the wire: - Writer-1 prepareSnapshotBarrier will emit File 1-1 generated by the method - Writer-1 will emits the checkpoint barrier for CHK-1 - Writer-2 prepareSnapshotBarrier will emit File 1-2 generated by the method - Writer-2 will emits checkpoint barrier for CHK-1 - Committer receives File 1-1 - Committer receives File 1-2 - Writer-1 prepareSnapshotBarrier will emit File 2-1 generated by the method - Writer-1 will emits checkpoint barrier for CHK-2 - Committer will **NOT** receive File 2-1 until it received the checkpoint barrier for every upstream operator, and executed the snapshotState method. Based on this the fix should be working if aligned checkpoints are used. I think all bets are out of the window, when unaligned checkpoints are used, as this case the checkpoint barriers can "overtake" normal records. @gyfora: Could you please validate if my understanding above is correct? If I am right, it would be good to state this somewhere in the Iceberg Flink connector documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org