lucasbru commented on code in PR #20457:
URL: https://github.com/apache/kafka/pull/20457#discussion_r2330173867


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, 
metadataImage)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 
5)))
+            .build();
+
+        // Commit the offset and test again
+        context.commit();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), 
context.lastCommittedOffset);
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setAssignmentEpoch(1)
+            .setTopology(
+                new StreamsGroupDescribeResponseData.Topology()
+                    .setEpoch(0)
+                    .setSubtopologies(List.of(
+                        new StreamsGroupDescribeResponseData.Subtopology()
+                            .setSubtopologyId(subtopology1)
+                            .setSourceTopics(List.of(fooTopicName))
+                    ))
+            )
+            .setMembers(Collections.singletonList(
+                
expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
+            ))
+            .setGroupState(StreamsGroupState.STABLE.toString())

Review Comment:
   yes



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -850,11 +850,16 @@ StreamsGroup getOrCreateStreamsGroup(
     ) {
         Group group = groups.get(groupId);
 
+        // Streams groups are inserted immediately into the `groups` map to 
allow soft-state
         if (group == null) {
-            return new StreamsGroup(logContext, snapshotRegistry, groupId, 
metrics);
+            StreamsGroup newGroup = new StreamsGroup(logContext, 
snapshotRegistry, groupId, metrics);
+            groups.put(groupId, newGroup);

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to