cgpoh opened a new issue, #6630:
URL: https://github.com/apache/iceberg/issues/6630

   ### Query engine
   
   Flink
   
   ### Question
   
   I have a Flink job that uses side output to write to Iceberg table when 
there are errors in the main processing function. If there are no errors in the 
processing function, no data files will be added to be committed. I noticed 
that the Flink job is restarting and throwing the following exception:
   ```
   IcebergFilesCommitter -> Sink: iceberg-error-sink-FPL (1/1)#152 
(2b5a4587aa3a50c531671a32a2f1538c) switched from RUNNING to FAILED with failure 
cause: java.lang.IllegalStateException: Cannot determine partition spec: no 
data files have been added
        at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:502)
        at 
org.apache.iceberg.MergingSnapshotProducer.dataSpec(MergingSnapshotProducer.java:150)
        at 
org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:133)
        at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:223)
        at 
org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:26)
        at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:369)
        at 
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)
        at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:367)
        at 
org.apache.iceberg.BaseReplacePartitions.commit(BaseReplacePartitions.java:26)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:372)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.replacePartitions(IcebergFilesCommitter.java:314)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitPendingResult(IcebergFilesCommitter.java:270)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:255)
        at 
org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:229)
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.base/java.lang.Thread.run(Unknown Source)
   ```
   I saw that in the `commitPendingResult` function of 
IcebergFilesCommitter.java, there's a condition to check whether to skip empty 
commit but if the MAX_CONTINUOUS_EMPTY_COMMITS is met, it will proceed to 
commit even there are no data files to commit and thus, throwing the above 
exception.
   ```java
   long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
   continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 
1 : 0;
   if (totalFiles != 0 || continuousEmptyCheckpoints % 
maxContinuousEmptyCommits == 0) {
     if (replacePartitions) {
       replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
     } else {
       commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
     }
     continuousEmptyCheckpoints = 0;
   } else {
     LOG.info("Skipping committing empty checkpoint {}", checkpointId);
   }
   ```
   May I know what's the purpose of this empty commit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to