ableegoldman commented on code in PR #16114:
URL: https://github.com/apache/kafka/pull/16114#discussion_r1618047686
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,72 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+ final Set<TaskId> activeTasksInOutput = new HashSet<>();
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> tasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (activeTasksInOutput.contains(task.id()) && task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ log.error("Assignment is invalid: an active task was
assigned multiple times: {}", task.id());
Review Comment:
It would be nice to include the ProcessId of both clients. We'll need to
change `activeTasksInOutput` into a Map<TaskId, ProcessId> so we can look up
the other ProcessId, then do something like this:
```suggestion
log.error("Assignment is invalid: active task {} was
assigned to multiple KafkaStreams clients: {} and {}", task.id(),
assignments.processId().id(), activeTasksInOutput.get(task.id().id()));
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,72 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+ final Set<TaskId> activeTasksInOutput = new HashSet<>();
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> tasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (activeTasksInOutput.contains(task.id()) && task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ log.error("Assignment is invalid: an active task was
assigned multiple times: {}", task.id());
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+
+ if (tasksForAssignment.contains(task.id())) {
+ log.error("Assignment is invalid: both an active and
standby assignment of a task were assigned to the same client: {}", task.id());
Review Comment:
nit: we should include the client id as well
```suggestion
log.error("Assignment is invalid: both an active and
standby copy of task {} were assigned to KafkaStreams client {}", task.id(),
assignment.processId().id());
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,72 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+ final Set<TaskId> activeTasksInOutput = new HashSet<>();
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> tasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (activeTasksInOutput.contains(task.id()) && task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ log.error("Assignment is invalid: an active task was
assigned multiple times: {}", task.id());
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+
+ if (tasksForAssignment.contains(task.id())) {
+ log.error("Assignment is invalid: both an active and
standby assignment of a task were assigned to the same client: {}", task.id());
+ return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+ }
+
+ tasksForAssignment.add(task.id());
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksInOutput.add(task.id());
+ } else {
+ standbyTasksInOutput.add(task.id());
+ }
+ }
+ }
+
+ for (final TaskInfo task : applicationState.allTasks()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.contains(task.id())) {
+ log.error("Assignment is invalid: a standby task was found for
a stateless task: {}", task.id());
Review Comment:
Same here, would be nice to log which client it was assigned to (which means
turning `standbyTasksInOutput` into a Map<TaskId, ProcessId> as well). Then:
```suggestion
log.error("Assignment is invalid: standby task for stateless
task {} was assigned to KafkaStreams client {}", task.id(),
standbyTasksInOutput.get(task.id().id());
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,72 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+ final Set<TaskId> activeTasksInOutput = new HashSet<>();
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> tasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (activeTasksInOutput.contains(task.id()) && task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ log.error("Assignment is invalid: an active task was
assigned multiple times: {}", task.id());
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+
+ if (tasksForAssignment.contains(task.id())) {
+ log.error("Assignment is invalid: both an active and
standby assignment of a task were assigned to the same client: {}", task.id());
+ return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+ }
+
+ tasksForAssignment.add(task.id());
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksInOutput.add(task.id());
+ } else {
+ standbyTasksInOutput.add(task.id());
+ }
+ }
+ }
+
+ for (final TaskInfo task : applicationState.allTasks()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.contains(task.id())) {
+ log.error("Assignment is invalid: a standby task was found for
a stateless task: {}", task.id());
+ return AssignmentError.INVALID_STANDBY_TASK;
+ }
+ }
+
+ final Map<ProcessId, KafkaStreamsState> clientStates =
applicationState.kafkaStreamsStates(false);
+ final Set<ProcessId> clientsInOutput =
assignments.stream().map(KafkaStreamsAssignment::processId)
+ .collect(Collectors.toSet());
+ for (final Map.Entry<ProcessId, KafkaStreamsState> entry :
clientStates.entrySet()) {
+ final ProcessId processIdInInput = entry.getKey();
+ if (!clientsInOutput.contains(processIdInInput)) {
+ log.error("Assignment is invalid: one of the clients has no
assignment: {}", processIdInInput.id());
Review Comment:
nit (clarify "KafkaStreams client" since "client" as a term is so massively
overloaded):
```suggestion
log.error("Assignment is invalid: KafkaStreams client {} has
no assignment", processIdInInput.id());
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,72 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+ final Set<TaskId> activeTasksInOutput = new HashSet<>();
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> tasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (activeTasksInOutput.contains(task.id()) && task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ log.error("Assignment is invalid: an active task was
assigned multiple times: {}", task.id());
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+
+ if (tasksForAssignment.contains(task.id())) {
+ log.error("Assignment is invalid: both an active and
standby assignment of a task were assigned to the same client: {}", task.id());
+ return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+ }
+
+ tasksForAssignment.add(task.id());
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksInOutput.add(task.id());
+ } else {
+ standbyTasksInOutput.add(task.id());
+ }
+ }
+ }
+
+ for (final TaskInfo task : applicationState.allTasks()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.contains(task.id())) {
+ log.error("Assignment is invalid: a standby task was found for
a stateless task: {}", task.id());
+ return AssignmentError.INVALID_STANDBY_TASK;
+ }
+ }
+
+ final Map<ProcessId, KafkaStreamsState> clientStates =
applicationState.kafkaStreamsStates(false);
+ final Set<ProcessId> clientsInOutput =
assignments.stream().map(KafkaStreamsAssignment::processId)
+ .collect(Collectors.toSet());
+ for (final Map.Entry<ProcessId, KafkaStreamsState> entry :
clientStates.entrySet()) {
+ final ProcessId processIdInInput = entry.getKey();
+ if (!clientsInOutput.contains(processIdInInput)) {
+ log.error("Assignment is invalid: one of the clients has no
assignment: {}", processIdInInput.id());
+ return AssignmentError.MISSING_PROCESS_ID;
+ }
+ }
+
+ for (final ProcessId processIdInOutput : clientsInOutput) {
+ if (!clientStates.containsKey(processIdInOutput)) {
+ log.error("Assignment is invalid: one of the clients in the
assignment is unknown: {}", processIdInOutput.id());
+ return AssignmentError.UNKNOWN_PROCESS_ID;
+ }
+ }
+
+ final Set<TaskId> taskIdsInInput =
applicationState.allTasks().stream().map(TaskInfo::id)
+ .collect(Collectors.toSet());
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (!taskIdsInInput.contains(task.id())) {
+ log.error("Assignment is invalid: one of the tasks in the
assignment is unknown: {}", task.id());
Review Comment:
```suggestion
log.error("Assignment is invalid: task {} assigned to
KafkaStreams client {} was unknown", task.id(), assignment.processId().id());
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -1522,6 +1525,72 @@ private void maybeScheduleFollowupRebalance(final long
encodedNextScheduledRebal
}
}
+ private AssignmentError validateTaskAssignment(final ApplicationState
applicationState,
+ final TaskAssignment
taskAssignment) {
+ final Collection<KafkaStreamsAssignment> assignments =
taskAssignment.assignment();
+ final Set<TaskId> activeTasksInOutput = new HashSet<>();
+ final Set<TaskId> standbyTasksInOutput = new HashSet<>();
+ for (final KafkaStreamsAssignment assignment : assignments) {
+ final Set<TaskId> tasksForAssignment = new HashSet<>();
+ for (final KafkaStreamsAssignment.AssignedTask task :
assignment.assignment()) {
+ if (activeTasksInOutput.contains(task.id()) && task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ log.error("Assignment is invalid: an active task was
assigned multiple times: {}", task.id());
+ return AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
+ }
+
+ if (tasksForAssignment.contains(task.id())) {
+ log.error("Assignment is invalid: both an active and
standby assignment of a task were assigned to the same client: {}", task.id());
+ return
AssignmentError.ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS;
+ }
+
+ tasksForAssignment.add(task.id());
+ if (task.type() ==
KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
+ activeTasksInOutput.add(task.id());
+ } else {
+ standbyTasksInOutput.add(task.id());
+ }
+ }
+ }
+
+ for (final TaskInfo task : applicationState.allTasks()) {
+ if (!task.isStateful() &&
standbyTasksInOutput.contains(task.id())) {
+ log.error("Assignment is invalid: a standby task was found for
a stateless task: {}", task.id());
+ return AssignmentError.INVALID_STANDBY_TASK;
+ }
+ }
+
+ final Map<ProcessId, KafkaStreamsState> clientStates =
applicationState.kafkaStreamsStates(false);
+ final Set<ProcessId> clientsInOutput =
assignments.stream().map(KafkaStreamsAssignment::processId)
+ .collect(Collectors.toSet());
+ for (final Map.Entry<ProcessId, KafkaStreamsState> entry :
clientStates.entrySet()) {
+ final ProcessId processIdInInput = entry.getKey();
+ if (!clientsInOutput.contains(processIdInInput)) {
+ log.error("Assignment is invalid: one of the clients has no
assignment: {}", processIdInInput.id());
+ return AssignmentError.MISSING_PROCESS_ID;
+ }
+ }
+
+ for (final ProcessId processIdInOutput : clientsInOutput) {
+ if (!clientStates.containsKey(processIdInOutput)) {
+ log.error("Assignment is invalid: one of the clients in the
assignment is unknown: {}", processIdInOutput.id());
Review Comment:
```suggestion
log.error("Assignment is invalid: the KafkaStreams client {}
is unknown", processIdInOutput.id());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]