smjn commented on code in PR #19339:
URL: https://github.com/apache/kafka/pull/19339#discussion_r2024659495
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initializeShareGroupState(groupId,
topicPartitionMap)
- ).thenApply(
- __ -> defaultResponse
- ).exceptionally(exception -> {
- log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exception);
- Errors error = Errors.forException(exception);
- return new ShareGroupHeartbeatResponseData()
- .setErrorCode(error.code())
- .setErrorMessage(error.message());
+ ).handle((__, exp) -> {
+ if (exp == null) {
+ return CompletableFuture.completedFuture(defaultResponse);
+ }
+ log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exp);
+ Errors error = Errors.forException(exp);
+ return uninitializeShareGroupState(error, groupId,
topicPartitionMap);
+ }).thenCompose(resp -> resp);
+ }
+
+ // Visibility for testing
+ CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+ List<CompletableFuture<List<InitializeShareGroupStateParameters>>>
requestsStages = runtime.scheduleReadAllOperation(
+ "reconcile-share-group-initializing-state",
+ GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+ );
+
+ if (requestsStages.isEmpty()) {
+ log.info("Nothing to reconcile for share group initializing
state.");
Review Comment:
This will we invoked during onElected only, hence the frequency will be
minimal.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initializeShareGroupState(groupId,
topicPartitionMap)
- ).thenApply(
- __ -> defaultResponse
- ).exceptionally(exception -> {
- log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exception);
- Errors error = Errors.forException(exception);
- return new ShareGroupHeartbeatResponseData()
- .setErrorCode(error.code())
- .setErrorMessage(error.message());
+ ).handle((__, exp) -> {
+ if (exp == null) {
+ return CompletableFuture.completedFuture(defaultResponse);
+ }
+ log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exp);
+ Errors error = Errors.forException(exp);
+ return uninitializeShareGroupState(error, groupId,
topicPartitionMap);
+ }).thenCompose(resp -> resp);
+ }
+
+ // Visibility for testing
+ CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+ List<CompletableFuture<List<InitializeShareGroupStateParameters>>>
requestsStages = runtime.scheduleReadAllOperation(
+ "reconcile-share-group-initializing-state",
+ GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+ );
+
+ if (requestsStages.isEmpty()) {
+ log.info("Nothing to reconcile for share group initializing
state.");
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<Void> allRequestsStage =
CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture<?>[0]));
+ final List<CompletableFuture<ShareGroupHeartbeatResponseData>>
persisterResponses = new ArrayList<>();
+ allRequestsStage.thenApply(__ -> {
+ requestsStages.forEach(requestsStage ->
requestsStage.join().forEach(request -> {
+ log.info("Reconciling initializing state - {}", request);
Review Comment:
This will we invoked during onElected only, hence the frequency will be
minimal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]