pvary commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1695119335


##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -426,30 +425,44 @@ private void commitOperation(
   }
 
   @Override
-  public void processElement(StreamRecord<WriteResult> element) {
-    this.writeResultsOfCurrentCkpt.add(element.getValue());
+  public void processElement(StreamRecord<FlinkWriteResult> element) {
+    FlinkWriteResult flinkWriteResult = element.getValue();
+    List<WriteResult> writeResults =
+        writeResultsSinceLastSnapshot.computeIfAbsent(
+            flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
+    writeResults.add(flinkWriteResult.writeResult());
   }
 
   @Override
   public void endInput() throws IOException {
     // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
-    long currentCheckpointId = Long.MAX_VALUE;
-    dataFilesPerCheckpoint.put(currentCheckpointId, 
writeToManifest(currentCheckpointId));
-    writeResultsOfCurrentCkpt.clear();
-
+    long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
+    writeToManifestSinceLastSnapshot(currentCheckpointId);
     commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, 
currentCheckpointId);
   }
 
+  private void writeToManifestSinceLastSnapshot(long checkpointId) throws 
IOException {
+    if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+      dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
+    }
+
+    for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :

Review Comment:
   I think the process above will translate to the following messages on the 
wire:
   - Writer-1 prepareSnapshotBarrier will emit File 1-1 generated by the method
   - Writer-1 will emits the checkpoint barrier for CHK-1
   - Writer-2 prepareSnapshotBarrier will emit File 1-2 generated by the method
   - Writer-2 will emits checkpoint barrier for CHK-1
   - Committer receives File 1-1
   - Committer receives File 1-2
   - Writer-1 prepareSnapshotBarrier will emit File 2-1 generated by the method
   - Writer-1 will emits checkpoint barrier for CHK-2
   - Committer will **NOT** receive File 2-1 until it received the checkpoint 
barrier for every upstream operator, and executed the snapshotState method.
   
   Based on this the fix should be working if aligned checkpoints are used. I 
think all bets are out of the window, when unaligned checkpoints are used, as 
this case the checkpoint barriers can "overtake" normal records.
   
   @gyfora: Could you please validate if my understanding above is correct?
   
   If I am right, it would be good to state this somewhere in the Iceberg Flink 
connector documentation.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to