This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0db9f32eb1d MINOR: misc improvements in consumer test & events (#21797)
0db9f32eb1d is described below
commit 0db9f32eb1d089a088a9a57c3bca4233f583ae8e
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Mar 18 16:39:52 2026 -0400
MINOR: misc improvements in consumer test & events (#21797)
Minor improvements in test and events:
- fix test to create the actual event used in real life flow
- unmodifiable collections in new events, aligned with existing ones
Reviewers: Andrew Schofield <[email protected]>, Nilesh Kumar
<[email protected]>
---
.../clients/consumer/internals/events/ApplyAssignmentEvent.java | 5 +++--
.../consumer/internals/events/PartitionsAssignedEvent.java | 8 +++++---
.../kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java | 8 +++++++-
3 files changed, 15 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
index b6acbb07cb3..20c907955d3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.common.TopicPartition;
+import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
@@ -45,8 +46,8 @@ public class ApplyAssignmentEvent extends
CompletableApplicationEvent<Void> {
public ApplyAssignmentEvent(Set<TopicPartition> assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {
super(Type.APPLY_ASSIGNMENT, Long.MAX_VALUE);
- this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
- this.addedPartitions = Objects.requireNonNull(addedPartitions);
+ this.assignedPartitions =
Collections.unmodifiableSet(Objects.requireNonNull(assignedPartitions));
+ this.addedPartitions =
Collections.unmodifiableSortedSet(Objects.requireNonNull(addedPartitions));
}
public Set<TopicPartition> assignedPartitions() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
index 82317cb2627..82fd263b071 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
+import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
@@ -25,8 +27,8 @@ import java.util.SortedSet;
/**
* Event sent from the background to the app thread, to notify that a new
assignment has been reconciled.
- * The app thread is expected to apply the assignment change to the
subscription state in the next call to consumer.poll,
- * and invoke the onPartitionsAssigned callback if needed.
+ * The app thread is expected to apply the assignment change to the
subscription state in the next call to
+ * {@link Consumer#poll(Duration)} and invoke the onPartitionsAssigned
callback if needed.
*/
public class PartitionsAssignedEvent extends CompletableBackgroundEvent<Void> {
@@ -42,7 +44,7 @@ public class PartitionsAssignedEvent extends
CompletableBackgroundEvent<Void> {
public PartitionsAssignedEvent(final Set<TopicPartition>
assignedPartitions,
final SortedSet<TopicPartition>
addedPartitions) {
super(Type.PARTITIONS_ASSIGNED, Long.MAX_VALUE);
- this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
+ this.assignedPartitions =
Collections.unmodifiableSet(Objects.requireNonNull(assignedPartitions));
this.addedPartitions =
Collections.unmodifiableSortedSet(Objects.requireNonNull(addedPartitions));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index c8efa6bb69f..4c7b97524ca 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -47,6 +47,7 @@ import
org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
import
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
@@ -1450,7 +1451,12 @@ public class AsyncKafkaConsumerTest {
SortedSet<TopicPartition> partitions = Collections.emptySortedSet();
for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
- CompletableBackgroundEvent<Void> e = new
PartitionsRemovedEvent(methodName, partitions);
+ CompletableBackgroundEvent<Void> e;
+ if (methodName == ON_PARTITIONS_ASSIGNED) {
+ e = new PartitionsAssignedEvent(Set.of(), partitions);
+ } else {
+ e = new PartitionsRemovedEvent(methodName, partitions);
+ }
backgroundEventQueue.add(e);
}