gharris1727 commented on code in PR #16303:
URL: https://github.com/apache/kafka/pull/16303#discussion_r1636921387
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
}
@Test
- public void testAddAssignmentsForPartitions() throws InterruptedException {
+ public void testAddAssignmentsForPartitions() {
final List<TopicIdPartition> idPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
- thread.start();
+ consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : idPartitions) {
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " +
idPartition + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
assertTrue(handler.isPartitionLoaded.get(idPartition));
}
}
@Test
- public void testRemoveAssignmentsForPartitions() throws
InterruptedException {
+ public void testRemoveAssignmentsForPartitions() {
final List<TopicIdPartition> allPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
- thread.start();
+ consumerTask.ingestRecords();
final TopicIdPartition tpId = allPartitions.get(0);
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + "
to be assigned");
addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
- "Couldn't read record");
+ consumerTask.ingestRecords();
+ assertTrue(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
+ "Couldn't read record");
final Set<TopicIdPartition> removePartitions =
Collections.singleton(tpId);
consumerTask.removeAssignmentsForPartitions(removePartitions);
+ consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : allPartitions) {
- final TestCondition condition = () ->
removePartitions.contains(idPartition) ==
!consumerTask.isUserPartitionAssigned(idPartition);
- TestUtils.waitForCondition(condition, "Timed out waiting for " +
idPartition + " to be removed");
+ final BooleanSupplier condition = () ->
removePartitions.contains(idPartition) ==
!consumerTask.isUserPartitionAssigned(idPartition);
+ assertTrue(condition, "Partition " + idPartition + " has not been
removed");
Review Comment:
This can be an assertEquals now.
```suggestion
assertEquals(!removePartitions.contains(idPartition),
consumerTask.isUserPartitionAssigned(idPartition), "Partition " + idPartition +
" has not been removed");
```
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
}
@Test
- public void testAddAssignmentsForPartitions() throws InterruptedException {
+ public void testAddAssignmentsForPartitions() {
final List<TopicIdPartition> idPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
- thread.start();
+ consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : idPartitions) {
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " +
idPartition + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
assertTrue(handler.isPartitionLoaded.get(idPartition));
}
}
@Test
- public void testRemoveAssignmentsForPartitions() throws
InterruptedException {
+ public void testRemoveAssignmentsForPartitions() {
final List<TopicIdPartition> allPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
- thread.start();
+ consumerTask.ingestRecords();
final TopicIdPartition tpId = allPartitions.get(0);
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + "
to be assigned");
addRecord(consumer, partitioner.metadataPartition(tpId), tpId, 0);
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
- "Couldn't read record");
+ consumerTask.ingestRecords();
+ assertTrue(() ->
consumerTask.readOffsetForMetadataPartition(partitioner.metadataPartition(tpId)).isPresent(),
Review Comment:
Thanks so much for making these waitForCondition calls into assertTrue
calls! I think you can use the (boolean, String) instead of the
(BooleanSupplier, String) form because these no-longer need to be recomputed.
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -284,20 +279,19 @@ public void testCanReprocessSkippedRecords() throws
InterruptedException {
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition),
0L));
final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
consumerTask.addAssignmentsForPartitions(assignments);
- thread.start();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 +
" to be assigned");
+ consumerTask.ingestRecords();
// Adding metadata records in the order opposite to the order of
assignments
addRecord(consumer, metadataPartition, tpId1, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
+ consumerTask.ingestRecords();
+ assertTrue(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
// Only one record is processed, tpId1 record is skipped as unassigned
// but read offset is 1 e.g., record for tpId1 has been read by
consumer
assertEquals(1, handler.metadataCounter);
// Adding assignment for tpId1 after related metadata records have
already been read
consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
Review Comment:
In this test, we used doAnswer to mock out the consumer behavior, because of
the multithreading. I don't think this mocking is necessary anymore now that
there aren't any spurious poll() calls in-between the two
addAssignmentsForPartition calls.
You can remove doAnswer in this test, and the spy on the consumer in
beforeEach.
##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##########
@@ -122,33 +122,41 @@ public ConsumerTask(RemotePartitionMetadataEventHandler
remotePartitionMetadataE
public void run() {
log.info("Starting consumer task thread.");
while (!isClosed) {
- try {
- if (hasAssignmentChanged) {
- maybeWaitForPartitionAssignments();
- }
+ ingestRecords();
+ }
+ log.info("Exited from consumer task thread");
+ }
- log.trace("Polling consumer to receive remote log metadata
topic records");
- final ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeoutMs));
- for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
- processConsumerRecord(record);
- }
- maybeMarkUserPartitionsAsReady();
- } catch (final WakeupException ex) {
- // ignore logging the error
- isClosed = true;
- } catch (final RetriableException ex) {
- log.warn("Retriable error occurred while processing the
records. Retrying...", ex);
- } catch (final Exception ex) {
- isClosed = true;
- log.error("Error occurred while processing the records", ex);
+ public void ingestRecords() {
+ try {
+ if (hasAssignmentChanged) {
+ maybeWaitForPartitionAssignments();
+ }
+
+ log.trace("Polling consumer to receive remote log metadata topic
records");
+ final ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeoutMs));
+ for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+ processConsumerRecord(record);
}
+ maybeMarkUserPartitionsAsReady();
+ } catch (final WakeupException ex) {
+ // ignore logging the error
+ closeConsumer();
+ } catch (final RetriableException ex) {
+ log.warn("Retriable error occurred while processing the records.
Retrying...", ex);
+ } catch (final Exception ex) {
+ log.error("Error occurred while processing the records", ex);
+ closeConsumer();
}
+ }
+
+ private void closeConsumer() {
Review Comment:
This introduces a race condition that can cause a consumer leak. Consider
the following sequence:
1. The ConsumerTask thread returns from maybeMarkPartitionsAsReady(), and
then pauses
2. An external thread calls close(), and sets isClose = true
3. The ConsumerTask thread resumes, returns from ingestRecords(), and checks
the isClose flag in run().
4. The ConsumerTask thread returns from run() and exits, without ever
closing the consumer.
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -136,46 +128,46 @@ public void testUserTopicIdPartitionEquals() {
}
@Test
- public void testAddAssignmentsForPartitions() throws InterruptedException {
+ public void testAddAssignmentsForPartitions() {
final List<TopicIdPartition> idPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = idPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(idPartitions));
- thread.start();
+ consumerTask.ingestRecords();
for (final TopicIdPartition idPartition : idPartitions) {
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(idPartition), "Timed out waiting for " +
idPartition + " to be assigned");
assertTrue(consumerTask.isMetadataPartitionAssigned(partitioner.metadataPartition(idPartition)));
assertTrue(handler.isPartitionLoaded.get(idPartition));
}
}
@Test
- public void testRemoveAssignmentsForPartitions() throws
InterruptedException {
+ public void testRemoveAssignmentsForPartitions() {
final List<TopicIdPartition> allPartitions = getIdPartitions("sample",
3);
final Map<TopicPartition, Long> endOffsets = allPartitions.stream()
.map(idp ->
toRemoteLogPartition(partitioner.metadataPartition(idp)))
.collect(Collectors.toMap(Function.identity(), e -> 0L, (a, b) ->
b));
consumer.updateEndOffsets(endOffsets);
consumerTask.addAssignmentsForPartitions(new HashSet<>(allPartitions));
- thread.start();
+ consumerTask.ingestRecords();
final TopicIdPartition tpId = allPartitions.get(0);
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId), "Timed out waiting for " + tpId + "
to be assigned");
Review Comment:
I think these assertions for isUserPartitionAssigned can be retained as
assertTrue calls.
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -238,29 +230,32 @@ public void testCanProcessRecord() throws
InterruptedException {
consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition),
0L));
final Set<TopicIdPartition> assignments = Collections.singleton(tpId0);
consumerTask.addAssignmentsForPartitions(assignments);
- thread.start();
- TestUtils.waitForCondition(() ->
consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 +
" to be assigned");
+ consumerTask.ingestRecords();
addRecord(consumer, metadataPartition, tpId0, 0);
addRecord(consumer, metadataPartition, tpId0, 1);
- TestUtils.waitForCondition(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
+ consumerTask.ingestRecords();
+ assertTrue(() ->
consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)),
"Couldn't read record");
Review Comment:
The "Couldn't read record" strings are pretty useless and potentially
confusing. I would be fine with omitting them completely, or if you want, you
could write some new strings based on the surrounding context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]