apoorvmittal10 commented on code in PR #16573:
URL: https://github.com/apache/kafka/pull/16573#discussion_r1696643858


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1702,65 +2012,218 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
     }
 
     /**
-     * Gets or subscribes a new dynamic consumer group member.
+     * Handles a ShareGroupHeartbeat request.
      *
-     * @param group                 The consumer group.
-     * @param memberId              The member id.
-     * @param memberEpoch           The member epoch.
-     * @param ownedTopicPartitions  The owned partitions reported by the 
member.
-     * @param createIfNotExists     Whether the member should be created or 
not.
-     * @param useClassicProtocol    Whether the member uses the classic 
protocol.
+     * @param groupId               The group id from the request.
+     * @param memberId              The member id from the request.
+     * @param memberEpoch           The member epoch from the request.
+     * @param rackId                The rack id from the request or null.
+     * @param clientId              The client id.
+     * @param clientHost            The client host.
+     * @param subscribedTopicNames  The list of subscribed topic names from 
the request or null.
      *
-     * @return The existing consumer group member or a new one.
+     * @return A Result containing the ShareGroupHeartbeat response and
+     *         a list of records to update the state machine.
      */
-    private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(
-        ConsumerGroup group,
+    private CoordinatorResult<ShareGroupHeartbeatResponseData, 
CoordinatorRecord> shareGroupHeartbeat(
+        String groupId,
         String memberId,
         int memberEpoch,
-        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
-        boolean createIfNotExists,
-        boolean useClassicProtocol
-    ) {
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
-        if (!useClassicProtocol) {
-            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
-        }
-        if (createIfNotExists) {
-            log.info("[GroupId {}] Member {} joins the consumer group using 
the {} protocol.",
-                group.groupId(), memberId, useClassicProtocol ? "classic" : 
"consumer");
-        }
-        return member;
-    }
+        String rackId,
+        String clientId,
+        String clientHost,
+        List<String> subscribedTopicNames
+    ) throws ApiException {
+        final long currentTimeMs = time.milliseconds();
+        // The records which persists.
+        final List<CoordinatorRecord> records = new ArrayList<>();
+        // The records which are replayed immediately.
+        final List<CoordinatorRecord> replayRecords = new ArrayList<>();
 
-    /**
-     * Gets or subscribes a static consumer group member. This method also 
replaces the
-     * previous static member if allowed.
-     *
-     * @param group                 The consumer group.
-     * @param memberId              The member id.
-     * @param memberEpoch           The member epoch.
-     * @param instanceId            The instance id.
-     * @param ownedTopicPartitions  The owned partitions reported by the 
member.
-     * @param createIfNotExists     Whether the member should be created or 
not.
-     * @param useClassicProtocol    Whether the member uses the classic 
protocol.
-     * @param records               The list to accumulate records created to 
replace
-     *                              the previous static member.
-     *                              
-     * @return The existing consumer group member or a new one.
-     */
-    private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(
-        ConsumerGroup group,
-        String memberId,
-        int memberEpoch,
-        String instanceId,
-        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
-        boolean createIfNotExists,
-        boolean useClassicProtocol,
-        List<CoordinatorRecord> records
-    ) {
-        ConsumerGroupMember existingStaticMemberOrNull = 
group.staticMember(instanceId);
+        // Get or create the share group.
+        boolean createIfNotExists = memberEpoch == 0;
+        final ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, 
createIfNotExists);
+        throwIfShareGroupIsFull(group, memberId);
 
-        if (createIfNotExists) {
+        // Get or create the member.
+        if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
+        ShareGroupMember member = getOrMaybeSubscribeShareGroupMember(
+            group,
+            memberId,
+            memberEpoch,
+            createIfNotExists
+        );
+
+        ShareGroupMember.Builder updatedMemberBuilder = new 
ShareGroupMember.Builder(member);
+        // 1. Create or update the member.
+        ShareGroupMember updatedMember = updatedMemberBuilder
+            .maybeUpdateRackId(Optional.ofNullable(rackId))
+            
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+            .setClientId(clientId)
+            .setClientHost(clientHost)
+            .build();
+
+        boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+            groupId,
+            member,
+            updatedMember,
+            records
+        );
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+        SubscriptionType subscriptionType = group.subscriptionType();
+
+        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+            // The subscription metadata is updated in two cases:
+            // 1) The member has updated its subscriptions;
+            // 2) The refresh deadline has been reached.
+            Map<String, Integer> subscribedTopicNamesMap = 
group.computeSubscribedTopicNames(member, updatedMember);
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                subscribedTopicNamesMap,
+                metadataImage.topics(),
+                metadataImage.cluster()
+            );
+
+            int numMembers = group.numMembers();
+            if (!group.hasMember(updatedMember.memberId())) {
+                numMembers++;
+            }
+
+            subscriptionType = ModernGroup.subscriptionType(
+                subscribedTopicNamesMap,
+                numMembers
+            );
+
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
+                    groupId, subscriptionMetadata);
+                bumpGroupEpoch = true;
+                replayRecords.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+            }
+
+            if (bumpGroupEpoch) {
+                groupEpoch += 1;
+                records.add(newGroupEpochRecord(groupId, groupEpoch, SHARE));
+                log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+            }
+
+            group.setMetadataRefreshDeadline(currentTimeMs + 
shareGroupMetadataRefreshIntervalMs, groupEpoch);
+        }
+
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch.
+        final int targetAssignmentEpoch;
+        final Assignment targetAssignment;
+
+        if (groupEpoch > group.assignmentEpoch()) {
+            targetAssignment = updateTargetAssignment(
+                group,
+                groupEpoch,
+                updatedMember,
+                subscriptionMetadata,
+                subscriptionType,
+                replayRecords
+            );
+            targetAssignmentEpoch = groupEpoch;

Review Comment:
   Yeah that's true. Though it's a simple iteration for share groups for now 
but it might get complex in future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to