pvary commented on code in PR #14182:
URL: https://github.com/apache/iceberg/pull/14182#discussion_r2381787017


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -274,26 +274,25 @@ private void replacePartitions(
       CommitSummary summary,
       String newFlinkJobId,
       String operatorId) {
-    for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      // We don't commit the merged result into a single transaction because 
for the sequential
-      // transaction txn1 and txn2, the equality-delete files of txn2 are 
required to be applied
-      // to data files from txn1. Committing the merged one will lead to the 
incorrect delete
-      // semantic.
-      for (WriteResult result : e.getValue()) {
-        ReplacePartitions dynamicOverwrite =
-            table.newReplacePartitions().scanManifestsWith(workerPool);
+    // Iceberg tables are unsorted. So the order of the append data does not 
matter.
+    // Hence, we commit everything in one snapshot.
+    ReplacePartitions dynamicOverwrite = 
table.newReplacePartitions().scanManifestsWith(workerPool);
+
+    for (List<WriteResult> writeResults : pendingResults.values()) {
+      for (WriteResult result : writeResults) {
         Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
-        commitOperation(
-            table,
-            branch,
-            dynamicOverwrite,
-            summary,
-            "dynamic partition overwrite",
-            newFlinkJobId,
-            operatorId,
-            e.getKey());
       }
     }
+
+    commitOperation(
+        table,
+        branch,
+        dynamicOverwrite,
+        summary,
+        "dynamic partition overwrite",
+        newFlinkJobId,
+        operatorId,
+        pendingResults.lastKey());

Review Comment:
   If I understand correctly, then the `overwrite-mode` only should be enabled 
in batch jobs, as it is very hard to make any claims about the results in a 
streaming job.
   
   Also, with the current implementation it is also problematic, as we could 
have multiple data files for a given partition, and then, they will replace 
each other, and only that last one wins 😢 
   
   Also, if the checkpoints are turned on, then we will have a same issue as 
mentioned above, just with a bit bigger amount of data. The 2nd checkpoint 
might delete data from the 1st checkpoint, because it is replacing the same 
partition.
   
   So this means that replace partitions is only working if the checkpointing 
is turned off (or you are lucky 😄)
   
   So essentially, it doesn't matter which solution we choose 😄 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to