gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1636921387


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
     }
 
     @Test
-    public void testAddAssignmentsForPartitions() throws InterruptedException {
+    public void testAddAssignmentsForPartitions() {
         final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : idPartitions) {
-            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
             
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
             assertTrue(handler.isPartitionLoaded.get(idPartition));
         }
     }
 
     @Test
-    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+    public void testRemoveAssignmentsForPartitions() {
         final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
 
         final TopicIdPartition tpId = allPartitions.get(0);
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
         addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-            "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
+                "Couldn't read record");
 
         final Set<TopicIdPartition> removePartitions = 
Collections.singleton(tpId);
         consumerTask.removeAssignmentsForPartitions(removePartitions);
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : allPartitions) {
-            final TestCondition condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
-            TestUtils.waitForCondition(condition, "Timed out waiting for " + 
idPartition + " to be removed");
+            final BooleanSupplier condition = () -> 
removePartitions.contains(idPartition) == 
!consumerTask.isUserPartitionAssigned(idPartition);
+            assertTrue(condition, "Partition " + idPartition + " has not been 
removed");

Review Comment:
   This can be an assertEquals now.
   ```suggestion
               assertEquals(!removePartitions.contains(idPartition), 
consumerTask.isUserPartitionAssigned(idPartition), "Partition " + idPartition + 
" has not been removed");
   
   ```



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
     }
 
     @Test
-    public void testAddAssignmentsForPartitions() throws InterruptedException {
+    public void testAddAssignmentsForPartitions() {
         final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : idPartitions) {
-            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
             
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
             assertTrue(handler.isPartitionLoaded.get(idPartition));
         }
     }
 
     @Test
-    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+    public void testRemoveAssignmentsForPartitions() {
         final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
 
         final TopicIdPartition tpId = allPartitions.get(0);
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
         addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-            "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),

Review Comment:
   Thanks so much for making these waitForCondition calls into assertTrue 
calls! I think you can use the (boolean, String) instead of the 
(BooleanSupplier, String) form because these no-longer need to be recomputed.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -284,20 +279,19 @@ public void testCanReprocessSkippedRecords() throws 
InterruptedException {
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition),
 0L));
         final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
         consumerTask.addAssignmentsForPartitions(assignments);
-        thread.start();
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + 
" to be assigned");
+        consumerTask.ingestRecords();
 
         // Adding metadata records in the order opposite to the order of 
assignments
         addRecord(consumer, metadataPartition, tpId1, 0);
         addRecord(consumer, metadataPartition, tpId0, 1);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
         // Only one record is processed, tpId1 record is skipped as unassigned
         // but read offset is 1 e.g., record for tpId1 has been read by 
consumer
         assertEquals(1, handler.metadataCounter);
 
         // Adding assignment for tpId1 after related metadata records have 
already been read
         consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));

Review Comment:
   In this test, we used doAnswer to mock out the consumer behavior, because of 
the multithreading. I don't think this mocking is necessary anymore now that 
there aren't any spurious poll() calls in-between the two 
addAssignmentsForPartition calls.
   
   You can remove doAnswer in this test, and the spy on the consumer in 
beforeEach.



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -122,33 +122,41 @@ public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataE
     public void run() {
         log.info("Starting consumer task thread.");
         while (!isClosed) {
-            try {
-                if (hasAssignmentChanged) {
-                    maybeWaitForPartitionAssignments();
-                }
+            ingestRecords();
+        }
+        log.info("Exited from consumer task thread");
+    }
 
-                log.trace("Polling consumer to receive remote log metadata 
topic records");
-                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-                    processConsumerRecord(record);
-                }
-                maybeMarkUserPartitionsAsReady();
-            } catch (final WakeupException ex) {
-                // ignore logging the error
-                isClosed = true;
-            } catch (final RetriableException ex) {
-                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
-            } catch (final Exception ex) {
-                isClosed = true;
-                log.error("Error occurred while processing the records", ex);
+    public void ingestRecords() {
+        try {
+            if (hasAssignmentChanged) {
+                maybeWaitForPartitionAssignments();
+            }
+
+            log.trace("Polling consumer to receive remote log metadata topic 
records");
+            final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+            for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                processConsumerRecord(record);
             }
+            maybeMarkUserPartitionsAsReady();
+        } catch (final WakeupException ex) {
+            // ignore logging the error
+            closeConsumer();
+        } catch (final RetriableException ex) {
+            log.warn("Retriable error occurred while processing the records. 
Retrying...", ex);
+        } catch (final Exception ex) {
+            log.error("Error occurred while processing the records", ex);
+            closeConsumer();
         }
+    }
+
+    private void closeConsumer() {

Review Comment:
   This introduces a race condition that can cause a consumer leak. Consider 
the following sequence:
   
   1. The ConsumerTask thread returns from maybeMarkPartitionsAsReady(), and 
then pauses
   2. An external thread calls close(), and sets isClose = true
   3. The ConsumerTask thread resumes, returns from ingestRecords(), and checks 
the isClose flag in run().
   4. The ConsumerTask thread returns from run() and exits, without ever 
closing the consumer.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
     }
 
     @Test
-    public void testAddAssignmentsForPartitions() throws InterruptedException {
+    public void testAddAssignmentsForPartitions() {
         final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : idPartitions) {
-            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
             
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
             assertTrue(handler.isPartitionLoaded.get(idPartition));
         }
     }
 
     @Test
-    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+    public void testRemoveAssignmentsForPartitions() {
         final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
 
         final TopicIdPartition tpId = allPartitions.get(0);
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");

Review Comment:
   I think these assertions for isUserPartitionAssigned can be retained as 
assertTrue calls.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -238,29 +230,32 @@ public void testCanProcessRecord() throws 
InterruptedException {
         
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
 0L));
         final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
         consumerTask.addAssignmentsForPartitions(assignments);
-        thread.start();
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + 
" to be assigned");
+        consumerTask.ingestRecords();
 
         addRecord(consumer, metadataPartition, tpId0, 0);
         addRecord(consumer, metadataPartition, tpId0, 1);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
 "Couldn't read record");

Review Comment:
   The "Couldn't read record" strings are pretty useless and potentially 
confusing. I would be fine with omitting them completely, or if you want, you 
could write some new strings based on the surrounding context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to