lucasbru commented on code in PR #17091:
URL: https://github.com/apache/kafka/pull/17091#discussion_r1771051009
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -462,23 +463,29 @@ public Map<TopicPartition, OffsetAndMetadata>
prepareCommit() {
}
}
- private Long findOffset(final TopicPartition partition) {
+ private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition
partition) {
Long offset = partitionGroup.headRecordOffset(partition);
+ Optional<Integer> leaderEpoch =
partitionGroup.headRecordLeaderEpoch(partition);
+ final long partitionTime =
partitionGroup.partitionTimestamp(partition);
if (offset == null) {
try {
- offset = mainConsumer.position(partition);
- } catch (final TimeoutException error) {
- // the `consumer.position()` call should never block, because
we know that we did process data
- // for the requested partition and thus the consumer should
have a valid local position
- // that it can return immediately
-
- // hence, a `TimeoutException` indicates a bug and thus we
rethrow it as fatal `IllegalStateException`
- throw new IllegalStateException(error);
+ if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition))
{
+ final OffsetAndMetadata offsetAndMetadata =
nextOffsetsAndMetadataToBeConsumed.get(partition);
+ offset = offsetAndMetadata.offset();
+ leaderEpoch = offsetAndMetadata.leaderEpoch();
+ } else {
+ // This indicates a bug and thus we rethrow it as fatal
`IllegalStateException`
+ final String errorMsg = String.format("Stream task " + id
+ " does not know the partition: " + partition);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg, new
NullPointerException());
Review Comment:
Not sure why you are creating a null pointer exception here?
Also, I wonder if we are sure that we will never hit this branch. In
principle, we will get here whenever we try to commit offsets for a topic
partition, for which we have not received any records. If I understand the code
correctly, we should not get here, but honestly I'm not 100%.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]