kumarpritam863 opened a new pull request, #14701:
URL: https://github.com/apache/iceberg/pull/14701

   ## Summary
   
     This PR enhances the Kafka Connect sink to track offset ranges (start and 
end offsets) for each topic partition, providing better traceability of which 
source data was included in each Iceberg
     snapshot.
   
     ## Motivation
   
     Previously, the connector only tracked a single offset value per 
partition, which represented the next offset to consume. This made it difficult 
to determine the exact range of data that was
     committed in a particular snapshot. By tracking both start and end 
offsets, we can now:
   
     1. Precisely identify the range of Kafka messages included in each Iceberg 
snapshot
     2. Improve debugging and data lineage capabilities
     3. Enable better auditing and compliance tracking
     4. Facilitate data recovery and reprocessing scenarios
     5. Also this will enable switching to exact topic partition for Flink jobs 
if running in hybrid mode with model of iceberg -> kafka
   
     ## Changes
   
     ### Core Data Models
   
     **`TopicPartitionOffset` class:**
     - Changed from single `offset` field to `startOffset` and `endOffset` 
fields
     - Updated Iceberg schema to include both offset fields
     - Field IDs: `START_OFFSET = 10_702`, `END_OFFSET = 10_703`, `TIMESTAMP = 
10_704`
     - Added getter methods: `startOffset()` and `endOffset()`
   
     **`Offset` class:**
     - Modified to track both `startOffset` and `endOffset` instead of a single 
offset
     - Added backward compatibility method `offset()` that returns `endOffset`
     - Updated constructor signature: `Offset(Long startOffset, Long endOffset, 
OffsetDateTime timestamp)`
   
     ### Offset Tracking
   
     **`SinkWriter` class:**
     - Enhanced `save()` method to track offset ranges per partition
     - For the first record in a partition: `startOffset = currentOffset`, 
`endOffset = currentOffset + 1`
     - For subsequent records: preserves original `startOffset`, updates 
`endOffset = currentOffset + 1`
     - Ensures accurate range tracking across multiple records in the same 
commit cycle
   
     **`Worker` class:**
     - Updated to pass both `startOffset` and `endOffset` when creating 
`TopicPartitionOffset` objects
   
     ### Snapshot Metadata
   
     **`CommitState` class:**
     - Added `topicPartitionOffsets()` method to extract all topic partition 
offsets from ready buffer
     - Returns complete offset information for the current commit
   
     **`Coordinator` class:**
     - Added new snapshot property: `kafka.connect.topic-partition-offsets`
     - Implemented `topicPartitionOffsetsToJson()` to serialize offset ranges 
to JSON
     - JSON format includes: `topic`, `partition`, `startOffset`, `endOffset`, 
and `timestamp` for each partition
     - Updated `commitToTable()` to store topic partition offsets in both 
append and delta operations
   
     ### Example Snapshot Metadata
   
     After this change, Iceberg snapshots will include metadata like:
   
     ```json
     {
       "kafka.connect.topic-partition-offsets": [
         {
           "topic": "events",
           "partition": 0,
           "startOffset": 100,
           "endOffset": 250,
           "timestamp": "2024-01-15T10:30:00Z"
         },
         {
           "topic": "events",
           "partition": 1,
           "startOffset": 50,
           "endOffset": 175,
           "timestamp": "2024-01-15T10:30:05Z"
         }
       ]
     }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to