gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1638488900


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
     }
 
     @Test
-    public void testAddAssignmentsForPartitions() throws InterruptedException {
+    public void testAddAssignmentsForPartitions() {
         final List<TopicIdPartition> idPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
         for (final TopicIdPartition idPartition : idPartitions) {
-            TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " + 
idPartition + " to be assigned");
             
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
             assertTrue(handler.isPartitionLoaded.get(idPartition));
         }
     }
 
     @Test
-    public void testRemoveAssignmentsForPartitions() throws 
InterruptedException {
+    public void testRemoveAssignmentsForPartitions() {
         final List<TopicIdPartition> allPartitions = getIdPartitions("sample", 
3);
         final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
             .map(idp -> 
toRemoteLogPartition(partitioner.metadataPartition(idp)))
             .collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) -> 
b));
         consumer.updateEndOffsets(endOffsets);
         consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
-        thread.start();
+        consumerTask.ingestRecords();
 
         final TopicIdPartition tpId = allPartitions.get(0);
-        TestUtils.waitForCondition(() -> 
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + " 
to be assigned");
         addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
-        TestUtils.waitForCondition(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
-            "Couldn't read record");
+        consumerTask.ingestRecords();
+        assertTrue(() -> 
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),

Review Comment:
   Hey this wasn't addressed. Can you remove the () -> where it it isn't 
necessary?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -115,40 +118,54 @@ public ConsumerTask(RemotePartitionMetadataEventHandler 
remotePartitionMetadataE
         this.pollTimeoutMs = pollTimeoutMs;
         this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
         this.time = Objects.requireNonNull(time);
+        this.isInternalConsumerClosed = new AtomicBoolean(false);
         this.uninitializedAt = time.milliseconds();
     }
 
     @Override
     public void run() {
         log.info("Starting consumer task thread.");
         while (!isClosed) {
-            try {
-                if (hasAssignmentChanged) {
-                    maybeWaitForPartitionAssignments();
-                }
+            ingestRecords();
+        }
+        closeConsumer();
+        log.info("Exited from consumer task thread");
+    }
 
-                log.trace("Polling consumer to receive remote log metadata 
topic records");
-                final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
-                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-                    processConsumerRecord(record);
-                }
-                maybeMarkUserPartitionsAsReady();
-            } catch (final WakeupException ex) {
-                // ignore logging the error
-                isClosed = true;
-            } catch (final RetriableException ex) {
-                log.warn("Retriable error occurred while processing the 
records. Retrying...", ex);
-            } catch (final Exception ex) {
-                isClosed = true;
-                log.error("Error occurred while processing the records", ex);
+    // public for testing
+    public void ingestRecords() {
+        try {
+            if (hasAssignmentChanged) {
+                maybeWaitForPartitionAssignments();
             }
+
+            log.trace("Polling consumer to receive remote log metadata topic 
records");
+            final ConsumerRecords<byte[], byte[]> consumerRecords = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+            for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                processConsumerRecord(record);
+            }
+            maybeMarkUserPartitionsAsReady();
+        } catch (final WakeupException ex) {
+            // ignore logging the error
+            isClosed = true;
+            closeConsumer();
+        } catch (final RetriableException ex) {
+            log.warn("Retriable error occurred while processing the records. 
Retrying...", ex);
+        } catch (final Exception ex) {
+            isClosed = true;
+            log.error("Error occurred while processing the records", ex);
+            closeConsumer();
         }
-        try {
-            consumer.close(Duration.ofSeconds(30));
-        } catch (final Exception e) {
-            log.error("Error encountered while closing the consumer", e);
+    }
+
+    private void closeConsumer() {
+        if (isInternalConsumerClosed.compareAndSet(false, true)) {

Review Comment:
   While the AtomicBoolean makes this implementation correct, it seems more 
complicated than the existing implementation.
   
   I think if you made closeConsumer more visible and called it in the 
ConsumerTaskTest, we could avoid needing to change this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to