bryanck commented on code in PR #14525:
URL: https://github.com/apache/iceberg/pull/14525#discussion_r2503975452
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -206,7 +206,15 @@ private void commitToTable(
String branch =
config.tableConfig(tableIdentifier.toString()).commitBranch();
+ // Control topic partition offsets may include a subset of partition ids
if there were no
+ // records for other partitions. Merge the updated topic partitions with
the last committed
+ // offsets.
Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table,
branch);
+ Map<Integer, Long> mergedOffsets =
+ Stream.of(committedOffsets, controlTopicOffsets)
+ .flatMap(map -> map.entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
Long::max));
Review Comment:
This is the correct behavior. The offsets in the table correspond to the
data committed. If the user did want to roll back the offsets, they also need
to roll back the offsets in the table, either by rolling back the snapshot or
by updating the offsets manually. (The hope was we'd have tools to help with
that at some point.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]