pvary commented on PR #11497: URL: https://github.com/apache/iceberg/pull/11497#issuecomment-2519720233
To summarise the state, I see the following open questions: - State handling and restoration - How to handle table schema, specification, configuration changes - How to handle errors **State handling and restoration** What we know: - There will be no issues with aligned checkpoints, and the checkpoint barriers are aligned with the triggers, so every event is processed in a single checkpoint - With unaligned checkpoints - The state of the compaction job is not super important as the user could always trigger a new compaction and drop old pending results - There are good barriers against double committing the same compaction results. If a replaced file doesn’t exists in the table anymore then the commit will fail - If we want to use the core CommitManager strategy (which handles partial commits), then we can’t guarantee the consistency of the state and the Iceberg table, since the CommitManager uses its own thread to do the actual commits - here the Flink folks might come up with some solution currently unknown to me - If we restore an old checkpoint or savepoint we are almost guaranteed to fail to commit the inProgress records as they are most probably already committed by the previous iteration of the job Currently the following ideas emerged to handle the situation: - Use core CommitManager, store the state and do a best effort when committing the inProgress records - Helps if the job stopped ungracefully (without calling the close on the DataFileRewriteCommitter - Generates errors on start (gracefully handled) - Generates errors on receiving groups stored in the wire state (gracefully handled) - Create our own strategy for committing - Needs an elaborate strategy to store the group ids in the commits - Before adding a new group we need to scan the table if the group is already committed - either with cache, or continuous checks - We lose the features provided by the core CommitManager (groups handling and parallel commits) - Remove state from the Compaction Task - Simplest implementation - We might lose some in flight work - nothing on graceful stop, but there might be some loss on crashing jobs **How to handle table schema, specification, configuration changes** We could decide if we want to handle table schema/specification/configuration changes. - We could opt for foregoing those changes and use the values available when the job started (Like every current Flink job) - We need to add guardrails to prevent compaction when there are some changes in the table which could cause issues. Like losing date from new columns when reading new files with the old schema - We could opt for keeping the schema/specification/configuration up-to-date. For this we have the following options: - Generate a SerializableTable object at planning time, and send it through the wire for the DataFileRewriteExecutors - The size of the table could increase the network traffic - There is some issue with S3FileIO serialization which needs to be fixed - Send only the snapshotId to the DataFileRewriteExecutors and load the table on every executor - Adds extra load to the Catalog, as every executor needs to fetch the table data once for every trigger **How to handle errors** We have 2 options for propagating the errors in the specific operators - Use side output to emitt the error and aggregate it in an final operator - Better separation of concerns - Send a Pair<Data, Exception> as an output and handle the exceptions in every operator - Simpler flow Another ortogonal question wrt. the exceptions is what to propagate: - Propagate the whole exception - We have the full stack trace available at the aggregation point - Might cause state issues if the Exception is changed - Propagate the exception message only and log the exception locally - We have less network traffic My current preferences: - **State handling and restoration**: 3 - **How to handle table schema, specification, configuration changes**: 2/a - **How to handle errors**: 1, 1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org