ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1619896882
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final
TaskInfo task,
}
return true;
}
+
+ private static Map<ProcessId, KafkaStreamsAssignment>
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+ final int numStandbyReplicas =
applicationState.assignmentConfigs().numStandbyReplicas();
+ final Map<TaskId, Integer> tasksToRemainingStandbys =
applicationState.allTasks().values().stream()
+ .collect(Collectors.toMap(TaskInfo::id, taskInfo ->
numStandbyReplicas));
+ final Map<ProcessId, KafkaStreamsState> streamStates =
applicationState.kafkaStreamsStates(false);
+
+ final Set<String> rackAwareAssignmentTags = new
HashSet<>(getRackAwareAssignmentTags(applicationState));
+ final TagStatistics tagStatistics = new
TagStatistics(applicationState);
+
+ final ConstrainedPrioritySet standbyTaskClientsByTaskLoad =
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+ final Set<TaskId> statefulTaskIds =
applicationState.allTasks().values().stream()
+ .filter(TaskInfo::isStateful)
+ .map(TaskInfo::id)
+ .collect(Collectors.toSet());
+ final Map<UUID, KafkaStreamsAssignment> clientsByUuid =
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+ entry -> entry.getKey().id(),
+ Map.Entry::getValue
+ ));
+ final Function<KafkaStreamsState, Map<String, String>> clientTagGetter
= createClientTagGetter(applicationState);
+
+ final Map<TaskId, ProcessId> pendingStandbyTasksToClientId = new
HashMap<>();
+ for (final TaskId statefulTaskId : statefulTaskIds) {
+ for (final KafkaStreamsAssignment assignment :
clientsByUuid.values()) {
+ if (assignment.tasks().containsKey(statefulTaskId)) {
+ assignStandbyTasksToClientsWithDifferentTags(
+ numStandbyReplicas,
+ standbyTaskClientsByTaskLoad,
+ statefulTaskId,
+ assignment.processId(),
+ rackAwareAssignmentTags,
+ streamStates,
+ kafkaStreamsAssignments,
+ tasksToRemainingStandbys,
+ tagStatistics.tagKeyToValues,
+ tagStatistics.tagEntryToClients,
+ pendingStandbyTasksToClientId,
+ clientTagGetter
+ );
+ }
+ }
+ }
+
+ if (!tasksToRemainingStandbys.isEmpty()) {
+ assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid,
+ numStandbyReplicas,
+ standbyTaskClientsByTaskLoad,
+ tasksToRemainingStandbys);
+ }
+
+ return kafkaStreamsAssignments;
+ }
+
+ private static Map<ProcessId, KafkaStreamsAssignment>
loadBasedStandbyTaskAssignment(final ApplicationState applicationState,
+
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments)
{
+ final int numStandbyReplicas =
applicationState.assignmentConfigs().numStandbyReplicas();
+ final Map<TaskId, Integer> tasksToRemainingStandbys =
applicationState.allTasks().values().stream()
+ .collect(Collectors.toMap(TaskInfo::id, taskInfo ->
numStandbyReplicas));
+ final Map<ProcessId, KafkaStreamsState> streamStates =
applicationState.kafkaStreamsStates(false);
+
+ final Set<TaskId> statefulTaskIds =
applicationState.allTasks().values().stream()
+ .filter(TaskInfo::isStateful)
+ .map(TaskInfo::id)
+ .collect(Collectors.toSet());
+ final Map<UUID, KafkaStreamsAssignment> clients =
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+ entry -> entry.getKey().id(),
+ Map.Entry::getValue
+ ));
+
+ final ConstrainedPrioritySet standbyTaskClientsByTaskLoad =
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet()));
+ for (final TaskId task : statefulTaskIds) {
+ assignStandbyTasksForActiveTask(numStandbyReplicas, clients,
+ tasksToRemainingStandbys, standbyTaskClientsByTaskLoad, task);
+ }
+ return kafkaStreamsAssignments;
+ }
+
+ private static void assignStandbyTasksForActiveTask(final int
numStandbyReplicas,
+ final Map<UUID,
KafkaStreamsAssignment> clients,
+ final Map<TaskId,
Integer> tasksToRemainingStandbys,
+ final
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+ final TaskId
activeTaskId) {
+ int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
+ while (numRemainingStandbys > 0) {
+ final UUID client =
standbyTaskClientsByTaskLoad.poll(activeTaskId);
+ if (client == null) {
+ break;
+ }
+ clients.get(client).assignTask(new AssignedTask(activeTaskId,
AssignedTask.Type.STANDBY));
+ numRemainingStandbys--;
+ standbyTaskClientsByTaskLoad.offer(client);
+ tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
+ }
+
+ if (numRemainingStandbys > 0) {
+ LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. "
+
+ "There is not enough available capacity. You should " +
+ "increase the number of application instances " +
+ "to maintain the requested number of standby replicas.",
+ numRemainingStandbys, numStandbyReplicas, activeTaskId);
+ }
+ }
+
+ private static void assignStandbyTasksToClientsWithDifferentTags(final int
numberOfStandbyClients,
+ final
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+ final
TaskId activeTaskId,
+ final
ProcessId activeTaskClient,
+ final
Set<String> rackAwareAssignmentTags,
+ final
Map<ProcessId, KafkaStreamsState> clientStates,
+ final
Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
+ final
Map<TaskId, Integer> tasksToRemainingStandbys,
+ final
Map<String, Set<String>> tagKeyToValues,
+ final
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients,
+ final
Map<TaskId, ProcessId> pendingStandbyTasksToClientId,
+ final
Function<KafkaStreamsState, Map<String, String>> clientTagGetter) {
+ standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet().stream()
+ .map(ProcessId::id).collect(Collectors.toSet()));
+
+ // We set countOfUsedClients as 1 because client where active task is
located has to be considered as used.
+ int countOfUsedClients = 1;
+ int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
+
+ final Map<KeyValue<String, String>, Set<ProcessId>>
tagEntryToUsedClients = new HashMap<>();
+
+ ProcessId lastUsedClient = activeTaskClient;
+ do {
+ updateClientsOnAlreadyUsedTagEntries(
+ clientStates.get(lastUsedClient),
+ countOfUsedClients,
+ rackAwareAssignmentTags,
+ tagEntryToClients,
+ tagKeyToValues,
+ tagEntryToUsedClients,
+ clientTagGetter
+ );
+
+ final UUID clientOnUnusedTagDimensions =
standbyTaskClientsByTaskLoad.poll(
+ activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(new
ProcessId(uuid), tagEntryToUsedClients)
+ );
+
+ if (clientOnUnusedTagDimensions == null) {
+ break;
+ }
+
+ final KafkaStreamsState clientStateOnUsedTagDimensions =
clientStates.get(new ProcessId(clientOnUnusedTagDimensions));
+ countOfUsedClients++;
+ numRemainingStandbys--;
+
+ LOG.debug("Assigning {} out of {} standby tasks for an active task
[{}] with client tags {}. " +
+ "Standby task client tags are {}.",
+ numberOfStandbyClients - numRemainingStandbys,
numberOfStandbyClients, activeTaskId,
+ clientTagGetter.apply(clientStates.get(activeTaskClient)),
+ clientTagGetter.apply(clientStateOnUsedTagDimensions));
+
+
kafkaStreamsAssignments.get(clientStateOnUsedTagDimensions.processId()).assignTask(
+ new AssignedTask(activeTaskId, AssignedTask.Type.STANDBY)
+ );
+ lastUsedClient = new ProcessId(clientOnUnusedTagDimensions);
+ } while (numRemainingStandbys > 0);
+
+ if (numRemainingStandbys > 0) {
+ pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
+ tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
+ LOG.warn("Rack aware standby task assignment was not able to
assign {} of {} standby tasks for the " +
+ "active task [{}] with the rack aware assignment tags {}.
" +
+ "This may happen when there aren't enough application
instances on different tag " +
+ "dimensions compared to an active and corresponding
standby task. " +
+ "Consider launching application instances on different
tag dimensions than [{}]. " +
+ "Standby task assignment will fall back to assigning
standby tasks to the least loaded clients.",
+ numRemainingStandbys, numberOfStandbyClients,
+ activeTaskId, rackAwareAssignmentTags,
+ clientTagGetter.apply(clientStates.get(activeTaskClient)));
+
+ } else {
+ tasksToRemainingStandbys.remove(activeTaskId);
+ }
+ }
+
+ private static boolean isClientUsedOnAnyOfTheTagEntries(final ProcessId
client,
+ final
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients) {
+ return tagEntryToUsedClients.values().stream().anyMatch(usedClients ->
usedClients.contains(client));
+ }
+
+ private static void updateClientsOnAlreadyUsedTagEntries(final
KafkaStreamsState usedClient,
+ final int
countOfUsedClients,
+ final Set<String>
rackAwareAssignmentTags,
+ final
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients,
+ final Map<String,
Set<String>> tagKeyToValues,
+ final
Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients,
+ final
Function<KafkaStreamsState, Map<String, String>> clientTagGetter) {
+ final Map<String, String> usedClientTags =
clientTagGetter.apply(usedClient);
+
+ for (final Map.Entry<String, String> usedClientTagEntry :
usedClientTags.entrySet()) {
+ final String tagKey = usedClientTagEntry.getKey();
+
+ if (!rackAwareAssignmentTags.contains(tagKey)) {
+ LOG.warn("Client tag with key [{}] will be ignored when
computing rack aware standby " +
+ "task assignment because it is not part of the
configured rack awareness [{}].",
+ tagKey, rackAwareAssignmentTags);
+ continue;
+ }
+
+ final Set<String> allTagValues = tagKeyToValues.get(tagKey);
+
+ if (allTagValues.size() <= countOfUsedClients) {
+ allTagValues.forEach(tagValue ->
tagEntryToUsedClients.remove(new KeyValue<>(tagKey, tagValue)));
+ } else {
+ final String tagValue = usedClientTagEntry.getValue();
+ final KeyValue<String, String> tagEntry = new
KeyValue<>(tagKey, tagValue);
+ final Set<ProcessId> clientsOnUsedTagValue =
tagEntryToClients.get(tagEntry);
+ tagEntryToUsedClients.put(tagEntry, clientsOnUsedTagValue);
+ }
+ }
+ }
+
+ private static Function<KafkaStreamsState, Map<String, String>>
createClientTagGetter(final ApplicationState applicationState) {
+ final boolean hasRackAwareAssignmentTags =
!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty();
+ final boolean canPerformRackAwareOptimization =
canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY);
+
+ if (hasRackAwareAssignmentTags || !canPerformRackAwareOptimization) {
+ return KafkaStreamsState::clientTags;
+ } else {
+ return state -> mkMap(mkEntry("rack", state.rackId().get()));
+ }
+ }
+
+ private static List<String> getRackAwareAssignmentTags(final
ApplicationState applicationState) {
+ final boolean hasRackAwareAssignmentTags =
!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty();
+
+ if (hasRackAwareAssignmentTags) {
+ return
applicationState.assignmentConfigs().rackAwareAssignmentTags();
+ } else if (canPerformRackAwareOptimization(applicationState,
AssignedTask.Type.STANDBY)) {
+ return Collections.singletonList("rack");
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ private static MoveStandbyTaskPredicate getStandbyTaskMovePredicate(final
ApplicationState applicationState) {
+ final boolean hasRackAwareAssignmentTags =
!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty();
+ final boolean canPerformRackAwareOptimization =
canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY);
+ if (hasRackAwareAssignmentTags || canPerformRackAwareOptimization) {
Review Comment:
Was thinking about this a bit and decided it should just be `if
(hasRackAwareAssignmentTags)` -- for one thing, this method should only ever be
invoked if `canPerformRackAwareOptimization` is true since it's called like
halfway through `#optimizeRackAwareStandbyTasks`.
For another, everything inside this `if` case is all about client tags, so
it seems like `hasRackAwareAssignmentTags` must be true to enter this part of
the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]