UladzislauBlok commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2384097202


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1960,19 +1960,28 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
             updatedConfiguredTopology = group.configuredTopology().get();
         }
 
+        // 3b. If the topology is validated, persist the fact that it is 
validated.
+        int validatedTopologyEpoch;

Review Comment:
   simplification:
   Set -1 by default, and conditionally override inside of 'if' 
   int validatedTopologyEpoch = -1; 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -837,7 +853,7 @@ private void maybeUpdateGroupState() {
         if (members.isEmpty()) {
             newState = EMPTY;
             clearShutdownRequestMemberId();
-        } else if (topology().isEmpty() || configuredTopology().isEmpty() || 
!configuredTopology().get().isReady()) {
+        } else if (topology().stream().allMatch(x -> x.topologyEpoch() != 
validatedTopologyEpoch.get())) {

Review Comment:
   minor one: 
   As for me Optional#streams is a bit misleading
   It could also be Optional#filter(predicate)#isPresent



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @Test
+  def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streams = createStreamsGroup(
+      inputTopics = Set(testTopicName),
+      streamsGroupId = streamsGroupId
+    )
+    streams.poll(JDuration.ofMillis(500L))
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+      }, "Stream group not stable yet")

Review Comment:
   Error message: "Stream group not stable yet"
   Correct me if I'm wrong, but this is final error message (when timeout is 
reached), so error message should be different



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName, 
metadataImage)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 
5)))
+            .build();
+
+        // Commit the offset and test again
+        context.commit();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), 
context.lastCommittedOffset);
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()

Review Comment:
   Naming is a bit misleading here. Expected member, but described group and 
just 'actual'. I think it's good to have more consistency here, kind of: 
expected member, expected described group, actual described group
   



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName, 
metadataImage)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)

Review Comment:
   minor: full import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to