AndrewJSchofield commented on code in PR #19339:
URL: https://github.com/apache/kafka/pull/19339#discussion_r2024467706


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private 
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
             topicPartitionFor(groupId),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.initializeShareGroupState(groupId, 
topicPartitionMap)
-        ).thenApply(
-            __ -> defaultResponse
-        ).exceptionally(exception -> {
-            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exception);
-            Errors error = Errors.forException(exception);
-            return new ShareGroupHeartbeatResponseData()
-                .setErrorCode(error.code())
-                .setErrorMessage(error.message());
+        ).handle((__, exp) -> {
+            if (exp == null) {
+                return CompletableFuture.completedFuture(defaultResponse);
+            }
+            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exp);
+            Errors error = Errors.forException(exp);
+            return uninitializeShareGroupState(error, groupId, 
topicPartitionMap);
+        }).thenCompose(resp -> resp);
+    }
+
+    // Visibility for testing
+    CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+        List<CompletableFuture<List<InitializeShareGroupStateParameters>>> 
requestsStages = runtime.scheduleReadAllOperation(
+            "reconcile-share-group-initializing-state",
+            GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+        );
+
+        if (requestsStages.isEmpty()) {
+            log.info("Nothing to reconcile for share group initializing 
state.");

Review Comment:
   Probably too much detail for info logging. Either debug or trace I suggest.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private 
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
             topicPartitionFor(groupId),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.initializeShareGroupState(groupId, 
topicPartitionMap)
-        ).thenApply(
-            __ -> defaultResponse
-        ).exceptionally(exception -> {
-            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exception);
-            Errors error = Errors.forException(exception);
-            return new ShareGroupHeartbeatResponseData()
-                .setErrorCode(error.code())
-                .setErrorMessage(error.message());
+        ).handle((__, exp) -> {
+            if (exp == null) {
+                return CompletableFuture.completedFuture(defaultResponse);
+            }
+            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exp);
+            Errors error = Errors.forException(exp);
+            return uninitializeShareGroupState(error, groupId, 
topicPartitionMap);
+        }).thenCompose(resp -> resp);
+    }
+
+    // Visibility for testing
+    CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+        List<CompletableFuture<List<InitializeShareGroupStateParameters>>> 
requestsStages = runtime.scheduleReadAllOperation(
+            "reconcile-share-group-initializing-state",
+            GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+        );
+
+        if (requestsStages.isEmpty()) {
+            log.info("Nothing to reconcile for share group initializing 
state.");
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> allRequestsStage = 
CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture<?>[0]));
+        final List<CompletableFuture<ShareGroupHeartbeatResponseData>> 
persisterResponses = new ArrayList<>();
+        allRequestsStage.thenApply(__ -> {
+            requestsStages.forEach(requestsStage -> 
requestsStage.join().forEach(request -> {
+                log.info("Reconciling initializing state - {}", request);

Review Comment:
   Too much detail for info logging.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2868,26 +2870,27 @@ private boolean initializedAssignmentPending(ShareGroup 
group) {
      * @return  A map of topic partitions which are subscribed by the share 
group but not initialized yet.
      */
     // Visibility for testing
-    Map<Uuid, Map.Entry<String, Set<Integer>>> 
subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> 
subscriptionMetadata) {
-        TopicsImage topicsImage = metadataImage.topics();
-        if (topicsImage == null || topicsImage.isEmpty() || 
subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
+    Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, 
Map<String, TopicMetadata> subscriptionMetadata) {
+        if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
             return Map.of();
         }
 
-        Map<Uuid, Map.Entry<String, Set<Integer>>> topicPartitionChangeMap = 
new HashMap<>();
+        Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>();
         ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
-        Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? Map.of() : 
info.initializedTopics();
+        Map<Uuid, Set<Integer>> alreadyInitialized = new HashMap<>();
+        if (info != null) {
+            alreadyInitialized.putAll(info.initializedTopics());
+            // We do not want to re-initialize tps which are in middle of 
initialization.
+            alreadyInitialized.putAll(info.initializingTopics());

Review Comment:
   I think this does an overwrite, not a merge. If you had `T1->{0,1}` in 
initialized, and `T1->{2,3}` in initializing, I think you'd get `T1->{2,3}` in 
the alreadyInitialized map, not `T1->{0,1,2,3}` which is I expect what was 
intended.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord> 
initializeShareGroupState(
         }
         ShareGroup group = (ShareGroup) groups.get(groupId);
 
-        // We must combine the existing information in the record with the
-        // topicPartitionMap argument.
-        Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
         ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
         if (currentMap == null) {
-            topicPartitionMap.forEach((k, v) -> finalMap.put(k, 
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
             return new CoordinatorResult<>(
-                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, 
Map.of())),
+                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), 
attachTopicName(topicPartitionMap), Map.of())),
                 null
             );
         }
 
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(topicPartitionMap.keySet());
-        combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+        // We must combine the existing information in the record with the 
topicPartitionMap argument so that the final
+        // record has up-to-date information.
+        Map<Uuid, Set<Integer>> finalInitializedMap = 
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
 
-        for (Uuid topicId : combinedTopicIdSet) {
-            String topicName = metadataImage.topics().getTopic(topicId).name();
-            Set<Integer> partitions = new 
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
-            if (topicPartitionMap.containsKey(topicId)) {
-                partitions.addAll(topicPartitionMap.get(topicId));
+        // Fetch initializing info from state metadata.
+        Map<Uuid, Set<Integer>> finalInitializingMap = new 
HashMap<>(currentMap.initializingTopics());
+
+        // Remove any entries which are already initialized.
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
topicPartitionMap.entrySet()) {
+            Uuid topicId = entry.getKey();
+            if (finalInitializingMap.containsKey(topicId)) {
+                Set<Integer> partitions = finalInitializingMap.get(topicId);
+                partitions.removeAll(entry.getValue());
+                if (partitions.isEmpty()) {
+                    finalInitializingMap.remove(topicId);
+                }
+            }
+        }
+
+        return new CoordinatorResult<>(List.of(
+            newShareGroupStatePartitionMetadataRecord(
+                group.groupId(),
+                attachTopicName(finalInitializingMap),
+                attachTopicName(finalInitializedMap),
+                Map.of()
+            )),
+            null
+        );
+    }
+
+    /**
+     * Removes specific topic partitions from the initializing state for a 
share group. This is usually part of
+     * shareGroupHeartbeat code flow, specifically, if there is a persister 
exception.
+     * @param groupId The group id corresponding to the share group whose 
share partitions have been initialized.
+     * @param topicPartitionMap Map representing topic partition data to be 
cleaned from the share state partition metadata.
+     *
+     * @return A Result containing ShareGroupStatePartitionMetadata records 
and Void response.
+     */
+    public CoordinatorResult<Void, CoordinatorRecord> 
uninitializeShareGroupState(
+        String groupId,
+        Map<Uuid, Set<Integer>> topicPartitionMap
+    ) {
+        ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
+        if (info == null || info.initializingTopics().isEmpty() || 
topicPartitionMap.isEmpty()) {
+            return new CoordinatorResult<>(List.of(), null);
+        }
+
+        Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics();
+        Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>();
+
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
initializingTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            // If topicId to clean is not present in topicPartitionMap map, 
retain it.
+            if (!topicPartitionMap.containsKey(topicId)) {
+                finalInitializingTopics.put(entry.getKey(), entry.getValue());
+            } else {
+                Set<Integer> partitions = new HashSet<>(entry.getValue());
+                partitions.removeAll(topicPartitionMap.get(topicId));
+                if (!partitions.isEmpty()) {
+                    finalInitializingTopics.put(entry.getKey(), partitions);
+                }
             }
-            finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName, 
partitions));
         }
 
         return new CoordinatorResult<>(
-            List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), 
finalMap, Map.of())),
+            List.of(
+                newShareGroupStatePartitionMetadataRecord(
+                    groupId,
+                    attachTopicName(finalInitializingTopics),
+                    attachTopicName(info.initializedTopics()),
+                    Map.of()

Review Comment:
   Wouldn't this be `info.deletingTopics()` in principle?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord> 
initializeShareGroupState(
         }
         ShareGroup group = (ShareGroup) groups.get(groupId);
 
-        // We must combine the existing information in the record with the
-        // topicPartitionMap argument.
-        Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
         ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
         if (currentMap == null) {
-            topicPartitionMap.forEach((k, v) -> finalMap.put(k, 
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
             return new CoordinatorResult<>(
-                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, 
Map.of())),
+                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), 
attachTopicName(topicPartitionMap), Map.of())),
                 null
             );
         }
 
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(topicPartitionMap.keySet());
-        combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+        // We must combine the existing information in the record with the 
topicPartitionMap argument so that the final
+        // record has up-to-date information.
+        Map<Uuid, Set<Integer>> finalInitializedMap = 
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
 
