lucasbru commented on code in PR #17091:
URL: https://github.com/apache/kafka/pull/17091#discussion_r1771041249


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java:
##########
@@ -57,6 +75,22 @@ private Fetch(
         this.records = records;
         this.positionAdvanced = positionAdvanced;
         this.numRecords = numRecords;
+        this.nextFetchOffsets = new HashMap<>();

Review Comment:
   Same here, I would be interested in when this constructor is called. I'm 
worried that we forget to pass the correct nextOffset information if this 
constructor is called.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1232,47 +1231,51 @@ public void 
shouldRespectPunctuateCancellationSystemTime() {
         
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
 now + 10);
     }
 
-    @Test
-    public void shouldRespectCommitNeeded() {
-        when(stateManager.taskId()).thenReturn(taskId);
-        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
-        task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"), 
StreamsConfig.METRICS_LATEST);
-        task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
-
-        assertFalse(task.commitNeeded());
-
-        task.addRecords(partition1, 
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
-        assertTrue(task.process(0L));
-        assertTrue(task.commitNeeded());
-
-        task.prepareCommit();
-        assertTrue(task.commitNeeded());
-
-        task.postCommit(true);
-        assertFalse(task.commitNeeded());
-
-        assertTrue(task.canPunctuateStreamTime());
-        assertTrue(task.maybePunctuateStreamTime());
-        assertTrue(task.commitNeeded());
-
-        task.prepareCommit();
-        assertTrue(task.commitNeeded());
-
-        task.postCommit(true);
-        assertFalse(task.commitNeeded());
-
-        time.sleep(10);
-        assertTrue(task.canPunctuateSystemTime());
-        assertTrue(task.maybePunctuateSystemTime());
-        assertTrue(task.commitNeeded());
-
-        task.prepareCommit();
-        assertTrue(task.commitNeeded());
-
-        task.postCommit(true);
-        assertFalse(task.commitNeeded());
-    }
+//    @Test

Review Comment:
   Reminder to not approve this as long as we have commented out test cases.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -637,7 +637,15 @@ private ConsumerRecords<K, V> poll(final Timer timer, 
final boolean includeMetad
                                 + "since the consumer's position has advanced 
for at least one topic partition");
                     }
 
-                    return this.interceptors.onConsume(new 
ConsumerRecords<>(fetch.records()));
+                    Map<TopicPartition, OffsetAndMetadata> 
nextOffsetAndMetadata = new HashMap<>();
+                    Map<TopicPartition, Long> nextFetchOffsets = 
fetch.nextFetchOffsets();

Review Comment:
   If both nextFetchOffsets and lastEpochs are coming from the fetch, could we 
just create `OffsetAndMetadata` directly inside a map 
`fetch.nextOffsetAndMetadata`? We'd avoid dealing with multiple maps this way 
it seems.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -35,11 +35,18 @@ public class ConsumerRecords<K, V> implements 
Iterable<ConsumerRecord<K, V>> {
     public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private Map<TopicPartition, OffsetAndMetadata> nextOffsets;
 
     public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> 
records) {

Review Comment:
   Who calls this constructor, is it in the Queues code? Do we really want to 
make `nextOffsets` null here? It seems better to initialize it to an empty map 
if no nextOffsets are available. It could be better to just have one 
constructor and pass an empty map for `nextOffsets` in the case of share 
consumers explicitly - it's easy to use this overload and "forget" to pass the 
right nextOffsets.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -35,11 +35,18 @@ public class ConsumerRecords<K, V> implements 
Iterable<ConsumerRecord<K, V>> {
     public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private Map<TopicPartition, OffsetAndMetadata> nextOffsets;

Review Comment:
   Could be final



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -181,6 +182,11 @@ public Long headRecordOffset() {
         return headRecord == null ? null : headRecord.offset();
     }
 
+    @SuppressWarnings("OptionalAssignedToNull")
+    public Optional<Integer> headRecordLeaderEpoch() {
+        return headRecord == null ? null : headRecord.leaderEpoch();

Review Comment:
   Yeah we are distinguishing three cases here: `null` - queue empty. 
`Optional.empty()` - queue not empty, leader epoch not set on head record. 
otherwise, leader epoch. This is a bit confusing, but I think it's correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to