Lianet Magrans created KAFKA-17066:
--------------------------------------
Summary: New consumer updateFetchPositions should perform all
operations in background thread
Key: KAFKA-17066
URL: https://issues.apache.org/jira/browse/KAFKA-17066
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 3.8.0
Reporter: Lianet Magrans
Fix For: 3.9.0
The updateFetchPositions func in the new consumer performs several actions
based on the assigned partitions from the subscriptionState. The way it's
currently implemented, it fetches committed offsets for partitions that
required a position (retrieved from subscription state in the app thread), and
then resets positions for the partitions still needing one (retrieved from the
subscription state but in the backgroud thread).
This is problematic, given that the assignment/subscriptionState may change in
the background thread at any time (ex. new partitions reconciled), so we could
end up resetting positions to the partition offsets for a partition for which
we never evetn attempted to retrieve committed offsets.
This sequence for a consumer that owns a partitions tp0,:
* consumer owns tp0
* app thread -> updateFetchPositions triggers
initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned
partitions requiring a position (taking them from
subscriptions.initializingPartitions()). This will fetch committed offsets for
tp0 only.
*
background thread -> receives new partition tp1 and completes reconciliation
(adds it to the subscription state as INITIALIZING, requires a position)
* app thread -> updateFetchPositions resets positions for all partitions that
still don't have a valid position after initWithCommittedOffsetsIfNeeded
(taking them from subscriptionState.partitionsNeedingReset). This will
mistakenly consider that it should reset tp1 to the partition offsets, when in
reality it never even tried fetching the committed offsets for it because it
wasn't assigned when initWithCommittedOffsetsIfNeeded happened.
We should consider moving the updateFetchPositions as a single event to the
background, that would safely use the subscriptionState object and apply all
actions involved in the updateFetchPositions to the same consistent set of
partitions assigned at that moment.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)