This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0db9f32eb1d MINOR: misc improvements in consumer test & events (#21797)
0db9f32eb1d is described below

commit 0db9f32eb1d089a088a9a57c3bca4233f583ae8e
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Mar 18 16:39:52 2026 -0400

    MINOR: misc improvements in consumer test & events (#21797)
    
    Minor improvements in test and events:
    - fix test to create the actual event used in real life flow
    - unmodifiable collections in new events, aligned with existing ones
    
    Reviewers: Andrew Schofield <[email protected]>, Nilesh Kumar
     <[email protected]>
---
 .../clients/consumer/internals/events/ApplyAssignmentEvent.java   | 5 +++--
 .../consumer/internals/events/PartitionsAssignedEvent.java        | 8 +++++---
 .../kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java  | 8 +++++++-
 3 files changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
index b6acbb07cb3..20c907955d3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplyAssignmentEvent.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.common.TopicPartition;
 
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
@@ -45,8 +46,8 @@ public class ApplyAssignmentEvent extends 
CompletableApplicationEvent<Void> {
     public ApplyAssignmentEvent(Set<TopicPartition> assignedPartitions,
                                 SortedSet<TopicPartition> addedPartitions) {
         super(Type.APPLY_ASSIGNMENT, Long.MAX_VALUE);
-        this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
-        this.addedPartitions = Objects.requireNonNull(addedPartitions);
+        this.assignedPartitions = 
Collections.unmodifiableSet(Objects.requireNonNull(assignedPartitions));
+        this.addedPartitions = 
Collections.unmodifiableSortedSet(Objects.requireNonNull(addedPartitions));
     }
 
     public Set<TopicPartition> assignedPartitions() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
index 82317cb2627..82fd263b071 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PartitionsAssignedEvent.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
@@ -25,8 +27,8 @@ import java.util.SortedSet;
 
 /**
  * Event sent from the background to the app thread, to notify that a new 
assignment has been reconciled.
- * The app thread is expected to apply the assignment change to the 
subscription state in the next call to consumer.poll,
- * and invoke the onPartitionsAssigned callback if needed.
+ * The app thread is expected to apply the assignment change to the 
subscription state in the next call to
+ * {@link Consumer#poll(Duration)} and invoke the onPartitionsAssigned 
callback if needed.
  */
 public class PartitionsAssignedEvent extends CompletableBackgroundEvent<Void> {
 
@@ -42,7 +44,7 @@ public class PartitionsAssignedEvent extends 
CompletableBackgroundEvent<Void> {
     public PartitionsAssignedEvent(final Set<TopicPartition> 
assignedPartitions,
                                    final SortedSet<TopicPartition> 
addedPartitions) {
         super(Type.PARTITIONS_ASSIGNED, Long.MAX_VALUE);
-        this.assignedPartitions = Objects.requireNonNull(assignedPartitions);
+        this.assignedPartitions = 
Collections.unmodifiableSet(Objects.requireNonNull(assignedPartitions));
         this.addedPartitions = 
Collections.unmodifiableSortedSet(Objects.requireNonNull(addedPartitions));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index c8efa6bb69f..4c7b97524ca 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -47,6 +47,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
@@ -1450,7 +1451,12 @@ public class AsyncKafkaConsumerTest {
         SortedSet<TopicPartition> partitions = Collections.emptySortedSet();
 
         for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
-            CompletableBackgroundEvent<Void> e = new 
PartitionsRemovedEvent(methodName, partitions);
+            CompletableBackgroundEvent<Void> e;
+            if (methodName == ON_PARTITIONS_ASSIGNED) {
+                e = new PartitionsAssignedEvent(Set.of(), partitions);
+            } else {
+                e = new PartitionsRemovedEvent(methodName, partitions);
+            }
             backgroundEventQueue.add(e);
         }
 

Reply via email to