gharris1727 commented on code in PR #15165:
URL: https://github.com/apache/kafka/pull/15165#discussion_r1603758912


##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -248,7 +248,10 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
                         subscriptions.position(entry.getKey(), newPosition);
                     }
                 }
-                toClear.add(entry.getKey());
+                // Since reassignment logic for tiered topics relies on 
seekToBeginning,

Review Comment:
   I don't think we can change this logic, because it would re-deliver the same 
records over and over, which is a distinct from the real KafkaConsumer that 
(without seeking) tries to only deliver records once.
   
   I do think that maybe seekToBeginning should clear the records from the 
reset partitions, to mirror how the real kafka consumer uses 
FetchState.AWAIT_RESET#hasValidPosition to prevent fetching data from a 
partition which has just been reset, even if there is data buffered.
   
   Then the test can explicitly re-add the records to re-consume them.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws 
InterruptedException {
         TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
         assertEquals(2, handler.metadataCounter);
 
-        // should only read the tpId1 records
+        // Adding assignment for partition 1 after related metadata records 
have already been read
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
-        addRecord(consumer, metadataPartition, tpId1, 2);

Review Comment:
   this add record is gone now, and that changes the final waitForCondition in 
this test. Could both of these changes be reverted, or are they integral to the 
change to the test?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -238,10 +238,15 @@ void maybeWaitForPartitionAssignments() throws 
InterruptedException {
             this.assignedMetadataPartitions = 
Collections.unmodifiableSet(metadataPartitionSnapshot);
             // for newly assigned user-partitions, read from the beginning of 
the corresponding metadata partition
             final Set<TopicPartition> seekToBeginOffsetPartitions = 
assignedUserTopicIdPartitionsSnapshot
-                .stream()
-                .filter(utp -> !utp.isAssigned)
-                .map(utp -> toRemoteLogPartition(utp.metadataPartition))
-                .collect(Collectors.toSet());
+                    .stream()
+                    .filter(utp -> !utp.isAssigned)
+                    .map(utp -> utp.metadataPartition)
+                    // When reset to beginning is happening, we also need to 
reset the last read offset
+                    // Otherwise if the next reassignment request for the same 
metadata partition comes in
+                    // before the record of already assigned topic has been 
read, then the reset will happen again to the last read offset
+                    .peek(readOffsetsByMetadataPartition::remove)

Review Comment:
   Connect's WorkerSinkTask follows a very similar pattern, where consumer.seek 
is followed by some bookkeeping. We set the internal state to the offset we 
seek rather than removing it, but i think removing the offset would probably be 
safer. :+1:



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws 
InterruptedException {
         TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
         assertEquals(2, handler.metadataCounter);
 
-        // should only read the tpId1 records
+        // Adding assignment for partition 1 after related metadata records 
have already been read
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
-        addRecord(consumer, metadataPartition, tpId1, 2);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId0 + 
" to be assigned");
+
+        // Adding assignment for partition0
+        // to trigger the reset to last read offset and assignment for another 
partition
+        // that has different metadata partition to trigger the update of 
metadata snapshot
+        HashSet<TopicIdPartition> partitions = new HashSet<>();
+        partitions.add(tpId0);
+        partitions.add(tpId3);
+        consumerTask.addAssignmentsForPartitions(partitions);
+        // Waiting for all metadata records to be re-read from metadata 
partition 2
         TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(2L)),
 "Couldn't read record");
-        assertEquals(3, handler.metadataCounter);
+        // Verifying that all the metadata records form metadata partition 2 
were processed properly.
+        TestUtils.waitForCondition(() -> handler.metadataCounter == 3, 
"Couldn't read record");
 
         // shouldn't read tpId2 records because it's not assigned
         addRecord(consumer, metadataPartition, tpId2, 3);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(3L)),
 "Couldn't read record");

Review Comment:
   See comment on addRecord.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -241,16 +258,25 @@ public void testCanProcessRecord() throws 
InterruptedException {
         TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
         assertEquals(2, handler.metadataCounter);
 
-        // should only read the tpId1 records
+        // Adding assignment for partition 1 after related metadata records 
have already been read
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + 
" to be assigned");
-        addRecord(consumer, metadataPartition, tpId1, 2);
+        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId0 + 
" to be assigned");

Review Comment:
   nit: typo tpId0 -> tpId1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to