AndrewJSchofield commented on code in PR #19339:
URL: https://github.com/apache/kafka/pull/19339#discussion_r2024467706
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initializeShareGroupState(groupId,
topicPartitionMap)
- ).thenApply(
- __ -> defaultResponse
- ).exceptionally(exception -> {
- log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exception);
- Errors error = Errors.forException(exception);
- return new ShareGroupHeartbeatResponseData()
- .setErrorCode(error.code())
- .setErrorMessage(error.message());
+ ).handle((__, exp) -> {
+ if (exp == null) {
+ return CompletableFuture.completedFuture(defaultResponse);
+ }
+ log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exp);
+ Errors error = Errors.forException(exp);
+ return uninitializeShareGroupState(error, groupId,
topicPartitionMap);
+ }).thenCompose(resp -> resp);
+ }
+
+ // Visibility for testing
+ CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+ List<CompletableFuture<List<InitializeShareGroupStateParameters>>>
requestsStages = runtime.scheduleReadAllOperation(
+ "reconcile-share-group-initializing-state",
+ GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+ );
+
+ if (requestsStages.isEmpty()) {
+ log.info("Nothing to reconcile for share group initializing
state.");
Review Comment:
Probably too much detail for info logging. Either debug or trace I suggest.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -533,15 +555,38 @@ private
CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStat
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initializeShareGroupState(groupId,
topicPartitionMap)
- ).thenApply(
- __ -> defaultResponse
- ).exceptionally(exception -> {
- log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exception);
- Errors error = Errors.forException(exception);
- return new ShareGroupHeartbeatResponseData()
- .setErrorCode(error.code())
- .setErrorMessage(error.message());
+ ).handle((__, exp) -> {
+ if (exp == null) {
+ return CompletableFuture.completedFuture(defaultResponse);
+ }
+ log.error("Unable to initialize share group state partition
metadata for {}.", groupId, exp);
+ Errors error = Errors.forException(exp);
+ return uninitializeShareGroupState(error, groupId,
topicPartitionMap);
+ }).thenCompose(resp -> resp);
+ }
+
+ // Visibility for testing
+ CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
+ List<CompletableFuture<List<InitializeShareGroupStateParameters>>>
requestsStages = runtime.scheduleReadAllOperation(
+ "reconcile-share-group-initializing-state",
+ GroupCoordinatorShard::reconcileShareGroupStateInitializingState
+ );
+
+ if (requestsStages.isEmpty()) {
+ log.info("Nothing to reconcile for share group initializing
state.");
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<Void> allRequestsStage =
CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture<?>[0]));
+ final List<CompletableFuture<ShareGroupHeartbeatResponseData>>
persisterResponses = new ArrayList<>();
+ allRequestsStage.thenApply(__ -> {
+ requestsStages.forEach(requestsStage ->
requestsStage.join().forEach(request -> {
+ log.info("Reconciling initializing state - {}", request);
Review Comment:
Too much detail for info logging.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2868,26 +2870,27 @@ private boolean initializedAssignmentPending(ShareGroup
group) {
* @return A map of topic partitions which are subscribed by the share
group but not initialized yet.
*/
// Visibility for testing
- Map<Uuid, Map.Entry<String, Set<Integer>>>
subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata>
subscriptionMetadata) {
- TopicsImage topicsImage = metadataImage.topics();
- if (topicsImage == null || topicsImage.isEmpty() ||
subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
+ Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId,
Map<String, TopicMetadata> subscriptionMetadata) {
+ if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
return Map.of();
}
- Map<Uuid, Map.Entry<String, Set<Integer>>> topicPartitionChangeMap =
new HashMap<>();
+ Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>();
ShareGroupStatePartitionMetadataInfo info =
shareGroupPartitionMetadata.get(groupId);
- Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? Map.of() :
info.initializedTopics();
+ Map<Uuid, Set<Integer>> alreadyInitialized = new HashMap<>();
+ if (info != null) {
+ alreadyInitialized.putAll(info.initializedTopics());
+ // We do not want to re-initialize tps which are in middle of
initialization.
+ alreadyInitialized.putAll(info.initializingTopics());
Review Comment:
I think this does an overwrite, not a merge. If you had `T1->{0,1}` in
initialized, and `T1->{2,3}` in initializing, I think you'd get `T1->{2,3}` in
the alreadyInitialized map, not `T1->{0,1,2,3}` which is I expect what was
intended.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord>
initializeShareGroupState(
}
ShareGroup group = (ShareGroup) groups.get(groupId);
- // We must combine the existing information in the record with the
- // topicPartitionMap argument.
- Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
ShareGroupStatePartitionMetadataInfo currentMap =
shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) {
- topicPartitionMap.forEach((k, v) -> finalMap.put(k,
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
return new CoordinatorResult<>(
-
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap,
Map.of())),
+
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(),
attachTopicName(topicPartitionMap), Map.of())),
null
);
}
- Set<Uuid> combinedTopicIdSet = new
HashSet<>(topicPartitionMap.keySet());
- combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+ // We must combine the existing information in the record with the
topicPartitionMap argument so that the final
+ // record has up-to-date information.
+ Map<Uuid, Set<Integer>> finalInitializedMap =
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
- for (Uuid topicId : combinedTopicIdSet) {
- String topicName = metadataImage.topics().getTopic(topicId).name();
- Set<Integer> partitions = new
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
- if (topicPartitionMap.containsKey(topicId)) {
- partitions.addAll(topicPartitionMap.get(topicId));
+ // Fetch initializing info from state metadata.
+ Map<Uuid, Set<Integer>> finalInitializingMap = new
HashMap<>(currentMap.initializingTopics());
+
+ // Remove any entries which are already initialized.
+ for (Map.Entry<Uuid, Set<Integer>> entry :
topicPartitionMap.entrySet()) {
+ Uuid topicId = entry.getKey();
+ if (finalInitializingMap.containsKey(topicId)) {
+ Set<Integer> partitions = finalInitializingMap.get(topicId);
+ partitions.removeAll(entry.getValue());
+ if (partitions.isEmpty()) {
+ finalInitializingMap.remove(topicId);
+ }
+ }
+ }
+
+ return new CoordinatorResult<>(List.of(
+ newShareGroupStatePartitionMetadataRecord(
+ group.groupId(),
+ attachTopicName(finalInitializingMap),
+ attachTopicName(finalInitializedMap),
+ Map.of()
+ )),
+ null
+ );
+ }
+
+ /**
+ * Removes specific topic partitions from the initializing state for a
share group. This is usually part of
+ * shareGroupHeartbeat code flow, specifically, if there is a persister
exception.
+ * @param groupId The group id corresponding to the share group whose
share partitions have been initialized.
+ * @param topicPartitionMap Map representing topic partition data to be
cleaned from the share state partition metadata.
+ *
+ * @return A Result containing ShareGroupStatePartitionMetadata records
and Void response.
+ */
+ public CoordinatorResult<Void, CoordinatorRecord>
uninitializeShareGroupState(
+ String groupId,
+ Map<Uuid, Set<Integer>> topicPartitionMap
+ ) {
+ ShareGroupStatePartitionMetadataInfo info =
shareGroupPartitionMetadata.get(groupId);
+ if (info == null || info.initializingTopics().isEmpty() ||
topicPartitionMap.isEmpty()) {
+ return new CoordinatorResult<>(List.of(), null);
+ }
+
+ Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics();
+ Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>();
+
+ for (Map.Entry<Uuid, Set<Integer>> entry :
initializingTopics.entrySet()) {
+ Uuid topicId = entry.getKey();
+ // If topicId to clean is not present in topicPartitionMap map,
retain it.
+ if (!topicPartitionMap.containsKey(topicId)) {
+ finalInitializingTopics.put(entry.getKey(), entry.getValue());
+ } else {
+ Set<Integer> partitions = new HashSet<>(entry.getValue());
+ partitions.removeAll(topicPartitionMap.get(topicId));
+ if (!partitions.isEmpty()) {
+ finalInitializingTopics.put(entry.getKey(), partitions);
+ }
}
- finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName,
partitions));
}
return new CoordinatorResult<>(
- List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(),
finalMap, Map.of())),
+ List.of(
+ newShareGroupStatePartitionMetadataRecord(
+ groupId,
+ attachTopicName(finalInitializingTopics),
+ attachTopicName(info.initializedTopics()),
+ Map.of()
Review Comment:
Wouldn't this be `info.deletingTopics()` in principle?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord>
initializeShareGroupState(
}
ShareGroup group = (ShareGroup) groups.get(groupId);
- // We must combine the existing information in the record with the
- // topicPartitionMap argument.
- Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
ShareGroupStatePartitionMetadataInfo currentMap =
shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) {
- topicPartitionMap.forEach((k, v) -> finalMap.put(k,
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
return new CoordinatorResult<>(
-
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap,
Map.of())),
+
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(),
attachTopicName(topicPartitionMap), Map.of())),
null
);
}
- Set<Uuid> combinedTopicIdSet = new
HashSet<>(topicPartitionMap.keySet());
- combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+ // We must combine the existing information in the record with the
topicPartitionMap argument so that the final
+ // record has up-to-date information.
+ Map<Uuid, Set<Integer>> finalInitializedMap =
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
- for (Uuid topicId : combinedTopicIdSet) {
- String topicName = metadataImage.topics().getTopic(topicId).name();
- Set<Integer> partitions = new
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
- if (topicPartitionMap.containsKey(topicId)) {
- partitions.addAll(topicPartitionMap.get(topicId));
+ // Fetch initializing info from state metadata.
+ Map<Uuid, Set<Integer>> finalInitializingMap = new
HashMap<>(currentMap.initializingTopics());
+
+ // Remove any entries which are already initialized.
+ for (Map.Entry<Uuid, Set<Integer>> entry :
topicPartitionMap.entrySet()) {
+ Uuid topicId = entry.getKey();
+ if (finalInitializingMap.containsKey(topicId)) {
+ Set<Integer> partitions = finalInitializingMap.get(topicId);
+ partitions.removeAll(entry.getValue());
+ if (partitions.isEmpty()) {
+ finalInitializingMap.remove(topicId);
+ }
+ }
+ }
+
+ return new CoordinatorResult<>(List.of(
+ newShareGroupStatePartitionMetadataRecord(
+ group.groupId(),
+ attachTopicName(finalInitializingMap),
+ attachTopicName(finalInitializedMap),
+ Map.of()
Review Comment:
In principle, I think there could be deleting topics too which would be
`currentMap.deletingTopics()`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord>
initializeShareGroupState(
}
ShareGroup group = (ShareGroup) groups.get(groupId);
- // We must combine the existing information in the record with the
- // topicPartitionMap argument.
- Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
ShareGroupStatePartitionMetadataInfo currentMap =
shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) {
- topicPartitionMap.forEach((k, v) -> finalMap.put(k,
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
return new CoordinatorResult<>(
-
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap,
Map.of())),
+
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(),
attachTopicName(topicPartitionMap), Map.of())),
null
);
}
- Set<Uuid> combinedTopicIdSet = new
HashSet<>(topicPartitionMap.keySet());
- combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+ // We must combine the existing information in the record with the
topicPartitionMap argument so that the final
+ // record has up-to-date information.
+ Map<Uuid, Set<Integer>> finalInitializedMap =
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
- for (Uuid topicId : combinedTopicIdSet) {
- String topicName = metadataImage.topics().getTopic(topicId).name();
- Set<Integer> partitions = new
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
- if (topicPartitionMap.containsKey(topicId)) {
- partitions.addAll(topicPartitionMap.get(topicId));
+ // Fetch initializing info from state metadata.
+ Map<Uuid, Set<Integer>> finalInitializingMap = new
HashMap<>(currentMap.initializingTopics());
+
+ // Remove any entries which are already initialized.
+ for (Map.Entry<Uuid, Set<Integer>> entry :
topicPartitionMap.entrySet()) {
+ Uuid topicId = entry.getKey();
+ if (finalInitializingMap.containsKey(topicId)) {
+ Set<Integer> partitions = finalInitializingMap.get(topicId);
+ partitions.removeAll(entry.getValue());
+ if (partitions.isEmpty()) {
+ finalInitializingMap.remove(topicId);
+ }
+ }
+ }
+
+ return new CoordinatorResult<>(List.of(
+ newShareGroupStatePartitionMetadataRecord(
+ group.groupId(),
+ attachTopicName(finalInitializingMap),
+ attachTopicName(finalInitializedMap),
+ Map.of()
+ )),
+ null
+ );
+ }
+
+ /**
+ * Removes specific topic partitions from the initializing state for a
share group. This is usually part of
+ * shareGroupHeartbeat code flow, specifically, if there is a persister
exception.
+ * @param groupId The group id corresponding to the share group whose
share partitions have been initialized.
+ * @param topicPartitionMap Map representing topic partition data to be
cleaned from the share state partition metadata.
+ *
+ * @return A Result containing ShareGroupStatePartitionMetadata records
and Void response.
+ */
+ public CoordinatorResult<Void, CoordinatorRecord>
uninitializeShareGroupState(
+ String groupId,
+ Map<Uuid, Set<Integer>> topicPartitionMap
+ ) {
+ ShareGroupStatePartitionMetadataInfo info =
shareGroupPartitionMetadata.get(groupId);
+ if (info == null || info.initializingTopics().isEmpty() ||
topicPartitionMap.isEmpty()) {
+ return new CoordinatorResult<>(List.of(), null);
+ }
+
+ Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics();
+ Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>();
+
+ for (Map.Entry<Uuid, Set<Integer>> entry :
initializingTopics.entrySet()) {
+ Uuid topicId = entry.getKey();
+ // If topicId to clean is not present in topicPartitionMap map,
retain it.
+ if (!topicPartitionMap.containsKey(topicId)) {
+ finalInitializingTopics.put(entry.getKey(), entry.getValue());
+ } else {
+ Set<Integer> partitions = new HashSet<>(entry.getValue());
+ partitions.removeAll(topicPartitionMap.get(topicId));
+ if (!partitions.isEmpty()) {
+ finalInitializingTopics.put(entry.getKey(), partitions);
+ }
}
- finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName,
partitions));
}
return new CoordinatorResult<>(
- List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(),
finalMap, Map.of())),
+ List.of(
+ newShareGroupStatePartitionMetadataRecord(
+ groupId,
+ attachTopicName(finalInitializingTopics),
+ attachTopicName(info.initializedTopics()),
+ Map.of()
+ )
+ ),
null
);
}
+ /**
+ * Iterates over all share groups and returns persister initialize
requests corresponding to any initializing
+ * topic partitions found in the group associated {@link
ShareGroupStatePartitionMetadataInfo}.
+ * @param offset The last committed offset for the {@link
ShareGroupStatePartitionMetadataInfo} timeline hashmap.
+ *
+ * @return A list containing {@link InitializeShareGroupStateParameters}
requests, could be empty.
+ */
+ public List<InitializeShareGroupStateParameters>
reconcileShareGroupStateInitializingState(long offset) {
+ List<InitializeShareGroupStateParameters> requests = new
LinkedList<>();
+ for (Group group : groups.values()) {
+ if (!(group instanceof ShareGroup shareGroup)) {
+ continue;
+ }
+ if
(!(shareGroupPartitionMetadata.containsKey(shareGroup.groupId()))) {
+ continue;
+ }
+ Map<Uuid, Set<Integer>> initializing =
shareGroupPartitionMetadata.get(shareGroup.groupId(),
offset).initializingTopics();
+ if (initializing == null || initializing.isEmpty()) {
+ continue;
+ }
+
requests.add(buildInitializeShareGroupStateRequest(shareGroup.groupId(),
shareGroup.groupEpoch(), initializing));
Review Comment:
Just a question here. Will the group epoch potentially have been increased
for a potentially repeated initialize request? I don't think that would be a
problem, but maybe it can happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]