lucasbru commented on code in PR #17091:
URL: https://github.com/apache/kafka/pull/17091#discussion_r1771051009
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -462,23 +463,29 @@ public Map<TopicPartition, OffsetAndMetadata>
prepareCommit() {
}
}
- private Long findOffset(final TopicPartition partition) {
+ private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition
partition) {
Long offset = partitionGroup.headRecordOffset(partition);
+ Optional<Integer> leaderEpoch =
partitionGroup.headRecordLeaderEpoch(partition);
+ final long partitionTime =
partitionGroup.partitionTimestamp(partition);
if (offset == null) {
try {
- offset = mainConsumer.position(partition);
- } catch (final TimeoutException error) {
- // the `consumer.position()` call should never block, because
we know that we did process data
- // for the requested partition and thus the consumer should
have a valid local position
- // that it can return immediately
-
- // hence, a `TimeoutException` indicates a bug and thus we
rethrow it as fatal `IllegalStateException`
- throw new IllegalStateException(error);
+ if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition))
{
+ final OffsetAndMetadata offsetAndMetadata =
nextOffsetsAndMetadataToBeConsumed.get(partition);
+ offset = offsetAndMetadata.offset();
+ leaderEpoch = offsetAndMetadata.leaderEpoch();
+ } else {
+ // This indicates a bug and thus we rethrow it as fatal
`IllegalStateException`
+ final String errorMsg = String.format("Stream task " + id
+ " does not know the partition: " + partition);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg, new
NullPointerException());
Review Comment:
Not sure why you are creating a null pointer exception here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]