vvcephei commented on a change in pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#discussion_r561985705
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -637,20 +636,32 @@ private ListOffsetResult
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
} else {
List<ConsumerRecord<K, V>> records =
fetchRecords(nextInLineFetch, recordsRemaining);
- if (!records.isEmpty()) {
- TopicPartition partition = nextInLineFetch.partition;
- List<ConsumerRecord<K, V>> currentRecords =
fetched.get(partition);
- if (currentRecords == null) {
- fetched.put(partition, records);
- } else {
- // this case shouldn't usually happen because we
only send one fetch at a time per partition,
- // but it might conceivably happen in some rare
cases (such as partition leader changes).
- // we have to copy to a new list because the old
one may be immutable
- List<ConsumerRecord<K, V>> newRecords = new
ArrayList<>(records.size() + currentRecords.size());
- newRecords.addAll(currentRecords);
- newRecords.addAll(records);
- fetched.put(partition, newRecords);
+ TopicPartition partition = nextInLineFetch.partition;
+
+ if (subscriptions.isAssigned(partition)) {
+ // initializeCompletedFetch, above, has already
persisted the metadata from the fetch in the
+ // SubscriptionState, so we can just read it out,
which in particular lets us re-use the logic
+ // for determining the end offset
+ final long receivedTimestamp =
nextInLineFetch.receivedTimestamp;
+ final Long beginningOffset =
subscriptions.logStartOffset(partition);
+ final Long endOffset =
subscriptions.logEndOffset(partition, isolationLevel);
+ final FetchPosition fetchPosition =
subscriptions.position(partition);
+
+ final FetchedRecords.FetchMetadata fetchMetadata =
fetched.metadata().get(partition);
+ if (fetchMetadata == null
+ ||
!fetchMetadata.position().offsetEpoch.isPresent()
+ || fetchPosition.offsetEpoch.isPresent()
+ && fetchMetadata.position().offsetEpoch.get() <=
fetchPosition.offsetEpoch.get()) {
Review comment:
Ah, good catch. It looks like this was also leftover from a previous
version.
I used to directly populate the returned metadata from the fetch response,
but now I'm just populating the returned metadata from the subscription state,
which `initializeCompletedFetch` has already updated.
The benefit is that we don't have to worry about cases like this, since
they've already been checked.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]