pvary commented on code in PR #14559:
URL: https://github.com/apache/iceberg/pull/14559#discussion_r2513672740


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -302,30 +303,58 @@ private void commitDeltaTxn(
       CommitSummary summary,
       String newFlinkJobId,
       String operatorId) {
-    for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
-      long checkpointId = e.getKey();
-      List<WriteResult> writeResults = e.getValue();
-
-      RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-      for (WriteResult result : writeResults) {
-        // Row delta validations are not needed for streaming changes that 
write equality deletes.
-        // Equality deletes are applied to data in all previous sequence 
numbers, so retries may
-        // push deletes further in the future, but do not affect correctness. 
Position deletes
-        // committed to the table in this path are used only to delete rows 
from data files that are
-        // being added in this commit. There is no way for data files added 
along with the delete
-        // files to be concurrently removed, so there is no need to validate 
the files referenced by
-        // the position delete files that are being committed.
-        Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
-        Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
+    if (summary.deleteFilesCount() == 0) {
+      // To be compatible with iceberg format V1.
+      AppendFiles appendFiles = 
table.newAppend().scanManifestsWith(workerPool);
+      for (List<WriteResult> resultList : pendingResults.values()) {
+        for (WriteResult result : resultList) {
+          Preconditions.checkState(
+              result.referencedDataFiles().length == 0,
+              "Should have no referenced data files for append.");
+          Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
+        }
       }
+      String description = "append";
 
-      // Every Flink checkpoint contains a set of independent changes which 
can be committed
-      // together. While it is technically feasible to combine append-only 
data across checkpoints,
-      // for the sake of simplicity, we do not implement this (premature) 
optimization. Multiple
-      // pending checkpoints here are very rare to occur, i.e. only with very 
short checkpoint
-      // intervals or when concurrent checkpointing is enabled.
+      // fail all commits as really its only one
       commitOperation(
-          table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, checkpointId);
+          table,
+          branch,
+          appendFiles,
+          summary,
+          description,

Review Comment:
   Can we use the string value here? We don't reuse the `description` variable 
anywhere else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to