lucasbru commented on code in PR #17091:
URL: https://github.com/apache/kafka/pull/17091#discussion_r1771053314


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -462,23 +463,29 @@ public Map<TopicPartition, OffsetAndMetadata> 
prepareCommit() {
         }
     }
 
-    private Long findOffset(final TopicPartition partition) {
+    private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition 
partition) {
         Long offset = partitionGroup.headRecordOffset(partition);
+        Optional<Integer> leaderEpoch = 
partitionGroup.headRecordLeaderEpoch(partition);
+        final long partitionTime = 
partitionGroup.partitionTimestamp(partition);
         if (offset == null) {
             try {
-                offset = mainConsumer.position(partition);
-            } catch (final TimeoutException error) {
-                // the `consumer.position()` call should never block, because 
we know that we did process data
-                // for the requested partition and thus the consumer should 
have a valid local position
-                // that it can return immediately
-
-                // hence, a `TimeoutException` indicates a bug and thus we 
rethrow it as fatal `IllegalStateException`
-                throw new IllegalStateException(error);
+                if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) 
{
+                    final OffsetAndMetadata offsetAndMetadata = 
nextOffsetsAndMetadataToBeConsumed.get(partition);
+                    offset = offsetAndMetadata.offset();
+                    leaderEpoch = offsetAndMetadata.leaderEpoch();
+                } else {
+                    // This indicates a bug and thus we rethrow it as fatal 
`IllegalStateException`
+                    final String errorMsg = String.format("Stream task " + id 
+ " does not know the partition: " + partition);
+                    log.error(errorMsg);

Review Comment:
   I think if you throw a fatal exception, you don't need to also log it. It 
should be logged by the code that catches the exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to