pvary commented on PR #11497:
URL: https://github.com/apache/iceberg/pull/11497#issuecomment-2519720233

   To summarise the state, I see the following open questions:
   - State handling and restoration
   - How to handle table schema, specification, configuration changes
   - How to handle errors
   
   **State handling and restoration**
   What we know:
   - There will be no issues with aligned checkpoints, and the checkpoint 
barriers are aligned with the triggers, so every event is processed in a single 
checkpoint
   - With unaligned checkpoints
       - The state of the compaction job is not super important as the user 
could always trigger a new compaction and drop old pending results
       - There are good barriers against double committing the same compaction 
results. If a replaced file doesn’t exists in the table anymore then the commit 
will fail
       - If we want to use the core CommitManager strategy (which handles 
partial commits), then we can’t guarantee the consistency of the state and the 
Iceberg table, since the CommitManager uses its own thread to do the actual 
commits - here the Flink folks might come up with some solution currently 
unknown to me
       - If we restore an old checkpoint or savepoint we are almost guaranteed 
to fail to commit the inProgress records as they are most probably already 
committed by the previous iteration of the job
   
   Currently the following ideas emerged to handle the situation:
   - Use core CommitManager, store the state and do a best effort when 
committing the inProgress records
       - Helps if the job stopped ungracefully (without calling the close on 
the DataFileRewriteCommitter
       - Generates errors on start (gracefully handled)
       - Generates errors on receiving groups stored in the wire state 
(gracefully handled)
   - Create our own strategy for committing
       - Needs an elaborate strategy to store the group ids in the commits
       - Before adding a new group we need to scan the table if the group is 
already committed - either with cache, or continuous checks
       - We lose the features provided by the core CommitManager (groups 
handling and parallel commits)
   - Remove state from the Compaction Task
       - Simplest implementation
       - We might lose some in flight work - nothing on graceful stop, but 
there might be some loss on crashing jobs
   
   **How to handle table schema, specification, configuration changes**
   We could decide if we want to handle table 
schema/specification/configuration changes.
   - We could opt for foregoing those changes and use the values available when 
the job started (Like every current Flink job) 
   - We need to add guardrails to prevent compaction when there are some 
changes in the table which could cause issues. Like losing date from new 
columns when reading new files with the old schema
   - We could opt for keeping the schema/specification/configuration 
up-to-date. For this we have the following options:
       - Generate a SerializableTable object at planning time, and send it 
through the wire for the DataFileRewriteExecutors
           - The size of the table could increase the network traffic
           - There is some issue with S3FileIO serialization which needs to be 
fixed
       - Send only the snapshotId to the DataFileRewriteExecutors and load the 
table on every executor
           - Adds extra load to the Catalog, as every executor needs to fetch 
the table data once for every trigger
   
   **How to handle errors**
   We have 2 options for propagating the errors in the specific operators
   - Use side output to emitt the error and aggregate it in an final operator
       - Better separation of concerns
   - Send a Pair<Data, Exception> as an output and handle the exceptions in 
every operator
       - Simpler flow
   
   Another ortogonal question wrt. the exceptions is what to propagate:
   - Propagate the whole exception
       - We have the full stack trace available at the aggregation point
       - Might cause state issues if the Exception is changed
   - Propagate the exception message only and log the exception locally
       - We have less network traffic
   
   My current preferences:
   - **State handling and restoration**: 3
   - **How to handle table schema, specification, configuration changes**: 2/a
   - **How to handle errors**: 1, 1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to