This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2da6a558d24 MINOR: Fix StreamsGroupHeartbeatRequest Test Typo (#20803)
2da6a558d24 is described below
commit 2da6a558d241821442be9370b00e32c725e9367a
Author: lucliu1108 <[email protected]>
AuthorDate: Mon Nov 3 03:06:00 2025 -0600
MINOR: Fix StreamsGroupHeartbeatRequest Test Typo (#20803)
## What
Fix the typos in
`StreamsGroupHeartBeatRequestTest#testDynamicGroupConfig()`, and add
additional assertions before modifying `standby.replicas` config to make
sure no standby replicas are available before the dynamic configuration
changes.
Reviewers: Lucas Brutschy <[email protected]>
---
.../server/StreamsGroupHeartbeatRequestTest.scala | 33 +++++++++++++++-------
1 file changed, 23 insertions(+), 10 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index ea955e72313..d3b0e3f2341 100644
---
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -660,7 +660,8 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
topology = topology,
processId = "process-1"
)
- streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code()
+ streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+ streamsGroupHeartbeatResponse1.activeTasks() != null
}, "First StreamsGroupHeartbeatRequest did not succeed within the
timeout period.")
val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
@@ -689,9 +690,14 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
topology = topology,
processId = "process-2"
)
- streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code()
+ streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() &&
+ streamsGroupHeartbeatResponse2.activeTasks() != null
}, "Second StreamsGroupHeartbeatRequest did not succeed within the
timeout period.")
+ // Verify both members do not have standby tasks initially
+ assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(),
"Member 1 should have no standby tasks initially")
+ assertEquals(0, streamsGroupHeartbeatResponse2.standbyTasks().size(),
"Member 2 should have no standby tasks initially")
+
// Both members continue to send heartbeats with their assigned tasks
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -709,7 +715,8 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
.map(r => convertTaskIds(r.warmupTasks()))
.getOrElse(List.empty),
)
- streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code()
+ streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+ streamsGroupHeartbeatResponse1.activeTasks() != null
}, "First member rebalance heartbeat did not succeed within the timeout
period.")
TestUtils.waitUntilTrue(() => {
@@ -718,22 +725,28 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
memberId = memberId2,
memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch(),
rebalanceTimeoutMs = 1000,
- activeTasks = Option(streamsGroupHeartbeatResponse1)
+ activeTasks = Option(streamsGroupHeartbeatResponse2)
.map(r => convertTaskIds(r.activeTasks()))
.getOrElse(List.empty),
- standbyTasks = Option(streamsGroupHeartbeatResponse1)
+ standbyTasks = Option(streamsGroupHeartbeatResponse2)
.map(r => convertTaskIds(r.standbyTasks()))
.getOrElse(List.empty),
- warmupTasks = Option(streamsGroupHeartbeatResponse1)
+ warmupTasks = Option(streamsGroupHeartbeatResponse2)
.map(r => convertTaskIds(r.warmupTasks()))
.getOrElse(List.empty)
)
streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code()
}, "Second member rebalance heartbeat did not succeed within the timeout
period.")
- // Verify initial state with no standby tasks
- assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(),
"Member 1 should have no standby tasks initially")
- assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(),
"Member 2 should have no standby tasks initially")
+ // Verify no standby tasks assigned in this configuration
+ assertEquals(Errors.NONE.code(),
streamsGroupHeartbeatResponse2.errorCode())
+ assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(),
"Member 1 should have no standby tasks in this configuration")
+ val member2StandbyTasksSize = if
(streamsGroupHeartbeatResponse2.standbyTasks() != null) {
+ streamsGroupHeartbeatResponse2.standbyTasks().size()
+ } else {
+ 0
+ }
+ assertEquals(0, member2StandbyTasksSize, "Member 2 should have no
standby tasks in this configuration")
// Change streams.num.standby.replicas = 1
val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP,
groupId)
@@ -757,7 +770,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
warmupTasks = List.empty
)
streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
- streamsGroupHeartbeatResponse1.standbyTasks()!= null
+ streamsGroupHeartbeatResponse1.standbyTasks() != null
}, "First member heartbeat after config change did not succeed within
the timeout period.")
TestUtils.waitUntilTrue(() => {