-        for (Uuid topicId : combinedTopicIdSet) {
-            String topicName = metadataImage.topics().getTopic(topicId).name();
-            Set<Integer> partitions = new 
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
-            if (topicPartitionMap.containsKey(topicId)) {
-                partitions.addAll(topicPartitionMap.get(topicId));
+        // Fetch initializing info from state metadata.
+        Map<Uuid, Set<Integer>> finalInitializingMap = new 
HashMap<>(currentMap.initializingTopics());
+
+        // Remove any entries which are already initialized.
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
topicPartitionMap.entrySet()) {
+            Uuid topicId = entry.getKey();
+            if (finalInitializingMap.containsKey(topicId)) {
+                Set<Integer> partitions = finalInitializingMap.get(topicId);
+                partitions.removeAll(entry.getValue());
+                if (partitions.isEmpty()) {
+                    finalInitializingMap.remove(topicId);
+                }
+            }
+        }
+
+        return new CoordinatorResult<>(List.of(
+            newShareGroupStatePartitionMetadataRecord(
+                group.groupId(),
+                attachTopicName(finalInitializingMap),
+                attachTopicName(finalInitializedMap),
+                Map.of()

Review Comment:
   In principle, I think there could be deleting topics too which would be 
`currentMap.deletingTopics()`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord> 
initializeShareGroupState(
         }
         ShareGroup group = (ShareGroup) groups.get(groupId);
 
-        // We must combine the existing information in the record with the
-        // topicPartitionMap argument.
-        Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
         ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
         if (currentMap == null) {
-            topicPartitionMap.forEach((k, v) -> finalMap.put(k, 
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
             return new CoordinatorResult<>(
-                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, 
Map.of())),
+                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), 
attachTopicName(topicPartitionMap), Map.of())),
                 null
             );
         }
 
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(topicPartitionMap.keySet());
-        combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+        // We must combine the existing information in the record with the 
topicPartitionMap argument so that the final
+        // record has up-to-date information.
+        Map<Uuid, Set<Integer>> finalInitializedMap = 
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
 
-        for (Uuid topicId : combinedTopicIdSet) {
-            String topicName = metadataImage.topics().getTopic(topicId).name();
-            Set<Integer> partitions = new 
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
-            if (topicPartitionMap.containsKey(topicId)) {
-                partitions.addAll(topicPartitionMap.get(topicId));
+        // Fetch initializing info from state metadata.
+        Map<Uuid, Set<Integer>> finalInitializingMap = new 
HashMap<>(currentMap.initializingTopics());
+
+        // Remove any entries which are already initialized.
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
topicPartitionMap.entrySet()) {
+            Uuid topicId = entry.getKey();
+            if (finalInitializingMap.containsKey(topicId)) {
+                Set<Integer> partitions = finalInitializingMap.get(topicId);
+                partitions.removeAll(entry.getValue());
+                if (partitions.isEmpty()) {
+                    finalInitializingMap.remove(topicId);
+                }
+            }
+        }
+
+        return new CoordinatorResult<>(List.of(
+            newShareGroupStatePartitionMetadataRecord(
+                group.groupId(),
+                attachTopicName(finalInitializingMap),
+                attachTopicName(finalInitializedMap),
+                Map.of()
+            )),
+            null
+        );
+    }
+
+    /**
+     * Removes specific topic partitions from the initializing state for a 
share group. This is usually part of
+     * shareGroupHeartbeat code flow, specifically, if there is a persister 
exception.
+     * @param groupId The group id corresponding to the share group whose 
share partitions have been initialized.
+     * @param topicPartitionMap Map representing topic partition data to be 
cleaned from the share state partition metadata.
+     *
+     * @return A Result containing ShareGroupStatePartitionMetadata records 
and Void response.
+     */
+    public CoordinatorResult<Void, CoordinatorRecord> 
uninitializeShareGroupState(
+        String groupId,
+        Map<Uuid, Set<Integer>> topicPartitionMap
+    ) {
+        ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
+        if (info == null || info.initializingTopics().isEmpty() || 
topicPartitionMap.isEmpty()) {
+            return new CoordinatorResult<>(List.of(), null);
+        }
+
+        Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics();
+        Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>();
+
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
initializingTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            // If topicId to clean is not present in topicPartitionMap map, 
retain it.
+            if (!topicPartitionMap.containsKey(topicId)) {
+                finalInitializingTopics.put(entry.getKey(), entry.getValue());
+            } else {
+                Set<Integer> partitions = new HashSet<>(entry.getValue());
+                partitions.removeAll(topicPartitionMap.get(topicId));
+                if (!partitions.isEmpty()) {
+                    finalInitializingTopics.put(entry.getKey(), partitions);
+                }
             }
-            finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName, 
partitions));
         }
 
         return new CoordinatorResult<>(
-            List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), 
finalMap, Map.of())),
+            List.of(
+                newShareGroupStatePartitionMetadataRecord(
+                    groupId,
+                    attachTopicName(finalInitializingTopics),
+                    attachTopicName(info.initializedTopics()),
+                    Map.of()
+                )
+            ),
             null
         );
     }
 
