herbherbherb opened a new issue, #15783: URL: https://github.com/apache/iceberg/issues/15783
### Query engine Flink ### Feature Request / Improvement Add a `WriteObserver` plugin interface to `IcebergSink` (Sink V2) that observes each record written and produces per-checkpoint metadata that flows through the entire sink pipeline to the Iceberg snapshot summary. ### Motivation Users need to extract per-record metadata (e.g., watermark timestamps, data quality scores) at the writer level and attach it to the committed Iceberg snapshot. Currently there is no way to propagate custom metadata from the writer through the aggregator to the committer without subclassing multiple internal classes (`IcebergSinkWriter`, `IcebergWriteAggregator`, `IcebergCommittable`, `IcebergCommitter`). This PR adds a `WriteObserver` that is called for each record in `IcebergSinkWriter.write()`. At checkpoint time, the observer's accumulated metadata is carried through the serialization boundary via a ThreadLocal holder, serialized alongside `WriteResult` and `IcebergCommittable`, merged across parallel writer subtasks in the aggregator, and applied as additional Iceberg snapshot properties in the committer. ### Changes - New: `WriteObserver.java` -- interface with `observe(RowData, SinkWriter.Context)` and `default Map<String, String> snapshotMetadata()` - New: `WriteObserverMetadataHolder.java` -- ThreadLocal holder for passing metadata through serialization boundaries - Modified: `IcebergSinkWriter` -- calls observer per-record, collects metadata at checkpoint via `prepareCommit()` - Modified: `WriteResultSerializer` -- v2 format carries observer metadata alongside WriteResult bytes - Modified: `IcebergWriteAggregator` -- reads metadata from deserialized WriteResults, merges across writer subtasks, passes to IcebergCommittable - Modified: `IcebergCommittable` + `IcebergCommittableSerializer` -- new `observerMetadata` field with v2 serialization format (backward-compatible with v1) - Modified: `IcebergCommitter` -- merges observer metadata from committables and applies as snapshot properties - Modified: `IcebergSink.Builder` -- new `writeObserver()` method ### Compatibility - No behavioral change when the observer is not set (null default) - `IcebergCommittableSerializer` v2 can deserialize v1 payloads (backward compatible) - `WriteResultSerializer` v2 can deserialize v1 payloads (backward compatible) - No changes to public API signatures of existing methods ### Use cases - Per-record watermark extraction for downstream freshness tracking - Data quality score aggregation per checkpoint - Custom metadata that should appear in Iceberg snapshot summaries -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
