lucasbru commented on code in PR #17091:
URL: https://github.com/apache/kafka/pull/17091#discussion_r1771041249
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java:
##########
@@ -57,6 +75,22 @@ private Fetch(
this.records = records;
this.positionAdvanced = positionAdvanced;
this.numRecords = numRecords;
+ this.nextFetchOffsets = new HashMap<>();
Review Comment:
Same here, I would be interested in when this constructor is called. I'm
worried that we forget to pass the correct nextOffset information if this
constructor is called.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1232,47 +1231,51 @@ public void
shouldRespectPunctuateCancellationSystemTime() {
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10);
}
- @Test
- public void shouldRespectCommitNeeded() {
- when(stateManager.taskId()).thenReturn(taskId);
- when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
- task = createSingleSourceStateless(createConfig(AT_LEAST_ONCE, "0"),
StreamsConfig.METRICS_LATEST);
- task.initializeIfNeeded();
- task.completeRestoration(noOpResetter -> { });
-
- assertFalse(task.commitNeeded());
-
- task.addRecords(partition1,
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
- assertTrue(task.process(0L));
- assertTrue(task.commitNeeded());
-
- task.prepareCommit();
- assertTrue(task.commitNeeded());
-
- task.postCommit(true);
- assertFalse(task.commitNeeded());
-
- assertTrue(task.canPunctuateStreamTime());
- assertTrue(task.maybePunctuateStreamTime());
- assertTrue(task.commitNeeded());
-
- task.prepareCommit();
- assertTrue(task.commitNeeded());
-
- task.postCommit(true);
- assertFalse(task.commitNeeded());
-
- time.sleep(10);
- assertTrue(task.canPunctuateSystemTime());
- assertTrue(task.maybePunctuateSystemTime());
- assertTrue(task.commitNeeded());
-
- task.prepareCommit();
- assertTrue(task.commitNeeded());
-
- task.postCommit(true);
- assertFalse(task.commitNeeded());
- }
+// @Test
Review Comment:
Reminder to not approve this as long as we have commented out test cases.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -637,7 +637,15 @@ private ConsumerRecords<K, V> poll(final Timer timer,
final boolean includeMetad
+ "since the consumer's position has advanced
for at least one topic partition");
}
- return this.interceptors.onConsume(new
ConsumerRecords<>(fetch.records()));
+ Map<TopicPartition, OffsetAndMetadata>
nextOffsetAndMetadata = new HashMap<>();
+ Map<TopicPartition, Long> nextFetchOffsets =
fetch.nextFetchOffsets();
Review Comment:
If both nextFetchOffsets and lastEpochs are coming from the fetch, could we
just create `OffsetAndMetadata` directly inside a map
`fetch.nextOffsetAndMetadata`? We'd avoid dealing with multiple maps this way
it seems.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -35,11 +35,18 @@ public class ConsumerRecords<K, V> implements
Iterable<ConsumerRecord<K, V>> {
public static final ConsumerRecords<Object, Object> EMPTY = new
ConsumerRecords<>(Collections.emptyMap());
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+ private Map<TopicPartition, OffsetAndMetadata> nextOffsets;
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>>
records) {
Review Comment:
Who calls this constructor, is it in the Queues code? Do we really want to
make `nextOffsets` null here? It seems better to initialize it to an empty map
if no nextOffsets are available. It could be better to just have one
constructor and pass an empty map for `nextOffsets` in the case of share
consumers explicitly - it's easy to use this overload and "forget" to pass the
right nextOffsets.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -35,11 +35,18 @@ public class ConsumerRecords<K, V> implements
Iterable<ConsumerRecord<K, V>> {
public static final ConsumerRecords<Object, Object> EMPTY = new
ConsumerRecords<>(Collections.emptyMap());
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+ private Map<TopicPartition, OffsetAndMetadata> nextOffsets;
Review Comment:
Could be final
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -181,6 +182,11 @@ public Long headRecordOffset() {
return headRecord == null ? null : headRecord.offset();
}
+ @SuppressWarnings("OptionalAssignedToNull")
+ public Optional<Integer> headRecordLeaderEpoch() {
+ return headRecord == null ? null : headRecord.leaderEpoch();
Review Comment:
Yeah we are distinguishing three cases here: `null` - queue empty.
`Optional.empty()` - queue not empty, leader epoch not set on head record.
otherwise, leader epoch. This is a bit confusing, but I think it's correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]