mxm commented on code in PR #14182:
URL: https://github.com/apache/iceberg/pull/14182#discussion_r2379354566


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -137,38 +139,68 @@ public void 
commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
         commitRequestMap.entrySet()) {
       Table table = 
catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
       DynamicCommittable last = 
entry.getValue().lastEntry().getValue().get(0).getCommittable();
-      long maxCommittedCheckpointId =
+
+      CheckpointInfo maxCommittedCheckpointIds =
           getMaxCommittedCheckpointId(
               table, last.jobId(), last.operatorId(), entry.getKey().branch());
       // Mark the already committed FilesCommittable(s) as finished
       entry
           .getValue()
-          .headMap(maxCommittedCheckpointId, true)
+          .headMap(maxCommittedCheckpointIds.maxProcessedCheckpointId, false)
           .values()
           .forEach(list -> 
list.forEach(CommitRequest::signalAlreadyCommitted));
+      // Filter out all committables until the last checkpoint id, which may 
be partially committed.
       NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> uncommitted =
-          entry.getValue().tailMap(maxCommittedCheckpointId, false);
+          
entry.getValue().tailMap(maxCommittedCheckpointIds.maxProcessedCheckpointId, 
true);
+
       if (!uncommitted.isEmpty()) {
         commitPendingRequests(
-            table, entry.getKey().branch(), uncommitted, last.jobId(), 
last.operatorId());
+            table,
+            entry.getKey().branch(),
+            uncommitted,
+            last.jobId(),
+            last.operatorId(),
+            maxCommittedCheckpointIds);
+      }
+    }
+  }
+
+  private static class CheckpointInfo {
+    private final long maxProcessedCheckpointId;
+    private final int maxCommittedWriteResultIndex;
+
+    private CheckpointInfo(long maxProcessedCheckpointId, int 
maxCommittedWriteResultIndex) {
+      this.maxProcessedCheckpointId = maxProcessedCheckpointId;
+      this.maxCommittedWriteResultIndex = maxCommittedWriteResultIndex;
+    }
+
+    int writeResultIndexFor(long checkpointId) {
+      if (checkpointId == this.maxProcessedCheckpointId) {
+        return this.maxCommittedWriteResultIndex;
       }
+      return -1;
     }
   }
 
-  private static long getMaxCommittedCheckpointId(
+  private static CheckpointInfo getMaxCommittedCheckpointId(
       Table table, String flinkJobId, String operatorId, String branch) {
     Snapshot snapshot = table.snapshot(branch);
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+    int writeResultIndex = INITIAL_WRITE_RESULT_INDEX;
 
     while (snapshot != null) {
       Map<String, String> summary = snapshot.summary();
       String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
       String snapshotOperatorId = summary.get(OPERATOR_ID);
       if (flinkJobId.equals(snapshotFlinkJobId)
           && (snapshotOperatorId == null || 
snapshotOperatorId.equals(operatorId))) {
-        String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
-        if (value != null) {
-          lastCommittedCheckpointId = Long.parseLong(value);
+        String maxCheckpointIdString = 
summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+        if (maxCheckpointIdString != null) {
+          lastCommittedCheckpointId = Long.parseLong(maxCheckpointIdString);
+          String writeResultIndexString = summary.get(MAX_WRITE_RESULT_INDEX);
+          if (writeResultIndexString != null) {
+            writeResultIndex = Integer.parseInt(writeResultIndexString);
+          }

Review Comment:
   Good point. This exact code path isn't exercised in the tests, although it 
is semantically identical to writing to an empty table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to