AndrewJSchofield commented on code in PR #19026:
URL: https://github.com/apache/kafka/pull/19026#discussion_r1993364145
##########
core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala:
##########
@@ -261,7 +273,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
val topicPartitionsAssignedToMember2 =
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
- assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
Review Comment:
I think I expect this epoch to be 3 still. Because the first member has
successfully received the assignment, I would have thought that a single
heartbeat of the second member would also be able to assign the partition.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int
partition) {
return Set.of();
}
+ /**
+ * Returns a set of assignable partitions from the topic metadata.
+ * If the allowed partition map is null, all the partitions in the
corresponding
+ * topic metadata are returned for the argument topic id. If allowed map
is empty,
+ * empty set is returned.
+ *
+ * @param topicId The uuid of the topic
+ * @return Set of integers if assignable partitions available, empty
otherwise.
+ * @throws UnknownTopicIdException if the topicId is not found in the
metadata.
Review Comment:
I do not think you should throw `UnknownTopicIdException` here. The
partition assignor will only throw `PartitionAssignorException` so this is
likely a bit inconvenient. I would return `null` for a missing topic ID, an
empty set if the topic ID is known but there are no assignable partitions, and
the set of partition indices for the assignable partitions.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int
partition) {
return Set.of();
}
+ /**
+ * Returns a set of assignable partitions from the topic metadata.
+ * If the allowed partition map is null, all the partitions in the
corresponding
+ * topic metadata are returned for the argument topic id. If allowed map
is empty,
+ * empty set is returned.
+ *
+ * @param topicId The uuid of the topic
+ * @return Set of integers if assignable partitions available, empty
otherwise.
+ * @throws UnknownTopicIdException if the topicId is not found in the
metadata.
+ */
+ @Override
+ public Set<Integer> assignablePartitions(Uuid topicId) throws
UnknownTopicIdException {
+ TopicMetadata topic = this.topicMetadata.get(topicId);
+ if (topic == null) {
+ throw new UnknownTopicIdException(topicId.toString());
+ }
+
+ if (topicPartitionAllowedMap == null) {
+ return IntStream.range(0,
topic.numPartitions()).boxed().collect(Collectors.toSet());
Review Comment:
There is also `toUnmodifiableSet` which is probably better.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java:
##########
@@ -812,6 +814,44 @@ public static CoordinatorRecord
newShareGroupCurrentAssignmentTombstoneRecord(
);
}
+ /**
+ * Creates a ShareGroupStatePartitionMetadata record.
+ *
+ * @param groupId The share group id.
+ * @param initializedTopics Topics which have been initialized.
+ * @param deletingTopics Topics which are being deleted.
+ * @return The record.
+ */
+ public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord(
+ String groupId,
+ Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics,
+ Map<Uuid, String> deletingTopics
+ ) {
+ List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo>
initializedTopicPartitionInfo = initializedTopics.entrySet().stream()
+ .map(entry -> new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(entry.getKey())
+ .setTopicName(entry.getValue().getKey())
+ .setPartitions(entry.getValue().getValue().stream().toList()))
+ .toList();
+
+ List<ShareGroupStatePartitionMetadataValue.TopicInfo>
deletingTopicsInfo = deletingTopics.entrySet().stream()
+ .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicInfo()
+ .setTopicId(entry.getKey())
+ .setTopicName(entry.getValue()))
+ .toList();
+
+ return CoordinatorRecord.record(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ApiMessageAndVersion(
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializedTopics(initializedTopicPartitionInfo)
+ .setDeletingTopics(deletingTopicsInfo),
+ (short) 0
+ )
+ );
+ }
+
Review Comment:
I think you're going to need the tombstone record too since deleting a group
will leave this new record hanging without it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]