ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1621467009


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -555,18 +556,21 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
 
         RackUtils.annotateTopicPartitionsWithRackInfo(cluster, 
internalTopicManager, allTopicPartitions);
 
-        final Set<TaskInfo> logicalTasks = logicalTaskIds.stream().map(taskId 
-> {
-            final Set<String> stateStoreNames = topologyMetadata
-                .stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
-                .keySet();
-            final Set<TaskTopicPartition> topicPartitions = 
topicPartitionsForTask.get(taskId);
-            return new DefaultTaskInfo(
-                taskId,
-                !stateStoreNames.isEmpty(),
-                stateStoreNames,
-                topicPartitions
-            );
-        }).collect(Collectors.toSet());
+        final Map<TaskId, TaskInfo> logicalTasks = 
logicalTaskIds.stream().collect(Collectors.toMap(
+            Function.identity(),
+            taskId -> {
+                final Set<String> stateStoreNames = topologyMetadata
+                    
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())

Review Comment:
   Ah somehow I missed this before -- this is actually returning _all_ the 
state stores for this topology, it's not specific to the taskId. This was an 
existing issue so we don't need to fix it in this PR, it can be addressed in a 
followup. It might be a bit complicated so I'll take a look at how we can get 
this info
   
   Would've caught this during testing since we definitely want tests with 
mixed stateless-and-stateful tasks, but still good to fix ASAP



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -72,6 +80,27 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
identityAssignment(final Ap
         return assignments;
     }
 
+    /**
+     * Assign standby tasks to KafkaStreams clients according to the default 
logic.
+     * <p>
+     * If rack-aware client tags are configured, the rack-aware standby task 
assignor will be used
+     *
+     * @param applicationState        the metadata and other info describing 
the current application state
+     * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
+     *
+     * @return a new map containing the mappings from KafkaStreamsAssignments 
updated with the default standby assignment
+     */
+    public static Map<ProcessId, KafkaStreamsAssignment> 
defaultStandbyTaskAssignment(final ApplicationState applicationState,
+                                                                               
       final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+        if 
(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
+            return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
+        } else if (canPerformRackAwareOptimization(applicationState, 
AssignedTask.Type.STANDBY)) {
+            return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);

Review Comment:
   Address in a followup:
   
   We should just remove this case entirely right? basically it's "if 
hasRackAwareTags then do tag-based standby task assignment, if doesNotHaveTags 
then do default standby task assignment"
   
   Note that the tag-based rack aware assignment has nothing to do with the 
rack ids. So `canPerformRackAwareOptimization` is kind of irrelevant to the 
question here



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
         }
         return true;
     }
+
+    private static Map<ProcessId, KafkaStreamsAssignment> 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+                                                                               
         final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+        final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+        final Map<TaskId, Integer> tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+            .collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+        final Map<ProcessId, KafkaStreamsState> streamStates = 
applicationState.kafkaStreamsStates(false);
+
+        final Set<String> rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+        final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+        final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+        final Set<TaskId> statefulTaskIds = 
applicationState.allTasks().values().stream()
+            .filter(TaskInfo::isStateful)
+            .map(TaskInfo::id)
+            .collect(Collectors.toSet());
+        final Map<UUID, KafkaStreamsAssignment> clientsByUuid = 
kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
+            entry -> entry.getKey().id(),
+            Map.Entry::getValue
+        ));
+        final Function<KafkaStreamsState, Map<String, String>> clientTagGetter 
= createClientTagGetter(applicationState);
+
+        final Map<TaskId, ProcessId> pendingStandbyTasksToClientId = new 
HashMap<>();
+        for (final TaskId statefulTaskId : statefulTaskIds) {
+            for (final KafkaStreamsAssignment assignment : 
clientsByUuid.values()) {
+                if (assignment.tasks().containsKey(statefulTaskId)) {

Review Comment:
   @apourchet make sure to address this in the followup PR



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -244,18 +271,112 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
         );
         LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-        throw new UnsupportedOperationException("Not yet Implemented.");
+        final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+        final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, 
List<TaskId>> getMovableTasks = (source, destination) -> {
+            return source.tasks().values().stream()
+                .filter(task -> task.type() == AssignedTask.Type.STANDBY)
+                .filter(task -> !destination.tasks().containsKey(task.id()))
+                .filter(task -> {
+                    final KafkaStreamsState sourceState = 
kafkaStreamsStates.get(source.processId());
+                    final KafkaStreamsState destinationState = 
kafkaStreamsStates.get(source.processId());
+                    return moveablePredicate.canMoveStandbyTask(sourceState, 
destinationState, task.id(), kafkaStreamsAssignments);
+                })
+                .map(AssignedTask::id)
+                .sorted()
+                .collect(Collectors.toList());
+        };
+
+        final long startTime = System.currentTimeMillis();
+        boolean taskMoved = true;
+        int round = 0;
+        final RackAwareGraphConstructor<KafkaStreamsAssignment> 
graphConstructor = RackAwareGraphConstructorFactory.create(
+            
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+        while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+            taskMoved = false;
+            round++;
+            for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+                final UUID clientId1 = clientIds.get(i);
+                final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+                for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+                    final UUID clientId2 = clientIds.get(i);
+                    final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+                    final String rack1 = 
clientRacks.get(clientState1.processId().id()).get();
+                    final String rack2 = 
clientRacks.get(clientState2.processId().id()).get();
+                    // Cross rack traffic can not be reduced if racks are the 
same
+                    if (rack1.equals(rack2)) {
+                        continue;
+                    }
+
+                    final List<TaskId> movable1 = 
getMovableTasks.apply(clientState1, clientState2);
+                    final List<TaskId> movable2 = 
getMovableTasks.apply(clientState2, clientState1);
+
+                    // There's no needed to optimize if one is empty because 
the optimization
+                    // can only swap tasks to keep the client's load balanced
+                    if (movable1.isEmpty() || movable2.isEmpty()) {
+                        continue;
+                    }
+
+                    final List<TaskId> taskIdList = 
Stream.concat(movable1.stream(), movable2.stream())
+                        .sorted()
+                        .collect(Collectors.toList());
+                    final List<UUID> clients = Stream.of(clientId1, 
clientId2).sorted().collect(Collectors.toList());
+
+                    final AssignmentGraph assignmentGraph = buildTaskGraph(
+                        assignmentsByUuid,
+                        clientRacks,
+                        taskIdList,
+                        clients,
+                        topicPartitionsByTaskId,
+                        crossRackTrafficCost,
+                        nonOverlapCost,
+                        false,
+                        false,

Review Comment:
   followup PR: should be true, true I think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to