ableegoldman commented on code in PR #16201:
URL: https://github.com/apache/kafka/pull/16201#discussion_r1628527914
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -53,6 +55,76 @@ public final class TaskAssignmentUtils {
private TaskAssignmentUtils() {}
+ /**
+ * Return an {@code AssignmentError} for a task assignment created for an
application.
+ *
+ * @param applicationState The application for which this task assignment
is being assessed.
+ * @param taskAssignment The task assignment that will be validated.
+ *
+ * @return {@code AssignmentError.NONE} if the assignment created for this
application is valid,
+ * or another {@code AssignmentError} otherwise.
+ */
+ public static AssignmentError validateTaskAssignment(final
ApplicationState applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+ final Map<TaskId, ProcessId> activeTasksInOutput = new HashMap<>();
+ final Map<TaskId, ProcessId> standbyTasksInOutput = new HashMap<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.tasks().values()) {
+ if (activeTasksInOutput.containsKey(task.id()) && task.type()
== KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ LOG.error("Assignment is invalid: active task {} was
assigned to multiple KafkaStreams clients: {} and {}",
+ task.id(), assignment.processId().id(),
activeTasksInOutput.get(task.id()).id());
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksInOutput.put(task.id(), assignment.processId());
+ } else {
+ standbyTasksInOutput.put(task.id(),
assignment.processId());
+ }
+ }
+ }
+
+ for (final TaskInfo task : applicationState.allTasks().values()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.containsKey(task.id())) {
+ LOG.error("Assignment is invalid: standby task for stateless
task {} was assigned to KafkaStreams client {}",
+ task.id(), standbyTasksInOutput.get(task.id()).id());
+ return AssignmentError.INVALID_STANDBY_TASK;
+ }
+ }
+
+ final Map<ProcessId, KafkaStreamsState> clientStates =
applicationState.kafkaStreamsStates(false);
+ final Set<ProcessId> clientsInOutput =
assignments.stream().map(KafkaStreamsAssignment::processId)
+ .collect(Collectors.toSet());
+ for (final Map.Entry<ProcessId, KafkaStreamsState> entry :
clientStates.entrySet()) {
+ final ProcessId processIdInInput = entry.getKey();
+ if (!clientsInOutput.contains(processIdInInput)) {
+ LOG.error("Assignment is invalid: KafkaStreams client {} has
no assignment", processIdInInput.id());
+ return AssignmentError.MISSING_PROCESS_ID;
+ }
+ }
+
+ for (final ProcessId processIdInOutput : clientsInOutput) {
+ if (!clientStates.containsKey(processIdInOutput)) {
+ LOG.error("Assignment is invalid: the KafkaStreams client {}
is unknown", processIdInOutput.id());
+ return AssignmentError.UNKNOWN_PROCESS_ID;
+ }
+ }
+
+ final Set<TaskId> taskIdsInInput =
applicationState.allTasks().keySet();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.tasks().values()) {
+ if (!taskIdsInInput.contains(task.id())) {
+ LOG.error("Assignment is invalid: task {} assigned to
KafkaStreams client {} was unknown",
+ task.id(), assignment.processId().id());
+ return AssignmentError.UNKNOWN_TASK_ID;
Review Comment:
Just noticed we're doing exactly the same loop here as we are at the
beginning of this method. Can we just move the contents into that loop?
(not a performance thing, I just think it's easier to read if we don't have
a bunch of individual loops where I have to re-check the loop conditions each
time)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]