jeffkbkim commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1610394065
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##########
@@ -79,8 +80,16 @@ public void testTwoMembersNoTopicSubscription() {
Collections.emptyMap()
));
- AssignmentSpec assignmentSpec = new AssignmentSpec(members,
HETEROGENEOUS);
- GroupAssignment groupAssignment = assignor.assign(assignmentSpec,
subscribedTopicMetadata);
+ GroupSpecImpl groupSpec = new GroupSpecImpl(
Review Comment:
nit: should all the `GroupSpecImpl groupSpec` initializations in the tests
be `GroupSpec groupSpec`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##########
@@ -82,4 +83,34 @@ public static void assertAssignment(
assertEquals(expectedAssignment.get(memberId),
computedAssignmentForMember);
});
}
+
+ /**
+ * Generate a reverse look up map of partition to member target
assignments from the given member spec.
+ *
+ * @param memberSpec A map where the key is the member Id and the
value is an
+ * AssignmentMemberSpec object containing the
member's partition assignments.
+ * @return Map of topic partition to member assignments.
+ */
+ public static Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+ Map<String, AssignmentMemberSpec> memberSpec
+ ) {
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new
HashMap<>();
+ for (Map.Entry<String, AssignmentMemberSpec> memberEntry :
memberSpec.entrySet()) {
+ String memberId = memberEntry.getKey();
+ Map<Uuid, Set<Integer>> topicsAndPartitions =
memberEntry.getValue().assignedPartitions();
+
+ for (Map.Entry<Uuid, Set<Integer>> topicEntry :
topicsAndPartitions.entrySet()) {
+ Uuid topicId = topicEntry.getKey();
+ Set<Integer> partitions = topicEntry.getValue();
+
+ invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+ Map<Integer, String> partitionMap =
invertedTargetAssignment.get(topicId);
Review Comment:
i think computeIfAbsent would work
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java:
##########
@@ -19,7 +19,7 @@
import org.apache.kafka.common.errors.ApiException;
/**
- * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The
exception
+ * Exception thrown by {@link PartitionAssignor#assign(GroupSpecImpl,
SubscribedTopicDescriber)}}. The exception
Review Comment:
i think this needs to be GroupSpec
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##########
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
Collections.emptyMap()
));
}
- assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+ groupSpec = new GroupSpecImpl(members, HOMOGENEOUS,
Collections.emptyMap());
}
+ public Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
+ GroupAssignment groupAssignment
+ ) {
+ Map<Uuid, Map<Integer, String>> invertedTargetAssignment = new
HashMap<>();
+ for (Map.Entry<String, MemberAssignment> memberEntry :
groupAssignment.members().entrySet()) {
+ String memberId = memberEntry.getKey();
+ Map<Uuid, Set<Integer>> topicsAndPartitions =
memberEntry.getValue().targetPartitions();
+
+ for (Map.Entry<Uuid, Set<Integer>> topicEntry :
topicsAndPartitions.entrySet()) {
+ Uuid topicId = topicEntry.getKey();
+ Set<Integer> partitions = topicEntry.getValue();
+
+ invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+ Map<Integer, String> partitionMap =
invertedTargetAssignment.get(topicId);
+
+ for (Integer partitionId : partitions) {
+ partitionMap.put(partitionId, memberId);
+ }
+ }
+ }
+ return invertedTargetAssignment;
+ }
+
+
Review Comment:
nit: newline
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##########
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
Collections.emptyMap()
));
}
- assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+ groupSpec = new GroupSpecImpl(members, HOMOGENEOUS,
Collections.emptyMap());
}
+ public Map<Uuid, Map<Integer, String>> invertedTargetAssignment(
Review Comment:
is it not possible to unify this with
ServerSideAssignorBenchmark#invertedTargetAssignment?
seems like some details are different but they have a lot in common
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]