lucasbru commented on code in PR #19722:
URL: https://github.com/apache/kafka/pull/19722#discussion_r2126496827
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java:
##########
@@ -113,14 +103,4 @@ void
testIsStatefulThrowsExceptionWhenSubtopologyIdDoesNotExist() {
void
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
assertThrows(NoSuchElementException.class, () ->
topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
}
-
- @Test
Review Comment:
Since the number of tasks is precomputed, we won't check for empty source
topics anymore, since we don't really look at the source topics anymore.
We will throw this exception earlier in the flow (inside
`computeNumberOfTasks`). At this point.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void
throwIfStreamsGroupMemberEpochIsInvalid(
}
}
+ /**
+ * Validates that the requested tasks exist in the configured topology and
partitions are valid.
+ * If tasks is null, does nothing. If an invalid task is found, throws
InvalidRequestException.
+ *
+ * @param topology The configured topology.
Review Comment:
Done
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void
throwIfStreamsGroupMemberEpochIsInvalid(
}
}
+ /**
+ * Validates that the requested tasks exist in the configured topology and
partitions are valid.
+ * If tasks is null, does nothing. If an invalid task is found, throws
InvalidRequestException.
+ *
+ * @param topology The configured topology.
+ * @param tasks The list of requested tasks.
+ */
+ private static void throwIfRequestContainsInvalidTasks(
+ SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+ ) {
+ if (tasks == null) return;
Review Comment:
Done
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java:
##########
@@ -77,18 +76,9 @@ void testIsStateful() {
@Test
void testMaxNumInputPartitions() {
- ConfiguredInternalTopic internalTopic =
mock(ConfiguredInternalTopic.class);
ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
subtopologyMap.put("subtopology1", subtopology);
- when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
-
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
internalTopic));
Review Comment:
Since the number of tasks is precomputed, we don't need to mock any other
details about the subtopology here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void
throwIfStreamsGroupMemberEpochIsInvalid(
}
}
+ /**
+ * Validates that the requested tasks exist in the configured topology and
partitions are valid.
+ * If tasks is null, does nothing. If an invalid task is found, throws
InvalidRequestException.
+ *
+ * @param topology The configured topology.
+ * @param tasks The list of requested tasks.
+ */
+ private static void throwIfRequestContainsInvalidTasks(
+ SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+ ) {
+ if (tasks == null) return;
+ for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
+ String subtopologyId = task.subtopologyId();
+ if (!subtopologySortedMap.containsKey(subtopologyId)) {
+ throw new InvalidRequestException("Subtopology " +
subtopologyId + " does not exist in the topology.");
+ }
+ ConfiguredSubtopology subtopology =
subtopologySortedMap.get(subtopologyId);
+ int numTasks = subtopology.numberOfTasks();
+ for (Integer partition : task.partitions()) {
Review Comment:
I think it's fine like this
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java:
##########
@@ -30,17 +30,23 @@
* <p>
* Configured subtopologies may be recreated every time the input topics used
by the subtopology are modified.
*
+ * @param numberOfTasks Precomputed number of tasks. Note that not
every source topic may have a partition for
+ * every task.
Review Comment:
Done
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1677,6 +1679,34 @@ private static void
throwIfStreamsGroupMemberEpochIsInvalid(
}
}
+ /**
+ * Validates that the requested tasks exist in the configured topology and
partitions are valid.
+ * If tasks is null, does nothing. If an invalid task is found, throws
InvalidRequestException.
+ *
+ * @param topology The configured topology.
+ * @param tasks The list of requested tasks.
+ */
+ private static void throwIfRequestContainsInvalidTasks(
+ SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+ ) {
+ if (tasks == null) return;
+ for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
+ String subtopologyId = task.subtopologyId();
+ if (!subtopologySortedMap.containsKey(subtopologyId)) {
+ throw new InvalidRequestException("Subtopology " +
subtopologyId + " does not exist in the topology.");
+ }
+ ConfiguredSubtopology subtopology =
subtopologySortedMap.get(subtopologyId);
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]