zhongqishang commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1683742893
########## flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java: ########## @@ -426,30 +425,44 @@ private void commitOperation( } @Override - public void processElement(StreamRecord<WriteResult> element) { - this.writeResultsOfCurrentCkpt.add(element.getValue()); + public void processElement(StreamRecord<FlinkWriteResult> element) { + FlinkWriteResult flinkWriteResult = element.getValue(); + List<WriteResult> writeResults = + writeResultsSinceLastSnapshot.computeIfAbsent( + flinkWriteResult.checkpointId(), k -> Lists.newArrayList()); + writeResults.add(flinkWriteResult.writeResult()); } @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. - long currentCheckpointId = Long.MAX_VALUE; - dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); - writeResultsOfCurrentCkpt.clear(); - + long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID; + writeToManifestSinceLastSnapshot(currentCheckpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, currentCheckpointId); } + private void writeToManifestSinceLastSnapshot(long checkpointId) throws IOException { + if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) { + dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA); + } + + for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt : Review Comment: > should we add Preconditions check that the writeResultsSinceLastSnapshot contains checkpointIds <= the current checkpointId? I think we can't add a Preconditions check, because the method needs to consider the case where writeResultsSinceLastSnapshot is empty. > We may have a problem in case of concurrent checkpoints (say 1 and 2). when checkpoint 1 snapshot method is executed, we shall not write the data files partially received for checkpoint 2 to manifest file. we should only process the map entries up to the checkpointId I discussed this scenario (If I understand correctly) with @pvary and we concluded that it is not possible. https://github.com/apache/iceberg/pull/10526#discussion_r1650849668 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org