Schiewatir opened a new pull request, #16360:
URL: https://github.com/apache/iceberg/pull/16360
## Problem
After a Kafka cluster is recreated (old cluster deleted, new one started),
the Apache Iceberg Kafka Connect sink stops committing metadata. Parquet files
accumulate in object storage but no snapshots are created, and the logs show
`"Coordinator … found nothing to commit to table"` on every commit cycle.
(Reported in #15293.)
**Root cause:** The control topic resets to offset 0 on the new cluster, but
the Iceberg snapshot still stores committed offsets from the old cluster (e.g.
`{"0":100}`). The deduplication filter in `Coordinator.commitToTable()` rejects
every `DataWritten` event whose offset is below the stored baseline, so all
data files are silently dropped.
```java
// before fix – stale minOffset = 100, envelope.offset() = 0..4 → all
filtered
Long minOffset = committedOffsets.get(envelope.partition());
return minOffset == null || envelope.offset() >= minOffset;
```
## Fix
When a partition's current control-topic offset is **lower** than its stored
committed offset, the committed offset for that partition is reset to the
current offset minus one (so the next write is accepted). This is detected
per-partition during `commitToTable()`, so there is no behaviour change for the
normal (non-reset) path.
After the reset, the filter accepts the new events, a snapshot is committed,
and its stored offset is reset to the new baseline — replication resumes.
Fixes #15293
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]