This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2da6a558d24 MINOR: Fix StreamsGroupHeartbeatRequest Test Typo (#20803)
2da6a558d24 is described below

commit 2da6a558d241821442be9370b00e32c725e9367a
Author: lucliu1108 <[email protected]>
AuthorDate: Mon Nov 3 03:06:00 2025 -0600

    MINOR: Fix StreamsGroupHeartbeatRequest Test Typo (#20803)
    
    ## What
    
    Fix the typos in
    `StreamsGroupHeartBeatRequestTest#testDynamicGroupConfig()`, and add
    additional assertions before modifying `standby.replicas` config to make
    sure no standby replicas are available before the dynamic configuration
    changes.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../server/StreamsGroupHeartbeatRequestTest.scala  | 33 +++++++++++++++-------
 1 file changed, 23 insertions(+), 10 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index ea955e72313..d3b0e3f2341 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -660,7 +660,8 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
           topology = topology,
           processId = "process-1"
         )
-        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code()
+        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse1.activeTasks() != null
       }, "First StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
 
       val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
@@ -689,9 +690,14 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
           topology = topology,
           processId = "process-2"
         )
-        streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code()
+        streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse2.activeTasks() != null
       }, "Second StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
 
+      // Verify both members do not have standby tasks initially
+      assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
+      assertEquals(0, streamsGroupHeartbeatResponse2.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
+
       // Both members continue to send heartbeats with their assigned tasks
       TestUtils.waitUntilTrue(() => {
         streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -709,7 +715,8 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
             .map(r => convertTaskIds(r.warmupTasks()))
             .getOrElse(List.empty),
         )
-        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code()
+        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse1.activeTasks() != null
       }, "First member rebalance heartbeat did not succeed within the timeout 
period.")
 
       TestUtils.waitUntilTrue(() => {
@@ -718,22 +725,28 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
           memberId = memberId2,
           memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(),
           rebalanceTimeoutMs = 1000,
-          activeTasks = Option(streamsGroupHeartbeatResponse1)
+          activeTasks = Option(streamsGroupHeartbeatResponse2)
             .map(r => convertTaskIds(r.activeTasks()))
             .getOrElse(List.empty),
-          standbyTasks = Option(streamsGroupHeartbeatResponse1)
+          standbyTasks = Option(streamsGroupHeartbeatResponse2)
             .map(r => convertTaskIds(r.standbyTasks()))
             .getOrElse(List.empty),
-          warmupTasks = Option(streamsGroupHeartbeatResponse1)
+          warmupTasks = Option(streamsGroupHeartbeatResponse2)
             .map(r => convertTaskIds(r.warmupTasks()))
             .getOrElse(List.empty)
         )
         streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code()
       }, "Second member rebalance heartbeat did not succeed within the timeout 
period.")
 
-      // Verify initial state with no standby tasks
-      assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
-      assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
+      // Verify no standby tasks assigned in this configuration
+      assertEquals(Errors.NONE.code(), 
streamsGroupHeartbeatResponse2.errorCode())
+      assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 1 should have no standby tasks in this configuration")
+      val member2StandbyTasksSize = if 
(streamsGroupHeartbeatResponse2.standbyTasks() != null) {
+        streamsGroupHeartbeatResponse2.standbyTasks().size()
+      } else {
+        0
+      }
+      assertEquals(0, member2StandbyTasksSize, "Member 2 should have no 
standby tasks in this configuration")
 
       // Change streams.num.standby.replicas = 1
       val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
@@ -757,7 +770,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
           warmupTasks = List.empty
         )
         streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
-          streamsGroupHeartbeatResponse1.standbyTasks()!= null
+          streamsGroupHeartbeatResponse1.standbyTasks() != null
       }, "First member heartbeat after config change did not succeed within 
the timeout period.")
 
       TestUtils.waitUntilTrue(() => {

Reply via email to