ableegoldman commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1619561495
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -16,78 +16,395 @@
*/
package org.apache.kafka.streams.processor.assignment;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.SortedSet;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.TaskId;
+import
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.internals.assignment.Graph;
+import
org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
+import
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
+import
org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory;
+import org.apache.kafka.streams.StreamsConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A set of utilities to help implement task assignment via the {@link
TaskAssignor}
*/
public final class TaskAssignmentUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskAssignmentUtils.class);
+
+ private TaskAssignmentUtils() {}
+
/**
- * Assign standby tasks to KafkaStreams clients according to the default
logic.
- * <p>
- * If rack-aware client tags are configured, the rack-aware standby task
assignor will be used
+ * Return a "no-op" assignment that just copies the previous assignment of
tasks to KafkaStreams clients
*
- * @param applicationState the metadata and other info describing
the current application state
- * @param kafkaStreamsAssignments the current assignment of tasks to
KafkaStreams clients
+ * @param applicationState the metadata and other info describing the
current application state
*
- * @return a new map containing the mappings from KafkaStreamsAssignments
updated with the default
- * standby assignment
+ * @return a new map containing an assignment that replicates exactly the
previous assignment reported
+ * in the applicationState
*/
- public static Map<ProcessId, KafkaStreamsAssignment>
defaultStandbyTaskAssignment(
- final ApplicationState applicationState,
- final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
- ) {
- throw new UnsupportedOperationException("Not Implemented.");
+ public static Map<ProcessId, KafkaStreamsAssignment>
identityAssignment(final ApplicationState applicationState) {
+ final Map<ProcessId, KafkaStreamsAssignment> assignments = new
HashMap<>();
+ applicationState.kafkaStreamsStates(false).forEach((processId, state)
-> {
+ final Set<AssignedTask> tasks = new HashSet<>();
+ state.previousActiveTasks().forEach(taskId -> {
+ tasks.add(new AssignedTask(taskId,
+ AssignedTask.Type.ACTIVE));
+ });
+ state.previousStandbyTasks().forEach(taskId -> {
+ tasks.add(new AssignedTask(taskId,
+ AssignedTask.Type.STANDBY));
+ });
+
+ final KafkaStreamsAssignment newAssignment =
KafkaStreamsAssignment.of(processId, tasks);
+ assignments.put(processId, newAssignment);
+ });
+ return assignments;
}
/**
- * Optimize the active task assignment for rack-awareness
+ * Optimize active task assignment for rack awareness. This optimization
is based on the
+ * {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG
trafficCost}
+ * and {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG
nonOverlapCost}
+ * configs which balance cross rack traffic minimization and task movement.
+ * Setting {@code trafficCost} to a larger number reduces the overall
cross rack traffic of the resulting
+ * assignment, but can increase the number of tasks shuffled around
between clients.
+ * Setting {@code nonOverlapCost} to a larger number increases the
affinity of tasks to their intended client
+ * and reduces the amount by which the rack-aware optimization can shuffle
tasks around, at the cost of higher
+ * cross-rack traffic.
+ * In an extreme case, if we set {@code nonOverlapCost} to 0 and @{code
trafficCost} to a positive value,
+ * the resulting assignment will have an absolute minimum of cross rack
traffic. If we set {@code trafficCost} to 0,
+ * and {@code nonOverlapCost} to a positive value, the resulting
assignment will be identical to the input assignment.
+ * <p>
+ * Note: this method will modify the input {@link KafkaStreamsAssignment}
objects and return the same map.
+ * It does not make a copy of the map or the KafkaStreamsAssignment
objects.
+ * <p>
+ * This method optimizes cross-rack traffic for active tasks only. For
standby task optimization,
+ * use {@link #optimizeRackAwareStandbyTasks}.
*
* @param applicationState the metadata and other info describing
the current application state
* @param kafkaStreamsAssignments the current assignment of tasks to
KafkaStreams clients
- * @param tasks the set of tasks to reassign if
possible. Must already be assigned
- * to a KafkaStreams client
+ * @param tasks the set of tasks to reassign if
possible. Must already be assigned to a KafkaStreams client
*
- * @return a new map containing the mappings from KafkaStreamsAssignments
updated with the default
- * rack-aware assignment for active tasks
+ * @return a map with the KafkaStreamsAssignments updated to minimize
cross-rack traffic for active tasks
*/
public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareActiveTasks(
final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
final SortedSet<TaskId> tasks
Review Comment:
need to fix the indentation (can be done in a followup PR)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]