kumarpritam863 opened a new pull request, #14701:
URL: https://github.com/apache/iceberg/pull/14701
## Summary
This PR enhances the Kafka Connect sink to track offset ranges (start and
end offsets) for each topic partition, providing better traceability of which
source data was included in each Iceberg
snapshot.
## Motivation
Previously, the connector only tracked a single offset value per
partition, which represented the next offset to consume. This made it difficult
to determine the exact range of data that was
committed in a particular snapshot. By tracking both start and end
offsets, we can now:
1. Precisely identify the range of Kafka messages included in each Iceberg
snapshot
2. Improve debugging and data lineage capabilities
3. Enable better auditing and compliance tracking
4. Facilitate data recovery and reprocessing scenarios
5. Also this will enable switching to exact topic partition for Flink jobs
if running in hybrid mode with model of iceberg -> kafka
## Changes
### Core Data Models
**`TopicPartitionOffset` class:**
- Changed from single `offset` field to `startOffset` and `endOffset`
fields
- Updated Iceberg schema to include both offset fields
- Field IDs: `START_OFFSET = 10_702`, `END_OFFSET = 10_703`, `TIMESTAMP =
10_704`
- Added getter methods: `startOffset()` and `endOffset()`
**`Offset` class:**
- Modified to track both `startOffset` and `endOffset` instead of a single
offset
- Added backward compatibility method `offset()` that returns `endOffset`
- Updated constructor signature: `Offset(Long startOffset, Long endOffset,
OffsetDateTime timestamp)`
### Offset Tracking
**`SinkWriter` class:**
- Enhanced `save()` method to track offset ranges per partition
- For the first record in a partition: `startOffset = currentOffset`,
`endOffset = currentOffset + 1`
- For subsequent records: preserves original `startOffset`, updates
`endOffset = currentOffset + 1`
- Ensures accurate range tracking across multiple records in the same
commit cycle
**`Worker` class:**
- Updated to pass both `startOffset` and `endOffset` when creating
`TopicPartitionOffset` objects
### Snapshot Metadata
**`CommitState` class:**
- Added `topicPartitionOffsets()` method to extract all topic partition
offsets from ready buffer
- Returns complete offset information for the current commit
**`Coordinator` class:**
- Added new snapshot property: `kafka.connect.topic-partition-offsets`
- Implemented `topicPartitionOffsetsToJson()` to serialize offset ranges
to JSON
- JSON format includes: `topic`, `partition`, `startOffset`, `endOffset`,
and `timestamp` for each partition
- Updated `commitToTable()` to store topic partition offsets in both
append and delta operations
### Example Snapshot Metadata
After this change, Iceberg snapshots will include metadata like:
```json
{
"kafka.connect.topic-partition-offsets": [
{
"topic": "events",
"partition": 0,
"startOffset": 100,
"endOffset": 250,
"timestamp": "2024-01-15T10:30:00Z"
},
{
"topic": "events",
"partition": 1,
"startOffset": 50,
"endOffset": 175,
"timestamp": "2024-01-15T10:30:05Z"
}
]
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]