lucasbru commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2390423868
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName,
metadataImage)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testJoinEmptyStreamsGroupAndDescribe() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+ )));
+
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.groupMetadataManager.streamsGroup(groupId));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setProcessId("process-id")
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
Review Comment:
Yeah, we are doing full imports in this file since htere are multiple
MemberStates
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]