+    /**
+     * Iterates over all share groups and returns persister initialize 
requests corresponding to any initializing
+     * topic partitions found in the group associated {@link 
ShareGroupStatePartitionMetadataInfo}.
+     * @param offset The last committed offset for the {@link 
ShareGroupStatePartitionMetadataInfo} timeline hashmap.
+     *
+     * @return A list containing {@link InitializeShareGroupStateParameters} 
requests, could be empty.
+     */
+    public List<InitializeShareGroupStateParameters> 
reconcileShareGroupStateInitializingState(long offset) {
+        List<InitializeShareGroupStateParameters> requests = new 
LinkedList<>();
+        for (Group group : groups.values()) {
+            if (!(group instanceof ShareGroup shareGroup)) {
+                continue;
+            }
+            if 
(!(shareGroupPartitionMetadata.containsKey(shareGroup.groupId()))) {
+                continue;
+            }
+            Map<Uuid, Set<Integer>> initializing = 
shareGroupPartitionMetadata.get(shareGroup.groupId(), 
offset).initializingTopics();
+            if (initializing == null || initializing.isEmpty()) {
+                continue;
+            }
+            
requests.add(buildInitializeShareGroupStateRequest(shareGroup.groupId(), 
shareGroup.groupEpoch(), initializing));

Review Comment:
   Just a question here. Will the group epoch potentially have been increased 
for a potentially repeated initialize request? I don't think that would be a 
problem, but maybe it can happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to