cgpoh opened a new issue, #6630: URL: https://github.com/apache/iceberg/issues/6630
### Query engine Flink ### Question I have a Flink job that uses side output to write to Iceberg table when there are errors in the main processing function. If there are no errors in the processing function, no data files will be added to be committed. I noticed that the Flink job is restarting and throwing the following exception: ``` IcebergFilesCommitter -> Sink: iceberg-error-sink-FPL (1/1)#152 (2b5a4587aa3a50c531671a32a2f1538c) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: Cannot determine partition spec: no data files have been added at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:502) at org.apache.iceberg.MergingSnapshotProducer.dataSpec(MergingSnapshotProducer.java:150) at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:133) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:223) at org.apache.iceberg.BaseReplacePartitions.apply(BaseReplacePartitions.java:26) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:369) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:367) at org.apache.iceberg.BaseReplacePartitions.commit(BaseReplacePartitions.java:26) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:372) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.replacePartitions(IcebergFilesCommitter.java:314) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitPendingResult(IcebergFilesCommitter.java:270) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:255) at org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:229) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:409) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:343) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1384) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$14(StreamTask.java:1325) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Unknown Source) ``` I saw that in the `commitPendingResult` function of IcebergFilesCommitter.java, there's a condition to check whether to skip empty commit but if the MAX_CONTINUOUS_EMPTY_COMMITS is met, it will proceed to commit even there are no data files to commit and thus, throwing the above exception. ```java long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount(); continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints + 1 : 0; if (totalFiles != 0 || continuousEmptyCheckpoints % maxContinuousEmptyCommits == 0) { if (replacePartitions) { replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId); } else { commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId); } continuousEmptyCheckpoints = 0; } else { LOG.info("Skipping committing empty checkpoint {}", checkpointId); } ``` May I know what's the purpose of this empty commit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org