smjn commented on code in PR #19339:
URL: https://github.com/apache/kafka/pull/19339#discussion_r2024659495


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private 
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
             topicPartitionFor(groupId),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.initializeShareGroupState(groupId, 
topicPartitionMap)
-        ).thenApply(
-            __ -> defaultResponse
-        ).exceptionally(exception -> {
-            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exception);
-            Errors error = Errors.forException(exception);
-            return new ShareGroupHeartbeatResponseData()
-                .setErrorCode(error.code())
-                .setErrorMessage(error.message());
+        ).handle((__, exp) -> {
+            if (exp == null) {
+                return CompletableFuture.completedFuture(defaultResponse);
+            }
+            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exp);
+            Errors error = Errors.forException(exp);
+            return uninitializeShareGroupState(error, groupId, 
topicPartitionMap);
+        }).thenCompose(resp -> resp);
+    }
+
+    // Visibility for testing
+    CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+        List<CompletableFuture<List<InitializeShareGroupStateParameters>>> 
requestsStages = runtime.scheduleReadAllOperation(
+            "reconcile-share-group-initializing-state",
+            GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+        );
+
+        if (requestsStages.isEmpty()) {
+            log.info("Nothing to reconcile for share group initializing 
state.");

Review Comment:
   This will we invoked during onElected only, hence the frequency will be 
minimal.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private 
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
             topicPartitionFor(groupId),
             Duration.ofMillis(config.offsetCommitTimeoutMs()),
             coordinator -> coordinator.initializeShareGroupState(groupId, 
topicPartitionMap)
-        ).thenApply(
-            __ -> defaultResponse
-        ).exceptionally(exception -> {
-            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exception);
-            Errors error = Errors.forException(exception);
-            return new ShareGroupHeartbeatResponseData()
-                .setErrorCode(error.code())
-                .setErrorMessage(error.message());
+        ).handle((__, exp) -> {
+            if (exp == null) {
+                return CompletableFuture.completedFuture(defaultResponse);
+            }
+            log.error("Unable to initialize share group state partition 
metadata for {}.", groupId, exp);
+            Errors error = Errors.forException(exp);
+            return uninitializeShareGroupState(error, groupId, 
topicPartitionMap);
+        }).thenCompose(resp -> resp);
+    }
+
+    // Visibility for testing
+    CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+        List<CompletableFuture<List<InitializeShareGroupStateParameters>>> 
requestsStages = runtime.scheduleReadAllOperation(
+            "reconcile-share-group-initializing-state",
+            GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+        );
+
+        if (requestsStages.isEmpty()) {
+            log.info("Nothing to reconcile for share group initializing 
state.");
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> allRequestsStage = 
CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture<?>[0]));
+        final List<CompletableFuture<ShareGroupHeartbeatResponseData>> 
persisterResponses = new ArrayList<>();
+        allRequestsStage.thenApply(__ -> {
+            requestsStages.forEach(requestsStage -> 
requestsStage.join().forEach(request -> {
+                log.info("Reconciling initializing state - {}", request);

Review Comment:
   This will we invoked during onElected only, hence the frequency will be 
minimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to