This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab88df74f21 KAFKA-20535: Improve async consumer CPU usage under low
max.poll.records. (#22199)
ab88df74f21 is described below
commit ab88df74f21af96b59b6d45f257404772873ef12
Author: ChickenchickenLove <[email protected]>
AuthorDate: Mon May 11 22:23:45 2026 +0900
KAFKA-20535: Improve async consumer CPU usage under low max.poll.records.
(#22199)
### Description
KAFKA-20332 fixed a correctness issue in the async consumer where the
application thread could collect buffered records before the background
thread had checked for pending reconciliations. The fix added a wait on
`inflightPoll.reconciliationCheckFuture()` in
`AsyncKafkaConsumer.collectFetch()`.
This restored the correctness guarantee, but it also increased CPU usage
in low `max.poll.records` scenarios. With `max.poll.records=5`,
profiling shows that the additional cost mainly comes from the
application thread waiting on
`ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(),
timeoutMs)` even when the consumer group member is not reconciling.
This patch avoids that unnecessary wait by tracking the consumer group
member state in `AsyncKafkaConsumer`. `AbstractMembershipManager` now
notifies `MemberStateListener` whenever the consumer member transitions
to a new state. `AsyncKafkaConsumer` uses this signal to wait for the
reconciliation check only while the member is in
`MemberState.RECONCILING`.
### Test Condition
- Broker
- stand alone
- 12 vCPU, 32GB RAM.
- Producer
- Use `bin/kafka-producer-perf-test.sh` in Broker.
- throughput 50000
- record-size 100
- Consumer (before, after, optimized)
- kubernetes environment.
- All consumers are scheduled on the same worker node.
- Profiler
- async-profiler
- duration 180 seconds.
- branch
- `before`: 5a2dcf8fd0
- `after`: 7e1c9db92f
- `optimized`: this PR
### Test Result
1. Check the throughput of each consumer
```
Before - RATE: 49997.400259974005 records/sec, total=7050826
After - RATE: 50002.199780022 records/sec, total=7557854
Optimized - RATE: 50002.199780022 records/sec, total=7584198
```
- All consumers have same throughput.
2. Average CPU usage from `kubectl top pod`
| Revision | Average CPU | |---|---:| | Before | 225.7m | |
After | 325.7m | | Optimized | 248.4m |
every 30second, 10 times. The optimized version reduced CPU usage by
about 23.7% compared with `after`.
### Flame Graph Summary
| Metric | Before | After | Optimized |
|---|---:|---:|---:|
| Samples | 2,402 | 3,160 | 2,542 |
| markReconciliationCheckComplete | 0.00% | 2.82% | 0.51% |
| setActiveTask | 0.00% | 0.06% | 0.00% |
| pollTimeMs | 0.00% | 0.00% | 0.00% |
| AsyncPollEvent | 0.37% | 2.91% | 0.87% |
| processBackgroundEvents | 2.37% | 1.93% | 2.28% |
| Reaper | 0.17% | 0.19% | 0.35% |
| parkNanos | 1.08% | 4.05% | 0.94% |
| unpark | 0.37% | 2.06% | 0.63% |
| AsyncKafkaConsumer.poll | 38.68% | 38.32% | 38.20% |
| AsyncKafkaConsumer.collectFetch | 20.32% | 23.61% | 19.39% |
| ApplicationEventProcessor.process | 2.66% | 6.46% | 2.83% |
| ApplicationEventHandler.add | 7.87% | 7.37% | 8.06% |
`After` shows higher CPU usage, and the profile also shows increased
time in `parkNanos` and `unpark`. This suggests that the additional wait
on `reconciliationCheckFuture` introduced more application/background
thread coordination overhead.
### AsyncKafkaConsumer.collectFetch
| Metric | Before | After | Optimized |
|---|---:|---:|---:|
| AsyncKafkaConsumer.collectFetch samples | 488 | 746 | 493 |
| AsyncKafkaConsumer.collectFetch % | 20.32% | 23.61% | 19.39% |
| FetchCollector.collectFetch samples | 482 | 512 | 477 |
| FetchCollector.collectFetch % | 20.07% | 16.20% | 18.76% |
| ConsumerUtils.getResult samples | 0 | 194 | 0 |
| ConsumerUtils.getResult % | 0.00% | 6.14% | 0.00% |
In `after`, the application thread spends additional time in
`ConsumerUtils.getResult()` while waiting for the reconciliation check
future. This also increases related park/unpark activity and application
event processing on the background thread. In the optimized version,
this wait is skipped unless the member is actually in `RECONCILING`
state.
### ConsumerNetworkThread.runOnce
| Metric | Before | After | Optimized |
|---|---:|---:|---:|
| ConsumerNetworkThread.runOnce samples | 1,216 | 1,621 | 1,295 |
| ConsumerNetworkThread.runOnce % | 50.62% | 51.30% | 50.94% |
| ConsumerNetworkThread.processApplicationEvents samples | 283 | 442 |
263 |
| ConsumerNetworkThread.processApplicationEvents % | 11.78% | 13.99% |
10.35% |
| FetchRequestManager.poll samples | 169 | 194 | 158 |
| FetchRequestManager.poll % | 7.04% | 6.13% | 6.22% |
| NetworkClientDelegate.poll samples | 600 | 755 | 684 |
| NetworkClientDelegate.poll % | 24.98% | 23.89% | 26.91% |
The higher CPU usage in `onsumerNetworkThread` appears to be a secondary
effect of the application thread waiting on `reconciliationCheckFuture`
more often. Each wait requires coordination between the application
thread and the ackground thread: the app thread enqueues an
`AsyncPollEvent`, waits for the reconciliation check to complete, and
the background thread processes that event and completes the future. As
a result, `ConsumerNetworkThread.processApplicationEvents` and related
`AsyncPollEvent` processing show higher CPU usage in the after profile.
Reviewers: Lianet Magrans <[email protected]>
---
.../internals/AbstractMembershipManager.java | 1 +
.../consumer/internals/AsyncKafkaConsumer.java | 13 ++++-
.../consumer/internals/MemberStateListener.java | 9 ++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 55 ++++++++++++++++++++++
4 files changed, 77 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 5782e138202..4d02d447d2b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -245,6 +245,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
log.info("Member {} with epoch {} transitioned from {} to {}.",
memberId, memberEpoch, state, nextState);
this.state = nextState;
+ stateUpdatesListeners.forEach(listener ->
listener.onMemberStateChange(nextState));
}
private static boolean isCompletingRebalance(MemberState currentState,
MemberState nextState) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 08e5ce30ed6..38dd6cb23c7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -420,6 +420,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refCount = new AtomicInteger(0);
+ private volatile boolean hasPendingReconciliation = false;
+
private final MemberStateListener memberStateListener = new
MemberStateListener() {
@Override
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String
memberId) {
@@ -430,6 +432,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
setGroupAssignmentSnapshot(partitions);
}
+
+ @Override
+ public void onMemberStateChange(MemberState memberState) {
+ setHasPendingReconciliation(memberState ==
MemberState.RECONCILING);
+ }
};
public AsyncKafkaConsumer(final ConsumerConfig config,
@@ -873,6 +880,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
groupAssignmentSnapshot.set(Collections.unmodifiableSet(partitions));
}
+ void setHasPendingReconciliation(final boolean hasPendingReconciliation) {
+ this.hasPendingReconciliation = hasPendingReconciliation;
+ }
+
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
if (!metrics().containsKey(metric.metricName())) {
@@ -2028,7 +2039,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// This is key because partitions may need revocation, so we need to
wait for the reconciliation check
// that triggers commits and marks partitions as pending revocation,
before we can
// safely collect records from the buffer.
- if (inflightPoll != null &&
!inflightPoll.isReconciliationCheckComplete()) {
+ if (hasPendingReconciliation && inflightPoll != null &&
!inflightPoll.isReconciliationCheckComplete()) {
// If the background hasn't had the time to check for pending
reconciliation,
// we need to wait for that check before moving on (instead of
returning empty right away,
// which will lead to blocking on buffer data)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
index 98b6271fcc0..cbf28b13af3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
@@ -47,4 +47,13 @@ public interface MemberStateListener {
default void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
}
+
+ /**
+ * Called whenever the member transitions to a new state.
+ *
+ * @param memberState The member state.
+ */
+ default void onMemberStateChange(MemberState memberState) {
+
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index f64c8102280..c6833d6aee4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -609,6 +609,7 @@ public class AsyncKafkaConsumerTest {
// Do NOT mark reconciliation check complete - simulating
background hasn't processed it yet
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+ consumer.setHasPendingReconciliation(true);
// Poll should return empty because reconciliation check is not
complete.
ConsumerRecords<?, ?> result1 = consumer.poll(Duration.ZERO);
@@ -623,6 +624,60 @@ public class AsyncKafkaConsumerTest {
assertEquals(2, result2.count(), "Expected 2 records after
reconciliation check is complete");
}
+ @Test
+ public void
testPollDoesNotWaitForReconciliationCheckIfNoPendingReconciliation() {
+ final String topicName = "foo";
+ final int partition = 3;
+ final TopicPartition tp = new TopicPartition(topicName, partition);
+ final List<ConsumerRecord<String, String>> records = asList(
+ new ConsumerRecord<>(topicName, partition, 2, "key1",
"value1"),
+ new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
+ );
+
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(
+ mock(FetchBuffer.class),
+ new ConsumerInterceptors<>(Collections.emptyList(), metrics),
+ mock(ConsumerRebalanceListenerInvoker.class),
+ subscriptions);
+
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ // PositionsValidator starts with metadataUpdateVersion=-1. Stub
metadata.updateVersion() to match,
+ // so canSkipUpdateFetchPositions() passes and we test the
reconciliation check path.
+ doReturn(-1).when(metadata).updateVersion();
+
+ completeTopicSubscriptionChangeEventSuccessfully();
+ consumer.subscribe(singleton(topicName),
mock(ConsumerRebalanceListener.class));
+ // Simulate partition assignment from group coordinator
+ subscriptions.assignFromSubscribed(singleton(tp));
+
+ // Set up position so canSkipUpdateFetchPositions() returns true
(partition in FETCHING state)
+ completeSeekUnvalidatedEventSuccessfully();
+ subscriptions.seek(tp, 0);
+
+ // Set up fetch collector to return records when called
+ doReturn(Fetch.forPartition(tp, records, true, new
OffsetAndMetadata(4, Optional.of(0), "")))
+ .when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+ // Capture the AsyncPollEvent but leave the reconciliation check
incomplete.
+ // Since there is no pending reconciliation, poll should not wait for
it.
+ AtomicReference<AsyncPollEvent> capturedEvent = new
AtomicReference<>();
+ doAnswer(invocation -> {
+ AsyncPollEvent event = invocation.getArgument(0);
+ assertTrue(capturedEvent.compareAndSet(null, event));
+ // Do NOT mark reconciliation check complete - simulating
background hasn't processed it yet
+ return null;
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+ consumer.setHasPendingReconciliation(false);
+
+ // Poll does not wait AsyncPollEvent if there is no pending
reconciliation.
+ ConsumerRecords<?, ?> result = consumer.poll(Duration.ZERO);
+
+ assertNotNull(capturedEvent.get(), "AsyncPollEvent should have been
captured");
+ assertFalse(capturedEvent.get().isReconciliationCheckComplete(),
"Reconciliation check should still be incomplete");
+ assertEquals(2, result.count(), "Expected records without waiting when
no reconciliation is pending");
+ }
+
@Test
public void testEnsureCallbackExecutedByApplicationThread() {
consumer = newConsumer();