gharris1727 commented on code in PR #15165:
URL: https://github.com/apache/kafka/pull/15165#discussion_r1603758912
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -248,7 +248,10 @@ public synchronized ConsumerRecords<K, V> poll(final
Duration timeout) {
subscriptions.position(entry.getKey(), newPosition);
}
}
- toClear.add(entry.getKey());
+ // Since reassignment logic for tiered topics relies on
seekToBeginning,
Review Comment:
I don't think we can change this logic, because it would re-deliver the same
records over and over, which is a distinct from the real KafkaConsumer that
(without seeking) tries to only deliver records once.
I do think that maybe seekToBeginning should clear the records from the
reset partitions, to mirror how the real kafka consumer uses
FetchState.AWAIT_RESET#hasValidPosition to prevent fetching data from a
partition which has just been reset, even if there is data buffered.
Then the test can explicitly re-add the records to re-consume them.
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws
InterruptedException {
TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
assertEquals(2, handler.metadataCounter);
- // should only read the tpId1 records
+ // Adding assignment for partition 1 after related metadata records
have already been read
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 +
" to be assigned");
- addRecord(consumer, metadataPartition, tpId1, 2);
Review Comment:
this add record is gone now, and that changes the final waitForCondition in
this test. Could both of these changes be reverted, or are they integral to the
change to the test?
##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -238,10 +238,15 @@ void maybeWaitForPartitionAssignments() throws
InterruptedException {
this.assignedMetadataPartitions =
Collections.unmodifiableSet(metadataPartitionSnapshot);
// for newly assigned user-partitions, read from the beginning of
the corresponding metadata partition
final Set<TopicPartition> seekToBeginOffsetPartitions =
assignedUserTopicIdPartitionsSnapshot
- .stream()
- .filter(utp -> !utp.isAssigned)
- .map(utp -> toRemoteLogPartition(utp.metadataPartition))
- .collect(Collectors.toSet());
+ .stream()
+ .filter(utp -> !utp.isAssigned)
+ .map(utp -> utp.metadataPartition)
+ // When reset to beginning is happening, we also need to
reset the last read offset
+ // Otherwise if the next reassignment request for the same
metadata partition comes in
+ // before the record of already assigned topic has been
read, then the reset will happen again to the last read offset
+ .peek(readOffsetsByMetadataPartition::remove)
Review Comment:
Connect's WorkerSinkTask follows a very similar pattern, where consumer.seek
is followed by some bookkeeping. We set the internal state to the offset we
seek rather than removing it, but i think removing the offset would probably be
safer. :+1:
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws
InterruptedException {
TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
assertEquals(2, handler.metadataCounter);
- // should only read the tpId1 records
+ // Adding assignment for partition 1 after related metadata records
have already been read
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 +
" to be assigned");
- addRecord(consumer, metadataPartition, tpId1, 2);
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId0 +
" to be assigned");
+
+ // Adding assignment for partition0
+ // to trigger the reset to last read offset and assignment for another
partition
+ // that has different metadata partition to trigger the update of
metadata snapshot
+ HashSet<TopicIdPartition> partitions = new HashSet<>();
+ partitions.add(tpId0);
+ partitions.add(tpId3);
+ consumerTask.addAssignmentsForPartitions(partitions);
+ // Waiting for all metadata records to be re-read from metadata
partition 2
TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)),
"Couldn't read record");
- assertEquals(3, handler.metadataCounter);
+ // Verifying that all the metadata records form metadata partition 2
were processed properly.
+ TestUtils.waitForCondition(() -> handler.metadataCounter == 3,
"Couldn't read record");
// shouldn't read tpId2 records because it's not assigned
addRecord(consumer, metadataPartition, tpId2, 3);
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)),
"Couldn't read record");
Review Comment:
See comment on addRecord.
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws
InterruptedException {
TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
assertEquals(2, handler.metadataCounter);
- // should only read the tpId1 records
+ // Adding assignment for partition 1 after related metadata records
have already been read
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 +
" to be assigned");
- addRecord(consumer, metadataPartition, tpId1, 2);
+ TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId0 +
" to be assigned");
Review Comment:
nit: typo tpId0 -> tpId1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]